X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=openerp%2Fsql_db.py;h=589da7ffe3baf9f1e7646607d0e410106a84507e;hb=9964aae7a3e5572e1e6f598408566ef8017a106c;hp=f18414bdff8b47db7301c57853e00332cf192183;hpb=11051f32e0ff2cc65a65311f76415996242fac67;p=odoo%2Fodoo.git diff --git a/openerp/sql_db.py b/openerp/sql_db.py index f18414b..589da7f 100644 --- a/openerp/sql_db.py +++ b/openerp/sql_db.py @@ -3,7 +3,7 @@ # # OpenERP, Open Source Management Solution # Copyright (C) 2004-2009 Tiny SPRL (). -# Copyright (C) 2010-2011 OpenERP s.a. (). +# Copyright (C) 2010-2014 OpenERP s.a. (). # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -20,28 +20,22 @@ # ############################################################################## -#.apidoc title: PostgreSQL interface """ The PostgreSQL connector is a connectivity layer between the OpenERP code and the database, *not* a database abstraction toolkit. Database abstraction is what the ORM does, in fact. - -See also: the `pooler` module """ -#.apidoc add-functions: print_stats -#.apidoc add-classes: Cursor Connection ConnectionPool - -__all__ = ['db_connect', 'close_db'] - +from contextlib import contextmanager from functools import wraps import logging +import urlparse +import uuid +import psycopg2.extras import psycopg2.extensions from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ from psycopg2.pool import PoolError -from psycopg2.psycopg1 import cursor as psycopg1cursor -from threading import currentThread psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) @@ -54,11 +48,13 @@ types_mapping = { } def unbuffer(symb, cr): - if symb is None: return None + if symb is None: + return None return str(symb) def undecimalize(symb, cr): - if symb is None: return None + if symb is None: + return None return float(symb) for name, typeoid in types_mapping.items(): @@ -74,15 +70,15 @@ import threading from inspect import currentframe import re -re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$'); -re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$'); +re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$') +re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$') sql_counter = 0 class Cursor(object): """Represents an open transaction to the PostgreSQL DB backend, acting as a lightweight wrapper around psycopg2's - ``psycopg1cursor`` objects. + ``cursor`` objects. ``Cursor`` is the object behind the ``cr`` variable used all over the OpenERP code. @@ -149,12 +145,12 @@ class Cursor(object): *any* data which may be modified during the life of the cursor. """ - IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit + IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit def check(f): @wraps(f) def wrapper(self, *args, **kwargs): - if self.__closed: + if self._closed: msg = 'Unable to use a closed cursor.' if self.__closer: msg += ' It was closed at %s, line %s' % self.__closer @@ -162,7 +158,7 @@ class Cursor(object): return f(self, *args, **kwargs) return wrapper - def __init__(self, pool, dbname, serialized=True): + def __init__(self, pool, dbname, dsn, serialized=True): self.sql_from_log = {} self.sql_into_log = {} @@ -171,22 +167,24 @@ class Cursor(object): self.sql_log = _logger.isEnabledFor(logging.DEBUG) self.sql_log_count = 0 - self.__closed = True # avoid the call of close() (by __del__) if an exception - # is raised by any of the following initialisations - self._pool = pool - self.dbname = dbname + # avoid the call of close() (by __del__) if an exception + # is raised by any of the following initialisations + self._closed = True + + self.__pool = pool + self.dbname = dbname # Whether to enable snapshot isolation level for this cursor. - # see also the docstring of Cursor. + # see also the docstring of Cursor. self._serialized = serialized - self._cnx = pool.borrow(dsn(dbname)) - self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor) + self._cnx = pool.borrow(dsn) + self._obj = self._cnx.cursor() if self.sql_log: - self.__caller = frame_codeinfo(currentframe(),2) + self.__caller = frame_codeinfo(currentframe(), 2) else: self.__caller = False - self.__closed = False # real initialisation value + self._closed = False # real initialisation value self.autocommit(False) self.__closer = False @@ -194,8 +192,18 @@ class Cursor(object): self.cache = {} + def __build_dict(self, row): + return {d.name: row[i] for i, d in enumerate(self._obj.description)} + def dictfetchone(self): + row = self._obj.fetchone() + return row and self.__build_dict(row) + def dictfetchmany(self, size): + return map(self.__build_dict, self._obj.fetchmany(size)) + def dictfetchall(self): + return map(self.__build_dict, self._obj.fetchall()) + def __del__(self): - if not self.__closed and not self._cnx.closed: + if not self._closed and not self._cnx.closed: # Oops. 'self' has not been closed explicitly. # The cursor will be deleted by the garbage collector, # but the database connection is not put back into the connection @@ -213,8 +221,7 @@ class Cursor(object): def execute(self, query, params=None, log_exceptions=None): if '%d' in query or '%f' in query: _logger.warning(query) - _logger.warning("SQL queries cannot contain %d or %f anymore. " - "Use only %s") + _logger.warning("SQL queries cannot contain %d or %f anymore. Use only %s") if params and not isinstance(params, (tuple, list, dict)): _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params) raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,)) @@ -226,20 +233,23 @@ class Cursor(object): params = params or None res = self._obj.execute(query, params) except psycopg2.ProgrammingError, pe: - if (self._default_log_exceptions if log_exceptions is None else log_exceptions): + if self._default_log_exceptions if log_exceptions is None else log_exceptions: _logger.error("Programming error: %s, in query %s", pe, query) raise except Exception: - if (self._default_log_exceptions if log_exceptions is None else log_exceptions): + if self._default_log_exceptions if log_exceptions is None else log_exceptions: _logger.exception("bad query: %s", self._obj.query or query) raise + # simple query count is always computed + self.sql_log_count += 1 + + # advanced stats only if sql_log is enabled if self.sql_log: delay = mdt.now() - now delay = delay.seconds * 1E6 + delay.microseconds _logger.debug("query: %s", self._obj.query) - self.sql_log_count+=1 res_from = re_from.match(query.lower()) if res_from: self.sql_from_log.setdefault(res_from.group(1), [0, 0]) @@ -252,7 +262,6 @@ class Cursor(object): self.sql_into_log[res_into.group(1)][1] += delay return res - def split_for_in_conditions(self, ids): """Split a list of identifiers into one or more smaller tuples safe for IN conditions, after uniquifying them.""" @@ -260,26 +269,24 @@ class Cursor(object): def print_log(self): global sql_counter - sql_counter += self.sql_log_count + if not self.sql_log: return def process(type): - sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log} + sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log} sum = 0 if sqllogs[type]: sqllogitems = sqllogs[type].items() sqllogitems.sort(key=lambda k: k[1][1]) _logger.debug("SQL LOG %s:", type) - sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0])) + sqllogitems.sort(lambda x, y: cmp(x[1][0], y[1][0])) for r in sqllogitems: delay = timedelta(microseconds=r[1][1]) - _logger.debug("table: %s: %s/%s", - r[0], delay, r[1][0]) - sum+= r[1][1] + _logger.debug("table: %s: %s/%s", r[0], delay, r[1][0]) + sum += r[1][1] sqllogs[type].clear() sum = timedelta(microseconds=sum) - _logger.debug("SUM %s:%s/%d [%d]", - type, sum, self.sql_log_count, sql_counter) + _logger.debug("SUM %s:%s/%d [%d]", type, sum, self.sql_log_count, sql_counter) sqllogs[type].clear() process('from') process('into') @@ -291,13 +298,20 @@ class Cursor(object): return self._close(False) def _close(self, leak=False): + global sql_counter + if not self._obj: return del self.cache if self.sql_log: - self.__closer = frame_codeinfo(currentframe(),3) + self.__closer = frame_codeinfo(currentframe(), 3) + + # simple query count is always computed + sql_counter += self.sql_log_count + + # advanced stats only if sql_log is enabled self.print_log() self._obj.close() @@ -308,7 +322,7 @@ class Cursor(object): # collected as fast as they should). The problem is probably due in # part because browse records keep a reference to the cursor. del self._obj - self.__closed = True + self._closed = True # Clean the underlying connection. self._cnx.rollback() @@ -319,7 +333,7 @@ class Cursor(object): chosen_template = tools.config['db_template'] templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template])) keep_in_pool = self.dbname not in templates_list - self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool) + self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool) @check def autocommit(self, on): @@ -336,9 +350,10 @@ class Cursor(object): # is remapped to serializable before being # sent to the database, so it is in fact # unavailable for use with pg 9.1. - isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \ - if self._serialized \ - else ISOLATION_LEVEL_READ_COMMITTED + isolation_level = \ + ISOLATION_LEVEL_REPEATABLE_READ \ + if self._serialized \ + else ISOLATION_LEVEL_READ_COMMITTED self._cnx.set_isolation_level(isolation_level) @check @@ -353,24 +368,86 @@ class Cursor(object): """ return self._cnx.rollback() + def __enter__(self): + """ Using the cursor as a contextmanager automatically commits and + closes it:: + + with cr: + cr.execute(...) + + # cr is committed if no failure occurred + # cr is closed in any case + """ + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.commit() + self.close() + + @contextmanager + @check + def savepoint(self): + """context manager entering in a new savepoint""" + name = uuid.uuid1().hex + self.execute('SAVEPOINT "%s"' % name) + try: + yield + self.execute('RELEASE SAVEPOINT "%s"' % name) + except: + self.execute('ROLLBACK TO SAVEPOINT "%s"' % name) + raise + @check def __getattr__(self, name): return getattr(self._obj, name) - """ Set the mode of postgres operations for all cursors - """ - """Obtain the mode of postgres operations for all cursors - """ +class TestCursor(Cursor): + """ A cursor to be used for tests. It keeps the transaction open across + several requests, and simulates committing, rolling back, and closing. + """ + def __init__(self, *args, **kwargs): + super(TestCursor, self).__init__(*args, **kwargs) + # in order to simulate commit and rollback, the cursor maintains a + # savepoint at its last commit + self.execute("SAVEPOINT test_cursor") + # we use a lock to serialize concurrent requests + self._lock = threading.RLock() + + def acquire(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + def force_close(self): + super(TestCursor, self).close() + + def close(self): + if not self._closed: + self.rollback() # for stuff that has not been committed + self.release() + + def autocommit(self, on): + _logger.debug("TestCursor.autocommit(%r) does nothing", on) + + def commit(self): + self.execute("RELEASE SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") + + def rollback(self): + self.execute("ROLLBACK TO SAVEPOINT test_cursor") + self.execute("SAVEPOINT test_cursor") class PsycoConnection(psycopg2.extensions.connection): pass class ConnectionPool(object): """ The pool of connections to database(s) - + Keep a set of connections to pg databases open, and reuse them to open cursors for all transactions. - + The connections are *not* automatically closed. Only a close_db() can trigger that. """ @@ -385,7 +462,6 @@ class ConnectionPool(object): self._lock.release() return _locked - def __init__(self, maxconn=64): self._connections = [] self._maxconn = max(maxconn, 1) @@ -401,9 +477,7 @@ class ConnectionPool(object): @locked def borrow(self, dsn): - self._debug('Borrow connection to %r', dsn) - - # free leaked connections + # free dead and leaked connections for i, (cnx, _) in tools.reverse_enumerate(self._connections): if cnx.closed: self._connections.pop(i) @@ -416,10 +490,18 @@ class ConnectionPool(object): _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn) for i, (cnx, used) in enumerate(self._connections): - if not used and dsn_are_equals(cnx.dsn, dsn): + if not used and cnx._original_dsn == dsn: + try: + cnx.reset() + except psycopg2.OperationalError: + self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn) + # psycopg2 2.4.4 and earlier do not allow closing a closed connection + if not cnx.closed: + cnx.close() + continue self._connections.pop(i) self._connections.append((cnx, True)) - self._debug('Existing connection found at index %d', i) + self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i) return cnx @@ -439,6 +521,7 @@ class ConnectionPool(object): except psycopg2.Error: _logger.exception('Connection to the database failed') raise + result._original_dsn = dsn self._connections.append((result, True)) self._debug('Create new connection') return result @@ -460,26 +543,35 @@ class ConnectionPool(object): raise PoolError('This connection does not below to the pool') @locked - def close_all(self, dsn): - _logger.info('%r: Close all connections to %r', self, dsn) + def close_all(self, dsn=None): + count = 0 + last = None for i, (cnx, used) in tools.reverse_enumerate(self._connections): - if dsn_are_equals(cnx.dsn, dsn): + if dsn is None or cnx._original_dsn == dsn: cnx.close() - self._connections.pop(i) + last = self._connections.pop(i)[0] + count += 1 + _logger.info('%r: Closed %d connections %s', self, count, + (dsn and last and 'to %r' % last.dsn) or '') class Connection(object): """ A lightweight instance of a connection to postgres """ - - def __init__(self, pool, dbname): + def __init__(self, pool, dbname, dsn): self.dbname = dbname - self._pool = pool + self.dsn = dsn + self.__pool = pool def cursor(self, serialized=True): cursor_type = serialized and 'serialized ' or '' - _logger.debug('create %scursor to %r', cursor_type, self.dbname) - return Cursor(self._pool, self.dbname, serialized=serialized) + _logger.debug('create %scursor to %r', cursor_type, self.dsn) + return Cursor(self.__pool, self.dbname, self.dsn, serialized=serialized) + + def test_cursor(self, serialized=True): + cursor_type = serialized and 'serialized ' or '' + _logger.debug('create test %scursor to %r', cursor_type, self.dsn) + return TestCursor(self.__pool, self.dbname, self.dsn, serialized=serialized) # serialized_cursor is deprecated - cursors are serialized by default serialized_cursor = cursor @@ -494,41 +586,48 @@ class Connection(object): except Exception: return False -def dsn(db_name): +def dsn(db_or_uri): + """parse the given `db_or_uri` and return a 2-tuple (dbname, uri)""" + if db_or_uri.startswith(('postgresql://', 'postgres://')): + # extract db from uri + us = urlparse.urlsplit(db_or_uri) + if len(us.path) > 1: + db_name = us.path[1:] + elif us.username: + db_name = us.username + else: + db_name = us.hostname + return db_name, db_or_uri + _dsn = '' for p in ('host', 'port', 'user', 'password'): cfg = tools.config['db_' + p] if cfg: _dsn += '%s=%s ' % (p, cfg) - return '%sdbname=%s' % (_dsn, db_name) - -def dsn_are_equals(first, second): - def key(dsn): - k = dict(x.split('=', 1) for x in dsn.strip().split()) - k.pop('password', None) # password is not relevant - return k - return key(first) == key(second) - + return db_or_uri, '%sdbname=%s' % (_dsn, db_or_uri) _Pool = None -def db_connect(db_name): +def db_connect(to, allow_uri=False): global _Pool if _Pool is None: _Pool = ConnectionPool(int(tools.config['db_maxconn'])) - currentThread().dbname = db_name - return Connection(_Pool, db_name) + + db, uri = dsn(to) + if not allow_uri and db != to: + raise ValueError('URI connections not allowed') + return Connection(_Pool, db, uri) def close_db(db_name): - global _Pool """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function.""" + global _Pool if _Pool: - _Pool.close_all(dsn(db_name)) - ct = currentThread() - if hasattr(ct, 'dbname'): - delattr(ct, 'dbname') + _Pool.close_all(dsn(db_name)[1]) +def close_all(): + global _Pool + if _Pool: + _Pool.close_all() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: -