1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 #.apidoc title: PostgreSQL interface
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
30 See also: the `pooler` module
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
36 __all__ = ['db_connect', 'close_db']
38 from functools import wraps
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
48 _logger = logging.getLogger(__name__)
56 def unbuffer(symb, cr):
57 if symb is None: return None
60 def undecimalize(symb, cr):
61 if symb is None: return None
64 for name, typeoid in types_mapping.items():
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
74 from inspect import currentframe
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
83 """Represents an open transaction to the PostgreSQL DB backend,
84 acting as a lightweight wrapper around psycopg2's
85 ``psycopg1cursor`` objects.
87 ``Cursor`` is the object behind the ``cr`` variable used all
88 over the OpenERP code.
90 .. rubric:: Transaction Isolation
92 One very important property of database transactions is the
93 level of isolation between concurrent transactions.
94 The SQL standard defines four levels of transaction isolation,
95 ranging from the most strict *Serializable* level, to the least
96 strict *Read Uncommitted* level. These levels are defined in
97 terms of the phenomena that must not occur between concurrent
98 transactions, such as *dirty read*, etc.
99 In the context of a generic business data management software
100 such as OpenERP, we need the best guarantees that no data
101 corruption can ever be cause by simply running multiple
102 transactions in parallel. Therefore, the preferred level would
103 be the *serializable* level, which ensures that a set of
104 transactions is guaranteed to produce the same effect as
105 running them one at a time in some order.
107 However, most database management systems implement a limited
108 serializable isolation in the form of
109 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110 providing most of the same advantages as True Serializability,
111 with a fraction of the performance cost.
112 With PostgreSQL up to version 9.0, this snapshot isolation was
113 the implementation of both the ``REPEATABLE READ`` and
114 ``SERIALIZABLE`` levels of the SQL standard.
115 As of PostgreSQL 9.1, the previous snapshot isolation implementation
116 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117 level was introduced, providing some additional heuristics to
118 detect a concurrent update by parallel transactions, and forcing
119 one of them to rollback.
121 OpenERP implements its own level of locking protection
122 for transactions that are highly likely to provoke concurrent
123 updates, such as stock reservations or document sequences updates.
124 Therefore we mostly care about the properties of snapshot isolation,
125 but we don't really need additional heuristics to trigger transaction
126 rollbacks, as we are taking care of triggering instant rollbacks
127 ourselves when it matters (and we can save the additional performance
128 hit of these heuristics).
130 As a result of the above, we have selected ``REPEATABLE READ`` as
131 the default transaction isolation level for OpenERP cursors, as
132 it will be mapped to the desired ``snapshot isolation`` level for
133 all supported PostgreSQL version (8.3 - 9.x).
135 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136 read level to serializable before sending it to the database, so it would
137 actually select the new serializable mode on PostgreSQL 9.1. Make
138 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139 the performance hit is a concern for you.
143 Cache dictionary with a "request" (-ish) lifecycle, only lives as
144 long as the cursor itself does and proactively cleared when the
147 This cache should *only* be used to store repeatable reads as it
148 ignores rollbacks and savepoints, it should not be used to store
149 *any* data which may be modified during the life of the cursor.
152 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
156 def wrapper(self, *args, **kwargs):
158 msg = 'Unable to use a closed cursor.'
160 msg += ' It was closed at %s, line %s' % self.__closer
161 raise psycopg2.OperationalError(msg)
162 return f(self, *args, **kwargs)
165 def __init__(self, pool, dbname, serialized=True):
166 self.sql_from_log = {}
167 self.sql_into_log = {}
169 # default log level determined at cursor creation, could be
170 # overridden later for debugging purposes
171 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
173 self.sql_log_count = 0
174 self.__closed = True # avoid the call of close() (by __del__) if an exception
175 # is raised by any of the following initialisations
179 # Whether to enable snapshot isolation level for this cursor.
180 # see also the docstring of Cursor.
181 self._serialized = serialized
183 self._cnx = pool.borrow(dsn(dbname))
184 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
186 self.__caller = frame_codeinfo(currentframe(),2)
188 self.__caller = False
189 self.__closed = False # real initialisation value
190 self.autocommit(False)
191 self.__closer = False
193 self._default_log_exceptions = True
198 if not self.__closed and not self._cnx.closed:
199 # Oops. 'self' has not been closed explicitly.
200 # The cursor will be deleted by the garbage collector,
201 # but the database connection is not put back into the connection
202 # pool, preventing some operation on the database like dropping it.
203 # This can also lead to a server overload.
204 msg = "Cursor not closed explicitly\n"
206 msg += "Cursor was created at %s:%s" % self.__caller
208 msg += "Please enable sql debugging to trace the caller."
213 def execute(self, query, params=None, log_exceptions=None):
214 if '%d' in query or '%f' in query:
215 _logger.warning(query)
216 _logger.warning("SQL queries cannot contain %d or %f anymore. "
218 if params and not isinstance(params, (tuple, list, dict)):
219 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
220 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
226 params = params or None
227 res = self._obj.execute(query, params)
228 except psycopg2.ProgrammingError, pe:
229 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
230 _logger.error("Programming error: %s, in query %s", pe, query)
233 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
234 _logger.exception("bad query: %s", self._obj.query or query)
238 delay = mdt.now() - now
239 delay = delay.seconds * 1E6 + delay.microseconds
241 _logger.debug("query: %s", self._obj.query)
242 self.sql_log_count+=1
243 res_from = re_from.match(query.lower())
245 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
246 self.sql_from_log[res_from.group(1)][0] += 1
247 self.sql_from_log[res_from.group(1)][1] += delay
248 res_into = re_into.match(query.lower())
250 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
251 self.sql_into_log[res_into.group(1)][0] += 1
252 self.sql_into_log[res_into.group(1)][1] += delay
256 def split_for_in_conditions(self, ids):
257 """Split a list of identifiers into one or more smaller tuples
258 safe for IN conditions, after uniquifying them."""
259 return tools.misc.split_every(self.IN_MAX, set(ids))
263 sql_counter += self.sql_log_count
267 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
270 sqllogitems = sqllogs[type].items()
271 sqllogitems.sort(key=lambda k: k[1][1])
272 _logger.debug("SQL LOG %s:", type)
273 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
274 for r in sqllogitems:
275 delay = timedelta(microseconds=r[1][1])
276 _logger.debug("table: %s: %s/%s",
277 r[0], delay, r[1][0])
279 sqllogs[type].clear()
280 sum = timedelta(microseconds=sum)
281 _logger.debug("SUM %s:%s/%d [%d]",
282 type, sum, self.sql_log_count, sql_counter)
283 sqllogs[type].clear()
286 self.sql_log_count = 0
291 return self._close(False)
293 def _close(self, leak=False):
300 self.__closer = frame_codeinfo(currentframe(),3)
305 # This force the cursor to be freed, and thus, available again. It is
306 # important because otherwise we can overload the server very easily
307 # because of a cursor shortage (because cursors are not garbage
308 # collected as fast as they should). The problem is probably due in
309 # part because browse records keep a reference to the cursor.
313 # Clean the underlying connection.
317 self._cnx.leaked = True
319 chosen_template = tools.config['db_template']
320 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
321 keep_in_pool = self.dbname not in templates_list
322 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
325 def autocommit(self, on):
327 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
329 # If a serializable cursor was requested, we
330 # use the appropriate PotsgreSQL isolation level
331 # that maps to snaphsot isolation.
332 # For all supported PostgreSQL versions (8.3-9.x),
333 # this is currently the ISOLATION_REPEATABLE_READ.
334 # See also the docstring of this class.
335 # NOTE: up to psycopg 2.4.2, repeatable read
336 # is remapped to serializable before being
337 # sent to the database, so it is in fact
338 # unavailable for use with pg 9.1.
339 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
340 if self._serialized \
341 else ISOLATION_LEVEL_READ_COMMITTED
342 self._cnx.set_isolation_level(isolation_level)
346 """ Perform an SQL `COMMIT`
348 return self._cnx.commit()
352 """ Perform an SQL `ROLLBACK`
354 return self._cnx.rollback()
357 def __getattr__(self, name):
358 return getattr(self._obj, name)
360 class PsycoConnection(psycopg2.extensions.connection):
363 class ConnectionPool(object):
364 """ The pool of connections to database(s)
366 Keep a set of connections to pg databases open, and reuse them
367 to open cursors for all transactions.
369 The connections are *not* automatically closed. Only a close_db()
375 def _locked(self, *args, **kwargs):
378 return fun(self, *args, **kwargs)
384 def __init__(self, maxconn=64):
385 self._connections = []
386 self._maxconn = max(maxconn, 1)
387 self._lock = threading.Lock()
390 used = len([1 for c, u in self._connections[:] if u])
391 count = len(self._connections)
392 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
394 def _debug(self, msg, *args):
395 _logger.debug(('%r ' + msg), self, *args)
398 def borrow(self, dsn):
399 self._debug('Borrow connection to %r', dsn)
401 # free dead and leaked connections
402 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
404 self._connections.pop(i)
405 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
407 if getattr(cnx, 'leaked', False):
408 delattr(cnx, 'leaked')
409 self._connections.pop(i)
410 self._connections.append((cnx, False))
411 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
413 for i, (cnx, used) in enumerate(self._connections):
414 if not used and dsn_are_equals(cnx.dsn, dsn):
417 except psycopg2.OperationalError:
418 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
419 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
423 self._connections.pop(i)
424 self._connections.append((cnx, True))
425 self._debug('Existing connection found at index %d', i)
429 if len(self._connections) >= self._maxconn:
430 # try to remove the oldest connection not used
431 for i, (cnx, used) in enumerate(self._connections):
433 self._connections.pop(i)
434 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
437 # note: this code is called only if the for loop has completed (no break)
438 raise PoolError('The Connection Pool Is Full')
441 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
442 except psycopg2.Error:
443 _logger.exception('Connection to the database failed')
445 self._connections.append((result, True))
446 self._debug('Create new connection')
450 def give_back(self, connection, keep_in_pool=True):
451 self._debug('Give back connection to %r', connection.dsn)
452 for i, (cnx, used) in enumerate(self._connections):
453 if cnx is connection:
454 self._connections.pop(i)
456 self._connections.append((cnx, False))
457 self._debug('Put connection to %r in pool', cnx.dsn)
459 self._debug('Forgot connection to %r', cnx.dsn)
463 raise PoolError('This connection does not below to the pool')
466 def close_all(self, dsn):
467 _logger.info('%r: Close all connections to %r', self, dsn)
468 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
469 if dsn_are_equals(cnx.dsn, dsn):
471 self._connections.pop(i)
474 class Connection(object):
475 """ A lightweight instance of a connection to postgres
478 def __init__(self, pool, dbname):
482 def cursor(self, serialized=True):
483 cursor_type = serialized and 'serialized ' or ''
484 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
485 return Cursor(self._pool, self.dbname, serialized=serialized)
487 # serialized_cursor is deprecated - cursors are serialized by default
488 serialized_cursor = cursor
490 def __nonzero__(self):
491 """Check if connection is possible"""
493 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
502 for p in ('host', 'port', 'user', 'password'):
503 cfg = tools.config['db_' + p]
505 _dsn += '%s=%s ' % (p, cfg)
507 return '%sdbname=%s' % (_dsn, db_name)
509 def dsn_are_equals(first, second):
511 k = dict(x.split('=', 1) for x in dsn.strip().split())
512 k.pop('password', None) # password is not relevant
514 return key(first) == key(second)
519 def db_connect(db_name):
522 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
523 currentThread().dbname = db_name
524 return Connection(_Pool, db_name)
526 def close_db(db_name):
527 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
530 _Pool.close_all(dsn(db_name))
532 if hasattr(ct, 'dbname'):
533 delattr(ct, 'dbname')
536 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: