1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
30 from functools import wraps
32 import psycopg2.extensions
33 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
34 from psycopg2.pool import PoolError
35 from psycopg2.psycopg1 import cursor as psycopg1cursor
37 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
39 _logger = logging.getLogger(__name__)
47 def unbuffer(symb, cr):
48 if symb is None: return None
51 def undecimalize(symb, cr):
52 if symb is None: return None
55 for name, typeoid in types_mapping.items():
56 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
57 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
61 from tools.func import frame_codeinfo
62 from datetime import datetime as mdt
63 from datetime import timedelta
65 from inspect import currentframe
68 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
69 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
74 """Represents an open transaction to the PostgreSQL DB backend,
75 acting as a lightweight wrapper around psycopg2's
76 ``psycopg1cursor`` objects.
78 ``Cursor`` is the object behind the ``cr`` variable used all
79 over the OpenERP code.
81 .. rubric:: Transaction Isolation
83 One very important property of database transactions is the
84 level of isolation between concurrent transactions.
85 The SQL standard defines four levels of transaction isolation,
86 ranging from the most strict *Serializable* level, to the least
87 strict *Read Uncommitted* level. These levels are defined in
88 terms of the phenomena that must not occur between concurrent
89 transactions, such as *dirty read*, etc.
90 In the context of a generic business data management software
91 such as OpenERP, we need the best guarantees that no data
92 corruption can ever be cause by simply running multiple
93 transactions in parallel. Therefore, the preferred level would
94 be the *serializable* level, which ensures that a set of
95 transactions is guaranteed to produce the same effect as
96 running them one at a time in some order.
98 However, most database management systems implement a limited
99 serializable isolation in the form of
100 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
101 providing most of the same advantages as True Serializability,
102 with a fraction of the performance cost.
103 With PostgreSQL up to version 9.0, this snapshot isolation was
104 the implementation of both the ``REPEATABLE READ`` and
105 ``SERIALIZABLE`` levels of the SQL standard.
106 As of PostgreSQL 9.1, the previous snapshot isolation implementation
107 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
108 level was introduced, providing some additional heuristics to
109 detect a concurrent update by parallel transactions, and forcing
110 one of them to rollback.
112 OpenERP implements its own level of locking protection
113 for transactions that are highly likely to provoke concurrent
114 updates, such as stock reservations or document sequences updates.
115 Therefore we mostly care about the properties of snapshot isolation,
116 but we don't really need additional heuristics to trigger transaction
117 rollbacks, as we are taking care of triggering instant rollbacks
118 ourselves when it matters (and we can save the additional performance
119 hit of these heuristics).
121 As a result of the above, we have selected ``REPEATABLE READ`` as
122 the default transaction isolation level for OpenERP cursors, as
123 it will be mapped to the desired ``snapshot isolation`` level for
124 all supported PostgreSQL version (8.3 - 9.x).
126 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
127 read level to serializable before sending it to the database, so it would
128 actually select the new serializable mode on PostgreSQL 9.1. Make
129 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
130 the performance hit is a concern for you.
134 Cache dictionary with a "request" (-ish) lifecycle, only lives as
135 long as the cursor itself does and proactively cleared when the
138 This cache should *only* be used to store repeatable reads as it
139 ignores rollbacks and savepoints, it should not be used to store
140 *any* data which may be modified during the life of the cursor.
143 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147 def wrapper(self, *args, **kwargs):
149 msg = 'Unable to use a closed cursor.'
151 msg += ' It was closed at %s, line %s' % self.__closer
152 raise psycopg2.OperationalError(msg)
153 return f(self, *args, **kwargs)
156 def __init__(self, pool, dbname, serialized=True):
157 self.sql_from_log = {}
158 self.sql_into_log = {}
160 # default log level determined at cursor creation, could be
161 # overridden later for debugging purposes
162 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
164 self.sql_log_count = 0
165 self.__closed = True # avoid the call of close() (by __del__) if an exception
166 # is raised by any of the following initialisations
170 # Whether to enable snapshot isolation level for this cursor.
171 # see also the docstring of Cursor.
172 self._serialized = serialized
174 self._cnx = pool.borrow(dsn(dbname))
175 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
177 self.__caller = frame_codeinfo(currentframe(),2)
179 self.__caller = False
180 self.__closed = False # real initialisation value
181 self.autocommit(False)
182 self.__closer = False
184 self._default_log_exceptions = True
189 if not self.__closed and not self._cnx.closed:
190 # Oops. 'self' has not been closed explicitly.
191 # The cursor will be deleted by the garbage collector,
192 # but the database connection is not put back into the connection
193 # pool, preventing some operation on the database like dropping it.
194 # This can also lead to a server overload.
195 msg = "Cursor not closed explicitly\n"
197 msg += "Cursor was created at %s:%s" % self.__caller
199 msg += "Please enable sql debugging to trace the caller."
204 def execute(self, query, params=None, log_exceptions=None):
205 if '%d' in query or '%f' in query:
206 _logger.warning(query)
207 _logger.warning("SQL queries cannot contain %d or %f anymore. "
209 if params and not isinstance(params, (tuple, list, dict)):
210 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
211 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
217 params = params or None
218 res = self._obj.execute(query, params)
219 except psycopg2.ProgrammingError, pe:
220 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
221 _logger.error("Programming error: %s, in query %s", pe, query)
224 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
225 _logger.exception("bad query: %s", self._obj.query or query)
229 delay = mdt.now() - now
230 delay = delay.seconds * 1E6 + delay.microseconds
232 _logger.debug("query: %s", self._obj.query)
233 self.sql_log_count+=1
234 res_from = re_from.match(query.lower())
236 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
237 self.sql_from_log[res_from.group(1)][0] += 1
238 self.sql_from_log[res_from.group(1)][1] += delay
239 res_into = re_into.match(query.lower())
241 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
242 self.sql_into_log[res_into.group(1)][0] += 1
243 self.sql_into_log[res_into.group(1)][1] += delay
247 def split_for_in_conditions(self, ids):
248 """Split a list of identifiers into one or more smaller tuples
249 safe for IN conditions, after uniquifying them."""
250 return tools.misc.split_every(self.IN_MAX, set(ids))
254 sql_counter += self.sql_log_count
258 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
261 sqllogitems = sqllogs[type].items()
262 sqllogitems.sort(key=lambda k: k[1][1])
263 _logger.debug("SQL LOG %s:", type)
264 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
265 for r in sqllogitems:
266 delay = timedelta(microseconds=r[1][1])
267 _logger.debug("table: %s: %s/%s",
268 r[0], delay, r[1][0])
270 sqllogs[type].clear()
271 sum = timedelta(microseconds=sum)
272 _logger.debug("SUM %s:%s/%d [%d]",
273 type, sum, self.sql_log_count, sql_counter)
274 sqllogs[type].clear()
277 self.sql_log_count = 0
282 return self._close(False)
284 def _close(self, leak=False):
291 self.__closer = frame_codeinfo(currentframe(),3)
296 # This force the cursor to be freed, and thus, available again. It is
297 # important because otherwise we can overload the server very easily
298 # because of a cursor shortage (because cursors are not garbage
299 # collected as fast as they should). The problem is probably due in
300 # part because browse records keep a reference to the cursor.
304 # Clean the underlying connection.
308 self._cnx.leaked = True
310 chosen_template = tools.config['db_template']
311 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
312 keep_in_pool = self.dbname not in templates_list
313 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
316 def autocommit(self, on):
318 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
320 # If a serializable cursor was requested, we
321 # use the appropriate PotsgreSQL isolation level
322 # that maps to snaphsot isolation.
323 # For all supported PostgreSQL versions (8.3-9.x),
324 # this is currently the ISOLATION_REPEATABLE_READ.
325 # See also the docstring of this class.
326 # NOTE: up to psycopg 2.4.2, repeatable read
327 # is remapped to serializable before being
328 # sent to the database, so it is in fact
329 # unavailable for use with pg 9.1.
330 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
331 if self._serialized \
332 else ISOLATION_LEVEL_READ_COMMITTED
333 self._cnx.set_isolation_level(isolation_level)
337 """ Perform an SQL `COMMIT`
339 return self._cnx.commit()
343 """ Perform an SQL `ROLLBACK`
345 return self._cnx.rollback()
348 def __getattr__(self, name):
349 return getattr(self._obj, name)
351 class PsycoConnection(psycopg2.extensions.connection):
354 class ConnectionPool(object):
355 """ The pool of connections to database(s)
357 Keep a set of connections to pg databases open, and reuse them
358 to open cursors for all transactions.
360 The connections are *not* automatically closed. Only a close_db()
366 def _locked(self, *args, **kwargs):
369 return fun(self, *args, **kwargs)
375 def __init__(self, maxconn=64):
376 self._connections = []
377 self._maxconn = max(maxconn, 1)
378 self._lock = threading.Lock()
381 used = len([1 for c, u in self._connections[:] if u])
382 count = len(self._connections)
383 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
385 def _debug(self, msg, *args):
386 _logger.debug(('%r ' + msg), self, *args)
389 def borrow(self, dsn):
390 self._debug('Borrow connection to %r', dsn)
392 # free dead and leaked connections
393 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
395 self._connections.pop(i)
396 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
398 if getattr(cnx, 'leaked', False):
399 delattr(cnx, 'leaked')
400 self._connections.pop(i)
401 self._connections.append((cnx, False))
402 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
404 for i, (cnx, used) in enumerate(self._connections):
405 if not used and dsn_are_equals(cnx.dsn, dsn):
408 except psycopg2.OperationalError:
409 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
410 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
414 self._connections.pop(i)
415 self._connections.append((cnx, True))
416 self._debug('Existing connection found at index %d', i)
420 if len(self._connections) >= self._maxconn:
421 # try to remove the oldest connection not used
422 for i, (cnx, used) in enumerate(self._connections):
424 self._connections.pop(i)
425 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
428 # note: this code is called only if the for loop has completed (no break)
429 raise PoolError('The Connection Pool Is Full')
432 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
433 except psycopg2.Error:
434 _logger.exception('Connection to the database failed')
436 self._connections.append((result, True))
437 self._debug('Create new connection')
441 def give_back(self, connection, keep_in_pool=True):
442 self._debug('Give back connection to %r', connection.dsn)
443 for i, (cnx, used) in enumerate(self._connections):
444 if cnx is connection:
445 self._connections.pop(i)
447 self._connections.append((cnx, False))
448 self._debug('Put connection to %r in pool', cnx.dsn)
450 self._debug('Forgot connection to %r', cnx.dsn)
454 raise PoolError('This connection does not below to the pool')
457 def close_all(self, dsn=None):
458 _logger.info('%r: Close all connections to %r', self, dsn)
459 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
460 if dsn is None or dsn_are_equals(cnx.dsn, dsn):
462 self._connections.pop(i)
465 class Connection(object):
466 """ A lightweight instance of a connection to postgres
469 def __init__(self, pool, dbname):
473 def cursor(self, serialized=True):
474 cursor_type = serialized and 'serialized ' or ''
475 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
476 return Cursor(self._pool, self.dbname, serialized=serialized)
478 # serialized_cursor is deprecated - cursors are serialized by default
479 serialized_cursor = cursor
481 def __nonzero__(self):
482 """Check if connection is possible"""
484 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
493 for p in ('host', 'port', 'user', 'password'):
494 cfg = tools.config['db_' + p]
496 _dsn += '%s=%s ' % (p, cfg)
498 return '%sdbname=%s' % (_dsn, db_name)
500 def dsn_are_equals(first, second):
502 k = dict(x.split('=', 1) for x in dsn.strip().split())
503 k.pop('password', None) # password is not relevant
505 return key(first) == key(second)
510 def db_connect(db_name):
513 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
514 return Connection(_Pool, db_name)
516 def close_db(db_name):
517 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
520 _Pool.close_all(dsn(db_name))
528 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: