1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 #.apidoc title: PostgreSQL interface
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
30 See also: the `pooler` module
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
36 __all__ = ['db_connect', 'close_db']
38 from functools import wraps
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
45 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
47 _logger = logging.getLogger(__name__)
55 def unbuffer(symb, cr):
56 if symb is None: return None
59 def undecimalize(symb, cr):
60 if symb is None: return None
63 for name, typeoid in types_mapping.items():
64 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
69 from tools.func import frame_codeinfo
70 from datetime import datetime as mdt
71 from datetime import timedelta
73 from inspect import currentframe
76 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
77 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
82 """Represents an open transaction to the PostgreSQL DB backend,
83 acting as a lightweight wrapper around psycopg2's
84 ``psycopg1cursor`` objects.
86 ``Cursor`` is the object behind the ``cr`` variable used all
87 over the OpenERP code.
89 .. rubric:: Transaction Isolation
91 One very important property of database transactions is the
92 level of isolation between concurrent transactions.
93 The SQL standard defines four levels of transaction isolation,
94 ranging from the most strict *Serializable* level, to the least
95 strict *Read Uncommitted* level. These levels are defined in
96 terms of the phenomena that must not occur between concurrent
97 transactions, such as *dirty read*, etc.
98 In the context of a generic business data management software
99 such as OpenERP, we need the best guarantees that no data
100 corruption can ever be cause by simply running multiple
101 transactions in parallel. Therefore, the preferred level would
102 be the *serializable* level, which ensures that a set of
103 transactions is guaranteed to produce the same effect as
104 running them one at a time in some order.
106 However, most database management systems implement a limited
107 serializable isolation in the form of
108 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
109 providing most of the same advantages as True Serializability,
110 with a fraction of the performance cost.
111 With PostgreSQL up to version 9.0, this snapshot isolation was
112 the implementation of both the ``REPEATABLE READ`` and
113 ``SERIALIZABLE`` levels of the SQL standard.
114 As of PostgreSQL 9.1, the previous snapshot isolation implementation
115 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
116 level was introduced, providing some additional heuristics to
117 detect a concurrent update by parallel transactions, and forcing
118 one of them to rollback.
120 OpenERP implements its own level of locking protection
121 for transactions that are highly likely to provoke concurrent
122 updates, such as stock reservations or document sequences updates.
123 Therefore we mostly care about the properties of snapshot isolation,
124 but we don't really need additional heuristics to trigger transaction
125 rollbacks, as we are taking care of triggering instant rollbacks
126 ourselves when it matters (and we can save the additional performance
127 hit of these heuristics).
129 As a result of the above, we have selected ``REPEATABLE READ`` as
130 the default transaction isolation level for OpenERP cursors, as
131 it will be mapped to the desired ``snapshot isolation`` level for
132 all supported PostgreSQL version (8.3 - 9.x).
134 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
135 read level to serializable before sending it to the database, so it would
136 actually select the new serializable mode on PostgreSQL 9.1. Make
137 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
138 the performance hit is a concern for you.
142 Cache dictionary with a "request" (-ish) lifecycle, only lives as
143 long as the cursor itself does and proactively cleared when the
146 This cache should *only* be used to store repeatable reads as it
147 ignores rollbacks and savepoints, it should not be used to store
148 *any* data which may be modified during the life of the cursor.
151 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
155 def wrapper(self, *args, **kwargs):
157 msg = 'Unable to use a closed cursor.'
159 msg += ' It was closed at %s, line %s' % self.__closer
160 raise psycopg2.OperationalError(msg)
161 return f(self, *args, **kwargs)
164 def __init__(self, pool, dbname, serialized=True):
165 self.sql_from_log = {}
166 self.sql_into_log = {}
168 # default log level determined at cursor creation, could be
169 # overridden later for debugging purposes
170 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
172 self.sql_log_count = 0
173 self.__closed = True # avoid the call of close() (by __del__) if an exception
174 # is raised by any of the following initialisations
178 # Whether to enable snapshot isolation level for this cursor.
179 # see also the docstring of Cursor.
180 self._serialized = serialized
182 self._cnx = pool.borrow(dsn(dbname))
183 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
185 self.__caller = frame_codeinfo(currentframe(),2)
187 self.__caller = False
188 self.__closed = False # real initialisation value
189 self.autocommit(False)
190 self.__closer = False
192 self._default_log_exceptions = True
197 if not self.__closed and not self._cnx.closed:
198 # Oops. 'self' has not been closed explicitly.
199 # The cursor will be deleted by the garbage collector,
200 # but the database connection is not put back into the connection
201 # pool, preventing some operation on the database like dropping it.
202 # This can also lead to a server overload.
203 msg = "Cursor not closed explicitly\n"
205 msg += "Cursor was created at %s:%s" % self.__caller
207 msg += "Please enable sql debugging to trace the caller."
212 def execute(self, query, params=None, log_exceptions=None):
213 if '%d' in query or '%f' in query:
214 _logger.warning(query)
215 _logger.warning("SQL queries cannot contain %d or %f anymore. "
217 if params and not isinstance(params, (tuple, list, dict)):
218 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
219 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
225 params = params or None
226 res = self._obj.execute(query, params)
227 except psycopg2.ProgrammingError, pe:
228 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
229 _logger.error("Programming error: %s, in query %s", pe, query)
232 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
233 _logger.exception("bad query: %s", self._obj.query or query)
237 delay = mdt.now() - now
238 delay = delay.seconds * 1E6 + delay.microseconds
240 _logger.debug("query: %s", self._obj.query)
241 self.sql_log_count+=1
242 res_from = re_from.match(query.lower())
244 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
245 self.sql_from_log[res_from.group(1)][0] += 1
246 self.sql_from_log[res_from.group(1)][1] += delay
247 res_into = re_into.match(query.lower())
249 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
250 self.sql_into_log[res_into.group(1)][0] += 1
251 self.sql_into_log[res_into.group(1)][1] += delay
255 def split_for_in_conditions(self, ids):
256 """Split a list of identifiers into one or more smaller tuples
257 safe for IN conditions, after uniquifying them."""
258 return tools.misc.split_every(self.IN_MAX, set(ids))
262 sql_counter += self.sql_log_count
266 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
269 sqllogitems = sqllogs[type].items()
270 sqllogitems.sort(key=lambda k: k[1][1])
271 _logger.debug("SQL LOG %s:", type)
272 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
273 for r in sqllogitems:
274 delay = timedelta(microseconds=r[1][1])
275 _logger.debug("table: %s: %s/%s",
276 r[0], delay, r[1][0])
278 sqllogs[type].clear()
279 sum = timedelta(microseconds=sum)
280 _logger.debug("SUM %s:%s/%d [%d]",
281 type, sum, self.sql_log_count, sql_counter)
282 sqllogs[type].clear()
285 self.sql_log_count = 0
290 return self._close(False)
292 def _close(self, leak=False):
299 self.__closer = frame_codeinfo(currentframe(),3)
304 # This force the cursor to be freed, and thus, available again. It is
305 # important because otherwise we can overload the server very easily
306 # because of a cursor shortage (because cursors are not garbage
307 # collected as fast as they should). The problem is probably due in
308 # part because browse records keep a reference to the cursor.
312 # Clean the underlying connection.
316 self._cnx.leaked = True
318 chosen_template = tools.config['db_template']
319 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
320 keep_in_pool = self.dbname not in templates_list
321 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
324 def autocommit(self, on):
326 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
328 # If a serializable cursor was requested, we
329 # use the appropriate PotsgreSQL isolation level
330 # that maps to snaphsot isolation.
331 # For all supported PostgreSQL versions (8.3-9.x),
332 # this is currently the ISOLATION_REPEATABLE_READ.
333 # See also the docstring of this class.
334 # NOTE: up to psycopg 2.4.2, repeatable read
335 # is remapped to serializable before being
336 # sent to the database, so it is in fact
337 # unavailable for use with pg 9.1.
338 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
339 if self._serialized \
340 else ISOLATION_LEVEL_READ_COMMITTED
341 self._cnx.set_isolation_level(isolation_level)
345 """ Perform an SQL `COMMIT`
347 return self._cnx.commit()
351 """ Perform an SQL `ROLLBACK`
353 return self._cnx.rollback()
356 def __getattr__(self, name):
357 return getattr(self._obj, name)
359 class PsycoConnection(psycopg2.extensions.connection):
362 class ConnectionPool(object):
363 """ The pool of connections to database(s)
365 Keep a set of connections to pg databases open, and reuse them
366 to open cursors for all transactions.
368 The connections are *not* automatically closed. Only a close_db()
374 def _locked(self, *args, **kwargs):
377 return fun(self, *args, **kwargs)
383 def __init__(self, maxconn=64):
384 self._connections = []
385 self._maxconn = max(maxconn, 1)
386 self._lock = threading.Lock()
389 used = len([1 for c, u in self._connections[:] if u])
390 count = len(self._connections)
391 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
393 def _debug(self, msg, *args):
394 _logger.debug(('%r ' + msg), self, *args)
397 def borrow(self, dsn):
398 self._debug('Borrow connection to %r', dsn)
400 # free dead and leaked connections
401 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
403 self._connections.pop(i)
404 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
406 if getattr(cnx, 'leaked', False):
407 delattr(cnx, 'leaked')
408 self._connections.pop(i)
409 self._connections.append((cnx, False))
410 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
412 for i, (cnx, used) in enumerate(self._connections):
413 if not used and dsn_are_equals(cnx.dsn, dsn):
416 except psycopg2.OperationalError:
417 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
418 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
422 self._connections.pop(i)
423 self._connections.append((cnx, True))
424 self._debug('Existing connection found at index %d', i)
428 if len(self._connections) >= self._maxconn:
429 # try to remove the oldest connection not used
430 for i, (cnx, used) in enumerate(self._connections):
432 self._connections.pop(i)
433 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
436 # note: this code is called only if the for loop has completed (no break)
437 raise PoolError('The Connection Pool Is Full')
440 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
441 except psycopg2.Error:
442 _logger.exception('Connection to the database failed')
444 self._connections.append((result, True))
445 self._debug('Create new connection')
449 def give_back(self, connection, keep_in_pool=True):
450 self._debug('Give back connection to %r', connection.dsn)
451 for i, (cnx, used) in enumerate(self._connections):
452 if cnx is connection:
453 self._connections.pop(i)
455 self._connections.append((cnx, False))
456 self._debug('Put connection to %r in pool', cnx.dsn)
458 self._debug('Forgot connection to %r', cnx.dsn)
462 raise PoolError('This connection does not below to the pool')
465 def close_all(self, dsn):
466 _logger.info('%r: Close all connections to %r', self, dsn)
467 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
468 if dsn_are_equals(cnx.dsn, dsn):
470 self._connections.pop(i)
473 class Connection(object):
474 """ A lightweight instance of a connection to postgres
477 def __init__(self, pool, dbname):
481 def cursor(self, serialized=True):
482 cursor_type = serialized and 'serialized ' or ''
483 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
484 return Cursor(self._pool, self.dbname, serialized=serialized)
486 # serialized_cursor is deprecated - cursors are serialized by default
487 serialized_cursor = cursor
489 def __nonzero__(self):
490 """Check if connection is possible"""
492 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
501 for p in ('host', 'port', 'user', 'password'):
502 cfg = tools.config['db_' + p]
504 _dsn += '%s=%s ' % (p, cfg)
506 return '%sdbname=%s' % (_dsn, db_name)
508 def dsn_are_equals(first, second):
510 k = dict(x.split('=', 1) for x in dsn.strip().split())
511 k.pop('password', None) # password is not relevant
513 return key(first) == key(second)
518 def db_connect(db_name):
521 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
522 return Connection(_Pool, db_name)
524 def close_db(db_name):
525 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
528 _Pool.close_all(dsn(db_name))
531 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: