Package trac :: Package db :: Package tests :: Module mysql_test

Source Code for Module trac.db.tests.mysql_test

  1  # -*- coding: utf-8 -*- 
  2  # 
  3  # Copyright (C) 2010-2023 Edgewall Software 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file COPYING, which 
  7  # you should have received as part of this distribution. The terms 
  8  # are also available at https://trac.edgewall.org/wiki/TracLicense. 
  9  # 
 10  # This software consists of voluntary contributions made by many 
 11  # individuals. For the exact contribution history, see the revision 
 12  # history and logs, available at https://trac.edgewall.org/log/. 
 13   
 14  import itertools 
 15  import unittest 
 16   
 17  from trac.db.api import DatabaseManager, get_column_names 
 18  from trac.db.mysql_backend import MySQLConnector 
 19  from trac.db.schema import Table, Column, Index 
 20  from trac.test import EnvironmentStub, Mock, get_dburi 
 21   
 22   
23 -class MySQLTableAlterationSQLTest(unittest.TestCase):
24 - def setUp(self):
25 self.env = EnvironmentStub()
26
27 - def test_alter_column_types(self):
28 connector = MySQLConnector(self.env) 29 sql = connector.alter_column_types('milestone', 30 {'due': ('int', 'int64'), 31 'completed': ('int', 'int64')}) 32 sql = list(sql) 33 self.assertEqual([ 34 "ALTER TABLE milestone " 35 "MODIFY completed bigint, " 36 "MODIFY due bigint", 37 ], sql)
38
40 connector = MySQLConnector(self.env) 41 sql = connector.alter_column_types('milestone', 42 {'due': ('int', 'int'), 43 'completed': ('int', 'int64')}) 44 sql = list(sql) 45 self.assertEqual([ 46 "ALTER TABLE milestone " 47 "MODIFY completed bigint", 48 ], sql)
49
51 connector = MySQLConnector(self.env) 52 sql = connector.alter_column_types('milestone', 53 {'due': ('int', 'int')}) 54 self.assertEqual([], list(sql))
55
56 - def test_utf8_size(self):
57 connector = MySQLConnector(self.env) 58 self.assertEqual(3, connector._max_bytes(Mock(charset='utf8'))) 59 self.assertEqual(4, connector._max_bytes(Mock(charset='utf8mb4')))
60
61 - def test_to_sql(self):
62 connector = MySQLConnector(self.env) 63 tab = Table('blah', key=('col1', 'col2', 'col3', 'col4', 'col5')) \ 64 [Column('col1'), Column('col2'), Column('col3'), Column('col4'), 65 Column('col5'), Column('col6'), 66 Index(['col2', 'col3', 'col4', 'col5'])] 67 68 sql = list(connector.to_sql(tab, max_bytes=3)) 69 self.assertEqual(2, len(sql)) 70 self.assertIn(' PRIMARY KEY (`col1`(204),`col2`(204),`col3`(204),' 71 '`col4`(204),`col5`(204))', sql[0]) 72 self.assertIn(' `blah_col2_col3_col4_col5_idx` ON `blah` (`col2`(255),' 73 '`col3`(255),`col4`(255),`col5`(255))', sql[1]) 74 75 sql = list(connector.to_sql(tab, max_bytes=4)) 76 self.assertEqual(2, len(sql)) 77 self.assertIn(' PRIMARY KEY (`col1`(153),`col2`(153),`col3`(153),' 78 '`col4`(153),`col5`(153))', sql[0]) 79 self.assertIn(' `blah_col2_col3_col4_col5_idx` ON `blah` (`col2`(191),' 80 '`col3`(191),`col4`(191),`col5`(191))', sql[1])
81 82
83 -class MySQLConnectionTestCase(unittest.TestCase):
84
85 - def setUp(self):
86 self.env = EnvironmentStub() 87 self.schema = [ 88 Table('test_simple', key='id')[ 89 Column('id', auto_increment=True), 90 Column('username'), 91 Column('email'), 92 Column('enabled', type='int'), 93 Column('extra'), 94 Index(['username'], unique=True), 95 Index(['email'], unique=False), 96 ], 97 Table('test_composite', key=['id', 'name'])[ 98 Column('id', type='int'), 99 Column('name'), 100 Column('value'), 101 Column('enabled', type='int'), 102 Index(['name', 'value'], unique=False), 103 Index(['name', 'enabled'], unique=True), 104 ], 105 ] 106 self.dbm = DatabaseManager(self.env) 107 self.dbm.drop_tables(self.schema) 108 self.dbm.create_tables(self.schema) 109 self.dbm.insert_into_tables([ 110 ('test_simple', 111 ('username', 'email', 'enabled'), 112 [('joe', '[email protected]', 1), (u'joé', '[email protected]', 0)]), 113 ('test_composite', 114 ('id', 'name', 'value', 'enabled'), 115 [(1, 'foo', '42', 1), 116 (1, 'bar', '42', 1), 117 (2, 'foo', '43', 0), 118 (2, 'bar', '43', 0)]), 119 ])
120
121 - def tearDown(self):
122 DatabaseManager(self.env).drop_tables(self.schema) 123 self.env.reset_db()
124
125 - def _show_index(self, table):
126 with self.env.db_query as db: 127 cursor = db.cursor() 128 cursor.execute("SHOW INDEX FROM " + db.quote(table)) 129 columns = get_column_names(cursor) 130 rows = [dict(zip(columns, row)) for row in cursor] 131 results = {} 132 for index, group in itertools.groupby(rows, lambda v: v['Key_name']): 133 group = list(group) 134 results[index] = { 135 'unique': not group[0]['Non_unique'], 136 'columns': [row['Column_name'] for row in group], 137 } 138 return results
139
140 - def _drop_column(self, table, column):
141 with self.env.db_transaction as db: 142 db.drop_column(table, column)
143
144 - def _query(self, stmt, *args):
145 return self.env.db_query(stmt, args)
146
147 - def test_remove_simple_keys(self):
148 indices_0 = self._show_index('test_simple') 149 self.assertEqual(['PRIMARY', 'test_simple_email_idx', 150 'test_simple_username_idx'], 151 sorted(indices_0)) 152 self.assertEqual({'unique': True, 'columns': ['id']}, 153 indices_0['PRIMARY']) 154 self.assertEqual({'unique': True, 'columns': ['username']}, 155 indices_0['test_simple_username_idx']) 156 self.assertEqual({'unique': False, 'columns': ['email']}, 157 indices_0['test_simple_email_idx']) 158 159 self._drop_column('test_simple', 'enabled') 160 self.assertEqual(indices_0, self._show_index('test_simple')) 161 162 self._drop_column('test_simple', 'username') 163 indices_1 = self._show_index('test_simple') 164 self.assertEqual(['PRIMARY', 'test_simple_email_idx'], 165 sorted(indices_1)) 166 167 self._drop_column('test_simple', 'email') 168 indices_2 = self._show_index('test_simple') 169 self.assertEqual(['PRIMARY'], sorted(indices_2)) 170 171 self._drop_column('test_simple', 'id') 172 indices_3 = self._show_index('test_simple') 173 self.assertEqual({}, indices_3)
174
176 indices_0 = self._show_index('test_composite') 177 self.assertEqual(['PRIMARY', 'test_composite_name_enabled_idx', 178 'test_composite_name_value_idx'], 179 sorted(indices_0)) 180 self.assertEqual({'unique': True, 'columns': ['id', 'name']}, 181 indices_0['PRIMARY']) 182 self.assertEqual({'unique': False, 'columns': ['name', 'value']}, 183 indices_0['test_composite_name_value_idx']) 184 self.assertEqual({'unique': True, 'columns': ['name', 'enabled']}, 185 indices_0['test_composite_name_enabled_idx']) 186 187 self._drop_column('test_composite', 'id') 188 indices_1 = self._show_index('test_composite') 189 self.assertEqual(['test_composite_name_enabled_idx', 190 'test_composite_name_value_idx'], 191 sorted(indices_1)) 192 self.assertEqual(indices_0['test_composite_name_value_idx'], 193 indices_1['test_composite_name_value_idx']) 194 self.assertEqual(indices_0['test_composite_name_enabled_idx'], 195 indices_1['test_composite_name_enabled_idx']) 196 rows = self._query("""SELECT * FROM test_composite 197 ORDER BY name, value, enabled""") 198 self.assertEqual([('bar', '42', 1), ('bar', '43', 0), 199 ('foo', '42', 1), ('foo', '43', 0)], rows) 200 201 self._drop_column('test_composite', 'name') 202 self.assertEqual({}, self._show_index('test_composite')) 203 rows = self._query("""SELECT * FROM test_composite 204 ORDER BY value, enabled""") 205 self.assertEqual([('42', 1), ('42', 1), ('43', 0), ('43', 0)], rows)
206 207
208 -def test_suite():
209 suite = unittest.TestSuite() 210 suite.addTest(unittest.makeSuite(MySQLTableAlterationSQLTest)) 211 if get_dburi().startswith('mysql:'): 212 suite.addTest(unittest.makeSuite(MySQLConnectionTestCase)) 213 return suite
214 215 216 if __name__ == '__main__': 217 unittest.main(defaultTest='test_suite') 218