X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=openerp%2Fsql_db.py;h=3e401347a18930249488fb55e8968c49b123b3f7;hb=1f97c40c9c109165caef58ee1a87f925b5c7d884;hp=337964f336811b76570612f676395e80678b6258;hpb=b6ece5d65f895a2d9ab4b1a5130594de3d80d95f;p=odoo%2Fodoo.git diff --git a/openerp/sql_db.py b/openerp/sql_db.py index 337964f..3e40134 100644 --- a/openerp/sql_db.py +++ b/openerp/sql_db.py @@ -3,7 +3,7 @@ # # OpenERP, Open Source Management Solution # Copyright (C) 2004-2009 Tiny SPRL (). -# Copyright (C) 2010-2011 OpenERP s.a. (). +# Copyright (C) 2010-2014 OpenERP s.a. (). # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -20,28 +20,22 @@ # ############################################################################## -#.apidoc title: PostgreSQL interface """ The PostgreSQL connector is a connectivity layer between the OpenERP code and the database, *not* a database abstraction toolkit. Database abstraction is what the ORM does, in fact. - -See also: the `pooler` module """ -#.apidoc add-functions: print_stats -#.apidoc add-classes: Cursor Connection ConnectionPool - -__all__ = ['db_connect', 'close_db'] - +from contextlib import contextmanager from functools import wraps import logging +import time +import uuid import psycopg2.extensions from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ from psycopg2.pool import PoolError from psycopg2.psycopg1 import cursor as psycopg1cursor -from threading import currentThread psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) @@ -154,7 +148,7 @@ class Cursor(object): def check(f): @wraps(f) def wrapper(self, *args, **kwargs): - if self.__closed: + if self._closed: msg = 'Unable to use a closed cursor.' if self.__closer: msg += ' It was closed at %s, line %s' % self.__closer @@ -171,9 +165,9 @@ class Cursor(object): self.sql_log = _logger.isEnabledFor(logging.DEBUG) self.sql_log_count = 0 - self.__closed = True # avoid the call of close() (by __del__) if an exception + self._closed = True # avoid the call of close() (by __del__) if an exception # is raised by any of the following initialisations - self._pool = pool + self.__pool = pool self.dbname = dbname # Whether to enable snapshot isolation level for this cursor. @@ -186,7 +180,7 @@ class Cursor(object): self.__caller = frame_codeinfo(currentframe(),2) else: self.__caller = False - self.__closed = False # real initialisation value + self._closed = False # real initialisation value self.autocommit(False) self.__closer = False @@ -195,7 +189,7 @@ class Cursor(object): self.cache = {} def __del__(self): - if not self.__closed and not self._cnx.closed: + if not self._closed and not self._cnx.closed: # Oops. 'self' has not been closed explicitly. # The cursor will be deleted by the garbage collector, # but the database connection is not put back into the connection @@ -308,7 +302,7 @@ class Cursor(object): # collected as fast as they should). The problem is probably due in # part because browse records keep a reference to the cursor. del self._obj - self.__closed = True + self._closed = True # Clean the underlying connection. self._cnx.rollback() @@ -319,7 +313,7 @@ class Cursor(object): chosen_template = tools.config['db_template'] templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template])) keep_in_pool = self.dbname not in templates_list - self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool) + self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool) @check def autocommit(self, on): @@ -353,10 +347,77 @@ class Cursor(object): """ return self._cnx.rollback() + def __enter__(self): + """ Using the cursor as a contextmanager automatically commits and + closes it:: + + with cr: + cr.execute(...) + + # cr is committed if no failure occurred + # cr is closed in any case + """ + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.commit() + self.close() + + @contextmanager + @check + def savepoint(self): + """context manager entering in a new savepoint""" + name = uuid.uuid1().hex + self.execute('SAVEPOINT "%s"' % name) + try: + yield + self.execute('RELEASE SAVEPOINT "%s"' % name) + except: + self.execute('ROLLBACK TO SAVEPOINT "%s"' % name) + raise + @check def __getattr__(self, name): return getattr(self._obj, name) +class TestCursor(Cursor): + """ A cursor to be used for tests. It keeps the transaction open across + several requests, and simulates committing, rolling back, and closing. + """ + def __init__(self, *args, **kwargs): + super(TestCursor, self).__init__(*args, **kwargs) + # in order to simulate commit and rollback, the cursor maintains a + # savepoint at its last commit + self.execute("SAVEPOINT test_cursor") + # we use a lock to serialize concurrent requests + self._lock = threading.RLock() + + def acquire(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + def force_close(self): + super(TestCursor, self).close() + + def close(self): + if not self._closed: + self.rollback() # for stuff that has not been committed + self.release() + + def autocommit(self, on): + _logger.debug("TestCursor.autocommit(%r) does nothing", on) + + def commit(self): + self.execute("RELEASE SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") + + def rollback(self): + self.execute("ROLLBACK TO SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") + class PsycoConnection(psycopg2.extensions.connection): pass @@ -398,7 +459,7 @@ class ConnectionPool(object): def borrow(self, dsn): self._debug('Borrow connection to %r', dsn) - # free leaked connections + # free dead and leaked connections for i, (cnx, _) in tools.reverse_enumerate(self._connections): if cnx.closed: self._connections.pop(i) @@ -412,6 +473,14 @@ class ConnectionPool(object): for i, (cnx, used) in enumerate(self._connections): if not used and dsn_are_equals(cnx.dsn, dsn): + try: + cnx.reset() + except psycopg2.OperationalError: + self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn) + # psycopg2 2.4.4 and earlier do not allow closing a closed connection + if not cnx.closed: + cnx.close() + continue self._connections.pop(i) self._connections.append((cnx, True)) self._debug('Existing connection found at index %d', i) @@ -455,10 +524,10 @@ class ConnectionPool(object): raise PoolError('This connection does not below to the pool') @locked - def close_all(self, dsn): + def close_all(self, dsn=None): _logger.info('%r: Close all connections to %r', self, dsn) for i, (cnx, used) in tools.reverse_enumerate(self._connections): - if dsn_are_equals(cnx.dsn, dsn): + if dsn is None or dsn_are_equals(cnx.dsn, dsn): cnx.close() self._connections.pop(i) @@ -469,12 +538,17 @@ class Connection(object): def __init__(self, pool, dbname): self.dbname = dbname - self._pool = pool + self.__pool = pool def cursor(self, serialized=True): cursor_type = serialized and 'serialized ' or '' _logger.debug('create %scursor to %r', cursor_type, self.dbname) - return Cursor(self._pool, self.dbname, serialized=serialized) + return Cursor(self.__pool, self.dbname, serialized=serialized) + + def test_cursor(self, serialized=True): + cursor_type = serialized and 'serialized ' or '' + _logger.debug('create test %scursor to %r', cursor_type, self.dbname) + return TestCursor(self.__pool, self.dbname, serialized=serialized) # serialized_cursor is deprecated - cursors are serialized by default serialized_cursor = cursor @@ -512,7 +586,6 @@ def db_connect(db_name): global _Pool if _Pool is None: _Pool = ConnectionPool(int(tools.config['db_maxconn'])) - currentThread().dbname = db_name return Connection(_Pool, db_name) def close_db(db_name): @@ -520,9 +593,11 @@ def close_db(db_name): global _Pool if _Pool: _Pool.close_all(dsn(db_name)) - ct = currentThread() - if hasattr(ct, 'dbname'): - delattr(ct, 'dbname') + +def close_all(): + global _Pool + if _Pool: + _Pool.close_all() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: