1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
29 See also: the `pooler` module
33 __all__ = ['db_connect', 'close_db']
35 from functools import wraps
37 import psycopg2.extensions
38 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
39 from psycopg2.pool import PoolError
40 from psycopg2.psycopg1 import cursor as psycopg1cursor
41 from threading import currentThread
43 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
45 _logger = logging.getLogger(__name__)
53 def unbuffer(symb, cr):
54 if symb is None: return None
57 def undecimalize(symb, cr):
58 if symb is None: return None
61 for name, typeoid in types_mapping.items():
62 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
63 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67 from tools.func import frame_codeinfo
68 from datetime import datetime as mdt
69 from datetime import timedelta
71 from inspect import currentframe
74 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
75 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
80 """Represents an open transaction to the PostgreSQL DB backend,
81 acting as a lightweight wrapper around psycopg2's
82 ``psycopg1cursor`` objects.
84 ``Cursor`` is the object behind the ``cr`` variable used all
85 over the OpenERP code.
87 .. rubric:: Transaction Isolation
89 One very important property of database transactions is the
90 level of isolation between concurrent transactions.
91 The SQL standard defines four levels of transaction isolation,
92 ranging from the most strict *Serializable* level, to the least
93 strict *Read Uncommitted* level. These levels are defined in
94 terms of the phenomena that must not occur between concurrent
95 transactions, such as *dirty read*, etc.
96 In the context of a generic business data management software
97 such as OpenERP, we need the best guarantees that no data
98 corruption can ever be cause by simply running multiple
99 transactions in parallel. Therefore, the preferred level would
100 be the *serializable* level, which ensures that a set of
101 transactions is guaranteed to produce the same effect as
102 running them one at a time in some order.
104 However, most database management systems implement a limited
105 serializable isolation in the form of
106 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
107 providing most of the same advantages as True Serializability,
108 with a fraction of the performance cost.
109 With PostgreSQL up to version 9.0, this snapshot isolation was
110 the implementation of both the ``REPEATABLE READ`` and
111 ``SERIALIZABLE`` levels of the SQL standard.
112 As of PostgreSQL 9.1, the previous snapshot isolation implementation
113 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
114 level was introduced, providing some additional heuristics to
115 detect a concurrent update by parallel transactions, and forcing
116 one of them to rollback.
118 OpenERP implements its own level of locking protection
119 for transactions that are highly likely to provoke concurrent
120 updates, such as stock reservations or document sequences updates.
121 Therefore we mostly care about the properties of snapshot isolation,
122 but we don't really need additional heuristics to trigger transaction
123 rollbacks, as we are taking care of triggering instant rollbacks
124 ourselves when it matters (and we can save the additional performance
125 hit of these heuristics).
127 As a result of the above, we have selected ``REPEATABLE READ`` as
128 the default transaction isolation level for OpenERP cursors, as
129 it will be mapped to the desired ``snapshot isolation`` level for
130 all supported PostgreSQL version (8.3 - 9.x).
132 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
133 read level to serializable before sending it to the database, so it would
134 actually select the new serializable mode on PostgreSQL 9.1. Make
135 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
136 the performance hit is a concern for you.
140 Cache dictionary with a "request" (-ish) lifecycle, only lives as
141 long as the cursor itself does and proactively cleared when the
144 This cache should *only* be used to store repeatable reads as it
145 ignores rollbacks and savepoints, it should not be used to store
146 *any* data which may be modified during the life of the cursor.
149 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
153 def wrapper(self, *args, **kwargs):
155 msg = 'Unable to use a closed cursor.'
157 msg += ' It was closed at %s, line %s' % self.__closer
158 raise psycopg2.OperationalError(msg)
159 return f(self, *args, **kwargs)
162 def __init__(self, pool, dbname, serialized=True):
163 self.sql_from_log = {}
164 self.sql_into_log = {}
166 # default log level determined at cursor creation, could be
167 # overridden later for debugging purposes
168 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
170 self.sql_log_count = 0
171 self.__closed = True # avoid the call of close() (by __del__) if an exception
172 # is raised by any of the following initialisations
176 # Whether to enable snapshot isolation level for this cursor.
177 # see also the docstring of Cursor.
178 self._serialized = serialized
180 self._cnx = pool.borrow(dsn(dbname))
181 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
183 self.__caller = frame_codeinfo(currentframe(),2)
185 self.__caller = False
186 self.__closed = False # real initialisation value
187 self.autocommit(False)
188 self.__closer = False
190 self._default_log_exceptions = True
195 if not self.__closed and not self._cnx.closed:
196 # Oops. 'self' has not been closed explicitly.
197 # The cursor will be deleted by the garbage collector,
198 # but the database connection is not put back into the connection
199 # pool, preventing some operation on the database like dropping it.
200 # This can also lead to a server overload.
201 msg = "Cursor not closed explicitly\n"
203 msg += "Cursor was created at %s:%s" % self.__caller
205 msg += "Please enable sql debugging to trace the caller."
210 def execute(self, query, params=None, log_exceptions=None):
211 if '%d' in query or '%f' in query:
212 _logger.warning(query)
213 _logger.warning("SQL queries cannot contain %d or %f anymore. "
215 if params and not isinstance(params, (tuple, list, dict)):
216 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
217 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
223 params = params or None
224 res = self._obj.execute(query, params)
225 except psycopg2.ProgrammingError, pe:
226 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
227 _logger.error("Programming error: %s, in query %s", pe, query)
230 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
231 _logger.exception("bad query: %s", self._obj.query or query)
235 delay = mdt.now() - now
236 delay = delay.seconds * 1E6 + delay.microseconds
238 _logger.debug("query: %s", self._obj.query)
239 self.sql_log_count+=1
240 res_from = re_from.match(query.lower())
242 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
243 self.sql_from_log[res_from.group(1)][0] += 1
244 self.sql_from_log[res_from.group(1)][1] += delay
245 res_into = re_into.match(query.lower())
247 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
248 self.sql_into_log[res_into.group(1)][0] += 1
249 self.sql_into_log[res_into.group(1)][1] += delay
253 def split_for_in_conditions(self, ids):
254 """Split a list of identifiers into one or more smaller tuples
255 safe for IN conditions, after uniquifying them."""
256 return tools.misc.split_every(self.IN_MAX, set(ids))
260 sql_counter += self.sql_log_count
264 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
267 sqllogitems = sqllogs[type].items()
268 sqllogitems.sort(key=lambda k: k[1][1])
269 _logger.debug("SQL LOG %s:", type)
270 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
271 for r in sqllogitems:
272 delay = timedelta(microseconds=r[1][1])
273 _logger.debug("table: %s: %s/%s",
274 r[0], delay, r[1][0])
276 sqllogs[type].clear()
277 sum = timedelta(microseconds=sum)
278 _logger.debug("SUM %s:%s/%d [%d]",
279 type, sum, self.sql_log_count, sql_counter)
280 sqllogs[type].clear()
283 self.sql_log_count = 0
288 return self._close(False)
290 def _close(self, leak=False):
297 self.__closer = frame_codeinfo(currentframe(),3)
302 # This force the cursor to be freed, and thus, available again. It is
303 # important because otherwise we can overload the server very easily
304 # because of a cursor shortage (because cursors are not garbage
305 # collected as fast as they should). The problem is probably due in
306 # part because browse records keep a reference to the cursor.
310 # Clean the underlying connection.
314 self._cnx.leaked = True
316 chosen_template = tools.config['db_template']
317 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
318 keep_in_pool = self.dbname not in templates_list
319 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
322 def autocommit(self, on):
324 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
326 # If a serializable cursor was requested, we
327 # use the appropriate PotsgreSQL isolation level
328 # that maps to snaphsot isolation.
329 # For all supported PostgreSQL versions (8.3-9.x),
330 # this is currently the ISOLATION_REPEATABLE_READ.
331 # See also the docstring of this class.
332 # NOTE: up to psycopg 2.4.2, repeatable read
333 # is remapped to serializable before being
334 # sent to the database, so it is in fact
335 # unavailable for use with pg 9.1.
336 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
337 if self._serialized \
338 else ISOLATION_LEVEL_READ_COMMITTED
339 self._cnx.set_isolation_level(isolation_level)
343 """ Perform an SQL `COMMIT`
345 return self._cnx.commit()
349 """ Perform an SQL `ROLLBACK`
351 return self._cnx.rollback()
354 def __getattr__(self, name):
355 return getattr(self._obj, name)
357 class PsycoConnection(psycopg2.extensions.connection):
360 class ConnectionPool(object):
361 """ The pool of connections to database(s)
363 Keep a set of connections to pg databases open, and reuse them
364 to open cursors for all transactions.
366 The connections are *not* automatically closed. Only a close_db()
372 def _locked(self, *args, **kwargs):
375 return fun(self, *args, **kwargs)
381 def __init__(self, maxconn=64):
382 self._connections = []
383 self._maxconn = max(maxconn, 1)
384 self._lock = threading.Lock()
387 used = len([1 for c, u in self._connections[:] if u])
388 count = len(self._connections)
389 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
391 def _debug(self, msg, *args):
392 _logger.debug(('%r ' + msg), self, *args)
395 def borrow(self, dsn):
396 self._debug('Borrow connection to %r', dsn)
398 # free leaked connections
399 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
401 self._connections.pop(i)
402 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
404 if getattr(cnx, 'leaked', False):
405 delattr(cnx, 'leaked')
406 self._connections.pop(i)
407 self._connections.append((cnx, False))
408 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
410 for i, (cnx, used) in enumerate(self._connections):
411 if not used and dsn_are_equals(cnx.dsn, dsn):
412 self._connections.pop(i)
413 self._connections.append((cnx, True))
414 self._debug('Existing connection found at index %d', i)
418 if len(self._connections) >= self._maxconn:
419 # try to remove the oldest connection not used
420 for i, (cnx, used) in enumerate(self._connections):
422 self._connections.pop(i)
423 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
426 # note: this code is called only if the for loop has completed (no break)
427 raise PoolError('The Connection Pool Is Full')
430 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
431 except psycopg2.Error:
432 _logger.exception('Connection to the database failed')
434 self._connections.append((result, True))
435 self._debug('Create new connection')
439 def give_back(self, connection, keep_in_pool=True):
440 self._debug('Give back connection to %r', connection.dsn)
441 for i, (cnx, used) in enumerate(self._connections):
442 if cnx is connection:
443 self._connections.pop(i)
445 self._connections.append((cnx, False))
446 self._debug('Put connection to %r in pool', cnx.dsn)
448 self._debug('Forgot connection to %r', cnx.dsn)
452 raise PoolError('This connection does not below to the pool')
455 def close_all(self, dsn):
456 _logger.info('%r: Close all connections to %r', self, dsn)
457 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
458 if dsn_are_equals(cnx.dsn, dsn):
460 self._connections.pop(i)
463 class Connection(object):
464 """ A lightweight instance of a connection to postgres
467 def __init__(self, pool, dbname):
471 def cursor(self, serialized=True):
472 cursor_type = serialized and 'serialized ' or ''
473 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
474 return Cursor(self._pool, self.dbname, serialized=serialized)
476 # serialized_cursor is deprecated - cursors are serialized by default
477 serialized_cursor = cursor
479 def __nonzero__(self):
480 """Check if connection is possible"""
482 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
491 for p in ('host', 'port', 'user', 'password'):
492 cfg = tools.config['db_' + p]
494 _dsn += '%s=%s ' % (p, cfg)
496 return '%sdbname=%s' % (_dsn, db_name)
498 def dsn_are_equals(first, second):
500 k = dict(x.split('=', 1) for x in dsn.strip().split())
501 k.pop('password', None) # password is not relevant
503 return key(first) == key(second)
508 def db_connect(db_name):
511 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
512 currentThread().dbname = db_name
513 return Connection(_Pool, db_name)
515 def close_db(db_name):
516 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
519 _Pool.close_all(dsn(db_name))
521 if hasattr(ct, 'dbname'):
522 delattr(ct, 'dbname')
525 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: