1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 #.apidoc title: PostgreSQL interface
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
30 See also: the `pooler` module
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
36 __all__ = ['db_connect', 'close_db']
38 from functools import wraps
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
48 _logger = logging.getLogger(__name__)
56 def unbuffer(symb, cr):
57 if symb is None: return None
60 def undecimalize(symb, cr):
61 if symb is None: return None
64 for name, typeoid in types_mapping.items():
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
74 from inspect import currentframe
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
83 """Represents an open transaction to the PostgreSQL DB backend,
84 acting as a lightweight wrapper around psycopg2's
85 ``psycopg1cursor`` objects.
87 ``Cursor`` is the object behind the ``cr`` variable used all
88 over the OpenERP code.
90 .. rubric:: Transaction Isolation
92 One very important property of database transactions is the
93 level of isolation between concurrent transactions.
94 The SQL standard defines four levels of transaction isolation,
95 ranging from the most strict *Serializable* level, to the least
96 strict *Read Uncommitted* level. These levels are defined in
97 terms of the phenomena that must not occur between concurrent
98 transactions, such as *dirty read*, etc.
99 In the context of a generic business data management software
100 such as OpenERP, we need the best guarantees that no data
101 corruption can ever be cause by simply running multiple
102 transactions in parallel. Therefore, the preferred level would
103 be the *serializable* level, which ensures that a set of
104 transactions is guaranteed to produce the same effect as
105 running them one at a time in some order.
107 However, most database management systems implement a limited
108 serializable isolation in the form of
109 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110 providing most of the same advantages as True Serializability,
111 with a fraction of the performance cost.
112 With PostgreSQL up to version 9.0, this snapshot isolation was
113 the implementation of both the ``REPEATABLE READ`` and
114 ``SERIALIZABLE`` levels of the SQL standard.
115 As of PostgreSQL 9.1, the previous snapshot isolation implementation
116 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117 level was introduced, providing some additional heuristics to
118 detect a concurrent update by parallel transactions, and forcing
119 one of them to rollback.
121 OpenERP implements its own level of locking protection
122 for transactions that are highly likely to provoke concurrent
123 updates, such as stock reservations or document sequences updates.
124 Therefore we mostly care about the properties of snapshot isolation,
125 but we don't really need additional heuristics to trigger transaction
126 rollbacks, as we are taking care of triggering instant rollbacks
127 ourselves when it matters (and we can save the additional performance
128 hit of these heuristics).
130 As a result of the above, we have selected ``REPEATABLE READ`` as
131 the default transaction isolation level for OpenERP cursors, as
132 it will be mapped to the desired ``snapshot isolation`` level for
133 all supported PostgreSQL version (8.3 - 9.x).
135 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136 read level to serializable before sending it to the database, so it would
137 actually select the new serializable mode on PostgreSQL 9.1. Make
138 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139 the performance hit is a concern for you.
143 Cache dictionary with a "request" (-ish) lifecycle, only lives as
144 long as the cursor itself does and proactively cleared when the
147 This cache should *only* be used to store repeatable reads as it
148 ignores rollbacks and savepoints, it should not be used to store
149 *any* data which may be modified during the life of the cursor.
152 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
156 def wrapper(self, *args, **kwargs):
158 msg = 'Unable to use a closed cursor.'
160 msg += ' It was closed at %s, line %s' % self.__closer
161 raise psycopg2.OperationalError(msg)
162 return f(self, *args, **kwargs)
165 def __init__(self, pool, dbname, serialized=True):
166 self.sql_from_log = {}
167 self.sql_into_log = {}
169 # default log level determined at cursor creation, could be
170 # overridden later for debugging purposes
171 self.sql_log = _logger.isEnabledFor(logging.DEBUG)
173 self.sql_log_count = 0
174 self.__closed = True # avoid the call of close() (by __del__) if an exception
175 # is raised by any of the following initialisations
179 # Whether to enable snapshot isolation level for this cursor.
180 # see also the docstring of Cursor.
181 self._serialized = serialized
183 self._cnx = pool.borrow(dsn(dbname))
184 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
186 self.__caller = frame_codeinfo(currentframe(),2)
188 self.__caller = False
189 self.__closed = False # real initialisation value
190 self.autocommit(False)
191 self.__closer = False
193 self._default_log_exceptions = True
198 if not self.__closed and not self._cnx.closed:
199 # Oops. 'self' has not been closed explicitly.
200 # The cursor will be deleted by the garbage collector,
201 # but the database connection is not put back into the connection
202 # pool, preventing some operation on the database like dropping it.
203 # This can also lead to a server overload.
204 msg = "Cursor not closed explicitly\n"
206 msg += "Cursor was created at %s:%s" % self.__caller
208 msg += "Please enable sql debugging to trace the caller."
213 def execute(self, query, params=None, log_exceptions=None):
214 if '%d' in query or '%f' in query:
215 _logger.warning(query)
216 _logger.warning("SQL queries cannot contain %d or %f anymore. "
218 if params and not isinstance(params, (tuple, list, dict)):
219 _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
220 raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
226 params = params or None
227 res = self._obj.execute(query, params)
228 except psycopg2.ProgrammingError, pe:
229 if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
230 _logger.error("Programming error: %s, in query %s", pe, query)
233 if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
234 _logger.exception("bad query: %s", self._obj.query or query)
238 delay = mdt.now() - now
239 delay = delay.seconds * 1E6 + delay.microseconds
241 _logger.debug("query: %s", self._obj.query)
242 self.sql_log_count+=1
243 res_from = re_from.match(query.lower())
245 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
246 self.sql_from_log[res_from.group(1)][0] += 1
247 self.sql_from_log[res_from.group(1)][1] += delay
248 res_into = re_into.match(query.lower())
250 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
251 self.sql_into_log[res_into.group(1)][0] += 1
252 self.sql_into_log[res_into.group(1)][1] += delay
256 def split_for_in_conditions(self, ids):
257 """Split a list of identifiers into one or more smaller tuples
258 safe for IN conditions, after uniquifying them."""
259 return tools.misc.split_every(self.IN_MAX, set(ids))
263 sql_counter += self.sql_log_count
267 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
270 sqllogitems = sqllogs[type].items()
271 sqllogitems.sort(key=lambda k: k[1][1])
272 _logger.debug("SQL LOG %s:", type)
273 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
274 for r in sqllogitems:
275 delay = timedelta(microseconds=r[1][1])
276 _logger.debug("table: %s: %s/%s",
277 r[0], delay, r[1][0])
279 sqllogs[type].clear()
280 sum = timedelta(microseconds=sum)
281 _logger.debug("SUM %s:%s/%d [%d]",
282 type, sum, self.sql_log_count, sql_counter)
283 sqllogs[type].clear()
286 self.sql_log_count = 0
291 return self._close(False)
293 def _close(self, leak=False):
300 self.__closer = frame_codeinfo(currentframe(),3)
305 # This force the cursor to be freed, and thus, available again. It is
306 # important because otherwise we can overload the server very easily
307 # because of a cursor shortage (because cursors are not garbage
308 # collected as fast as they should). The problem is probably due in
309 # part because browse records keep a reference to the cursor.
313 # Clean the underlying connection.
317 self._cnx.leaked = True
319 chosen_template = tools.config['db_template']
320 templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
321 keep_in_pool = self.dbname not in templates_list
322 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
325 def autocommit(self, on):
327 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
329 # If a serializable cursor was requested, we
330 # use the appropriate PotsgreSQL isolation level
331 # that maps to snaphsot isolation.
332 # For all supported PostgreSQL versions (8.3-9.x),
333 # this is currently the ISOLATION_REPEATABLE_READ.
334 # See also the docstring of this class.
335 # NOTE: up to psycopg 2.4.2, repeatable read
336 # is remapped to serializable before being
337 # sent to the database, so it is in fact
338 # unavailable for use with pg 9.1.
339 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
340 if self._serialized \
341 else ISOLATION_LEVEL_READ_COMMITTED
342 self._cnx.set_isolation_level(isolation_level)
346 """ Perform an SQL `COMMIT`
348 return self._cnx.commit()
352 """ Perform an SQL `ROLLBACK`
354 return self._cnx.rollback()
357 def __getattr__(self, name):
358 return getattr(self._obj, name)
360 """ Set the mode of postgres operations for all cursors
362 """Obtain the mode of postgres operations for all cursors
365 class PsycoConnection(psycopg2.extensions.connection):
368 class ConnectionPool(object):
369 """ The pool of connections to database(s)
371 Keep a set of connections to pg databases open, and reuse them
372 to open cursors for all transactions.
374 The connections are *not* automatically closed. Only a close_db()
380 def _locked(self, *args, **kwargs):
383 return fun(self, *args, **kwargs)
389 def __init__(self, maxconn=64):
390 self._connections = []
391 self._maxconn = max(maxconn, 1)
392 self._lock = threading.Lock()
395 used = len([1 for c, u in self._connections[:] if u])
396 count = len(self._connections)
397 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
399 def _debug(self, msg, *args):
400 _logger.debug(('%r ' + msg), self, *args)
403 def borrow(self, dsn):
404 self._debug('Borrow connection to %r', dsn)
406 # free leaked connections
407 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
409 self._connections.pop(i)
410 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
412 if getattr(cnx, 'leaked', False):
413 delattr(cnx, 'leaked')
414 self._connections.pop(i)
415 self._connections.append((cnx, False))
416 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
418 for i, (cnx, used) in enumerate(self._connections):
419 if not used and dsn_are_equals(cnx.dsn, dsn):
420 self._connections.pop(i)
421 self._connections.append((cnx, True))
422 self._debug('Existing connection found at index %d', i)
426 if len(self._connections) >= self._maxconn:
427 # try to remove the oldest connection not used
428 for i, (cnx, used) in enumerate(self._connections):
430 self._connections.pop(i)
431 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
434 # note: this code is called only if the for loop has completed (no break)
435 raise PoolError('The Connection Pool Is Full')
438 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
439 except psycopg2.Error:
440 _logger.exception('Connection to the database failed')
442 self._connections.append((result, True))
443 self._debug('Create new connection')
447 def give_back(self, connection, keep_in_pool=True):
448 self._debug('Give back connection to %r', connection.dsn)
449 for i, (cnx, used) in enumerate(self._connections):
450 if cnx is connection:
451 self._connections.pop(i)
453 self._connections.append((cnx, False))
454 self._debug('Put connection to %r in pool', cnx.dsn)
456 self._debug('Forgot connection to %r', cnx.dsn)
460 raise PoolError('This connection does not below to the pool')
463 def close_all(self, dsn):
464 _logger.info('%r: Close all connections to %r', self, dsn)
465 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
466 if dsn_are_equals(cnx.dsn, dsn):
468 self._connections.pop(i)
471 class Connection(object):
472 """ A lightweight instance of a connection to postgres
475 def __init__(self, pool, dbname):
479 def cursor(self, serialized=True):
480 cursor_type = serialized and 'serialized ' or ''
481 _logger.debug('create %scursor to %r', cursor_type, self.dbname)
482 return Cursor(self._pool, self.dbname, serialized=serialized)
484 # serialized_cursor is deprecated - cursors are serialized by default
485 serialized_cursor = cursor
487 def __nonzero__(self):
488 """Check if connection is possible"""
490 _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
499 for p in ('host', 'port', 'user', 'password'):
500 cfg = tools.config['db_' + p]
502 _dsn += '%s=%s ' % (p, cfg)
504 return '%sdbname=%s' % (_dsn, db_name)
506 def dsn_are_equals(first, second):
508 k = dict(x.split('=', 1) for x in dsn.strip().split())
509 k.pop('password', None) # password is not relevant
511 return key(first) == key(second)
516 def db_connect(db_name):
519 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
520 currentThread().dbname = db_name
521 return Connection(_Pool, db_name)
523 def close_db(db_name):
525 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
527 _Pool.close_all(dsn(db_name))
529 if hasattr(ct, 'dbname'):
530 delattr(ct, 'dbname')
533 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: