[FIX] registry: another pass of cleanup for registry signaling
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
45
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
47
48 _logger = logging.getLogger(__name__)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141         .. attribute:: cache
142
143             Cache dictionary with a "request" (-ish) lifecycle, only lives as
144             long as the cursor itself does and proactively cleared when the
145             cursor is closed.
146
147             This cache should *only* be used to store repeatable reads as it
148             ignores rollbacks and savepoints, it should not be used to store
149             *any* data which may be modified during the life of the cursor.
150
151     """
152     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
153
154     def check(f):
155         @wraps(f)
156         def wrapper(self, *args, **kwargs):
157             if self.__closed:
158                 msg = 'Unable to use a closed cursor.'
159                 if self.__closer:
160                     msg += ' It was closed at %s, line %s' % self.__closer
161                 raise psycopg2.OperationalError(msg)
162             return f(self, *args, **kwargs)
163         return wrapper
164
165     def __init__(self, pool, dbname, serialized=True):
166         self.sql_from_log = {}
167         self.sql_into_log = {}
168
169         # default log level determined at cursor creation, could be
170         # overridden later for debugging purposes
171         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
172
173         self.sql_log_count = 0
174         self.__closed = True    # avoid the call of close() (by __del__) if an exception
175                                 # is raised by any of the following initialisations
176         self._pool = pool
177         self.dbname = dbname
178
179         # Whether to enable snapshot isolation level for this cursor.
180         # see also the docstring of Cursor.  
181         self._serialized = serialized
182
183         self._cnx = pool.borrow(dsn(dbname))
184         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
185         if self.sql_log:
186             self.__caller = frame_codeinfo(currentframe(),2)
187         else:
188             self.__caller = False
189         self.__closed = False   # real initialisation value
190         self.autocommit(False)
191         self.__closer = False
192
193         self._default_log_exceptions = True
194
195         self.cache = {}
196
197     def __del__(self):
198         if not self.__closed and not self._cnx.closed:
199             # Oops. 'self' has not been closed explicitly.
200             # The cursor will be deleted by the garbage collector,
201             # but the database connection is not put back into the connection
202             # pool, preventing some operation on the database like dropping it.
203             # This can also lead to a server overload.
204             msg = "Cursor not closed explicitly\n"
205             if self.__caller:
206                 msg += "Cursor was created at %s:%s" % self.__caller
207             else:
208                 msg += "Please enable sql debugging to trace the caller."
209             _logger.warning(msg)
210             self._close(True)
211
212     @check
213     def execute(self, query, params=None, log_exceptions=None):
214         if '%d' in query or '%f' in query:
215             _logger.warning(query)
216             _logger.warning("SQL queries cannot contain %d or %f anymore. "
217                          "Use only %s")
218         if params and not isinstance(params, (tuple, list, dict)):
219             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
220             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
221
222         if self.sql_log:
223             now = mdt.now()
224
225         try:
226             params = params or None
227             res = self._obj.execute(query, params)
228         except psycopg2.ProgrammingError, pe:
229             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
230                 _logger.error("Programming error: %s, in query %s", pe, query)
231             raise
232         except Exception:
233             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
234                 _logger.exception("bad query: %s", self._obj.query or query)
235             raise
236
237         if self.sql_log:
238             delay = mdt.now() - now
239             delay = delay.seconds * 1E6 + delay.microseconds
240
241             _logger.debug("query: %s", self._obj.query)
242             self.sql_log_count+=1
243             res_from = re_from.match(query.lower())
244             if res_from:
245                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
246                 self.sql_from_log[res_from.group(1)][0] += 1
247                 self.sql_from_log[res_from.group(1)][1] += delay
248             res_into = re_into.match(query.lower())
249             if res_into:
250                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
251                 self.sql_into_log[res_into.group(1)][0] += 1
252                 self.sql_into_log[res_into.group(1)][1] += delay
253         return res
254
255
256     def split_for_in_conditions(self, ids):
257         """Split a list of identifiers into one or more smaller tuples
258            safe for IN conditions, after uniquifying them."""
259         return tools.misc.split_every(self.IN_MAX, set(ids))
260
261     def print_log(self):
262         global sql_counter
263         sql_counter += self.sql_log_count
264         if not self.sql_log:
265             return
266         def process(type):
267             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
268             sum = 0
269             if sqllogs[type]:
270                 sqllogitems = sqllogs[type].items()
271                 sqllogitems.sort(key=lambda k: k[1][1])
272                 _logger.debug("SQL LOG %s:", type)
273                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
274                 for r in sqllogitems:
275                     delay = timedelta(microseconds=r[1][1])
276                     _logger.debug("table: %s: %s/%s",
277                                         r[0], delay, r[1][0])
278                     sum+= r[1][1]
279                 sqllogs[type].clear()
280             sum = timedelta(microseconds=sum)
281             _logger.debug("SUM %s:%s/%d [%d]",
282                                 type, sum, self.sql_log_count, sql_counter)
283             sqllogs[type].clear()
284         process('from')
285         process('into')
286         self.sql_log_count = 0
287         self.sql_log = False
288
289     @check
290     def close(self):
291         return self._close(False)
292
293     def _close(self, leak=False):
294         if not self._obj:
295             return
296
297         del self.cache
298
299         if self.sql_log:
300             self.__closer = frame_codeinfo(currentframe(),3)
301         self.print_log()
302
303         self._obj.close()
304
305         # This force the cursor to be freed, and thus, available again. It is
306         # important because otherwise we can overload the server very easily
307         # because of a cursor shortage (because cursors are not garbage
308         # collected as fast as they should). The problem is probably due in
309         # part because browse records keep a reference to the cursor.
310         del self._obj
311         self.__closed = True
312
313         # Clean the underlying connection.
314         self._cnx.rollback()
315
316         if leak:
317             self._cnx.leaked = True
318         else:
319             chosen_template = tools.config['db_template']
320             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
321             keep_in_pool = self.dbname not in templates_list
322             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
323
324     @check
325     def autocommit(self, on):
326         if on:
327             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
328         else:
329             # If a serializable cursor was requested, we
330             # use the appropriate PotsgreSQL isolation level
331             # that maps to snaphsot isolation.
332             # For all supported PostgreSQL versions (8.3-9.x),
333             # this is currently the ISOLATION_REPEATABLE_READ.
334             # See also the docstring of this class.
335             # NOTE: up to psycopg 2.4.2, repeatable read
336             #       is remapped to serializable before being
337             #       sent to the database, so it is in fact
338             #       unavailable for use with pg 9.1.
339             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
340                                   if self._serialized \
341                                   else ISOLATION_LEVEL_READ_COMMITTED
342         self._cnx.set_isolation_level(isolation_level)
343
344     @check
345     def commit(self):
346         """ Perform an SQL `COMMIT`
347         """
348         return self._cnx.commit()
349
350     @check
351     def rollback(self):
352         """ Perform an SQL `ROLLBACK`
353         """
354         return self._cnx.rollback()
355
356     @check
357     def __getattr__(self, name):
358         return getattr(self._obj, name)
359
360 class PsycoConnection(psycopg2.extensions.connection):
361     pass
362
363 class ConnectionPool(object):
364     """ The pool of connections to database(s)
365     
366         Keep a set of connections to pg databases open, and reuse them
367         to open cursors for all transactions.
368         
369         The connections are *not* automatically closed. Only a close_db()
370         can trigger that.
371     """
372
373     def locked(fun):
374         @wraps(fun)
375         def _locked(self, *args, **kwargs):
376             self._lock.acquire()
377             try:
378                 return fun(self, *args, **kwargs)
379             finally:
380                 self._lock.release()
381         return _locked
382
383
384     def __init__(self, maxconn=64):
385         self._connections = []
386         self._maxconn = max(maxconn, 1)
387         self._lock = threading.Lock()
388
389     def __repr__(self):
390         used = len([1 for c, u in self._connections[:] if u])
391         count = len(self._connections)
392         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
393
394     def _debug(self, msg, *args):
395         _logger.debug(('%r ' + msg), self, *args)
396
397     @locked
398     def borrow(self, dsn):
399         self._debug('Borrow connection to %r', dsn)
400
401         # free dead and leaked connections
402         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
403             if cnx.closed:
404                 self._connections.pop(i)
405                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
406                 continue
407             if getattr(cnx, 'leaked', False):
408                 delattr(cnx, 'leaked')
409                 self._connections.pop(i)
410                 self._connections.append((cnx, False))
411                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
412
413         for i, (cnx, used) in enumerate(self._connections):
414             if not used and dsn_are_equals(cnx.dsn, dsn):
415                 try:
416                     cnx.reset()
417                 except psycopg2.OperationalError:
418                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
419                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
420                     if not cnx.closed:
421                         cnx.close()
422                     continue
423                 self._connections.pop(i)
424                 self._connections.append((cnx, True))
425                 self._debug('Existing connection found at index %d', i)
426
427                 return cnx
428
429         if len(self._connections) >= self._maxconn:
430             # try to remove the oldest connection not used
431             for i, (cnx, used) in enumerate(self._connections):
432                 if not used:
433                     self._connections.pop(i)
434                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
435                     break
436             else:
437                 # note: this code is called only if the for loop has completed (no break)
438                 raise PoolError('The Connection Pool Is Full')
439
440         try:
441             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
442         except psycopg2.Error:
443             _logger.exception('Connection to the database failed')
444             raise
445         self._connections.append((result, True))
446         self._debug('Create new connection')
447         return result
448
449     @locked
450     def give_back(self, connection, keep_in_pool=True):
451         self._debug('Give back connection to %r', connection.dsn)
452         for i, (cnx, used) in enumerate(self._connections):
453             if cnx is connection:
454                 self._connections.pop(i)
455                 if keep_in_pool:
456                     self._connections.append((cnx, False))
457                     self._debug('Put connection to %r in pool', cnx.dsn)
458                 else:
459                     self._debug('Forgot connection to %r', cnx.dsn)
460                     cnx.close()
461                 break
462         else:
463             raise PoolError('This connection does not below to the pool')
464
465     @locked
466     def close_all(self, dsn):
467         _logger.info('%r: Close all connections to %r', self, dsn)
468         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
469             if dsn_are_equals(cnx.dsn, dsn):
470                 cnx.close()
471                 self._connections.pop(i)
472
473
474 class Connection(object):
475     """ A lightweight instance of a connection to postgres
476     """
477
478     def __init__(self, pool, dbname):
479         self.dbname = dbname
480         self._pool = pool
481
482     def cursor(self, serialized=True):
483         cursor_type = serialized and 'serialized ' or ''
484         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
485         return Cursor(self._pool, self.dbname, serialized=serialized)
486
487     # serialized_cursor is deprecated - cursors are serialized by default
488     serialized_cursor = cursor
489
490     def __nonzero__(self):
491         """Check if connection is possible"""
492         try:
493             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
494             cr = self.cursor()
495             cr.close()
496             return True
497         except Exception:
498             return False
499
500 def dsn(db_name):
501     _dsn = ''
502     for p in ('host', 'port', 'user', 'password'):
503         cfg = tools.config['db_' + p]
504         if cfg:
505             _dsn += '%s=%s ' % (p, cfg)
506
507     return '%sdbname=%s' % (_dsn, db_name)
508
509 def dsn_are_equals(first, second):
510     def key(dsn):
511         k = dict(x.split('=', 1) for x in dsn.strip().split())
512         k.pop('password', None) # password is not relevant
513         return k
514     return key(first) == key(second)
515
516
517 _Pool = None
518
519 def db_connect(db_name):
520     global _Pool
521     if _Pool is None:
522         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
523     currentThread().dbname = db_name
524     return Connection(_Pool, db_name)
525
526 def close_db(db_name):
527     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
528     global _Pool
529     if _Pool:
530         _Pool.close_all(dsn(db_name))
531     ct = currentThread()
532     if hasattr(ct, 'dbname'):
533         delattr(ct, 'dbname')
534
535
536 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
537