ee58503c40be4a5c3bd234467cf2eec40924d13d
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30 from contextlib import contextmanager
31 from functools import wraps
32 import logging
33 import time
34 import uuid
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41
42 _logger = logging.getLogger(__name__)
43
44 types_mapping = {
45     'date': (1082,),
46     'time': (1083,),
47     'datetime': (1114,),
48 }
49
50 def unbuffer(symb, cr):
51     if symb is None: return None
52     return str(symb)
53
54 def undecimalize(symb, cr):
55     if symb is None: return None
56     return float(symb)
57
58 for name, typeoid in types_mapping.items():
59     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
61
62
63 import tools
64 from tools.func import frame_codeinfo
65 from datetime import datetime as mdt
66 from datetime import timedelta
67 import threading
68 from inspect import currentframe
69
70 import re
71 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
72 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
73
74 sql_counter = 0
75
76 class Cursor(object):
77     """Represents an open transaction to the PostgreSQL DB backend,
78        acting as a lightweight wrapper around psycopg2's
79        ``psycopg1cursor`` objects.
80
81         ``Cursor`` is the object behind the ``cr`` variable used all
82         over the OpenERP code.
83
84         .. rubric:: Transaction Isolation
85
86         One very important property of database transactions is the
87         level of isolation between concurrent transactions.
88         The SQL standard defines four levels of transaction isolation,
89         ranging from the most strict *Serializable* level, to the least
90         strict *Read Uncommitted* level. These levels are defined in
91         terms of the phenomena that must not occur between concurrent
92         transactions, such as *dirty read*, etc.
93         In the context of a generic business data management software
94         such as OpenERP, we need the best guarantees that no data
95         corruption can ever be cause by simply running multiple
96         transactions in parallel. Therefore, the preferred level would
97         be the *serializable* level, which ensures that a set of
98         transactions is guaranteed to produce the same effect as
99         running them one at a time in some order.
100
101         However, most database management systems implement a limited
102         serializable isolation in the form of
103         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
104         providing most of the same advantages as True Serializability,
105         with a fraction of the performance cost.
106         With PostgreSQL up to version 9.0, this snapshot isolation was
107         the implementation of both the ``REPEATABLE READ`` and
108         ``SERIALIZABLE`` levels of the SQL standard.
109         As of PostgreSQL 9.1, the previous snapshot isolation implementation
110         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
111         level was introduced, providing some additional heuristics to
112         detect a concurrent update by parallel transactions, and forcing
113         one of them to rollback.
114
115         OpenERP implements its own level of locking protection
116         for transactions that are highly likely to provoke concurrent
117         updates, such as stock reservations or document sequences updates.
118         Therefore we mostly care about the properties of snapshot isolation,
119         but we don't really need additional heuristics to trigger transaction
120         rollbacks, as we are taking care of triggering instant rollbacks
121         ourselves when it matters (and we can save the additional performance
122         hit of these heuristics).
123
124         As a result of the above, we have selected ``REPEATABLE READ`` as
125         the default transaction isolation level for OpenERP cursors, as
126         it will be mapped to the desired ``snapshot isolation`` level for
127         all supported PostgreSQL version (8.3 - 9.x).
128
129         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
130         read level to serializable before sending it to the database, so it would
131         actually select the new serializable mode on PostgreSQL 9.1. Make
132         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
133         the performance hit is a concern for you.
134
135         .. attribute:: cache
136
137             Cache dictionary with a "request" (-ish) lifecycle, only lives as
138             long as the cursor itself does and proactively cleared when the
139             cursor is closed.
140
141             This cache should *only* be used to store repeatable reads as it
142             ignores rollbacks and savepoints, it should not be used to store
143             *any* data which may be modified during the life of the cursor.
144
145     """
146     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147
148     def check(f):
149         @wraps(f)
150         def wrapper(self, *args, **kwargs):
151             if self._closed:
152                 msg = 'Unable to use a closed cursor.'
153                 if self.__closer:
154                     msg += ' It was closed at %s, line %s' % self.__closer
155                 raise psycopg2.OperationalError(msg)
156             return f(self, *args, **kwargs)
157         return wrapper
158
159     def __init__(self, pool, dbname, serialized=True):
160         self.sql_from_log = {}
161         self.sql_into_log = {}
162
163         # default log level determined at cursor creation, could be
164         # overridden later for debugging purposes
165         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
166
167         self.sql_log_count = 0
168         self._closed = True    # avoid the call of close() (by __del__) if an exception
169                                 # is raised by any of the following initialisations
170         self._pool = pool
171         self.dbname = dbname
172
173         # Whether to enable snapshot isolation level for this cursor.
174         # see also the docstring of Cursor.  
175         self._serialized = serialized
176
177         self._cnx = pool.borrow(dsn(dbname))
178         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
179         if self.sql_log:
180             self.__caller = frame_codeinfo(currentframe(),2)
181         else:
182             self.__caller = False
183         self._closed = False   # real initialisation value
184         self.autocommit(False)
185         self.__closer = False
186
187         self._default_log_exceptions = True
188
189         self.cache = {}
190
191     def __del__(self):
192         if not self._closed and not self._cnx.closed:
193             # Oops. 'self' has not been closed explicitly.
194             # The cursor will be deleted by the garbage collector,
195             # but the database connection is not put back into the connection
196             # pool, preventing some operation on the database like dropping it.
197             # This can also lead to a server overload.
198             msg = "Cursor not closed explicitly\n"
199             if self.__caller:
200                 msg += "Cursor was created at %s:%s" % self.__caller
201             else:
202                 msg += "Please enable sql debugging to trace the caller."
203             _logger.warning(msg)
204             self._close(True)
205
206     @check
207     def execute(self, query, params=None, log_exceptions=None):
208         if '%d' in query or '%f' in query:
209             _logger.warning(query)
210             _logger.warning("SQL queries cannot contain %d or %f anymore. "
211                          "Use only %s")
212         if params and not isinstance(params, (tuple, list, dict)):
213             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
214             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
215
216         if self.sql_log:
217             now = mdt.now()
218
219         try:
220             params = params or None
221             res = self._obj.execute(query, params)
222         except psycopg2.ProgrammingError, pe:
223             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
224                 _logger.error("Programming error: %s, in query %s", pe, query)
225             raise
226         except Exception:
227             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
228                 _logger.exception("bad query: %s", self._obj.query or query)
229             raise
230
231         if self.sql_log:
232             delay = mdt.now() - now
233             delay = delay.seconds * 1E6 + delay.microseconds
234
235             _logger.debug("query: %s", self._obj.query)
236             self.sql_log_count+=1
237             res_from = re_from.match(query.lower())
238             if res_from:
239                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
240                 self.sql_from_log[res_from.group(1)][0] += 1
241                 self.sql_from_log[res_from.group(1)][1] += delay
242             res_into = re_into.match(query.lower())
243             if res_into:
244                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
245                 self.sql_into_log[res_into.group(1)][0] += 1
246                 self.sql_into_log[res_into.group(1)][1] += delay
247         return res
248
249
250     def split_for_in_conditions(self, ids):
251         """Split a list of identifiers into one or more smaller tuples
252            safe for IN conditions, after uniquifying them."""
253         return tools.misc.split_every(self.IN_MAX, set(ids))
254
255     def print_log(self):
256         global sql_counter
257         sql_counter += self.sql_log_count
258         if not self.sql_log:
259             return
260         def process(type):
261             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
262             sum = 0
263             if sqllogs[type]:
264                 sqllogitems = sqllogs[type].items()
265                 sqllogitems.sort(key=lambda k: k[1][1])
266                 _logger.debug("SQL LOG %s:", type)
267                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
268                 for r in sqllogitems:
269                     delay = timedelta(microseconds=r[1][1])
270                     _logger.debug("table: %s: %s/%s",
271                                         r[0], delay, r[1][0])
272                     sum+= r[1][1]
273                 sqllogs[type].clear()
274             sum = timedelta(microseconds=sum)
275             _logger.debug("SUM %s:%s/%d [%d]",
276                                 type, sum, self.sql_log_count, sql_counter)
277             sqllogs[type].clear()
278         process('from')
279         process('into')
280         self.sql_log_count = 0
281         self.sql_log = False
282
283     @check
284     def close(self):
285         return self._close(False)
286
287     def _close(self, leak=False):
288         if not self._obj:
289             return
290
291         del self.cache
292
293         if self.sql_log:
294             self.__closer = frame_codeinfo(currentframe(),3)
295         self.print_log()
296
297         self._obj.close()
298
299         # This force the cursor to be freed, and thus, available again. It is
300         # important because otherwise we can overload the server very easily
301         # because of a cursor shortage (because cursors are not garbage
302         # collected as fast as they should). The problem is probably due in
303         # part because browse records keep a reference to the cursor.
304         del self._obj
305         self._closed = True
306
307         # Clean the underlying connection.
308         self._cnx.rollback()
309
310         if leak:
311             self._cnx.leaked = True
312         else:
313             chosen_template = tools.config['db_template']
314             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
315             keep_in_pool = self.dbname not in templates_list
316             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
317
318     @check
319     def autocommit(self, on):
320         if on:
321             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
322         else:
323             # If a serializable cursor was requested, we
324             # use the appropriate PotsgreSQL isolation level
325             # that maps to snaphsot isolation.
326             # For all supported PostgreSQL versions (8.3-9.x),
327             # this is currently the ISOLATION_REPEATABLE_READ.
328             # See also the docstring of this class.
329             # NOTE: up to psycopg 2.4.2, repeatable read
330             #       is remapped to serializable before being
331             #       sent to the database, so it is in fact
332             #       unavailable for use with pg 9.1.
333             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
334                                   if self._serialized \
335                                   else ISOLATION_LEVEL_READ_COMMITTED
336         self._cnx.set_isolation_level(isolation_level)
337
338     @check
339     def commit(self):
340         """ Perform an SQL `COMMIT`
341         """
342         return self._cnx.commit()
343
344     @check
345     def rollback(self):
346         """ Perform an SQL `ROLLBACK`
347         """
348         return self._cnx.rollback()
349
350     def __enter__(self):
351         """ Using the cursor as a contextmanager automatically commits and
352             closes it::
353
354                 with cr:
355                     cr.execute(...)
356
357                 # cr is committed if no failure occurred
358                 # cr is closed in any case
359         """
360         return self
361
362     def __exit__(self, exc_type, exc_value, traceback):
363         if exc_type is None:
364             self.commit()
365         self.close()
366
367     @contextmanager
368     @check
369     def savepoint(self):
370         """context manager entering in a new savepoint"""
371         name = uuid.uuid1().hex
372         self.execute('SAVEPOINT "%s"' % name)
373         try:
374             yield
375             self.execute('RELEASE SAVEPOINT "%s"' % name)
376         except:
377             self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
378             raise
379
380     @check
381     def __getattr__(self, name):
382         return getattr(self._obj, name)
383
384 class TestCursor(Cursor):
385     """ A cursor to be used for tests. It keeps the transaction open across
386         several requests, and simulates committing, rolling back, and closing.
387     """
388     def __init__(self, *args, **kwargs):
389         super(TestCursor, self).__init__(*args, **kwargs)
390         # in order to simulate commit and rollback, the cursor maintains a
391         # savepoint at its last commit
392         self.execute("SAVEPOINT test_cursor")
393         # we use a lock to serialize concurrent requests
394         self._lock = threading.RLock()
395
396     def acquire(self):
397         self._lock.acquire()
398
399     def release(self):
400         self._lock.release()
401
402     def close(self, force=False):
403         if force:
404             super(TestCursor, self).close()
405         elif not self._closed:
406             self.rollback()             # for stuff that has not been committed
407             self.release()
408
409     def autocommit(self, on):
410         _logger.debug("TestCursor.autocommit(%r) does nothing", on)
411
412     def commit(self):
413         self.execute("RELEASE SAVEPOINT test_cursor")
414         self.execute("SAVEPOINT test_cursor")
415
416     def rollback(self):
417         self.execute("ROLLBACK TO SAVEPOINT test_cursor")
418         self.execute("SAVEPOINT test_cursor")
419
420 class PsycoConnection(psycopg2.extensions.connection):
421     pass
422
423 class ConnectionPool(object):
424     """ The pool of connections to database(s)
425     
426         Keep a set of connections to pg databases open, and reuse them
427         to open cursors for all transactions.
428         
429         The connections are *not* automatically closed. Only a close_db()
430         can trigger that.
431     """
432
433     def locked(fun):
434         @wraps(fun)
435         def _locked(self, *args, **kwargs):
436             self._lock.acquire()
437             try:
438                 return fun(self, *args, **kwargs)
439             finally:
440                 self._lock.release()
441         return _locked
442
443
444     def __init__(self, maxconn=64):
445         self._connections = []
446         self._maxconn = max(maxconn, 1)
447         self._lock = threading.Lock()
448
449     def __repr__(self):
450         used = len([1 for c, u in self._connections[:] if u])
451         count = len(self._connections)
452         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
453
454     def _debug(self, msg, *args):
455         _logger.debug(('%r ' + msg), self, *args)
456
457     @locked
458     def borrow(self, dsn):
459         self._debug('Borrow connection to %r', dsn)
460
461         # free dead and leaked connections
462         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
463             if cnx.closed:
464                 self._connections.pop(i)
465                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
466                 continue
467             if getattr(cnx, 'leaked', False):
468                 delattr(cnx, 'leaked')
469                 self._connections.pop(i)
470                 self._connections.append((cnx, False))
471                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
472
473         for i, (cnx, used) in enumerate(self._connections):
474             if not used and dsn_are_equals(cnx.dsn, dsn):
475                 try:
476                     cnx.reset()
477                 except psycopg2.OperationalError:
478                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
479                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
480                     if not cnx.closed:
481                         cnx.close()
482                     continue
483                 self._connections.pop(i)
484                 self._connections.append((cnx, True))
485                 self._debug('Existing connection found at index %d', i)
486
487                 return cnx
488
489         if len(self._connections) >= self._maxconn:
490             # try to remove the oldest connection not used
491             for i, (cnx, used) in enumerate(self._connections):
492                 if not used:
493                     self._connections.pop(i)
494                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
495                     break
496             else:
497                 # note: this code is called only if the for loop has completed (no break)
498                 raise PoolError('The Connection Pool Is Full')
499
500         try:
501             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
502         except psycopg2.Error:
503             _logger.exception('Connection to the database failed')
504             raise
505         self._connections.append((result, True))
506         self._debug('Create new connection')
507         return result
508
509     @locked
510     def give_back(self, connection, keep_in_pool=True):
511         self._debug('Give back connection to %r', connection.dsn)
512         for i, (cnx, used) in enumerate(self._connections):
513             if cnx is connection:
514                 self._connections.pop(i)
515                 if keep_in_pool:
516                     self._connections.append((cnx, False))
517                     self._debug('Put connection to %r in pool', cnx.dsn)
518                 else:
519                     self._debug('Forgot connection to %r', cnx.dsn)
520                     cnx.close()
521                 break
522         else:
523             raise PoolError('This connection does not below to the pool')
524
525     @locked
526     def close_all(self, dsn=None):
527         _logger.info('%r: Close all connections to %r', self, dsn)
528         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
529             if dsn is None or dsn_are_equals(cnx.dsn, dsn):
530                 cnx.close()
531                 self._connections.pop(i)
532
533
534 class Connection(object):
535     """ A lightweight instance of a connection to postgres
536     """
537
538     def __init__(self, pool, dbname):
539         self.dbname = dbname
540         self._pool = pool
541
542     def cursor(self, serialized=True):
543         cursor_type = serialized and 'serialized ' or ''
544         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
545         return Cursor(self._pool, self.dbname, serialized=serialized)
546
547     def test_cursor(self, serialized=True):
548         cursor_type = serialized and 'serialized ' or ''
549         _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
550         return TestCursor(self._pool, self.dbname, serialized=serialized)
551
552     # serialized_cursor is deprecated - cursors are serialized by default
553     serialized_cursor = cursor
554
555     def __nonzero__(self):
556         """Check if connection is possible"""
557         try:
558             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
559             cr = self.cursor()
560             cr.close()
561             return True
562         except Exception:
563             return False
564
565 def dsn(db_name):
566     _dsn = ''
567     for p in ('host', 'port', 'user', 'password'):
568         cfg = tools.config['db_' + p]
569         if cfg:
570             _dsn += '%s=%s ' % (p, cfg)
571
572     return '%sdbname=%s' % (_dsn, db_name)
573
574 def dsn_are_equals(first, second):
575     def key(dsn):
576         k = dict(x.split('=', 1) for x in dsn.strip().split())
577         k.pop('password', None) # password is not relevant
578         return k
579     return key(first) == key(second)
580
581
582 _Pool = None
583
584 def db_connect(db_name):
585     global _Pool
586     if _Pool is None:
587         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
588     return Connection(_Pool, db_name)
589
590 def close_db(db_name):
591     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
592     global _Pool
593     if _Pool:
594         _Pool.close_all(dsn(db_name))
595
596 def close_all():
597     global _Pool
598     if _Pool:
599         _Pool.close_all()
600
601
602 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
603