1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
31 __all__ = ['db_connect', 'close_db']
33 from functools import wraps
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39 from threading import currentThread
41 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
43 _logger = logging.getLogger(__name__)
51 def unbuffer(symb, cr):
52 if symb is None: return None
55 def undecimalize(symb, cr):
56 if symb is None: return None
59 for name, typeoid in types_mapping.items():
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
61 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
65 from tools.func import frame_codeinfo
66 from datetime import datetime as mdt
67 from datetime import timedelta
69 from inspect import currentframe
72 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
73 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
78 """Represents an open transaction to the PostgreSQL DB backend,
79 acting as a lightweight wrapper around psycopg2's
80 ``psycopg1cursor`` objects.
82 ``Cursor`` is the object behind the ``cr`` variable used all
83 over the OpenERP code.
85 .. rubric:: Transaction Isolation
87 One very important property of database transactions is the
88 level of isolation between concurrent transactions.
89 The SQL standard defines four levels of transaction isolation,
90 ranging from the most strict *Serializable* level, to the least
91 strict *Read Uncommitted* level. These levels are defined in
92 terms of the phenomena that must not occur between concurrent
93 transactions, such as *dirty read*, etc.
94 In the context of a generic business data management software
95 such as OpenERP, we need the best guarantees that no data
96 corruption can ever be cause by simply running multiple
97 transactions in parallel. Therefore, the preferred level would
98 be the *serializable* level, which ensures that a set of
99 transactions is guaranteed to produce the same effect as
100 running them one at a time in some order.
102 However, most database management systems implement a limited
103 serializable isolation in the form of
104 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
105 providing most of the same advantages as True Serializability,
106 with a fraction of the performance cost.
107 With PostgreSQL up to version 9.0, this snapshot isolation was
108 the implementation of both the ``REPEATABLE READ`` and
109 ``SERIALIZABLE`` levels of the SQL standard.
110 As of PostgreSQL 9.1, the previous snapshot isolation implementation
111 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
112 level was introduced, providing some additional heuristics to
113 detect a concurrent update by parallel transactions, and forcing
114 one of them to rollback.
116 OpenERP implements its own level of locking protection
117 for transactions that are highly likely to provoke concurrent
118 updates, such as stock reservations or document sequences updates.
119 Therefore we mostly care about the properties of snapshot isolation,
120 but we don't really need additional heuristics to trigger transaction
121 rollbacks, as we are taking care of triggering instant rollbacks
122 ourselves when it matters (and we can save the additional performance
123 hit of these heuristics).
125 As a result of the above, we have selected ``REPEATABLE READ`` as
126 the default transaction isolation level for OpenERP cursors, as
127 it will be mapped to the desired ``snapshot isolation`` level for
128 all supported PostgreSQL version (8.3 - 9.x).
130 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
131 read level to serializable before sending it to the database, so it would
132 actually select the new serializable mode on PostgreSQL 9.1. Make
133 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
134 the performance hit is a concern for you.
138 Cache dictionary with a "request" (-ish) lifecycle, only lives as
139 long as the cursor itself does and proactively cleared when the
142 This cache should *only* be used to store repeatable reads as it
143 ignores rollbacks and savepoints, it should not be used to store
144 *any* data which may be modified during the life of the cursor.
147 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
151 def wrapper(self, *args, **kwargs):
153 msg = 'Unable to use a closed cursor.'
155 msg += ' It was closed at %s, line %s' % self.__closer
156 raise psycopg2.OperationalError(msg)
157 return f(self, *args, **kwargs)
160 def __init__(self, pool, dbname, serialized=True):
161 self.sql_from_log = {}
162 self.sql_into_log = {}
164 # default log level determined at cursor creation, could be
165 # overridden later for debugging purposes
166 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
168 self.sql_log_count = 0
169 self.__closed = True # avoid the call of close() (by __del__) if an exception
170 # is raised by any of the following initialisations
174 # Whether to enable snapshot isolation level for this cursor.
175 # see also the docstring of Cursor.
176 self._serialized = serialized
178 self._cnx = pool.borrow(dsn(dbname))
179 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
181 self.__caller = frame_codeinfo(currentframe(),2)
183 self.__caller = False
184 self.__closed = False # real initialisation value
185 self.autocommit(False)
186 self.__closer = False
188 self._default_log_exceptions = True
193 if not self.__closed and not self._cnx.closed:
194 # Oops. 'self' has not been closed explicitly.
195 # The cursor will be deleted by the garbage collector,
196 # but the database connection is not put back into the connection
197 # pool, preventing some operation on the database like dropping it.
198 # This can also lead to a server overload.
199 msg = "Cursor not closed explicitly\n"
201 msg += "Cursor was created at %s:%s" % self.__caller
203 msg += "Please enable sql debugging to trace the caller."
208 def execute(self, query, params=None, log_exceptions=None):
209 if '%d' in query or '%f' in query:
210 _logger.warning(query)
211 _logger.warning("SQL queries cannot contain %d or %f anymore. "
213 if params and not isinstance(params, (tuple, list, dict)):
214 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
215 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
221 params = params or None
222 res = self._obj.execute(query, params)
223 except psycopg2.ProgrammingError, pe:
224 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
225 _logger.error("Programming error: %s, in query %s", pe, query)
228 if self._default_log_exceptions if log_exceptions is None else log_exceptions:
229 _logger.exception("bad query: %s", self._obj.query or query)
233 delay = mdt.now() - now
234 delay = delay.seconds * 1E6 + delay.microseconds
236 _logger.debug("query: %s", self._obj.query)
237 self.sql_log_count+=1
238 res_from = re_from.match(query.lower())
240 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
241 self.sql_from_log[res_from.group(1)][0] += 1
242 self.sql_from_log[res_from.group(1)][1] += delay
243 res_into = re_into.match(query.lower())
245 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
246 self.sql_into_log[res_into.group(1)][0] += 1
247 self.sql_into_log[res_into.group(1)][1] += delay
251 def split_for_in_conditions(self, ids):
252 """Split a list of identifiers into one or more smaller tuples
253 safe for IN conditions, after uniquifying them."""
254 return tools.misc.split_every(self.IN_MAX, set(ids))
258 sql_counter += self.sql_log_count
262 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
265 sqllogitems = sqllogs[type].items()
266 sqllogitems.sort(key=lambda k: k[1][1])
267 _logger.debug("SQL LOG %s:", type)
268 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
269 for r in sqllogitems:
270 delay = timedelta(microseconds=r[1][1])
271 _logger.debug("table: %s: %s/%s",
272 r[0], delay, r[1][0])
274 sqllogs[type].clear()
275 sum = timedelta(microseconds=sum)
276 _logger.debug("SUM %s:%s/%d [%d]",
277 type, sum, self.sql_log_count, sql_counter)
278 sqllogs[type].clear()
281 self.sql_log_count = 0
286 return self._close(False)
288 def _close(self, leak=False):
295 self.__closer = frame_codeinfo(currentframe(),3)
300 # This force the cursor to be freed, and thus, available again. It is
301 # important because otherwise we can overload the server very easily
302 # because of a cursor shortage (because cursors are not garbage
303 # collected as fast as they should). The problem is probably due in
304 # part because browse records keep a reference to the cursor.
308 # Clean the underlying connection.
312 self._cnx.leaked = True
314 chosen_template = tools.config['db_template']
315 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
316 keep_in_pool = self.dbname not in templates_list
317 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
320 def autocommit(self, on):
322 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
324 # If a serializable cursor was requested, we
325 # use the appropriate PotsgreSQL isolation level
326 # that maps to snaphsot isolation.
327 # For all supported PostgreSQL versions (8.3-9.x),
328 # this is currently the ISOLATION_REPEATABLE_READ.
329 # See also the docstring of this class.
330 # NOTE: up to psycopg 2.4.2, repeatable read
331 # is remapped to serializable before being
332 # sent to the database, so it is in fact
333 # unavailable for use with pg 9.1.
334 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
335 if self._serialized \
336 else ISOLATION_LEVEL_READ_COMMITTED
337 self._cnx.set_isolation_level(isolation_level)
341 """ Perform an SQL `COMMIT`
343 return self._cnx.commit()
347 """ Perform an SQL `ROLLBACK`
349 return self._cnx.rollback()
352 def __getattr__(self, name):
353 return getattr(self._obj, name)
355 class PsycoConnection(psycopg2.extensions.connection):
358 class ConnectionPool(object):
359 """ The pool of connections to database(s)
361 Keep a set of connections to pg databases open, and reuse them
362 to open cursors for all transactions.
364 The connections are *not* automatically closed. Only a close_db()
370 def _locked(self, *args, **kwargs):
373 return fun(self, *args, **kwargs)
379 def __init__(self, maxconn=64):
380 self._connections = []
381 self._maxconn = max(maxconn, 1)
382 self._lock = threading.Lock()
385 used = len([1 for c, u in self._connections[:] if u])
386 count = len(self._connections)
387 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
389 def _debug(self, msg, *args):
390 _logger.debug(('%r ' + msg), self, *args)
393 def borrow(self, dsn):
394 self._debug('Borrow connection to %r', dsn)
396 # free leaked connections
397 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
399 self._connections.pop(i)
400 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
402 if getattr(cnx, 'leaked', False):
403 delattr(cnx, 'leaked')
404 self._connections.pop(i)
405 self._connections.append((cnx, False))
406 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
408 for i, (cnx, used) in enumerate(self._connections):
409 if not used and dsn_are_equals(cnx.dsn, dsn):
410 self._connections.pop(i)
411 self._connections.append((cnx, True))
412 self._debug('Existing connection found at index %d', i)
416 if len(self._connections) >= self._maxconn:
417 # try to remove the oldest connection not used
418 for i, (cnx, used) in enumerate(self._connections):
420 self._connections.pop(i)
421 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
424 # note: this code is called only if the for loop has completed (no break)
425 raise PoolError('The Connection Pool Is Full')
428 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
429 except psycopg2.Error:
430 _logger.exception('Connection to the database failed')
432 self._connections.append((result, True))
433 self._debug('Create new connection')
437 def give_back(self, connection, keep_in_pool=True):
438 self._debug('Give back connection to %r', connection.dsn)
439 for i, (cnx, used) in enumerate(self._connections):
440 if cnx is connection:
441 self._connections.pop(i)
443 self._connections.append((cnx, False))
444 self._debug('Put connection to %r in pool', cnx.dsn)
446 self._debug('Forgot connection to %r', cnx.dsn)
450 raise PoolError('This connection does not below to the pool')
453 def close_all(self, dsn):
454 _logger.info('%r: Close all connections to %r', self, dsn)
455 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
456 if dsn_are_equals(cnx.dsn, dsn):
458 self._connections.pop(i)
461 class Connection(object):
462 """ A lightweight instance of a connection to postgres
465 def __init__(self, pool, dbname):
469 def cursor(self, serialized=True):
470 cursor_type = serialized and 'serialized ' or ''
471 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
472 return Cursor(self._pool, self.dbname, serialized=serialized)
474 # serialized_cursor is deprecated - cursors are serialized by default
475 serialized_cursor = cursor
477 def __nonzero__(self):
478 """Check if connection is possible"""
480 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
489 for p in ('host', 'port', 'user', 'password'):
490 cfg = tools.config['db_' + p]
492 _dsn += '%s=%s ' % (p, cfg)
494 return '%sdbname=%s' % (_dsn, db_name)
496 def dsn_are_equals(first, second):
498 k = dict(x.split('=', 1) for x in dsn.strip().split())
499 k.pop('password', None) # password is not relevant
501 return key(first) == key(second)
506 def db_connect(db_name):
509 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
510 currentThread().dbname = db_name
511 return Connection(_Pool, db_name)
513 def close_db(db_name):
514 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
517 _Pool.close_all(dsn(db_name))
519 if hasattr(ct, 'dbname'):
520 delattr(ct, 'dbname')
523 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: