Merge remote-tracking branch 'odoo/7.0' into 7.0
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44
45 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
46
47 _logger = logging.getLogger(__name__)
48
49 types_mapping = {
50     'date': (1082,),
51     'time': (1083,),
52     'datetime': (1114,),
53 }
54
55 def unbuffer(symb, cr):
56     if symb is None: return None
57     return str(symb)
58
59 def undecimalize(symb, cr):
60     if symb is None: return None
61     return float(symb)
62
63 for name, typeoid in types_mapping.items():
64     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
66
67
68 import tools
69 from tools.func import frame_codeinfo
70 from datetime import datetime as mdt
71 from datetime import timedelta
72 import threading
73 from inspect import currentframe
74
75 import re
76 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
77 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
78
79 sql_counter = 0
80
81 class Cursor(object):
82     """Represents an open transaction to the PostgreSQL DB backend,
83        acting as a lightweight wrapper around psycopg2's
84        ``psycopg1cursor`` objects.
85
86         ``Cursor`` is the object behind the ``cr`` variable used all
87         over the OpenERP code.
88
89         .. rubric:: Transaction Isolation
90
91         One very important property of database transactions is the
92         level of isolation between concurrent transactions.
93         The SQL standard defines four levels of transaction isolation,
94         ranging from the most strict *Serializable* level, to the least
95         strict *Read Uncommitted* level. These levels are defined in
96         terms of the phenomena that must not occur between concurrent
97         transactions, such as *dirty read*, etc.
98         In the context of a generic business data management software
99         such as OpenERP, we need the best guarantees that no data
100         corruption can ever be cause by simply running multiple
101         transactions in parallel. Therefore, the preferred level would
102         be the *serializable* level, which ensures that a set of
103         transactions is guaranteed to produce the same effect as
104         running them one at a time in some order.
105
106         However, most database management systems implement a limited
107         serializable isolation in the form of
108         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
109         providing most of the same advantages as True Serializability,
110         with a fraction of the performance cost.
111         With PostgreSQL up to version 9.0, this snapshot isolation was
112         the implementation of both the ``REPEATABLE READ`` and
113         ``SERIALIZABLE`` levels of the SQL standard.
114         As of PostgreSQL 9.1, the previous snapshot isolation implementation
115         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
116         level was introduced, providing some additional heuristics to
117         detect a concurrent update by parallel transactions, and forcing
118         one of them to rollback.
119
120         OpenERP implements its own level of locking protection
121         for transactions that are highly likely to provoke concurrent
122         updates, such as stock reservations or document sequences updates.
123         Therefore we mostly care about the properties of snapshot isolation,
124         but we don't really need additional heuristics to trigger transaction
125         rollbacks, as we are taking care of triggering instant rollbacks
126         ourselves when it matters (and we can save the additional performance
127         hit of these heuristics).
128
129         As a result of the above, we have selected ``REPEATABLE READ`` as
130         the default transaction isolation level for OpenERP cursors, as
131         it will be mapped to the desired ``snapshot isolation`` level for
132         all supported PostgreSQL version (8.3 - 9.x).
133
134         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
135         read level to serializable before sending it to the database, so it would
136         actually select the new serializable mode on PostgreSQL 9.1. Make
137         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
138         the performance hit is a concern for you.
139
140         .. attribute:: cache
141
142             Cache dictionary with a "request" (-ish) lifecycle, only lives as
143             long as the cursor itself does and proactively cleared when the
144             cursor is closed.
145
146             This cache should *only* be used to store repeatable reads as it
147             ignores rollbacks and savepoints, it should not be used to store
148             *any* data which may be modified during the life of the cursor.
149
150     """
151     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
152
153     def check(f):
154         @wraps(f)
155         def wrapper(self, *args, **kwargs):
156             if self.__closed:
157                 msg = 'Unable to use a closed cursor.'
158                 if self.__closer:
159                     msg += ' It was closed at %s, line %s' % self.__closer
160                 raise psycopg2.OperationalError(msg)
161             return f(self, *args, **kwargs)
162         return wrapper
163
164     def __init__(self, pool, dbname, serialized=True):
165         self.sql_from_log = {}
166         self.sql_into_log = {}
167
168         # default log level determined at cursor creation, could be
169         # overridden later for debugging purposes
170         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
171
172         self.sql_log_count = 0
173         self.__closed = True    # avoid the call of close() (by __del__) if an exception
174                                 # is raised by any of the following initialisations
175         self.__pool = pool
176         self.dbname = dbname
177
178         # Whether to enable snapshot isolation level for this cursor.
179         # see also the docstring of Cursor.  
180         self._serialized = serialized
181
182         self._cnx = pool.borrow(dsn(dbname))
183         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
184         if self.sql_log:
185             self.__caller = frame_codeinfo(currentframe(),2)
186         else:
187             self.__caller = False
188         self.__closed = False   # real initialisation value
189         self.autocommit(False)
190         self.__closer = False
191
192         self._default_log_exceptions = True
193
194         self.cache = {}
195
196     def __del__(self):
197         if not self.__closed and not self._cnx.closed:
198             # Oops. 'self' has not been closed explicitly.
199             # The cursor will be deleted by the garbage collector,
200             # but the database connection is not put back into the connection
201             # pool, preventing some operation on the database like dropping it.
202             # This can also lead to a server overload.
203             msg = "Cursor not closed explicitly\n"
204             if self.__caller:
205                 msg += "Cursor was created at %s:%s" % self.__caller
206             else:
207                 msg += "Please enable sql debugging to trace the caller."
208             _logger.warning(msg)
209             self._close(True)
210
211     @check
212     def execute(self, query, params=None, log_exceptions=None):
213         if '%d' in query or '%f' in query:
214             _logger.warning(query)
215             _logger.warning("SQL queries cannot contain %d or %f anymore. "
216                          "Use only %s")
217         if params and not isinstance(params, (tuple, list, dict)):
218             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
219             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
220
221         if self.sql_log:
222             now = mdt.now()
223
224         try:
225             params = params or None
226             res = self._obj.execute(query, params)
227         except psycopg2.ProgrammingError, pe:
228             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
229                 _logger.error("Programming error: %s, in query %s", pe, query)
230             raise
231         except Exception:
232             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
233                 _logger.exception("bad query: %s", self._obj.query or query)
234             raise
235
236         if self.sql_log:
237             delay = mdt.now() - now
238             delay = delay.seconds * 1E6 + delay.microseconds
239
240             _logger.debug("query: %s", self._obj.query)
241             self.sql_log_count+=1
242             res_from = re_from.match(query.lower())
243             if res_from:
244                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
245                 self.sql_from_log[res_from.group(1)][0] += 1
246                 self.sql_from_log[res_from.group(1)][1] += delay
247             res_into = re_into.match(query.lower())
248             if res_into:
249                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
250                 self.sql_into_log[res_into.group(1)][0] += 1
251                 self.sql_into_log[res_into.group(1)][1] += delay
252         return res
253
254
255     def split_for_in_conditions(self, ids):
256         """Split a list of identifiers into one or more smaller tuples
257            safe for IN conditions, after uniquifying them."""
258         return tools.misc.split_every(self.IN_MAX, set(ids))
259
260     def print_log(self):
261         global sql_counter
262         sql_counter += self.sql_log_count
263         if not self.sql_log:
264             return
265         def process(type):
266             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
267             sum = 0
268             if sqllogs[type]:
269                 sqllogitems = sqllogs[type].items()
270                 sqllogitems.sort(key=lambda k: k[1][1])
271                 _logger.debug("SQL LOG %s:", type)
272                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
273                 for r in sqllogitems:
274                     delay = timedelta(microseconds=r[1][1])
275                     _logger.debug("table: %s: %s/%s",
276                                         r[0], delay, r[1][0])
277                     sum+= r[1][1]
278                 sqllogs[type].clear()
279             sum = timedelta(microseconds=sum)
280             _logger.debug("SUM %s:%s/%d [%d]",
281                                 type, sum, self.sql_log_count, sql_counter)
282             sqllogs[type].clear()
283         process('from')
284         process('into')
285         self.sql_log_count = 0
286         self.sql_log = False
287
288     @check
289     def close(self):
290         return self._close(False)
291
292     def _close(self, leak=False):
293         if not self._obj:
294             return
295
296         del self.cache
297
298         if self.sql_log:
299             self.__closer = frame_codeinfo(currentframe(),3)
300         self.print_log()
301
302         self._obj.close()
303
304         # This force the cursor to be freed, and thus, available again. It is
305         # important because otherwise we can overload the server very easily
306         # because of a cursor shortage (because cursors are not garbage
307         # collected as fast as they should). The problem is probably due in
308         # part because browse records keep a reference to the cursor.
309         del self._obj
310         self.__closed = True
311
312         # Clean the underlying connection.
313         self._cnx.rollback()
314
315         if leak:
316             self._cnx.leaked = True
317         else:
318             chosen_template = tools.config['db_template']
319             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
320             keep_in_pool = self.dbname not in templates_list
321             self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
322
323     @check
324     def autocommit(self, on):
325         if on:
326             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
327         else:
328             # If a serializable cursor was requested, we
329             # use the appropriate PotsgreSQL isolation level
330             # that maps to snaphsot isolation.
331             # For all supported PostgreSQL versions (8.3-9.x),
332             # this is currently the ISOLATION_REPEATABLE_READ.
333             # See also the docstring of this class.
334             # NOTE: up to psycopg 2.4.2, repeatable read
335             #       is remapped to serializable before being
336             #       sent to the database, so it is in fact
337             #       unavailable for use with pg 9.1.
338             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
339                                   if self._serialized \
340                                   else ISOLATION_LEVEL_READ_COMMITTED
341         self._cnx.set_isolation_level(isolation_level)
342
343     @check
344     def commit(self):
345         """ Perform an SQL `COMMIT`
346         """
347         return self._cnx.commit()
348
349     @check
350     def rollback(self):
351         """ Perform an SQL `ROLLBACK`
352         """
353         return self._cnx.rollback()
354
355     @check
356     def __getattr__(self, name):
357         return getattr(self._obj, name)
358
359 class PsycoConnection(psycopg2.extensions.connection):
360     pass
361
362 class ConnectionPool(object):
363     """ The pool of connections to database(s)
364     
365         Keep a set of connections to pg databases open, and reuse them
366         to open cursors for all transactions.
367         
368         The connections are *not* automatically closed. Only a close_db()
369         can trigger that.
370     """
371
372     def locked(fun):
373         @wraps(fun)
374         def _locked(self, *args, **kwargs):
375             self._lock.acquire()
376             try:
377                 return fun(self, *args, **kwargs)
378             finally:
379                 self._lock.release()
380         return _locked
381
382
383     def __init__(self, maxconn=64):
384         self._connections = []
385         self._maxconn = max(maxconn, 1)
386         self._lock = threading.Lock()
387
388     def __repr__(self):
389         used = len([1 for c, u in self._connections[:] if u])
390         count = len(self._connections)
391         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
392
393     def _debug(self, msg, *args):
394         _logger.debug(('%r ' + msg), self, *args)
395
396     @locked
397     def borrow(self, dsn):
398         self._debug('Borrow connection to %r', dsn)
399
400         # free dead and leaked connections
401         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
402             if cnx.closed:
403                 self._connections.pop(i)
404                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
405                 continue
406             if getattr(cnx, 'leaked', False):
407                 delattr(cnx, 'leaked')
408                 self._connections.pop(i)
409                 self._connections.append((cnx, False))
410                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
411
412         for i, (cnx, used) in enumerate(self._connections):
413             if not used and dsn_are_equals(cnx.dsn, dsn):
414                 try:
415                     cnx.reset()
416                 except psycopg2.OperationalError:
417                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
418                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
419                     if not cnx.closed:
420                         cnx.close()
421                     continue
422                 self._connections.pop(i)
423                 self._connections.append((cnx, True))
424                 self._debug('Existing connection found at index %d', i)
425
426                 return cnx
427
428         if len(self._connections) >= self._maxconn:
429             # try to remove the oldest connection not used
430             for i, (cnx, used) in enumerate(self._connections):
431                 if not used:
432                     self._connections.pop(i)
433                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
434                     break
435             else:
436                 # note: this code is called only if the for loop has completed (no break)
437                 raise PoolError('The Connection Pool Is Full')
438
439         try:
440             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
441         except psycopg2.Error:
442             _logger.exception('Connection to the database failed')
443             raise
444         self._connections.append((result, True))
445         self._debug('Create new connection')
446         return result
447
448     @locked
449     def give_back(self, connection, keep_in_pool=True):
450         self._debug('Give back connection to %r', connection.dsn)
451         for i, (cnx, used) in enumerate(self._connections):
452             if cnx is connection:
453                 self._connections.pop(i)
454                 if keep_in_pool:
455                     self._connections.append((cnx, False))
456                     self._debug('Put connection to %r in pool', cnx.dsn)
457                 else:
458                     self._debug('Forgot connection to %r', cnx.dsn)
459                     cnx.close()
460                 break
461         else:
462             raise PoolError('This connection does not below to the pool')
463
464     @locked
465     def close_all(self, dsn):
466         _logger.info('%r: Close all connections to %r', self, dsn)
467         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
468             if dsn_are_equals(cnx.dsn, dsn):
469                 cnx.close()
470                 self._connections.pop(i)
471
472
473 class Connection(object):
474     """ A lightweight instance of a connection to postgres
475     """
476
477     def __init__(self, pool, dbname):
478         self.dbname = dbname
479         self.__pool = pool
480
481     def cursor(self, serialized=True):
482         cursor_type = serialized and 'serialized ' or ''
483         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
484         return Cursor(self.__pool, self.dbname, serialized=serialized)
485
486     # serialized_cursor is deprecated - cursors are serialized by default
487     serialized_cursor = cursor
488
489     def __nonzero__(self):
490         """Check if connection is possible"""
491         try:
492             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
493             cr = self.cursor()
494             cr.close()
495             return True
496         except Exception:
497             return False
498
499 def dsn(db_name):
500     _dsn = ''
501     for p in ('host', 'port', 'user', 'password'):
502         cfg = tools.config['db_' + p]
503         if cfg:
504             _dsn += '%s=%s ' % (p, cfg)
505
506     return '%sdbname=%s' % (_dsn, db_name)
507
508 def dsn_are_equals(first, second):
509     def key(dsn):
510         k = dict(x.split('=', 1) for x in dsn.strip().split())
511         k.pop('password', None) # password is not relevant
512         return k
513     return key(first) == key(second)
514
515
516 _Pool = None
517
518 def db_connect(db_name):
519     global _Pool
520     if _Pool is None:
521         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
522     return Connection(_Pool, db_name)
523
524 def close_db(db_name):
525     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
526     global _Pool
527     if _Pool:
528         _Pool.close_all(dsn(db_name))
529
530
531 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
532