1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
30 from contextlib import contextmanager
31 from functools import wraps
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
42 _logger = logging.getLogger(__name__)
50 def unbuffer(symb, cr):
51 if symb is None: return None
54 def undecimalize(symb, cr):
55 if symb is None: return None
58 for name, typeoid in types_mapping.items():
59 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
64 from tools.func import frame_codeinfo
65 from datetime import datetime as mdt
66 from datetime import timedelta
68 from inspect import currentframe
71 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
72 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
77 """Represents an open transaction to the PostgreSQL DB backend,
78 acting as a lightweight wrapper around psycopg2's
79 ``psycopg1cursor`` objects.
81 ``Cursor`` is the object behind the ``cr`` variable used all
82 over the OpenERP code.
84 .. rubric:: Transaction Isolation
86 One very important property of database transactions is the
87 level of isolation between concurrent transactions.
88 The SQL standard defines four levels of transaction isolation,
89 ranging from the most strict *Serializable* level, to the least
90 strict *Read Uncommitted* level. These levels are defined in
91 terms of the phenomena that must not occur between concurrent
92 transactions, such as *dirty read*, etc.
93 In the context of a generic business data management software
94 such as OpenERP, we need the best guarantees that no data
95 corruption can ever be cause by simply running multiple
96 transactions in parallel. Therefore, the preferred level would
97 be the *serializable* level, which ensures that a set of
98 transactions is guaranteed to produce the same effect as
99 running them one at a time in some order.
101 However, most database management systems implement a limited
102 serializable isolation in the form of
103 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
104 providing most of the same advantages as True Serializability,
105 with a fraction of the performance cost.
106 With PostgreSQL up to version 9.0, this snapshot isolation was
107 the implementation of both the ``REPEATABLE READ`` and
108 ``SERIALIZABLE`` levels of the SQL standard.
109 As of PostgreSQL 9.1, the previous snapshot isolation implementation
110 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
111 level was introduced, providing some additional heuristics to
112 detect a concurrent update by parallel transactions, and forcing
113 one of them to rollback.
115 OpenERP implements its own level of locking protection
116 for transactions that are highly likely to provoke concurrent
117 updates, such as stock reservations or document sequences updates.
118 Therefore we mostly care about the properties of snapshot isolation,
119 but we don't really need additional heuristics to trigger transaction
120 rollbacks, as we are taking care of triggering instant rollbacks
121 ourselves when it matters (and we can save the additional performance
122 hit of these heuristics).
124 As a result of the above, we have selected ``REPEATABLE READ`` as
125 the default transaction isolation level for OpenERP cursors, as
126 it will be mapped to the desired ``snapshot isolation`` level for
127 all supported PostgreSQL version (8.3 - 9.x).
129 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
130 read level to serializable before sending it to the database, so it would
131 actually select the new serializable mode on PostgreSQL 9.1. Make
132 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
133 the performance hit is a concern for you.
137 Cache dictionary with a "request" (-ish) lifecycle, only lives as
138 long as the cursor itself does and proactively cleared when the
141 This cache should *only* be used to store repeatable reads as it
142 ignores rollbacks and savepoints, it should not be used to store
143 *any* data which may be modified during the life of the cursor.
146 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
150 def wrapper(self, *args, **kwargs):
152 msg = 'Unable to use a closed cursor.'
154 msg += ' It was closed at %s, line %s' % self.__closer
155 raise psycopg2.OperationalError(msg)
156 return f(self, *args, **kwargs)
159 def __init__(self, pool, dbname, serialized=True):
160 self.sql_from_log = {}
161 self.sql_into_log = {}
163 # default log level determined at cursor creation, could be
164 # overridden later for debugging purposes
165 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
167 self.sql_log_count = 0
168 self._closed = True # avoid the call of close() (by __del__) if an exception
169 # is raised by any of the following initialisations
173 # Whether to enable snapshot isolation level for this cursor.
174 # see also the docstring of Cursor.
175 self._serialized = serialized
177 self._cnx = pool.borrow(dsn(dbname))
178 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
180 self.__caller = frame_codeinfo(currentframe(),2)
182 self.__caller = False
183 self._closed = False # real initialisation value
184 self.autocommit(False)
185 self.__closer = False
187 self._default_log_exceptions = True
192 if not self._closed and not self._cnx.closed:
193 # Oops. 'self' has not been closed explicitly.
194 # The cursor will be deleted by the garbage collector,
195 # but the database connection is not put back into the connection
196 # pool, preventing some operation on the database like dropping it.
197 # This can also lead to a server overload.
198 msg = "Cursor not closed explicitly\n"
200 msg += "Cursor was created at %s:%s" % self.__caller
202 msg += "Please enable sql debugging to trace the caller."
207 def execute(self, query, params=None, log_exceptions=None):
208 if '%d' in query or '%f' in query:
209 _logger.warning(query)
210 _logger.warning("SQL queries cannot contain %d or %f anymore. "
212 if params and not isinstance(params, (tuple, list, dict)):
213 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
214 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
220 params = params or None
221 res = self._obj.execute(query, params)
222 except psycopg2.ProgrammingError, pe:
223 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
224 _logger.error("Programming error: %s, in query %s", pe, query)
227 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
228 _logger.exception("bad query: %s", self._obj.query or query)
232 delay = mdt.now() - now
233 delay = delay.seconds * 1E6 + delay.microseconds
235 _logger.debug("query: %s", self._obj.query)
236 self.sql_log_count+=1
237 res_from = re_from.match(query.lower())
239 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
240 self.sql_from_log[res_from.group(1)][0] += 1
241 self.sql_from_log[res_from.group(1)][1] += delay
242 res_into = re_into.match(query.lower())
244 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
245 self.sql_into_log[res_into.group(1)][0] += 1
246 self.sql_into_log[res_into.group(1)][1] += delay
250 def split_for_in_conditions(self, ids):
251 """Split a list of identifiers into one or more smaller tuples
252 safe for IN conditions, after uniquifying them."""
253 return tools.misc.split_every(self.IN_MAX, set(ids))
257 sql_counter += self.sql_log_count
261 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
264 sqllogitems = sqllogs[type].items()
265 sqllogitems.sort(key=lambda k: k[1][1])
266 _logger.debug("SQL LOG %s:", type)
267 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
268 for r in sqllogitems:
269 delay = timedelta(microseconds=r[1][1])
270 _logger.debug("table: %s: %s/%s",
271 r[0], delay, r[1][0])
273 sqllogs[type].clear()
274 sum = timedelta(microseconds=sum)
275 _logger.debug("SUM %s:%s/%d [%d]",
276 type, sum, self.sql_log_count, sql_counter)
277 sqllogs[type].clear()
280 self.sql_log_count = 0
285 return self._close(False)
287 def _close(self, leak=False):
294 self.__closer = frame_codeinfo(currentframe(),3)
299 # This force the cursor to be freed, and thus, available again. It is
300 # important because otherwise we can overload the server very easily
301 # because of a cursor shortage (because cursors are not garbage
302 # collected as fast as they should). The problem is probably due in
303 # part because browse records keep a reference to the cursor.
307 # Clean the underlying connection.
311 self._cnx.leaked = True
313 chosen_template = tools.config['db_template']
314 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
315 keep_in_pool = self.dbname not in templates_list
316 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
319 def autocommit(self, on):
321 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
323 # If a serializable cursor was requested, we
324 # use the appropriate PotsgreSQL isolation level
325 # that maps to snaphsot isolation.
326 # For all supported PostgreSQL versions (8.3-9.x),
327 # this is currently the ISOLATION_REPEATABLE_READ.
328 # See also the docstring of this class.
329 # NOTE: up to psycopg 2.4.2, repeatable read
330 # is remapped to serializable before being
331 # sent to the database, so it is in fact
332 # unavailable for use with pg 9.1.
333 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
334 if self._serialized \
335 else ISOLATION_LEVEL_READ_COMMITTED
336 self._cnx.set_isolation_level(isolation_level)
340 """ Perform an SQL `COMMIT`
342 return self._cnx.commit()
346 """ Perform an SQL `ROLLBACK`
348 return self._cnx.rollback()
351 """ Using the cursor as a contextmanager automatically commits and
357 # cr is committed if no failure occurred
358 # cr is closed in any case
362 def __exit__(self, exc_type, exc_value, traceback):
370 """context manager entering in a new savepoint"""
371 name = uuid.uuid1().hex
372 self.execute('SAVEPOINT "%s"' % name)
375 self.execute('RELEASE SAVEPOINT "%s"' % name)
377 self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
381 def __getattr__(self, name):
382 return getattr(self._obj, name)
384 class TestCursor(Cursor):
385 """ A cursor to be used for tests. It keeps the transaction open across
386 several requests, and simulates committing, rolling back, and closing.
388 def __init__(self, *args, **kwargs):
389 super(TestCursor, self).__init__(*args, **kwargs)
390 # in order to simulate commit and rollback, the cursor maintains a
391 # savepoint at its last commit
392 self.execute("SAVEPOINT test_cursor")
393 # we use a lock to serialize concurrent requests
394 self._lock = threading.RLock()
402 def close(self, force=False):
404 super(TestCursor, self).close()
405 elif not self._closed:
406 self.rollback() # for stuff that has not been committed
409 def autocommit(self, on):
410 _logger.debug("TestCursor.autocommit(%r) does nothing", on)
413 self.execute("RELEASE SAVEPOINT test_cursor")
414 self.execute("SAVEPOINT test_cursor")
417 self.execute("ROLLBACK TO SAVEPOINT test_cursor")
418 self.execute("SAVEPOINT test_cursor")
420 class PsycoConnection(psycopg2.extensions.connection):
423 class ConnectionPool(object):
424 """ The pool of connections to database(s)
426 Keep a set of connections to pg databases open, and reuse them
427 to open cursors for all transactions.
429 The connections are *not* automatically closed. Only a close_db()
435 def _locked(self, *args, **kwargs):
438 return fun(self, *args, **kwargs)
444 def __init__(self, maxconn=64):
445 self._connections = []
446 self._maxconn = max(maxconn, 1)
447 self._lock = threading.Lock()
450 used = len([1 for c, u in self._connections[:] if u])
451 count = len(self._connections)
452 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
454 def _debug(self, msg, *args):
455 _logger.debug(('%r ' + msg), self, *args)
458 def borrow(self, dsn):
459 self._debug('Borrow connection to %r', dsn)
461 # free dead and leaked connections
462 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
464 self._connections.pop(i)
465 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
467 if getattr(cnx, 'leaked', False):
468 delattr(cnx, 'leaked')
469 self._connections.pop(i)
470 self._connections.append((cnx, False))
471 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
473 for i, (cnx, used) in enumerate(self._connections):
474 if not used and dsn_are_equals(cnx.dsn, dsn):
477 except psycopg2.OperationalError:
478 self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
479 # psycopg2 2.4.4 and earlier do not allow closing a closed connection
483 self._connections.pop(i)
484 self._connections.append((cnx, True))
485 self._debug('Existing connection found at index %d', i)
489 if len(self._connections) >= self._maxconn:
490 # try to remove the oldest connection not used
491 for i, (cnx, used) in enumerate(self._connections):
493 self._connections.pop(i)
494 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
497 # note: this code is called only if the for loop has completed (no break)
498 raise PoolError('The Connection Pool Is Full')
501 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
502 except psycopg2.Error:
503 _logger.exception('Connection to the database failed')
505 self._connections.append((result, True))
506 self._debug('Create new connection')
510 def give_back(self, connection, keep_in_pool=True):
511 self._debug('Give back connection to %r', connection.dsn)
512 for i, (cnx, used) in enumerate(self._connections):
513 if cnx is connection:
514 self._connections.pop(i)
516 self._connections.append((cnx, False))
517 self._debug('Put connection to %r in pool', cnx.dsn)
519 self._debug('Forgot connection to %r', cnx.dsn)
523 raise PoolError('This connection does not below to the pool')
526 def close_all(self, dsn=None):
527 _logger.info('%r: Close all connections to %r', self, dsn)
528 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
529 if dsn is None or dsn_are_equals(cnx.dsn, dsn):
531 self._connections.pop(i)
534 class Connection(object):
535 """ A lightweight instance of a connection to postgres
538 def __init__(self, pool, dbname):
542 def cursor(self, serialized=True):
543 cursor_type = serialized and 'serialized ' or ''
544 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
545 return Cursor(self._pool, self.dbname, serialized=serialized)
547 def test_cursor(self, serialized=True):
548 cursor_type = serialized and 'serialized ' or ''
549 _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
550 return TestCursor(self._pool, self.dbname, serialized=serialized)
552 # serialized_cursor is deprecated - cursors are serialized by default
553 serialized_cursor = cursor
555 def __nonzero__(self):
556 """Check if connection is possible"""
558 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
567 for p in ('host', 'port', 'user', 'password'):
568 cfg = tools.config['db_' + p]
570 _dsn += '%s=%s ' % (p, cfg)
572 return '%sdbname=%s' % (_dsn, db_name)
574 def dsn_are_equals(first, second):
576 k = dict(x.split('=', 1) for x in dsn.strip().split())
577 k.pop('password', None) # password is not relevant
579 return key(first) == key(second)
584 def db_connect(db_name):
587 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
588 return Connection(_Pool, db_name)
590 def close_db(db_name):
591 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
594 _Pool.close_all(dsn(db_name))
602 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: