1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 # Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 #.apidoc title: PostgreSQL interface
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
30 See also: the `pooler` module
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
36 __all__ = ['db_connect', 'close_db']
38 from threading import currentThread
40 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE,\
41 ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.psycopg1 import cursor as psycopg1cursor
43 from psycopg2.pool import PoolError
45 import psycopg2.extensions
48 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
56 def unbuffer(symb, cr):
57 if symb is None: return None
60 def undecimalize(symb, cr):
61 if symb is None: return None
64 for name, typeoid in types_mapping.items():
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
70 from tools.func import wraps, frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
74 from inspect import currentframe
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
83 """Represents an open transaction to the PostgreSQL DB backend,
84 acting as a lightweight wrapper around psycopg2's
85 ``psycopg1cursor`` objects.
87 ``Cursor`` is the object behind the ``cr`` variable used all
88 over the OpenERP code.
90 .. rubric:: Transaction Isolation
92 One very important property of database transactions is the
93 level of isolation between concurrent transactions.
94 The SQL standard defines four levels of transaction isolation,
95 ranging from the most strict *Serializable* level, to the least
96 strict *Read Uncommitted* level. These levels are defined in
97 terms of the phenomena that must not occur between concurrent
98 transactions, such as *dirty read*, etc.
99 In the context of a generic business data management software
100 such as OpenERP, we need the best guarantees that no data
101 corruption can ever be cause by simply running multiple
102 transactions in parallel. Therefore, the preferred level would
103 be the *serializable* level, which ensures that a set of
104 transactions is guaranteed to produce the same effect as
105 running them one at a time in some order.
107 However, most database management systems implement a limited
108 serializable isolation in the form of
109 `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110 providing most of the same advantages as True Serializability,
111 with a fraction of the performance cost.
112 With PostgreSQL up to version 9.0, this snapshot isolation was
113 the implementation of both the ``REPEATABLE READ`` and
114 ``SERIALIZABLE`` levels of the SQL standard.
115 As of PostgreSQL 9.1, the previous snapshot isolation implementation
116 was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117 level was introduced, providing some additional heuristics to
118 detect a concurrent update by parallel transactions, and forcing
119 one of them to rollback.
121 OpenERP implements its own level of locking protection
122 for transactions that are highly likely to provoke concurrent
123 updates, such as stock reservations or document sequences updates.
124 Therefore we mostly care about the properties of snapshot isolation,
125 but we don't really need additional heuristics to trigger transaction
126 rollbacks, as we are taking care of triggering instant rollbacks
127 ourselves when it matters (and we can save the additional performance
128 hit of these heuristics).
130 As a result of the above, we have selected ``REPEATABLE READ`` as
131 the default transaction isolation level for OpenERP cursors, as
132 it will be mapped to the desired ``snapshot isolation`` level for
133 all supported PostgreSQL version (8.3 - 9.x).
135 Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136 read level to serializable before sending it to the database, so it would
137 actually select the new serializable mode on PostgreSQL 9.1. Make
138 sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139 the performance hit is a concern for you.
142 IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147 def wrapper(self, *args, **kwargs):
149 msg = 'Unable to use a closed cursor.'
151 msg += ' It was closed at %s, line %s' % self.__closer
152 raise psycopg2.OperationalError(msg)
153 return f(self, *args, **kwargs)
156 def __init__(self, pool, dbname, serialized=True):
157 if self.__class__.__logger is None:
158 self.__class__.__logger = logging.getLogger('db.cursor')
159 self.sql_from_log = {}
160 self.sql_into_log = {}
162 # default log level determined at cursor creation, could be
163 # overridden later for debugging purposes
164 self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
166 self.sql_log_count = 0
167 self.__closed = True # avoid the call of close() (by __del__) if an exception
168 # is raised by any of the following initialisations
172 # Whether to enable snapshot isolation level for this cursor.
173 # see also the docstring of Cursor.
174 self._serialized = serialized
176 self._cnx = pool.borrow(dsn(dbname))
177 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
178 self.__closed = False # real initialisation value
179 self.autocommit(False)
181 self.__caller = frame_codeinfo(currentframe(),2)
183 self.__caller = False
184 self.__closer = False
186 self._default_log_exceptions = True
189 if not self.__closed:
190 # Oops. 'self' has not been closed explicitly.
191 # The cursor will be deleted by the garbage collector,
192 # but the database connection is not put back into the connection
193 # pool, preventing some operation on the database like dropping it.
194 # This can also lead to a server overload.
195 msg = "Cursor not closed explicitly\n"
197 msg += "Cursor was created at %s:%s" % self.__caller
199 msg += "Please enable sql debugging to trace the caller."
200 self.__logger.warn(msg)
204 def execute(self, query, params=None, log_exceptions=None):
205 if '%d' in query or '%f' in query:
206 self.__logger.warn(query)
207 self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
214 params = params or None
215 res = self._obj.execute(query, params)
216 except psycopg2.ProgrammingError, pe:
217 if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
218 self.__logger.error("Programming error: %s, in query %s", pe, query)
221 if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
222 self.__logger.exception("bad query: %s", self._obj.query or query)
226 delay = mdt.now() - now
227 delay = delay.seconds * 1E6 + delay.microseconds
229 self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
230 self.sql_log_count+=1
231 res_from = re_from.match(query.lower())
233 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
234 self.sql_from_log[res_from.group(1)][0] += 1
235 self.sql_from_log[res_from.group(1)][1] += delay
236 res_into = re_into.match(query.lower())
238 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
239 self.sql_into_log[res_into.group(1)][0] += 1
240 self.sql_into_log[res_into.group(1)][1] += delay
244 def split_for_in_conditions(self, ids):
245 """Split a list of identifiers into one or more smaller tuples
246 safe for IN conditions, after uniquifying them."""
247 return tools.misc.split_every(self.IN_MAX, set(ids))
251 sql_counter += self.sql_log_count
255 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
258 sqllogitems = sqllogs[type].items()
259 sqllogitems.sort(key=lambda k: k[1][1])
260 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
261 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
262 for r in sqllogitems:
263 delay = timedelta(microseconds=r[1][1])
264 self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
265 r[0], delay, r[1][0])
267 sqllogs[type].clear()
268 sum = timedelta(microseconds=sum)
269 self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
270 type, sum, self.sql_log_count, sql_counter)
271 sqllogs[type].clear()
274 self.sql_log_count = 0
279 return self._close(False)
281 def _close(self, leak=False):
286 self.__closer = frame_codeinfo(currentframe(),3)
291 # This force the cursor to be freed, and thus, available again. It is
292 # important because otherwise we can overload the server very easily
293 # because of a cursor shortage (because cursors are not garbage
294 # collected as fast as they should). The problem is probably due in
295 # part because browse records keep a reference to the cursor.
299 # Clean the underlying connection.
303 self._cnx.leaked = True
305 keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
306 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
309 def autocommit(self, on):
311 isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
313 # If a serializable cursor was requested, we
314 # use the appropriate PotsgreSQL isolation level
315 # that maps to snaphsot isolation.
316 # For all supported PostgreSQL versions (8.3-9.x),
317 # this is currently the ISOLATION_REPEATABLE_READ.
318 # See also the docstring of this class.
319 # NOTE: up to psycopg 2.4.2, repeatable read
320 # is remapped to serializable before being
321 # sent to the database, so it is in fact
322 # unavailable for use with pg 9.1.
323 isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
324 if self._serialized \
325 else ISOLATION_LEVEL_READ_COMMITTED
326 self._cnx.set_isolation_level(isolation_level)
330 """ Perform an SQL `COMMIT`
332 return self._cnx.commit()
336 """ Perform an SQL `ROLLBACK`
338 return self._cnx.rollback()
341 def __getattr__(self, name):
342 return getattr(self._obj, name)
344 """ Set the mode of postgres operations for all cursors
346 """Obtain the mode of postgres operations for all cursors
349 class PsycoConnection(psycopg2.extensions.connection):
352 class ConnectionPool(object):
353 """ The pool of connections to database(s)
355 Keep a set of connections to pg databases open, and reuse them
356 to open cursors for all transactions.
358 The connections are *not* automatically closed. Only a close_db()
361 __logger = logging.getLogger('db.connection_pool')
365 def _locked(self, *args, **kwargs):
368 return fun(self, *args, **kwargs)
374 def __init__(self, maxconn=64):
375 self._connections = []
376 self._maxconn = max(maxconn, 1)
377 self._lock = threading.Lock()
380 used = len([1 for c, u in self._connections[:] if u])
381 count = len(self._connections)
382 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
384 def _debug(self, msg, *args):
385 self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
388 def borrow(self, dsn):
389 self._debug('Borrow connection to %r', dsn)
391 # free leaked connections
392 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
393 if getattr(cnx, 'leaked', False):
394 delattr(cnx, 'leaked')
395 self._connections.pop(i)
396 self._connections.append((cnx, False))
397 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
399 for i, (cnx, used) in enumerate(self._connections):
400 if not used and dsn_are_equals(cnx.dsn, dsn):
401 self._connections.pop(i)
402 self._connections.append((cnx, True))
403 self._debug('Existing connection found at index %d', i)
407 if len(self._connections) >= self._maxconn:
408 # try to remove the oldest connection not used
409 for i, (cnx, used) in enumerate(self._connections):
411 self._connections.pop(i)
412 self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
415 # note: this code is called only if the for loop has completed (no break)
416 raise PoolError('The Connection Pool Is Full')
419 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
420 except psycopg2.Error, e:
421 self.__logger.exception('Connection to the database failed')
423 self._connections.append((result, True))
424 self._debug('Create new connection')
428 def give_back(self, connection, keep_in_pool=True):
429 self._debug('Give back connection to %r', connection.dsn)
430 for i, (cnx, used) in enumerate(self._connections):
431 if cnx is connection:
432 self._connections.pop(i)
434 self._connections.append((cnx, False))
435 self._debug('Put connection to %r in pool', cnx.dsn)
437 self._debug('Forgot connection to %r', cnx.dsn)
441 raise PoolError('This connection does not below to the pool')
444 def close_all(self, dsn):
445 self.__logger.info('%r: Close all connections to %r', self, dsn)
446 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
447 if dsn_are_equals(cnx.dsn, dsn):
449 self._connections.pop(i)
452 class Connection(object):
453 """ A lightweight instance of a connection to postgres
455 __logger = logging.getLogger('db.connection')
457 def __init__(self, pool, dbname):
461 def cursor(self, serialized=True):
462 cursor_type = serialized and 'serialized ' or ''
463 self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
464 return Cursor(self._pool, self.dbname, serialized=serialized)
466 # serialized_cursor is deprecated - cursors are serialized by default
467 serialized_cursor = cursor
469 def __nonzero__(self):
470 """Check if connection is possible"""
472 warnings.warn("You use an expensive function to test a connection.",
473 DeprecationWarning, stacklevel=1)
482 for p in ('host', 'port', 'user', 'password'):
483 cfg = tools.config['db_' + p]
485 _dsn += '%s=%s ' % (p, cfg)
487 return '%sdbname=%s' % (_dsn, db_name)
489 def dsn_are_equals(first, second):
491 k = dict(x.split('=', 1) for x in dsn.strip().split())
492 k.pop('password', None) # password is not relevant
494 return key(first) == key(second)
499 def db_connect(db_name):
502 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
503 currentThread().dbname = db_name
504 return Connection(_Pool, db_name)
506 def close_db(db_name):
507 """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
508 _Pool.close_all(dsn(db_name))
510 if hasattr(ct, 'dbname'):
511 delattr(ct, 'dbname')
514 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: