1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
30 from contextlib import contextmanager
31 from functools import wraps
35 import psycopg2.extras
36 import psycopg2.extensions
37 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
38 from psycopg2.pool import PoolError
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
42 _logger = logging.getLogger(__name__)
50 def unbuffer(symb, cr):
55 def undecimalize(symb, cr):
60 for name, typeoid in types_mapping.items():
61 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
62 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
66 from tools.func import frame_codeinfo
67 from datetime import datetime as mdt
68 from datetime import timedelta
70 from inspect import currentframe
73 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
74 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
79 """Represents an open transaction to the PostgreSQL DB backend,
80 acting as a lightweight wrapper around psycopg2's
83 ``Cursor`` is the object behind the ``cr`` variable used all
84 over the OpenERP code.
86 .. rubric:: Transaction Isolation
88 One very important property of database transactions is the
89 level of isolation between concurrent transactions.
90 The SQL standard defines four levels of transaction isolation,
91 ranging from the most strict *Serializable* level, to the least
92 strict *Read Uncommitted* level. These levels are defined in
93 terms of the phenomena that must not occur between concurrent
94 transactions, such as *dirty read*, etc.
95 In the context of a generic business data management software
96 such as OpenERP, we need the best guarantees that no data
97 corruption can ever be cause by simply running multiple
98 transactions in parallel. Therefore, the preferred level would
99 be the *serializable* level, which ensures that a set of
100 transactions is guaranteed to produce the same effect as
101 running them one at a time in some order.
103 However, most database management systems implement a limited
104 serializable isolation in the form of
105 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
106 providing most of the same advantages as True Serializability,
107 with a fraction of the performance cost.
108 With PostgreSQL up to version 9.0, this snapshot isolation was
109 the implementation of both the ``REPEATABLE READ`` and
110 ``SERIALIZABLE`` levels of the SQL standard.
111 As of PostgreSQL 9.1, the previous snapshot isolation implementation
112 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
113 level was introduced, providing some additional heuristics to
114 detect a concurrent update by parallel transactions, and forcing
115 one of them to rollback.
117 OpenERP implements its own level of locking protection
118 for transactions that are highly likely to provoke concurrent
119 updates, such as stock reservations or document sequences updates.
120 Therefore we mostly care about the properties of snapshot isolation,
121 but we don't really need additional heuristics to trigger transaction
122 rollbacks, as we are taking care of triggering instant rollbacks
123 ourselves when it matters (and we can save the additional performance
124 hit of these heuristics).
126 As a result of the above, we have selected ``REPEATABLE READ`` as
127 the default transaction isolation level for OpenERP cursors, as
128 it will be mapped to the desired ``snapshot isolation`` level for
129 all supported PostgreSQL version (8.3 - 9.x).
131 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
132 read level to serializable before sending it to the database, so it would
133 actually select the new serializable mode on PostgreSQL 9.1. Make
134 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
135 the performance hit is a concern for you.
139 Cache dictionary with a "request" (-ish) lifecycle, only lives as
140 long as the cursor itself does and proactively cleared when the
143 This cache should *only* be used to store repeatable reads as it
144 ignores rollbacks and savepoints, it should not be used to store
145 *any* data which may be modified during the life of the cursor.
148 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
152 def wrapper(self, *args, **kwargs):
154 msg = 'Unable to use a closed cursor.'
156 msg += ' It was closed at %s, line %s' % self.__closer
157 raise psycopg2.OperationalError(msg)
158 return f(self, *args, **kwargs)
161 def __init__(self, pool, dbname, dsn, serialized=True):
162 self.sql_from_log = {}
163 self.sql_into_log = {}
165 # default log level determined at cursor creation, could be
166 # overridden later for debugging purposes
167 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
169 self.sql_log_count = 0
171 # avoid the call of close() (by __del__) if an exception
172 # is raised by any of the following initialisations
177 # Whether to enable snapshot isolation level for this cursor.
178 # see also the docstring of Cursor.
179 self._serialized = serialized
181 self._cnx = pool.borrow(dsn)
182 self._obj = self._cnx.cursor()
184 self.__caller = frame_codeinfo(currentframe(), 2)
186 self.__caller = False
187 self._closed = False # real initialisation value
188 self.autocommit(False)
189 self.__closer = False
191 self._default_log_exceptions = True
195 def __build_dict(self, row):
196 return {d.name: row[i] for i, d in enumerate(self._obj.description)}
197 def dictfetchone(self):
198 row = self._obj.fetchone()
199 return row and self.__build_dict(row)
200 def dictfetchmany(self, size):
201 return map(self.__build_dict, self._obj.fetchmany(size))
202 def dictfetchall(self):
203 return map(self.__build_dict, self._obj.fetchall())
206 if not self._closed and not self._cnx.closed:
207 # Oops. 'self' has not been closed explicitly.
208 # The cursor will be deleted by the garbage collector,
209 # but the database connection is not put back into the connection
210 # pool, preventing some operation on the database like dropping it.
211 # This can also lead to a server overload.
212 msg = "Cursor not closed explicitly\n"
214 msg += "Cursor was created at %s:%s" % self.__caller
216 msg += "Please enable sql debugging to trace the caller."
221 def execute(self, query, params=None, log_exceptions=None):
222 if '%d' in query or '%f' in query:
223 _logger.warning(query)
224 _logger.warning("SQL queries cannot contain %d or %f anymore. Use only %s")
225 if params and not isinstance(params, (tuple, list, dict)):
226 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
227 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
233 params = params or None
234 res = self._obj.execute(query, params)
235 except psycopg2.ProgrammingError, pe:
236 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
237 _logger.error("Programming error: %s, in query %s", pe, query)
240 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
241 _logger.exception("bad query: %s", self._obj.query or query)
244 # simple query count is always computed
245 self.sql_log_count += 1
247 # advanced stats only if sql_log is enabled
249 delay = mdt.now() - now
250 delay = delay.seconds * 1E6 + delay.microseconds
252 _logger.debug("query: %s", self._obj.query)
253 res_from = re_from.match(query.lower())
255 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
256 self.sql_from_log[res_from.group(1)][0] += 1
257 self.sql_from_log[res_from.group(1)][1] += delay
258 res_into = re_into.match(query.lower())
260 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
261 self.sql_into_log[res_into.group(1)][0] += 1
262 self.sql_into_log[res_into.group(1)][1] += delay
265 def split_for_in_conditions(self, ids):
266 """Split a list of identifiers into one or more smaller tuples
267 safe for IN conditions, after uniquifying them."""
268 return tools.misc.split_every(self.IN_MAX, set(ids))
276 sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log}
279 sqllogitems = sqllogs[type].items()
280 sqllogitems.sort(key=lambda k: k[1][1])
281 _logger.debug("SQL LOG %s:", type)
282 sqllogitems.sort(lambda x, y: cmp(x[1][0], y[1][0]))
283 for r in sqllogitems:
284 delay = timedelta(microseconds=r[1][1])
285 _logger.debug("table: %s: %s/%s", r[0], delay, r[1][0])
287 sqllogs[type].clear()
288 sum = timedelta(microseconds=sum)
289 _logger.debug("SUM %s:%s/%d [%d]", type, sum, self.sql_log_count, sql_counter)
290 sqllogs[type].clear()
293 self.sql_log_count = 0
298 return self._close(False)
300 def _close(self, leak=False):
309 self.__closer = frame_codeinfo(currentframe(), 3)
311 # simple query count is always computed
312 sql_counter += self.sql_log_count
314 # advanced stats only if sql_log is enabled
319 # This force the cursor to be freed, and thus, available again. It is
320 # important because otherwise we can overload the server very easily
321 # because of a cursor shortage (because cursors are not garbage
322 # collected as fast as they should). The problem is probably due in
323 # part because browse records keep a reference to the cursor.
327 # Clean the underlying connection.
331 self._cnx.leaked = True
333 chosen_template = tools.config['db_template']
334 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
335 keep_in_pool = self.dbname not in templates_list
336 self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
339 def autocommit(self, on):
341 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
343 # If a serializable cursor was requested, we
344 # use the appropriate PotsgreSQL isolation level
345 # that maps to snaphsot isolation.
346 # For all supported PostgreSQL versions (8.3-9.x),
347 # this is currently the ISOLATION_REPEATABLE_READ.
348 # See also the docstring of this class.
349 # NOTE: up to psycopg 2.4.2, repeatable read
350 # is remapped to serializable before being
351 # sent to the database, so it is in fact
352 # unavailable for use with pg 9.1.
354 ISOLATION_LEVEL_REPEATABLE_READ \
355 if self._serialized \
356 else ISOLATION_LEVEL_READ_COMMITTED
357 self._cnx.set_isolation_level(isolation_level)
361 """ Perform an SQL `COMMIT`
363 return self._cnx.commit()
367 """ Perform an SQL `ROLLBACK`
369 return self._cnx.rollback()
372 """ Using the cursor as a contextmanager automatically commits and
378 # cr is committed if no failure occurred
379 # cr is closed in any case
383 def __exit__(self, exc_type, exc_value, traceback):
391 """context manager entering in a new savepoint"""
392 name = uuid.uuid1().hex
393 self.execute('SAVEPOINT "%s"' % name)
396 self.execute('RELEASE SAVEPOINT "%s"' % name)
398 self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
402 def __getattr__(self, name):
403 return getattr(self._obj, name)
405 class TestCursor(Cursor):
406 """ A cursor to be used for tests. It keeps the transaction open across
407 several requests, and simulates committing, rolling back, and closing.
409 def __init__(self, *args, **kwargs):
410 super(TestCursor, self).__init__(*args, **kwargs)
411 # in order to simulate commit and rollback, the cursor maintains a
412 # savepoint at its last commit
413 self.execute("SAVEPOINT test_cursor")
414 # we use a lock to serialize concurrent requests
415 self._lock = threading.RLock()
423 def force_close(self):
424 super(TestCursor, self).close()
428 self.rollback() # for stuff that has not been committed
431 def autocommit(self, on):
432 _logger.debug("TestCursor.autocommit(%r) does nothing", on)
435 self.execute("RELEASE SAVEPOINT test_cursor")
436 self.execute("SAVEPOINT test_cursor")
439 self.execute("ROLLBACK TO SAVEPOINT test_cursor")
440 self.execute("SAVEPOINT test_cursor")
442 class PsycoConnection(psycopg2.extensions.connection):
445 class ConnectionPool(object):
446 """ The pool of connections to database(s)
448 Keep a set of connections to pg databases open, and reuse them
449 to open cursors for all transactions.
451 The connections are *not* automatically closed. Only a close_db()
457 def _locked(self, *args, **kwargs):
460 return fun(self, *args, **kwargs)
465 def __init__(self, maxconn=64):
466 self._connections = []
467 self._maxconn = max(maxconn, 1)
468 self._lock = threading.Lock()
471 used = len([1 for c, u in self._connections[:] if u])
472 count = len(self._connections)
473 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
475 def _debug(self, msg, *args):
476 _logger.debug(('%r ' + msg), self, *args)
479 def borrow(self, dsn):
480 # free dead and leaked connections
481 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
483 self._connections.pop(i)
484 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
486 if getattr(cnx, 'leaked', False):
487 delattr(cnx, 'leaked')
488 self._connections.pop(i)
489 self._connections.append((cnx, False))
490 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
492 for i, (cnx, used) in enumerate(self._connections):
493 if not used and cnx._original_dsn == dsn:
496 except psycopg2.OperationalError:
497 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
498 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
502 self._connections.pop(i)
503 self._connections.append((cnx, True))
504 self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i)
508 if len(self._connections) >= self._maxconn:
509 # try to remove the oldest connection not used
510 for i, (cnx, used) in enumerate(self._connections):
512 self._connections.pop(i)
513 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
516 # note: this code is called only if the for loop has completed (no break)
517 raise PoolError('The Connection Pool Is Full')
520 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
521 except psycopg2.Error:
522 _logger.exception('Connection to the database failed')
524 result._original_dsn = dsn
525 self._connections.append((result, True))
526 self._debug('Create new connection')
530 def give_back(self, connection, keep_in_pool=True):
531 self._debug('Give back connection to %r', connection.dsn)
532 for i, (cnx, used) in enumerate(self._connections):
533 if cnx is connection:
534 self._connections.pop(i)
536 self._connections.append((cnx, False))
537 self._debug('Put connection to %r in pool', cnx.dsn)
539 self._debug('Forgot connection to %r', cnx.dsn)
543 raise PoolError('This connection does not below to the pool')
546 def close_all(self, dsn=None):
549 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
550 if dsn is None or cnx._original_dsn == dsn:
552 last = self._connections.pop(i)[0]
554 _logger.info('%r: Closed %d connections %s', self, count,
555 (dsn and last and 'to %r' % last.dsn) or '')
558 class Connection(object):
559 """ A lightweight instance of a connection to postgres
561 def __init__(self, pool, dbname, dsn):
566 def cursor(self, serialized=True):
567 cursor_type = serialized and 'serialized ' or ''
568 _logger.debug('create %scursor to %r', cursor_type, self.dsn)
569 return Cursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
571 def test_cursor(self, serialized=True):
572 cursor_type = serialized and 'serialized ' or ''
573 _logger.debug('create test %scursor to %r', cursor_type, self.dsn)
574 return TestCursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
576 # serialized_cursor is deprecated - cursors are serialized by default
577 serialized_cursor = cursor
579 def __nonzero__(self):
580 """Check if connection is possible"""
582 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
590 """parse the given `db_or_uri` and return a 2-tuple (dbname, uri)"""
591 if db_or_uri.startswith(('postgresql://', 'postgres://')):
592 # extract db from uri
593 us = urlparse.urlsplit(db_or_uri)
595 db_name = us.path[1:]
597 db_name = us.username
599 db_name = us.hostname
600 return db_name, db_or_uri
603 for p in ('host', 'port', 'user', 'password'):
604 cfg = tools.config['db_' + p]
606 _dsn += '%s=%s ' % (p, cfg)
608 return db_or_uri, '%sdbname=%s' % (_dsn, db_or_uri)
612 def db_connect(to, allow_uri=False):
615 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
618 if not allow_uri and db != to:
619 raise ValueError('URI connections not allowed')
620 return Connection(_Pool, db, uri)
622 def close_db(db_name):
623 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
626 _Pool.close_all(dsn(db_name)[1])
633 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: