#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
-#.apidoc title: PostgreSQL interface
"""
The PostgreSQL connector is a connectivity layer between the OpenERP code and
the database, *not* a database abstraction toolkit. Database abstraction is what
the ORM does, in fact.
-
-See also: the `pooler` module
"""
-#.apidoc add-functions: print_stats
-#.apidoc add-classes: Cursor Connection ConnectionPool
-
-__all__ = ['db_connect', 'close_db']
-
+from contextlib import contextmanager
from functools import wraps
import logging
+import time
+import uuid
import psycopg2.extensions
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
from psycopg2.pool import PoolError
from psycopg2.psycopg1 import cursor as psycopg1cursor
-from threading import currentThread
-import warnings
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
+_logger = logging.getLogger(__name__)
+
types_mapping = {
'date': (1082,),
'time': (1083,),
from inspect import currentframe
import re
-re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
-re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
+re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
+re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
sql_counter = 0
sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
the performance hit is a concern for you.
+ .. attribute:: cache
+
+ Cache dictionary with a "request" (-ish) lifecycle, only lives as
+ long as the cursor itself does and proactively cleared when the
+ cursor is closed.
+
+ This cache should *only* be used to store repeatable reads as it
+ ignores rollbacks and savepoints, it should not be used to store
+ *any* data which may be modified during the life of the cursor.
+
"""
IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
- __logger = None
def check(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
- if self.__closed:
+ if self._closed:
msg = 'Unable to use a closed cursor.'
if self.__closer:
msg += ' It was closed at %s, line %s' % self.__closer
return wrapper
def __init__(self, pool, dbname, serialized=True):
- if self.__class__.__logger is None:
- self.__class__.__logger = logging.getLogger('db.cursor')
self.sql_from_log = {}
self.sql_into_log = {}
# default log level determined at cursor creation, could be
# overridden later for debugging purposes
- self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
+ self.sql_log = _logger.isEnabledFor(logging.DEBUG)
self.sql_log_count = 0
- self.__closed = True # avoid the call of close() (by __del__) if an exception
+ self._closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations
- self._pool = pool
+ self.__pool = pool
self.dbname = dbname
# Whether to enable snapshot isolation level for this cursor.
self.__caller = frame_codeinfo(currentframe(),2)
else:
self.__caller = False
- self.__closed = False # real initialisation value
+ self._closed = False # real initialisation value
self.autocommit(False)
self.__closer = False
self._default_log_exceptions = True
+ self.cache = {}
+
def __del__(self):
- if not self.__closed and not self._cnx.closed:
+ if not self._closed and not self._cnx.closed:
# Oops. 'self' has not been closed explicitly.
# The cursor will be deleted by the garbage collector,
# but the database connection is not put back into the connection
msg += "Cursor was created at %s:%s" % self.__caller
else:
msg += "Please enable sql debugging to trace the caller."
- self.__logger.warn(msg)
+ _logger.warning(msg)
self._close(True)
@check
def execute(self, query, params=None, log_exceptions=None):
if '%d' in query or '%f' in query:
- self.__logger.warn(query)
- self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
- "Use only %s")
+ _logger.warning(query)
+ _logger.warning("SQL queries cannot contain %d or %f anymore. "
+ "Use only %s")
+ if params and not isinstance(params, (tuple, list, dict)):
+ _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
+ raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
if self.sql_log:
now = mdt.now()
params = params or None
res = self._obj.execute(query, params)
except psycopg2.ProgrammingError, pe:
- if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
- self.__logger.error("Programming error: %s, in query %s", pe, query)
+ if self._default_log_exceptions if log_exceptions is None else log_exceptions:
+ _logger.error("Programming error: %s, in query %s", pe, query)
raise
except Exception:
- if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
- self.__logger.exception("bad query: %s", self._obj.query or query)
+ if self._default_log_exceptions if log_exceptions is None else log_exceptions:
+ _logger.exception("bad query: %s", self._obj.query or query)
raise
if self.sql_log:
delay = mdt.now() - now
delay = delay.seconds * 1E6 + delay.microseconds
- self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
+ _logger.debug("query: %s", self._obj.query)
self.sql_log_count+=1
res_from = re_from.match(query.lower())
if res_from:
if sqllogs[type]:
sqllogitems = sqllogs[type].items()
sqllogitems.sort(key=lambda k: k[1][1])
- self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
+ _logger.debug("SQL LOG %s:", type)
sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
for r in sqllogitems:
delay = timedelta(microseconds=r[1][1])
- self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
+ _logger.debug("table: %s: %s/%s",
r[0], delay, r[1][0])
sum+= r[1][1]
sqllogs[type].clear()
sum = timedelta(microseconds=sum)
- self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
+ _logger.debug("SUM %s:%s/%d [%d]",
type, sum, self.sql_log_count, sql_counter)
sqllogs[type].clear()
process('from')
if not self._obj:
return
+ del self.cache
+
if self.sql_log:
self.__closer = frame_codeinfo(currentframe(),3)
self.print_log()
# collected as fast as they should). The problem is probably due in
# part because browse records keep a reference to the cursor.
del self._obj
- self.__closed = True
+ self._closed = True
# Clean the underlying connection.
self._cnx.rollback()
if leak:
self._cnx.leaked = True
else:
- keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
- self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
+ chosen_template = tools.config['db_template']
+ templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
+ keep_in_pool = self.dbname not in templates_list
+ self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
@check
def autocommit(self, on):
"""
return self._cnx.rollback()
+ def __enter__(self):
+ """ Using the cursor as a contextmanager automatically commits and
+ closes it::
+
+ with cr:
+ cr.execute(...)
+
+ # cr is committed if no failure occurred
+ # cr is closed in any case
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.commit()
+ self.close()
+
+ @contextmanager
+ @check
+ def savepoint(self):
+ """context manager entering in a new savepoint"""
+ name = uuid.uuid1().hex
+ self.execute('SAVEPOINT "%s"' % name)
+ try:
+ yield
+ self.execute('RELEASE SAVEPOINT "%s"' % name)
+ except:
+ self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
+ raise
+
@check
def __getattr__(self, name):
return getattr(self._obj, name)
- """ Set the mode of postgres operations for all cursors
- """
- """Obtain the mode of postgres operations for all cursors
- """
+class TestCursor(Cursor):
+ """ A cursor to be used for tests. It keeps the transaction open across
+ several requests, and simulates committing, rolling back, and closing.
+ """
+ def __init__(self, *args, **kwargs):
+ super(TestCursor, self).__init__(*args, **kwargs)
+ # in order to simulate commit and rollback, the cursor maintains a
+ # savepoint at its last commit
+ self.execute("SAVEPOINT test_cursor")
+ # we use a lock to serialize concurrent requests
+ self._lock = threading.RLock()
+
+ def acquire(self):
+ self._lock.acquire()
+
+ def release(self):
+ self._lock.release()
+
+ def force_close(self):
+ super(TestCursor, self).close()
+
+ def close(self):
+ if not self._closed:
+ self.rollback() # for stuff that has not been committed
+ self.release()
+
+ def autocommit(self, on):
+ _logger.debug("TestCursor.autocommit(%r) does nothing", on)
+
+ def commit(self):
+ self.execute("RELEASE SAVEPOINT test_cursor")
+ self.execute("SAVEPOINT test_cursor")
+
+ def rollback(self):
+ self.execute("ROLLBACK TO SAVEPOINT test_cursor")
+ self.execute("SAVEPOINT test_cursor")
class PsycoConnection(psycopg2.extensions.connection):
pass
The connections are *not* automatically closed. Only a close_db()
can trigger that.
"""
- __logger = logging.getLogger('db.connection_pool')
def locked(fun):
@wraps(fun)
return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
def _debug(self, msg, *args):
- self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
+ _logger.debug(('%r ' + msg), self, *args)
@locked
def borrow(self, dsn):
self._debug('Borrow connection to %r', dsn)
- # free leaked connections
+ # free dead and leaked connections
for i, (cnx, _) in tools.reverse_enumerate(self._connections):
if cnx.closed:
self._connections.pop(i)
delattr(cnx, 'leaked')
self._connections.pop(i)
self._connections.append((cnx, False))
- self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
+ _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
for i, (cnx, used) in enumerate(self._connections):
if not used and dsn_are_equals(cnx.dsn, dsn):
+ try:
+ cnx.reset()
+ except psycopg2.OperationalError:
+ self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
+ # psycopg2 2.4.4 and earlier do not allow closing a closed connection
+ if not cnx.closed:
+ cnx.close()
+ continue
self._connections.pop(i)
self._connections.append((cnx, True))
self._debug('Existing connection found at index %d', i)
try:
result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
except psycopg2.Error:
- self.__logger.exception('Connection to the database failed')
+ _logger.exception('Connection to the database failed')
raise
self._connections.append((result, True))
self._debug('Create new connection')
raise PoolError('This connection does not below to the pool')
@locked
- def close_all(self, dsn):
- self.__logger.info('%r: Close all connections to %r', self, dsn)
+ def close_all(self, dsn=None):
+ _logger.info('%r: Close all connections to %r', self, dsn)
for i, (cnx, used) in tools.reverse_enumerate(self._connections):
- if dsn_are_equals(cnx.dsn, dsn):
+ if dsn is None or dsn_are_equals(cnx.dsn, dsn):
cnx.close()
self._connections.pop(i)
class Connection(object):
""" A lightweight instance of a connection to postgres
"""
- __logger = logging.getLogger('db.connection')
def __init__(self, pool, dbname):
self.dbname = dbname
- self._pool = pool
+ self.__pool = pool
def cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or ''
- self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
- return Cursor(self._pool, self.dbname, serialized=serialized)
+ _logger.debug('create %scursor to %r', cursor_type, self.dbname)
+ return Cursor(self.__pool, self.dbname, serialized=serialized)
+
+ def test_cursor(self, serialized=True):
+ cursor_type = serialized and 'serialized ' or ''
+ _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
+ return TestCursor(self.__pool, self.dbname, serialized=serialized)
# serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor
def __nonzero__(self):
"""Check if connection is possible"""
try:
- warnings.warn("You use an expensive function to test a connection.",
- DeprecationWarning, stacklevel=1)
+ _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
cr = self.cursor()
cr.close()
return True
global _Pool
if _Pool is None:
_Pool = ConnectionPool(int(tools.config['db_maxconn']))
- currentThread().dbname = db_name
return Connection(_Pool, db_name)
def close_db(db_name):
""" You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
- _Pool.close_all(dsn(db_name))
- ct = currentThread()
- if hasattr(ct, 'dbname'):
- delattr(ct, 'dbname')
+ global _Pool
+ if _Pool:
+ _Pool.close_all(dsn(db_name))
+
+def close_all():
+ global _Pool
+ if _Pool:
+ _Pool.close_all()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: