Package trac :: Package db :: Module pool

Source Code for Module trac.db.pool

  1  # -*- coding: utf-8 -*- 
  2  # 
  3  # Copyright (C) 2005-2023 Edgewall Software 
  4  # Copyright (C) 2005 Christopher Lenz <[email protected]> 
  5  # All rights reserved. 
  6  # 
  7  # This software is licensed as described in the file COPYING, which 
  8  # you should have received as part of this distribution. The terms 
  9  # are also available at https://trac.edgewall.org/wiki/TracLicense. 
 10  # 
 11  # This software consists of voluntary contributions made by many 
 12  # individuals. For the exact contribution history, see the revision 
 13  # history and logs, available at https://trac.edgewall.org/log/. 
 14  # 
 15  # Author: Christopher Lenz <[email protected]> 
 16   
 17  import os 
 18  import sys 
 19   
 20  from trac.core import TracError 
 21  from trac.db.util import ConnectionWrapper 
 22  from trac.util.concurrency import get_thread_id, threading 
 23  from trac.util.datefmt import time_now 
 24  from trac.util.text import exception_to_unicode 
 25  from trac.util.translation import _ 
 26   
 27   
28 -class TimeoutError(TracError):
29 """Exception raised by the connection pool when no connection has become 30 available after a given timeout."""
31 32
33 -class PooledConnection(ConnectionWrapper):
34 """A database connection that can be pooled. When closed, it gets returned 35 to the pool. 36 """ 37
38 - def __init__(self, pool, cnx, key, tid, log=None):
39 ConnectionWrapper.__init__(self, cnx, log) 40 self._pool = pool 41 self._key = key 42 self._tid = tid
43
44 - def close(self):
45 if self.cnx: 46 cnx = self.cnx 47 self.cnx = None 48 self.log = None 49 self._pool._return_cnx(cnx, self._key, self._tid)
50
51 - def __del__(self):
52 self.close()
53 54 55
56 -class ConnectionPoolBackend(object):
57 """A process-wide LRU-based connection pool. 58 """
59 - def __init__(self, maxsize):
60 self._available = threading.Condition(threading.RLock()) 61 self._maxsize = maxsize 62 self._active = {} 63 self._pool = [] 64 self._pool_key = [] 65 self._pool_time = [] 66 self._waiters = 0
67
68 - def get_cnx(self, connector, kwargs, timeout=None):
69 cnx = None 70 log = kwargs.get('log') 71 key = unicode(kwargs) 72 start = time_now() 73 tid = get_thread_id() 74 # Get a Connection, either directly or a deferred one 75 with self._available: 76 # First choice: Return the same cnx already used by the thread 77 if (tid, key) in self._active: 78 cnx, num = self._active[(tid, key)] 79 num += 1 80 else: 81 if self._waiters == 0: 82 cnx = self._take_cnx(connector, kwargs, key, tid) 83 if not cnx: 84 self._waiters += 1 85 self._available.wait() 86 self._waiters -= 1 87 cnx = self._take_cnx(connector, kwargs, key, tid) 88 num = 1 89 if cnx: 90 self._active[(tid, key)] = (cnx, num) 91 92 deferred = num == 1 and isinstance(cnx, tuple) 93 exc_info = (None, None, None) 94 if deferred: 95 # Potentially lengthy operations must be done without lock held 96 op, cnx = cnx 97 try: 98 if op == 'ping': 99 cnx.ping() 100 elif op == 'close': 101 cnx.close() 102 if op in ('close', 'create'): 103 cnx = connector.get_connection(**kwargs) 104 except TracError: 105 exc_info = sys.exc_info() 106 cnx = None 107 except Exception: 108 exc_info = sys.exc_info() 109 if log: 110 log.error('Exception caught on %s', op, exc_info=True) 111 cnx = None 112 113 if cnx and not isinstance(cnx, tuple): 114 if deferred: 115 # replace placeholder with real Connection 116 with self._available: 117 self._active[(tid, key)] = (cnx, num) 118 return PooledConnection(self, cnx, key, tid, log) 119 120 if deferred: 121 # cnx couldn't be reused, clear placeholder 122 with self._available: 123 del self._active[(tid, key)] 124 if op == 'ping': # retry 125 return self.get_cnx(connector, kwargs) 126 127 # if we didn't get a cnx after wait(), something's fishy... 128 if isinstance(exc_info[1], TracError): 129 raise exc_info[0], exc_info[1], exc_info[2] 130 timeout = time_now() - start 131 errmsg = _("Unable to get database connection within %(time)d seconds.", 132 time=timeout) 133 if exc_info[1]: 134 errmsg += " (%s)" % exception_to_unicode(exc_info[1]) 135 raise TimeoutError(errmsg)
136
137 - def _take_cnx(self, connector, kwargs, key, tid):
138 """Note: _available lock must be held when calling this method.""" 139 # Second best option: Reuse a live pooled connection 140 if key in self._pool_key: 141 idx = self._pool_key.index(key) 142 self._pool_key.pop(idx) 143 self._pool_time.pop(idx) 144 cnx = self._pool.pop(idx) 145 # If possible, verify that the pooled connection is 146 # still available and working. 147 if hasattr(cnx, 'ping'): 148 return 'ping', cnx 149 return cnx 150 # Third best option: Create a new connection 151 elif len(self._active) + len(self._pool) < self._maxsize: 152 return 'create', None 153 # Forth best option: Replace a pooled connection with a new one 154 elif len(self._active) < self._maxsize: 155 # Remove the LRU connection in the pool 156 cnx = self._pool.pop(0) 157 self._pool_key.pop(0) 158 self._pool_time.pop(0) 159 return 'close', cnx
160
161 - def _return_cnx(self, cnx, key, tid):
162 # Decrement active refcount, clear slot if 1 163 with self._available: 164 assert (tid, key) in self._active 165 cnx, num = self._active[(tid, key)] 166 if num == 1: 167 del self._active[(tid, key)] 168 else: 169 self._active[(tid, key)] = (cnx, num - 1) 170 if num == 1: 171 # Reset connection outside of critical section 172 try: 173 cnx.rollback() # resets the connection 174 except Exception: 175 cnx.close() 176 cnx = None 177 # Connection available, from reuse or from creation of a new one 178 with self._available: 179 if cnx and cnx.poolable: 180 self._pool.append(cnx) 181 self._pool_key.append(key) 182 self._pool_time.append(time_now()) 183 self._available.notify()
184
185 - def shutdown(self, tid=None):
186 """Close pooled connections not used in a while""" 187 delay = 120 188 if tid is None: 189 delay = 0 190 when = time_now() - delay 191 with self._available: 192 if tid is None: # global shutdown, also close active connections 193 for db, num in self._active.values(): 194 db.close() 195 self._active = {} 196 while self._pool_time and self._pool_time[0] <= when: 197 db = self._pool.pop(0) 198 db.close() 199 self._pool_key.pop(0) 200 self._pool_time.pop(0)
201 202 203 _pool_size = int(os.environ.get('TRAC_DB_POOL_SIZE', 10)) 204 _backend = ConnectionPoolBackend(_pool_size) 205 206
207 -class ConnectionPool(object):
208 - def __init__(self, maxsize, connector, **kwargs):
209 # maxsize not used right now but kept for api compatibility 210 self._connector = connector 211 self._kwargs = kwargs
212
213 - def get_cnx(self, timeout=None):
214 return _backend.get_cnx(self._connector, self._kwargs, timeout)
215
216 - def shutdown(self, tid=None):
217 _backend.shutdown(tid)
218