+
+ __logger = logging.getLogger('db.connection_pool')
+
+ def locked(fun):
+ @wraps(fun)
+ def _locked(self, *args, **kwargs):
+ self._lock.acquire()
+ try:
+ return fun(self, *args, **kwargs)
+ finally:
+ self._lock.release()
+ return _locked
+
+
+ def __init__(self, maxconn=64):
+ self._connections = []
+ self._maxconn = max(maxconn, 1)
+ self._lock = threading.Lock()
+
+ def __repr__(self):
+ used = len([1 for c, u in self._connections[:] if u])
+ count = len(self._connections)
+ return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
+
+ def _debug(self, msg, *args):
+ self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
+
+ @locked
+ def borrow(self, dsn):
+ self._debug('Borrow connection to %r', dsn)
+
+ # free leaked connections
+ for i, (cnx, _) in tools.reverse_enumerate(self._connections):
+ if getattr(cnx, 'leaked', False):
+ delattr(cnx, 'leaked')
+ self._connections.pop(i)
+ self._connections.append((cnx, False))
+ self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
+
+ for i, (cnx, used) in enumerate(self._connections):
+ if not used and dsn_are_equals(cnx.dsn, dsn):
+ self._connections.pop(i)
+ self._connections.append((cnx, True))
+ self._debug('Existing connection found at index %d', i)
+
+ return cnx
+
+ if len(self._connections) >= self._maxconn:
+ # try to remove the oldest connection not used
+ for i, (cnx, used) in enumerate(self._connections):
+ if not used:
+ self._connections.pop(i)
+ self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
+ break
+ else:
+ # note: this code is called only if the for loop has completed (no break)
+ raise PoolError('The Connection Pool Is Full')
+
+ try:
+ result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
+ except psycopg2.Error, e:
+ self.__logger.exception('Connection to the database failed')
+ raise
+ self._connections.append((result, True))
+ self._debug('Create new connection')
+ return result
+
+ @locked
+ def give_back(self, connection, keep_in_pool=True):
+ self._debug('Give back connection to %r', connection.dsn)
+ for i, (cnx, used) in enumerate(self._connections):
+ if cnx is connection:
+ self._connections.pop(i)
+ if keep_in_pool:
+ self._connections.append((cnx, False))
+ self._debug('Put connection to %r in pool', cnx.dsn)
+ else:
+ self._debug('Forgot connection to %r', cnx.dsn)
+ break
+ else:
+ raise PoolError('This connection does not below to the pool')
+
+ @locked
+ def close_all(self, dsn):
+ self.__logger.info('%r: Close all connections to %r', self, dsn)
+ for i, (cnx, used) in tools.reverse_enumerate(self._connections):
+ if dsn_are_equals(cnx.dsn, dsn):
+ cnx.close()
+ self._connections.pop(i)
+
+
+class Connection(object):
+ __logger = logging.getLogger('db.connection')
+