Package trac :: Package db :: Package tests :: Module api

Source Code for Module trac.db.tests.api

  1  # -*- coding: utf-8 -*- 
  2  # 
  3  # Copyright (C) 2005-2020 Edgewall Software 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file COPYING, which 
  7  # you should have received as part of this distribution. The terms 
  8  # are also available at https://trac.edgewall.org/wiki/TracLicense. 
  9  # 
 10  # This software consists of voluntary contributions made by many 
 11  # individuals. For the exact contribution history, see the revision 
 12  # history and logs, available at https://trac.edgewall.org/log/. 
 13   
 14  from __future__ import with_statement 
 15   
 16  import os 
 17  import unittest 
 18   
 19  import trac.tests.compat 
 20  from trac.config import ConfigurationError 
 21  from trac.db.api import DatabaseManager, _parse_db_str, get_column_names, \ 
 22                          with_transaction 
 23  from trac.db_default import schema as default_schema 
 24  from trac.db.schema import Column, Table 
 25  from trac.test import EnvironmentStub, Mock 
 26  from trac.util.concurrency import ThreadLocal 
27 28 29 -class Connection(object):
30 31 committed = False 32 rolledback = False 33
34 - def commit(self):
35 self.committed = True
36
37 - def rollback(self):
38 self.rolledback = True
39
40 41 -class Error(Exception):
42 pass
43
44 45 -def make_env(get_cnx):
46 from trac.core import ComponentManager 47 return Mock(ComponentManager, components={DatabaseManager: 48 Mock(get_connection=get_cnx, 49 _transaction_local=ThreadLocal(wdb=None, rdb=None))})
50
51 52 -class WithTransactionTest(unittest.TestCase):
53
55 db = Connection() 56 env = make_env(lambda: db) 57 @with_transaction(env) 58 def do_transaction(db): 59 self.assertTrue(not db.committed and not db.rolledback)
60 self.assertTrue(db.committed and not db.rolledback)
61
62 - def test_failed_transaction(self):
63 db = Connection() 64 env = make_env(lambda: db) 65 try: 66 @with_transaction(env) 67 def do_transaction(db): 68 self.assertTrue(not db.committed and not db.rolledback) 69 raise Error()
70 self.fail() 71 except Error: 72 pass 73 self.assertTrue(not db.committed and db.rolledback) 74
75 - def test_implicit_nesting_success(self):
76 env = make_env(Connection) 77 dbs = [None, None] 78 @with_transaction(env) 79 def level0(db): 80 dbs[0] = db 81 @with_transaction(env) 82 def level1(db): 83 dbs[1] = db 84 self.assertTrue(not db.committed and not db.rolledback)
85 self.assertTrue(not db.committed and not db.rolledback) 86 self.assertTrue(dbs[0] is not None) 87 self.assertTrue(dbs[0] is dbs[1]) 88 self.assertTrue(dbs[0].committed and not dbs[0].rolledback) 89
90 - def test_implicit_nesting_failure(self):
91 env = make_env(Connection) 92 dbs = [None, None] 93 try: 94 @with_transaction(env) 95 def level0(db): 96 dbs[0] = db 97 try: 98 @with_transaction(env) 99 def level1(db): 100 dbs[1] = db 101 self.assertTrue(not db.committed and not db.rolledback) 102 raise Error()
103 self.fail() 104 except Error: 105 self.assertTrue(not db.committed and not db.rolledback) 106 raise 107 self.fail() 108 except Error: 109 pass 110 self.assertTrue(dbs[0] is not None) 111 self.assertTrue(dbs[0] is dbs[1]) 112 self.assertTrue(not dbs[0].committed and dbs[0].rolledback) 113
114 - def test_explicit_success(self):
115 db = Connection() 116 env = make_env(lambda: None) 117 @with_transaction(env, db) 118 def do_transaction(idb): 119 self.assertTrue(idb is db) 120 self.assertTrue(not db.committed and not db.rolledback)
121 self.assertTrue(not db.committed and not db.rolledback) 122
123 - def test_explicit_failure(self):
124 db = Connection() 125 env = make_env(lambda: None) 126 try: 127 @with_transaction(env, db) 128 def do_transaction(idb): 129 self.assertTrue(idb is db) 130 self.assertTrue(not db.committed and not db.rolledback) 131 raise Error()
132 self.fail() 133 except Error: 134 pass 135 self.assertTrue(not db.committed and not db.rolledback) 136
137 - def test_implicit_in_explicit_success(self):
138 db = Connection() 139 env = make_env(lambda: db) 140 dbs = [None, None] 141 @with_transaction(env, db) 142 def level0(db): 143 dbs[0] = db 144 @with_transaction(env) 145 def level1(db): 146 dbs[1] = db 147 self.assertTrue(not db.committed and not db.rolledback)
148 self.assertTrue(not db.committed and not db.rolledback) 149 self.assertTrue(dbs[0] is not None) 150 self.assertTrue(dbs[0] is dbs[1]) 151 self.assertTrue(not dbs[0].committed and not dbs[0].rolledback) 152
153 - def test_implicit_in_explicit_failure(self):
154 db = Connection() 155 env = make_env(lambda: db) 156 dbs = [None, None] 157 try: 158 @with_transaction(env, db) 159 def level0(db): 160 dbs[0] = db 161 @with_transaction(env) 162 def level1(db): 163 dbs[1] = db 164 self.assertTrue(not db.committed and not db.rolledback) 165 raise Error()
166 self.fail() 167 self.fail() 168 except Error: 169 pass 170 self.assertTrue(dbs[0] is not None) 171 self.assertTrue(dbs[0] is dbs[1]) 172 self.assertTrue(not dbs[0].committed and not dbs[0].rolledback) 173
174 - def test_explicit_in_implicit_success(self):
175 db = Connection() 176 env = make_env(lambda: db) 177 dbs = [None, None] 178 @with_transaction(env) 179 def level0(db): 180 dbs[0] = db 181 @with_transaction(env, db) 182 def level1(db): 183 dbs[1] = db 184 self.assertTrue(not db.committed and not db.rolledback)
185 self.assertTrue(not db.committed and not db.rolledback) 186 self.assertTrue(dbs[0] is not None) 187 self.assertTrue(dbs[0] is dbs[1]) 188 self.assertTrue(dbs[0].committed and not dbs[0].rolledback) 189
190 - def test_explicit_in_implicit_failure(self):
191 db = Connection() 192 env = make_env(lambda: db) 193 dbs = [None, None] 194 try: 195 @with_transaction(env) 196 def level0(db): 197 dbs[0] = db 198 @with_transaction(env, db) 199 def level1(db): 200 dbs[1] = db 201 self.assertTrue(not db.committed and not db.rolledback) 202 raise Error()
203 self.fail() 204 self.fail() 205 except Error: 206 pass 207 self.assertTrue(dbs[0] is not None) 208 self.assertTrue(dbs[0] is dbs[1]) 209 self.assertTrue(not dbs[0].committed and dbs[0].rolledback) 210
211 - def test_invalid_nesting(self):
212 env = make_env(Connection) 213 try: 214 @with_transaction(env) 215 def level0(db): 216 @with_transaction(env, Connection()) 217 def level1(db): 218 raise Error()
219 raise Error() 220 raise Error() 221 except AssertionError: 222 pass 223
224 225 226 -class ParseConnectionStringTestCase(unittest.TestCase):
227
228 - def test_sqlite_relative(self):
229 # Default syntax for specifying DB path relative to the environment 230 # directory 231 self.assertEqual(('sqlite', {'path': 'db/trac.db'}), 232 _parse_db_str('sqlite:db/trac.db'))
233
234 - def test_sqlite_absolute(self):
235 # Standard syntax 236 self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}), 237 _parse_db_str('sqlite:///var/db/trac.db')) 238 # Legacy syntax 239 self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}), 240 _parse_db_str('sqlite:/var/db/trac.db'))
241
243 # In-memory database 244 self.assertEqual(('sqlite', {'path': 'db/trac.db', 245 'params': {'timeout': '10000'}}), 246 _parse_db_str('sqlite:db/trac.db?timeout=10000'))
247
248 - def test_sqlite_windows_path(self):
249 # In-memory database 250 os_name = os.name 251 try: 252 os.name = 'nt' 253 self.assertEqual(('sqlite', {'path': 'C:/project/db/trac.db'}), 254 _parse_db_str('sqlite:C|/project/db/trac.db')) 255 finally: 256 os.name = os_name
257
258 - def test_postgres_simple(self):
259 self.assertEqual(('postgres', {'host': 'localhost', 'path': '/trac'}), 260 _parse_db_str('postgres://localhost/trac'))
261
262 - def test_postgres_with_port(self):
263 self.assertEqual(('postgres', {'host': 'localhost', 'port': 9431, 264 'path': '/trac'}), 265 _parse_db_str('postgres://localhost:9431/trac'))
266
267 - def test_postgres_with_creds(self):
268 self.assertEqual(('postgres', {'user': 'john', 'password': 'letmein', 269 'host': 'localhost', 'port': 9431, 270 'path': '/trac'}), 271 _parse_db_str('postgres://john:letmein@localhost:9431/trac'))
272
274 self.assertEqual(('postgres', {'user': 'john', 'password': ':@/', 275 'host': 'localhost', 'path': '/trac'}), 276 _parse_db_str('postgres://john:%3a%40%2f@localhost/trac'))
277
278 - def test_mysql_simple(self):
279 self.assertEqual(('mysql', {'host': 'localhost', 'path': '/trac'}), 280 _parse_db_str('mysql://localhost/trac'))
281
282 - def test_mysql_with_creds(self):
283 self.assertEqual(('mysql', {'user': 'john', 'password': 'letmein', 284 'host': 'localhost', 'port': 3306, 285 'path': '/trac'}), 286 _parse_db_str('mysql://john:letmein@localhost:3306/trac'))
287
288 - def test_empty_string(self):
289 self.assertRaises(ConfigurationError, _parse_db_str, '')
290
291 - def test_invalid_port(self):
292 self.assertRaises(ConfigurationError, _parse_db_str, 293 'postgres://localhost:42:42')
294
295 - def test_invalid_schema(self):
296 self.assertRaises(ConfigurationError, _parse_db_str, 297 'sqlitedb/trac.db')
298
299 - def test_no_path(self):
300 self.assertRaises(ConfigurationError, _parse_db_str, 301 'sqlite:')
302
304 self.assertRaises(ConfigurationError, _parse_db_str, 305 'postgres://localhost/schema?name')
306
307 308 -class StringsTestCase(unittest.TestCase):
309
310 - def setUp(self):
311 self.env = EnvironmentStub()
312
313 - def tearDown(self):
314 self.env.reset_db()
315
316 - def test_insert_unicode(self):
317 with self.env.db_transaction as db: 318 quoted = db.quote('system') 319 db("INSERT INTO " + quoted + " (name,value) VALUES (%s,%s)", 320 ('test-unicode', u'ünicöde')) 321 self.assertEqual([(u'ünicöde',)], self.env.db_query( 322 "SELECT value FROM " + quoted + " WHERE name='test-unicode'"))
323
324 - def test_insert_empty(self):
325 from trac.util.text import empty 326 with self.env.db_transaction as db: 327 quoted = db.quote('system') 328 db("INSERT INTO " + quoted + " (name,value) VALUES (%s,%s)", 329 ('test-empty', empty)) 330 self.assertEqual([(u'',)], self.env.db_query( 331 "SELECT value FROM " + quoted + " WHERE name='test-empty'"))
332
333 - def test_insert_markup(self):
334 from genshi.core import Markup 335 with self.env.db_transaction as db: 336 quoted = db.quote('system') 337 db("INSERT INTO " + quoted + " (name,value) VALUES (%s,%s)", 338 ('test-markup', Markup(u'<em>märkup</em>'))) 339 self.assertEqual([(u'<em>märkup</em>',)], self.env.db_query( 340 "SELECT value FROM " + quoted + " WHERE name='test-markup'"))
341
342 - def test_quote(self):
343 db = self.env.get_db_cnx() 344 cursor = db.cursor() 345 cursor.execute('SELECT 1 AS %s' % \ 346 db.quote(r'alpha\`\"\'\\beta``gamma""delta')) 347 self.assertEqual(r'alpha\`\"\'\\beta``gamma""delta', 348 get_column_names(cursor)[0])
349
351 db = self.env.get_read_db() 352 name = """%?`%s"%'%%""" 353 354 def test(db, logging=False): 355 cursor = db.cursor() 356 if logging: 357 cursor.log = self.env.log 358 359 cursor.execute('SELECT 1 AS ' + db.quote(name)) 360 self.assertEqual(name, get_column_names(cursor)[0]) 361 cursor.execute('SELECT %s AS ' + db.quote(name), (42,)) 362 self.assertEqual(name, get_column_names(cursor)[0]) 363 stmt = "UPDATE " + db.quote('system') + " SET value=%s " + \ 364 "WHERE 1=(SELECT 0 AS " + db.quote(name) + ")" 365 cursor.executemany(stmt, []) 366 cursor.executemany(stmt, [('42',), ('43',)])
367 368 test(db) 369 test(db, logging=True)
370
371 - def test_prefix_match_case_sensitive(self):
372 @self.env.with_transaction() 373 def do_insert(db): 374 cursor = db.cursor() 375 cursor.executemany("INSERT INTO " + db.quote('system') + 376 " (name,value) VALUES (%s,1)", 377 [('blahblah',), ('BlahBlah',), ('BLAHBLAH',), 378 (u'BlähBlah',), (u'BlahBläh',)])
379 380 db = self.env.get_read_db() 381 cursor = db.cursor() 382 cursor.execute("SELECT name FROM " + db.quote('system') + 383 " WHERE name " + db.prefix_match(), 384 (db.prefix_match_value('Blah'),)) 385 names = sorted(name for name, in cursor) 386 self.assertEqual('BlahBlah', names[0]) 387 self.assertEqual(u'BlahBläh', names[1]) 388 self.assertEqual(2, len(names)) 389
390 - def test_prefix_match_metachars(self):
391 def do_query(prefix): 392 db = self.env.get_read_db() 393 cursor = db.cursor() 394 cursor.execute("SELECT name FROM " + db.quote('system') + 395 " WHERE name " + db.prefix_match() + 396 " ORDER BY name", 397 (db.prefix_match_value(prefix),)) 398 return [name for name, in cursor]
399 400 @self.env.with_transaction() 401 def do_insert(db): 402 values = ['foo*bar', 'foo*bar!', 'foo?bar', 'foo?bar!', 403 'foo[bar', 'foo[bar!', 'foo]bar', 'foo]bar!', 404 'foo%bar', 'foo%bar!', 'foo_bar', 'foo_bar!', 405 'foo/bar', 'foo/bar!', 'fo*ob?ar[fo]ob%ar_fo/obar'] 406 cursor = db.cursor() 407 cursor.executemany("INSERT INTO " + db.quote('system') + 408 " (name,value) VALUES (%s,1)", 409 [(value,) for value in values]) 410 411 self.assertEqual(['foo*bar', 'foo*bar!'], do_query('foo*')) 412 self.assertEqual(['foo?bar', 'foo?bar!'], do_query('foo?')) 413 self.assertEqual(['foo[bar', 'foo[bar!'], do_query('foo[')) 414 self.assertEqual(['foo]bar', 'foo]bar!'], do_query('foo]')) 415 self.assertEqual(['foo%bar', 'foo%bar!'], do_query('foo%')) 416 self.assertEqual(['foo_bar', 'foo_bar!'], do_query('foo_')) 417 self.assertEqual(['foo/bar', 'foo/bar!'], do_query('foo/')) 418 self.assertEqual(['fo*ob?ar[fo]ob%ar_fo/obar'], do_query('fo*')) 419 self.assertEqual(['fo*ob?ar[fo]ob%ar_fo/obar'], 420 do_query('fo*ob?ar[fo]ob%ar_fo/obar')) 421
422 423 -class ConnectionTestCase(unittest.TestCase):
424 - def setUp(self):
425 self.env = EnvironmentStub() 426 self.schema = [ 427 Table('HOURS', key='ID')[ 428 Column('ID', auto_increment=True), 429 Column('AUTHOR')], 430 Table('blog', key='bid')[ 431 Column('bid', auto_increment=True), 432 Column('author') 433 ] 434 ] 435 self.env.global_databasemanager.drop_tables(self.schema) 436 self.env.global_databasemanager.create_tables(self.schema)
437
438 - def tearDown(self):
441
442 - def test_get_last_id(self):
443 q = "INSERT INTO report (author) VALUES ('anonymous')" 444 with self.env.db_transaction as db: 445 cursor = db.cursor() 446 cursor.execute(q) 447 # Row ID correct before... 448 id1 = db.get_last_id(cursor, 'report') 449 db.commit() 450 cursor.execute(q) 451 # ... and after commit() 452 db.commit() 453 id2 = db.get_last_id(cursor, 'report') 454 455 self.assertNotEqual(0, id1) 456 self.assertEqual(id1 + 1, id2)
457
459 with self.env.db_transaction as db: 460 db("INSERT INTO report (id, author) VALUES (42, 'anonymous')") 461 cursor = db.cursor() 462 db.update_sequence(cursor, 'report', 'id') 463 464 self.env.db_transaction( 465 "INSERT INTO report (author) VALUES ('next-id')") 466 467 self.assertEqual(43, self.env.db_query( 468 "SELECT id FROM report WHERE author='next-id'")[0][0])
469
471 with self.env.db_transaction as db: 472 cursor = db.cursor() 473 cursor.execute( 474 "INSERT INTO blog (bid, author) VALUES (42, 'anonymous')") 475 db.update_sequence(cursor, 'blog', 'bid') 476 477 self.env.db_transaction( 478 "INSERT INTO blog (author) VALUES ('next-id')") 479 480 self.assertEqual(43, self.env.db_query( 481 "SELECT bid FROM blog WHERE author='next-id'")[0][0])
482
484 """Test for regression described in comment:4:ticket:11512.""" 485 with self.env.db_transaction as db: 486 db("INSERT INTO %s (%s, %s) VALUES (42, 'anonymous')" 487 % (db.quote('HOURS'), db.quote('ID'), db.quote('AUTHOR'))) 488 cursor = db.cursor() 489 db.update_sequence(cursor, 'HOURS', 'ID') 490 491 with self.env.db_transaction as db: 492 cursor = db.cursor() 493 cursor.execute( 494 "INSERT INTO %s (%s) VALUES ('next-id')" 495 % (db.quote('HOURS'), db.quote('AUTHOR'))) 496 last_id = db.get_last_id(cursor, 'HOURS', 'ID') 497 498 self.assertEqual(43, last_id)
499
500 - def test_table_names(self):
501 schema = default_schema + self.schema 502 with self.env.db_query as db: 503 db_tables = db.get_table_names() 504 self.assertEqual(len(schema), len(db_tables)) 505 # Some DB (e.g. MariaDB) normalize the table names to lower case 506 db_tables = [t.lower() for t in db_tables] 507 for table in schema: 508 self.assertIn(table.name.lower(), db_tables)
509
510 - def test_get_column_names(self):
511 schema = default_schema + self.schema 512 with self.env.db_transaction as db: 513 for table in schema: 514 db_columns = db.get_column_names(table.name) 515 self.assertEqual(len(table.columns), len(db_columns)) 516 for column in table.columns: 517 self.assertIn(column.name, db_columns)
518
519 520 -def suite():
521 suite = unittest.TestSuite() 522 suite.addTest(unittest.makeSuite(ParseConnectionStringTestCase)) 523 suite.addTest(unittest.makeSuite(StringsTestCase)) 524 suite.addTest(unittest.makeSuite(ConnectionTestCase)) 525 suite.addTest(unittest.makeSuite(WithTransactionTest)) 526 return suite
527 528 529 if __name__ == '__main__': 530 unittest.main(defaultTest='suite') 531