X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=openerp%2Fsql_db.py;h=3e401347a18930249488fb55e8968c49b123b3f7;hb=1f97c40c9c109165caef58ee1a87f925b5c7d884;hp=f18414bdff8b47db7301c57853e00332cf192183;hpb=11051f32e0ff2cc65a65311f76415996242fac67;p=odoo%2Fodoo.git diff --git a/openerp/sql_db.py b/openerp/sql_db.py index f18414b..3e40134 100644 --- a/openerp/sql_db.py +++ b/openerp/sql_db.py @@ -3,7 +3,7 @@ # # OpenERP, Open Source Management Solution # Copyright (C) 2004-2009 Tiny SPRL (). -# Copyright (C) 2010-2011 OpenERP s.a. (). +# Copyright (C) 2010-2014 OpenERP s.a. (). # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -20,28 +20,22 @@ # ############################################################################## -#.apidoc title: PostgreSQL interface """ The PostgreSQL connector is a connectivity layer between the OpenERP code and the database, *not* a database abstraction toolkit. Database abstraction is what the ORM does, in fact. - -See also: the `pooler` module """ -#.apidoc add-functions: print_stats -#.apidoc add-classes: Cursor Connection ConnectionPool - -__all__ = ['db_connect', 'close_db'] - +from contextlib import contextmanager from functools import wraps import logging +import time +import uuid import psycopg2.extensions from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ from psycopg2.pool import PoolError from psycopg2.psycopg1 import cursor as psycopg1cursor -from threading import currentThread psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) @@ -74,8 +68,8 @@ import threading from inspect import currentframe import re -re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$'); -re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$'); +re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$') +re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$') sql_counter = 0 @@ -154,7 +148,7 @@ class Cursor(object): def check(f): @wraps(f) def wrapper(self, *args, **kwargs): - if self.__closed: + if self._closed: msg = 'Unable to use a closed cursor.' if self.__closer: msg += ' It was closed at %s, line %s' % self.__closer @@ -171,9 +165,9 @@ class Cursor(object): self.sql_log = _logger.isEnabledFor(logging.DEBUG) self.sql_log_count = 0 - self.__closed = True # avoid the call of close() (by __del__) if an exception + self._closed = True # avoid the call of close() (by __del__) if an exception # is raised by any of the following initialisations - self._pool = pool + self.__pool = pool self.dbname = dbname # Whether to enable snapshot isolation level for this cursor. @@ -186,7 +180,7 @@ class Cursor(object): self.__caller = frame_codeinfo(currentframe(),2) else: self.__caller = False - self.__closed = False # real initialisation value + self._closed = False # real initialisation value self.autocommit(False) self.__closer = False @@ -195,7 +189,7 @@ class Cursor(object): self.cache = {} def __del__(self): - if not self.__closed and not self._cnx.closed: + if not self._closed and not self._cnx.closed: # Oops. 'self' has not been closed explicitly. # The cursor will be deleted by the garbage collector, # but the database connection is not put back into the connection @@ -226,11 +220,11 @@ class Cursor(object): params = params or None res = self._obj.execute(query, params) except psycopg2.ProgrammingError, pe: - if (self._default_log_exceptions if log_exceptions is None else log_exceptions): + if self._default_log_exceptions if log_exceptions is None else log_exceptions: _logger.error("Programming error: %s, in query %s", pe, query) raise except Exception: - if (self._default_log_exceptions if log_exceptions is None else log_exceptions): + if self._default_log_exceptions if log_exceptions is None else log_exceptions: _logger.exception("bad query: %s", self._obj.query or query) raise @@ -308,7 +302,7 @@ class Cursor(object): # collected as fast as they should). The problem is probably due in # part because browse records keep a reference to the cursor. del self._obj - self.__closed = True + self._closed = True # Clean the underlying connection. self._cnx.rollback() @@ -319,7 +313,7 @@ class Cursor(object): chosen_template = tools.config['db_template'] templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template])) keep_in_pool = self.dbname not in templates_list - self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool) + self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool) @check def autocommit(self, on): @@ -353,14 +347,76 @@ class Cursor(object): """ return self._cnx.rollback() + def __enter__(self): + """ Using the cursor as a contextmanager automatically commits and + closes it:: + + with cr: + cr.execute(...) + + # cr is committed if no failure occurred + # cr is closed in any case + """ + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.commit() + self.close() + + @contextmanager + @check + def savepoint(self): + """context manager entering in a new savepoint""" + name = uuid.uuid1().hex + self.execute('SAVEPOINT "%s"' % name) + try: + yield + self.execute('RELEASE SAVEPOINT "%s"' % name) + except: + self.execute('ROLLBACK TO SAVEPOINT "%s"' % name) + raise + @check def __getattr__(self, name): return getattr(self._obj, name) - """ Set the mode of postgres operations for all cursors - """ - """Obtain the mode of postgres operations for all cursors - """ +class TestCursor(Cursor): + """ A cursor to be used for tests. It keeps the transaction open across + several requests, and simulates committing, rolling back, and closing. + """ + def __init__(self, *args, **kwargs): + super(TestCursor, self).__init__(*args, **kwargs) + # in order to simulate commit and rollback, the cursor maintains a + # savepoint at its last commit + self.execute("SAVEPOINT test_cursor") + # we use a lock to serialize concurrent requests + self._lock = threading.RLock() + + def acquire(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + def force_close(self): + super(TestCursor, self).close() + + def close(self): + if not self._closed: + self.rollback() # for stuff that has not been committed + self.release() + + def autocommit(self, on): + _logger.debug("TestCursor.autocommit(%r) does nothing", on) + + def commit(self): + self.execute("RELEASE SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") + + def rollback(self): + self.execute("ROLLBACK TO SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") class PsycoConnection(psycopg2.extensions.connection): pass @@ -403,7 +459,7 @@ class ConnectionPool(object): def borrow(self, dsn): self._debug('Borrow connection to %r', dsn) - # free leaked connections + # free dead and leaked connections for i, (cnx, _) in tools.reverse_enumerate(self._connections): if cnx.closed: self._connections.pop(i) @@ -417,6 +473,14 @@ class ConnectionPool(object): for i, (cnx, used) in enumerate(self._connections): if not used and dsn_are_equals(cnx.dsn, dsn): + try: + cnx.reset() + except psycopg2.OperationalError: + self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn) + # psycopg2 2.4.4 and earlier do not allow closing a closed connection + if not cnx.closed: + cnx.close() + continue self._connections.pop(i) self._connections.append((cnx, True)) self._debug('Existing connection found at index %d', i) @@ -460,10 +524,10 @@ class ConnectionPool(object): raise PoolError('This connection does not below to the pool') @locked - def close_all(self, dsn): + def close_all(self, dsn=None): _logger.info('%r: Close all connections to %r', self, dsn) for i, (cnx, used) in tools.reverse_enumerate(self._connections): - if dsn_are_equals(cnx.dsn, dsn): + if dsn is None or dsn_are_equals(cnx.dsn, dsn): cnx.close() self._connections.pop(i) @@ -474,12 +538,17 @@ class Connection(object): def __init__(self, pool, dbname): self.dbname = dbname - self._pool = pool + self.__pool = pool def cursor(self, serialized=True): cursor_type = serialized and 'serialized ' or '' _logger.debug('create %scursor to %r', cursor_type, self.dbname) - return Cursor(self._pool, self.dbname, serialized=serialized) + return Cursor(self.__pool, self.dbname, serialized=serialized) + + def test_cursor(self, serialized=True): + cursor_type = serialized and 'serialized ' or '' + _logger.debug('create test %scursor to %r', cursor_type, self.dbname) + return TestCursor(self.__pool, self.dbname, serialized=serialized) # serialized_cursor is deprecated - cursors are serialized by default serialized_cursor = cursor @@ -517,17 +586,18 @@ def db_connect(db_name): global _Pool if _Pool is None: _Pool = ConnectionPool(int(tools.config['db_maxconn'])) - currentThread().dbname = db_name return Connection(_Pool, db_name) def close_db(db_name): - global _Pool """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function.""" + global _Pool if _Pool: _Pool.close_all(dsn(db_name)) - ct = currentThread() - if hasattr(ct, 'dbname'): - delattr(ct, 'dbname') + +def close_all(): + global _Pool + if _Pool: + _Pool.close_all() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: