Package trac :: Package db :: Module mysql_backend

Source Code for Module trac.db.mysql_backend

  1  # -*- coding: utf-8 -*- 
  2  # 
  3  # Copyright (C) 2005-2023 Edgewall Software 
  4  # Copyright (C) 2005-2006 Christopher Lenz <[email protected]> 
  5  # Copyright (C) 2005 Jeff Weiss <[email protected]> 
  6  # Copyright (C) 2006 Andres Salomon <[email protected]> 
  7  # All rights reserved. 
  8  # 
  9  # This software is licensed as described in the file COPYING, which 
 10  # you should have received as part of this distribution. The terms 
 11  # are also available at https://trac.edgewall.org/wiki/TracLicense. 
 12  # 
 13  # This software consists of voluntary contributions made by many 
 14  # individuals. For the exact contribution history, see the revision 
 15  # history and logs, available at https://trac.edgewall.org/log/. 
 16   
 17  import os 
 18  import re 
 19  import sys 
 20  from contextlib import closing 
 21   
 22  from trac.api import IEnvironmentSetupParticipant 
 23  from trac.core import * 
 24  from trac.config import Option 
 25  from trac.db.api import ConnectionBase, DatabaseManager, IDatabaseConnector, \ 
 26                          get_column_names, parse_connection_uri 
 27  from trac.db.util import ConnectionWrapper, IterableCursor 
 28  from trac.util import as_int, get_pkginfo 
 29  from trac.util.html import Markup 
 30  from trac.util.compat import close_fds 
 31  from trac.util.text import exception_to_unicode, to_unicode 
 32  from trac.util.translation import _ 
 33   
 34  _like_escape_re = re.compile(r'([/_%])') 
 35   
 36  try: 
 37      import pymysql 
 38  except ImportError: 
 39      pymysql = None 
 40      pymsql_version = None 
 41  else: 
 42      pymsql_version = get_pkginfo(pymysql).get('version', pymysql.__version__) 
 43   
44 - class MySQLUnicodeCursor(pymysql.cursors.Cursor):
45 - def _convert_row(self, row):
46 return tuple(v.decode('utf-8') if isinstance(v, str) else v 47 for v in row)
48
49 - def fetchone(self):
50 row = super(MySQLUnicodeCursor, self).fetchone() 51 return self._convert_row(row) if row else None
52
53 - def fetchmany(self, num):
54 rows = super(MySQLUnicodeCursor, self).fetchmany(num) 55 return [self._convert_row(row) for row in rows] \ 56 if rows is not None else []
57
58 - def fetchall(self):
59 rows = super(MySQLUnicodeCursor, self).fetchall() 60 return [self._convert_row(row) for row in rows] \ 61 if rows is not None else []
62
63 - class MySQLSilentCursor(MySQLUnicodeCursor):
64 - def _show_warnings(self, conn=None):
65 pass
66 67 68 # Mapping from "abstract" SQL types to DB-specific types 69 _type_map = { 70 'int64': 'bigint', 71 'text': 'mediumtext', 72 } 73 74
75 -def _quote(identifier):
76 return "`%s`" % identifier.replace('`', '``')
77 78
79 -class MySQLConnector(Component):
80 """Database connector for MySQL version 4.1 and greater. 81 82 Database URLs should be of the form:: 83 84 {{{ 85 mysql://user[:password]@host[:port]/database[?param1=value&param2=value] 86 }}} 87 88 The following parameters are supported: 89 * `compress`: Enable compression (0 or 1) 90 * `init_command`: Command to run once the connection is created 91 * `named_pipe`: Use a named pipe to connect on Windows (0 or 1) 92 * `read_default_file`: Read default client values from the given file 93 * `read_default_group`: Configuration group to use from the default file 94 * `unix_socket`: Use a Unix socket at the given path to connect 95 """ 96 implements(IDatabaseConnector, IEnvironmentSetupParticipant) 97 98 required = False 99 100 mysqldump_path = Option('trac', 'mysqldump_path', 'mysqldump', 101 """Location of mysqldump for MySQL database backups""") 102
103 - def __init__(self):
104 if pymysql: 105 self._mysql_version = \ 106 'server: (not-connected), client: "%s", thread-safe: %s' % \ 107 (pymysql.get_client_info(), pymysql.thread_safe()) 108 else: 109 self._mysql_version = None
110 111 # IDatabaseConnector methods 112
113 - def get_supported_schemes(self):
114 yield 'mysql', 1
115
116 - def get_connection(self, path, log=None, user=None, password=None, 117 host=None, port=None, params={}):
118 cnx = MySQLConnection(path, log, user, password, host, port, params) 119 if not self.required: 120 self._mysql_version = \ 121 'server: "%s", client: "%s", thread-safe: %s' \ 122 % (cnx.cnx.get_server_info(), pymysql.get_client_info(), 123 pymysql.thread_safe()) 124 self.required = True 125 return cnx
126
127 - def get_exceptions(self):
128 return pymysql
129
130 - def init_db(self, path, schema=None, log=None, user=None, password=None, 131 host=None, port=None, params={}):
132 cnx = self.get_connection(path, log, user, password, host, port, 133 params) 134 self._verify_variables(cnx) 135 max_bytes = self._max_bytes(cnx) 136 cursor = cnx.cursor() 137 if schema is None: 138 from trac.db_default import schema 139 for table in schema: 140 for stmt in self.to_sql(table, max_bytes=max_bytes): 141 self.log.debug(stmt) 142 cursor.execute(stmt) 143 self._verify_table_status(cnx) 144 cnx.commit()
145
146 - def destroy_db(self, path, log=None, user=None, password=None, host=None, 147 port=None, params={}):
148 cnx = self.get_connection(path, log, user, password, host, port, 149 params) 150 for table_name in cnx.get_table_names(): 151 cnx.drop_table(table_name) 152 cnx.commit()
153
154 - def db_exists(self, path, log=None, user=None, password=None, host=None, 155 port=None, params={}):
156 cnx = self.get_connection(path, log, user, password, host, port, 157 params) 158 return bool(cnx.get_table_names())
159
160 - def _max_bytes(self, cnx):
161 if cnx is None: 162 connector, args = DatabaseManager(self.env).get_connector() 163 with closing(connector.get_connection(**args)) as cnx: 164 charset = cnx.charset 165 else: 166 charset = cnx.charset 167 return 4 if charset == 'utf8mb4' else 3
168 169 _max_key_length = 3072 170
171 - def _collist(self, table, columns, max_bytes):
172 """Take a list of columns and impose limits on each so that indexing 173 works properly. 174 175 Some Versions of MySQL limit each index prefix to 3072 bytes total, 176 with a max of 767 bytes per column. 177 """ 178 cols = [] 179 limit_col = 767 / max_bytes 180 limit = min(self._max_key_length / (max_bytes * len(columns)), 181 limit_col) 182 for c in columns: 183 name = _quote(c) 184 table_col = filter((lambda x: x.name == c), table.columns) 185 if len(table_col) == 1 and table_col[0].type.lower() == 'text': 186 if table_col[0].key_size is not None: 187 name += '(%d)' % min(table_col[0].key_size, limit_col) 188 else: 189 name += '(%s)' % limit 190 # For non-text columns, we simply throw away the extra bytes. 191 # That could certainly be optimized better, but for now let's KISS. 192 cols.append(name) 193 return ','.join(cols)
194
195 - def to_sql(self, table, max_bytes=None):
196 if max_bytes is None: 197 max_bytes = self._max_bytes(None) 198 sql = ['CREATE TABLE %s (' % _quote(table.name)] 199 coldefs = [] 200 for column in table.columns: 201 ctype = column.type 202 ctype = _type_map.get(ctype, ctype) 203 if column.auto_increment: 204 ctype = 'INT UNSIGNED NOT NULL AUTO_INCREMENT' 205 # Override the column type, as a text field cannot 206 # use auto_increment. 207 column.type = 'int' 208 coldefs.append(' %s %s' % (_quote(column.name), ctype)) 209 if len(table.key) > 0: 210 coldefs.append(' PRIMARY KEY (%s)' % 211 self._collist(table, table.key, 212 max_bytes=max_bytes)) 213 sql.append(',\n'.join(coldefs) + '\n)') 214 yield '\n'.join(sql) 215 216 for index in table.indices: 217 unique = 'UNIQUE' if index.unique else '' 218 idxname = '%s_%s_idx' % (table.name, '_'.join(index.columns)) 219 yield 'CREATE %s INDEX %s ON %s (%s)' % \ 220 (unique, _quote(idxname), _quote(table.name), 221 self._collist(table, index.columns, max_bytes=max_bytes))
222
223 - def alter_column_types(self, table, columns):
224 """Yield SQL statements altering the type of one or more columns of 225 a table. 226 227 Type changes are specified as a `columns` dict mapping column names 228 to `(from, to)` SQL type tuples. 229 """ 230 alterations = [] 231 for name, (from_, to) in sorted(columns.iteritems()): 232 to = _type_map.get(to, to) 233 if to != _type_map.get(from_, from_): 234 alterations.append((name, to)) 235 if alterations: 236 yield "ALTER TABLE %s %s" % (table, 237 ', '.join("MODIFY %s %s" % each 238 for each in alterations))
239
240 - def backup(self, dest_file):
241 from subprocess import Popen, PIPE 242 db_url = self.env.config.get('trac', 'database') 243 scheme, db_prop = parse_connection_uri(db_url) 244 db_params = db_prop.setdefault('params', {}) 245 db_name = os.path.basename(db_prop['path']) 246 247 args = [self.mysqldump_path, '--no-defaults'] 248 if 'host' in db_prop: 249 args.extend(['-h', db_prop['host']]) 250 if 'port' in db_prop: 251 args.extend(['-P', str(db_prop['port'])]) 252 if 'user' in db_prop: 253 args.extend(['-u', db_prop['user']]) 254 for name, value in db_params.iteritems(): 255 if name == 'compress' and as_int(value, 0): 256 args.append('--compress') 257 elif name == 'named_pipe' and as_int(value, 0): 258 args.append('--protocol=pipe') 259 elif name == 'read_default_file': # Must be first 260 args.insert(1, '--defaults-file=' + value) 261 elif name == 'unix_socket': 262 args.extend(['--protocol=socket', '--socket=' + value]) 263 elif name not in ('init_command', 'read_default_group'): 264 self.log.warning("Invalid connection string parameter '%s'", 265 name) 266 args.extend(['-r', dest_file, db_name]) 267 268 environ = os.environ.copy() 269 if 'password' in db_prop: 270 environ['MYSQL_PWD'] = str(db_prop['password']) 271 try: 272 p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds) 273 except OSError as e: 274 raise TracError(_("Unable to run %(path)s: %(msg)s", 275 path=self.mysqldump_path, 276 msg=exception_to_unicode(e))) 277 errmsg = p.communicate()[1] 278 if p.returncode != 0: 279 raise TracError(_("mysqldump failed: %(msg)s", 280 msg=to_unicode(errmsg.strip()))) 281 if not os.path.exists(dest_file): 282 raise TracError(_("No destination file created")) 283 return dest_file
284
285 - def get_system_info(self):
286 yield 'MySQL', self._mysql_version 287 yield pymysql.__name__, pymsql_version
288 289 # IEnvironmentSetupParticipant methods 290
291 - def environment_created(self):
292 pass
293
295 if self.required: 296 with self.env.db_query as db: 297 self._verify_table_status(db) 298 self._verify_variables(db) 299 return False
300
301 - def upgrade_environment(self):
302 pass
303 304 UNSUPPORTED_ENGINES = ('MyISAM', 'EXAMPLE', 'ARCHIVE', 'CSV', 'ISAM') 305
306 - def _verify_table_status(self, db):
307 from trac.db_default import schema 308 tables = [t.name for t in schema] 309 cursor = db.cursor() 310 cursor.execute("SHOW TABLE STATUS WHERE name IN (%s)" % 311 ','.join(('%s',) * len(tables)), 312 tables) 313 cols = get_column_names(cursor) 314 rows = [dict(zip(cols, row)) for row in cursor] 315 316 engines = [row['Name'] for row in rows 317 if row['Engine'] in self.UNSUPPORTED_ENGINES] 318 if engines: 319 raise TracError(_( 320 "All tables must be created as InnoDB or NDB storage engine " 321 "to support transactions. The following tables have been " 322 "created as storage engine which doesn't support " 323 "transactions: %(tables)s", tables=', '.join(engines))) 324 325 non_utf8bin = [row['Name'] for row in rows 326 if row['Collation'] not in ('utf8_bin', 'utf8mb4_bin', 327 None)] 328 if non_utf8bin: 329 raise TracError(_("All tables must be created with utf8_bin or " 330 "utf8mb4_bin as collation. The following tables " 331 "don't have the collations: %(tables)s", 332 tables=', '.join(non_utf8bin)))
333 334 SUPPORTED_COLLATIONS = ( 335 ('utf8mb4', 'utf8mb4_bin'), 336 ('utf8mb3', 'utf8_bin'), 337 ('utf8', 'utf8_bin'), 338 ) 339
340 - def _verify_variables(self, db):
341 cursor = db.cursor() 342 cursor.execute("SHOW VARIABLES WHERE variable_name IN (" 343 "'default_storage_engine','storage_engine'," 344 "'default_tmp_storage_engine'," 345 "'character_set_database','collation_database')") 346 vars = {row[0].lower(): row[1] for row in cursor} 347 348 engine = vars.get('default_storage_engine') or \ 349 vars.get('storage_engine') 350 if engine in self.UNSUPPORTED_ENGINES: 351 raise TracError(_("The current storage engine is %(engine)s. " 352 "It must be InnoDB or NDB storage engine to " 353 "support transactions.", engine=engine)) 354 355 tmp_engine = vars.get('default_tmp_storage_engine') 356 if tmp_engine in self.UNSUPPORTED_ENGINES: 357 raise TracError(_("The current storage engine for TEMPORARY " 358 "tables is %(engine)s. It must be InnoDB or NDB " 359 "storage engine to support transactions.", 360 engine=tmp_engine)) 361 362 charset = vars['character_set_database'] 363 collation = vars['collation_database'] 364 if (charset, collation) not in self.SUPPORTED_COLLATIONS: 365 raise TracError(_( 366 "The charset and collation of database are '%(charset)s' and " 367 "'%(collation)s'. The database must be created with one of " 368 "%(supported)s.", charset=charset, collation=collation, 369 supported=repr(self.SUPPORTED_COLLATIONS)))
370 371
372 -class MySQLConnection(ConnectionBase, ConnectionWrapper):
373 """Connection wrapper for MySQL.""" 374 375 poolable = True 376
377 - def __init__(self, path, log, user=None, password=None, host=None, 378 port=None, params={}):
379 if path.startswith('/'): 380 path = path[1:] 381 if password is None: 382 password = '' 383 if port is None: 384 port = 3306 385 opts = {'charset': 'utf8'} 386 for name, value in params.iteritems(): 387 key = name.encode('utf-8') 388 if name == 'read_default_group': 389 opts[key] = value 390 elif name == 'init_command': 391 opts[key] = value.encode('utf-8') 392 elif name in ('read_default_file', 'unix_socket'): 393 opts[key] = value.encode(sys.getfilesystemencoding()) 394 elif name in ('compress', 'named_pipe'): 395 opts[key] = as_int(value, 0) 396 elif name == 'charset': 397 value = value.lower() 398 if value in ('utf8', 'utf8mb4'): 399 opts[key] = value 400 elif log: 401 log.warning("Invalid connection string parameter '%s=%s'", 402 name, value) 403 elif log: 404 log.warning("Invalid connection string parameter '%s'", name) 405 cnx = pymysql.connect(db=path, user=user, passwd=password, host=host, 406 port=port, **opts) 407 cursor = cnx.cursor() 408 cursor.execute("SHOW VARIABLES WHERE " 409 " variable_name='character_set_database'") 410 charset = cursor.fetchone()[1] 411 if charset == 'utf8mb3': 412 charset = 'utf8' 413 self.charset = charset 414 cursor.close() 415 if self.charset != opts['charset']: 416 cnx.close() 417 opts['charset'] = self.charset 418 cnx = pymysql.connect(db=path, user=user, passwd=password, 419 host=host, port=port, **opts) 420 self.schema = path 421 if hasattr(cnx, 'encoders'): 422 # 'encoders' undocumented but present since 1.2.1 (r422) 423 cnx.encoders[Markup] = cnx.encoders[unicode] 424 ConnectionWrapper.__init__(self, cnx, log) 425 self._is_closed = False
426
427 - def cursor(self):
428 return IterableCursor(MySQLUnicodeCursor(self.cnx), self.log)
429
430 - def rollback(self):
431 self.cnx.ping() 432 try: 433 self.cnx.rollback() 434 except pymysql.ProgrammingError: 435 self._is_closed = True
436
437 - def close(self):
438 if not self._is_closed: 439 try: 440 self.cnx.close() 441 except pymysql.ProgrammingError: 442 pass # this error would mean it's already closed. So, ignore 443 self._is_closed = True
444
445 - def cast(self, column, type):
446 if type in ('int', 'int64'): 447 type = 'signed' 448 elif type == 'text': 449 type = 'char' 450 return 'CAST(%s AS %s)' % (column, type)
451
452 - def concat(self, *args):
453 return 'concat(%s)' % ', '.join(args)
454
455 - def drop_column(self, table, column):
456 cursor = pymysql.cursors.Cursor(self.cnx) 457 if column in self.get_column_names(table): 458 quoted_table = self.quote(table) 459 cursor.execute("SHOW INDEX FROM %s" % quoted_table) 460 columns = get_column_names(cursor) 461 keys = {} 462 for row in cursor.fetchall(): 463 row = dict(zip(columns, row)) 464 keys.setdefault(row['Key_name'], []).append(row['Column_name']) 465 # drop all composite indices which in the given column is involved 466 for key, columns in keys.iteritems(): 467 if len(columns) > 1 and column in columns: 468 if key == 'PRIMARY': 469 cursor.execute("ALTER TABLE %s DROP PRIMARY KEY" % 470 quoted_table) 471 else: 472 cursor.execute("ALTER TABLE %s DROP KEY %s" % 473 (quoted_table, self.quote(key))) 474 cursor.execute("ALTER TABLE %s DROP COLUMN %s " % 475 (quoted_table, self.quote(column)))
476
477 - def drop_table(self, table):
478 cursor = MySQLSilentCursor(self.cnx) 479 cursor.execute("DROP TABLE IF EXISTS " + self.quote(table))
480
481 - def get_column_names(self, table):
482 rows = self.execute(""" 483 SELECT column_name FROM information_schema.columns 484 WHERE table_schema=%s AND table_name=%s 485 ORDER BY ordinal_position 486 """, (self.schema, table)) 487 return [row[0] for row in rows]
488
489 - def get_last_id(self, cursor, table, column='id'):
490 return cursor.lastrowid
491
492 - def get_sequence_names(self):
493 return []
494
495 - def get_table_names(self):
496 rows = self.execute(""" 497 SELECT table_name FROM information_schema.tables 498 WHERE table_schema=%s 499 """, (self.schema,)) 500 return [row[0] for row in rows]
501
502 - def has_table(self, table):
503 rows = self.execute(""" 504 SELECT EXISTS (SELECT * FROM information_schema.columns 505 WHERE table_schema=%s AND table_name=%s) 506 """, (self.schema, table)) 507 return bool(rows[0][0])
508
509 - def like(self):
510 return "LIKE %%s COLLATE %s_general_ci ESCAPE '/'" % self.charset
511
512 - def like_escape(self, text):
513 return _like_escape_re.sub(r'/\1', text)
514
515 - def reset_tables(self):
516 table_names = [] 517 if not self.schema: 518 return table_names 519 cursor = self.cursor() 520 cursor.execute(""" 521 SELECT t.table_name, 522 EXISTS (SELECT * FROM information_schema.columns AS c 523 WHERE c.table_schema=t.table_schema 524 AND c.table_name=t.table_name 525 AND extra='auto_increment') 526 FROM information_schema.tables AS t 527 WHERE t.table_schema=%s 528 """, (self.schema,)) 529 for table, has_autoinc in cursor.fetchall(): 530 table_names.append(table) 531 quoted = self.quote(table) 532 if not has_autoinc: 533 # DELETE FROM is preferred to TRUNCATE TABLE, as the 534 # auto_increment is not used. 535 cursor.execute("DELETE FROM %s" % quoted) 536 else: 537 # TRUNCATE TABLE is preferred to DELETE FROM, as we 538 # need to reset the auto_increment in MySQL. 539 cursor.execute("TRUNCATE TABLE %s" % quoted) 540 return table_names
541
542 - def prefix_match(self):
543 return "LIKE %s ESCAPE '/'"
544
545 - def prefix_match_value(self, prefix):
546 return self.like_escape(prefix) + '%'
547
548 - def quote(self, identifier):
549 """Return the quoted identifier.""" 550 return _quote(identifier)
551
552 - def update_sequence(self, cursor, table, column='id'):
553 # MySQL handles sequence updates automagically 554 pass
555