[MERGE] forward port of branch 8.0 up to 2e092ac
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30 from contextlib import contextmanager
31 from functools import wraps
32 import logging
33 import urlparse
34 import uuid
35 import psycopg2.extras
36 import psycopg2.extensions
37 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
38 from psycopg2.pool import PoolError
39
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41
42 _logger = logging.getLogger(__name__)
43
44 types_mapping = {
45     'date': (1082,),
46     'time': (1083,),
47     'datetime': (1114,),
48 }
49
50 def unbuffer(symb, cr):
51     if symb is None:
52         return None
53     return str(symb)
54
55 def undecimalize(symb, cr):
56     if symb is None:
57         return None
58     return float(symb)
59
60 for name, typeoid in types_mapping.items():
61     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
62 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
63
64
65 import tools
66 from tools.func import frame_codeinfo
67 from datetime import datetime as mdt
68 from datetime import timedelta
69 import threading
70 from inspect import currentframe
71
72 import re
73 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
74 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
75
76 sql_counter = 0
77
78 class Cursor(object):
79     """Represents an open transaction to the PostgreSQL DB backend,
80        acting as a lightweight wrapper around psycopg2's
81        ``cursor`` objects.
82
83         ``Cursor`` is the object behind the ``cr`` variable used all
84         over the OpenERP code.
85
86         .. rubric:: Transaction Isolation
87
88         One very important property of database transactions is the
89         level of isolation between concurrent transactions.
90         The SQL standard defines four levels of transaction isolation,
91         ranging from the most strict *Serializable* level, to the least
92         strict *Read Uncommitted* level. These levels are defined in
93         terms of the phenomena that must not occur between concurrent
94         transactions, such as *dirty read*, etc.
95         In the context of a generic business data management software
96         such as OpenERP, we need the best guarantees that no data
97         corruption can ever be cause by simply running multiple
98         transactions in parallel. Therefore, the preferred level would
99         be the *serializable* level, which ensures that a set of
100         transactions is guaranteed to produce the same effect as
101         running them one at a time in some order.
102
103         However, most database management systems implement a limited
104         serializable isolation in the form of
105         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
106         providing most of the same advantages as True Serializability,
107         with a fraction of the performance cost.
108         With PostgreSQL up to version 9.0, this snapshot isolation was
109         the implementation of both the ``REPEATABLE READ`` and
110         ``SERIALIZABLE`` levels of the SQL standard.
111         As of PostgreSQL 9.1, the previous snapshot isolation implementation
112         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
113         level was introduced, providing some additional heuristics to
114         detect a concurrent update by parallel transactions, and forcing
115         one of them to rollback.
116
117         OpenERP implements its own level of locking protection
118         for transactions that are highly likely to provoke concurrent
119         updates, such as stock reservations or document sequences updates.
120         Therefore we mostly care about the properties of snapshot isolation,
121         but we don't really need additional heuristics to trigger transaction
122         rollbacks, as we are taking care of triggering instant rollbacks
123         ourselves when it matters (and we can save the additional performance
124         hit of these heuristics).
125
126         As a result of the above, we have selected ``REPEATABLE READ`` as
127         the default transaction isolation level for OpenERP cursors, as
128         it will be mapped to the desired ``snapshot isolation`` level for
129         all supported PostgreSQL version (8.3 - 9.x).
130
131         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
132         read level to serializable before sending it to the database, so it would
133         actually select the new serializable mode on PostgreSQL 9.1. Make
134         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
135         the performance hit is a concern for you.
136
137         .. attribute:: cache
138
139             Cache dictionary with a "request" (-ish) lifecycle, only lives as
140             long as the cursor itself does and proactively cleared when the
141             cursor is closed.
142
143             This cache should *only* be used to store repeatable reads as it
144             ignores rollbacks and savepoints, it should not be used to store
145             *any* data which may be modified during the life of the cursor.
146
147     """
148     IN_MAX = 1000   # decent limit on size of IN queries - guideline = Oracle limit
149
150     def check(f):
151         @wraps(f)
152         def wrapper(self, *args, **kwargs):
153             if self._closed:
154                 msg = 'Unable to use a closed cursor.'
155                 if self.__closer:
156                     msg += ' It was closed at %s, line %s' % self.__closer
157                 raise psycopg2.OperationalError(msg)
158             return f(self, *args, **kwargs)
159         return wrapper
160
161     def __init__(self, pool, dbname, dsn, serialized=True):
162         self.sql_from_log = {}
163         self.sql_into_log = {}
164
165         # default log level determined at cursor creation, could be
166         # overridden later for debugging purposes
167         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
168
169         self.sql_log_count = 0
170
171         # avoid the call of close() (by __del__) if an exception
172         # is raised by any of the following initialisations
173         self._closed = True
174
175         self.__pool = pool
176         self.dbname = dbname
177         # Whether to enable snapshot isolation level for this cursor.
178         # see also the docstring of Cursor.
179         self._serialized = serialized
180
181         self._cnx = pool.borrow(dsn)
182         self._obj = self._cnx.cursor()
183         if self.sql_log:
184             self.__caller = frame_codeinfo(currentframe(), 2)
185         else:
186             self.__caller = False
187         self._closed = False   # real initialisation value
188         self.autocommit(False)
189         self.__closer = False
190
191         self._default_log_exceptions = True
192
193         self.cache = {}
194
195     def __build_dict(self, row):
196         return {d.name: row[i] for i, d in enumerate(self._obj.description)}
197     def dictfetchone(self):
198         row = self._obj.fetchone()
199         return row and self.__build_dict(row)
200     def dictfetchmany(self, size):
201         return map(self.__build_dict, self._obj.fetchmany(size))
202     def dictfetchall(self):
203         return map(self.__build_dict, self._obj.fetchall())
204
205     def __del__(self):
206         if not self._closed and not self._cnx.closed:
207             # Oops. 'self' has not been closed explicitly.
208             # The cursor will be deleted by the garbage collector,
209             # but the database connection is not put back into the connection
210             # pool, preventing some operation on the database like dropping it.
211             # This can also lead to a server overload.
212             msg = "Cursor not closed explicitly\n"
213             if self.__caller:
214                 msg += "Cursor was created at %s:%s" % self.__caller
215             else:
216                 msg += "Please enable sql debugging to trace the caller."
217             _logger.warning(msg)
218             self._close(True)
219
220     @check
221     def execute(self, query, params=None, log_exceptions=None):
222         if '%d' in query or '%f' in query:
223             _logger.warning(query)
224             _logger.warning("SQL queries cannot contain %d or %f anymore. Use only %s")
225         if params and not isinstance(params, (tuple, list, dict)):
226             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
227             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
228
229         if self.sql_log:
230             now = mdt.now()
231
232         try:
233             params = params or None
234             res = self._obj.execute(query, params)
235         except psycopg2.ProgrammingError, pe:
236             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
237                 _logger.error("Programming error: %s, in query %s", pe, query)
238             raise
239         except Exception:
240             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
241                 _logger.exception("bad query: %s", self._obj.query or query)
242             raise
243
244         # simple query count is always computed
245         self.sql_log_count += 1
246
247         # advanced stats only if sql_log is enabled
248         if self.sql_log:
249             delay = mdt.now() - now
250             delay = delay.seconds * 1E6 + delay.microseconds
251
252             _logger.debug("query: %s", self._obj.query)
253             res_from = re_from.match(query.lower())
254             if res_from:
255                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
256                 self.sql_from_log[res_from.group(1)][0] += 1
257                 self.sql_from_log[res_from.group(1)][1] += delay
258             res_into = re_into.match(query.lower())
259             if res_into:
260                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
261                 self.sql_into_log[res_into.group(1)][0] += 1
262                 self.sql_into_log[res_into.group(1)][1] += delay
263         return res
264
265     def split_for_in_conditions(self, ids):
266         """Split a list of identifiers into one or more smaller tuples
267            safe for IN conditions, after uniquifying them."""
268         return tools.misc.split_every(self.IN_MAX, set(ids))
269
270     def print_log(self):
271         global sql_counter
272
273         if not self.sql_log:
274             return
275         def process(type):
276             sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log}
277             sum = 0
278             if sqllogs[type]:
279                 sqllogitems = sqllogs[type].items()
280                 sqllogitems.sort(key=lambda k: k[1][1])
281                 _logger.debug("SQL LOG %s:", type)
282                 sqllogitems.sort(lambda x, y: cmp(x[1][0], y[1][0]))
283                 for r in sqllogitems:
284                     delay = timedelta(microseconds=r[1][1])
285                     _logger.debug("table: %s: %s/%s", r[0], delay, r[1][0])
286                     sum += r[1][1]
287                 sqllogs[type].clear()
288             sum = timedelta(microseconds=sum)
289             _logger.debug("SUM %s:%s/%d [%d]", type, sum, self.sql_log_count, sql_counter)
290             sqllogs[type].clear()
291         process('from')
292         process('into')
293         self.sql_log_count = 0
294         self.sql_log = False
295
296     @check
297     def close(self):
298         return self._close(False)
299
300     def _close(self, leak=False):
301         global sql_counter
302
303         if not self._obj:
304             return
305
306         del self.cache
307
308         if self.sql_log:
309             self.__closer = frame_codeinfo(currentframe(), 3)
310
311         # simple query count is always computed
312         sql_counter += self.sql_log_count
313
314         # advanced stats only if sql_log is enabled
315         self.print_log()
316
317         self._obj.close()
318
319         # This force the cursor to be freed, and thus, available again. It is
320         # important because otherwise we can overload the server very easily
321         # because of a cursor shortage (because cursors are not garbage
322         # collected as fast as they should). The problem is probably due in
323         # part because browse records keep a reference to the cursor.
324         del self._obj
325         self._closed = True
326
327         # Clean the underlying connection.
328         self._cnx.rollback()
329
330         if leak:
331             self._cnx.leaked = True
332         else:
333             chosen_template = tools.config['db_template']
334             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
335             keep_in_pool = self.dbname not in templates_list
336             self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
337
338     @check
339     def autocommit(self, on):
340         if on:
341             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
342         else:
343             # If a serializable cursor was requested, we
344             # use the appropriate PotsgreSQL isolation level
345             # that maps to snaphsot isolation.
346             # For all supported PostgreSQL versions (8.3-9.x),
347             # this is currently the ISOLATION_REPEATABLE_READ.
348             # See also the docstring of this class.
349             # NOTE: up to psycopg 2.4.2, repeatable read
350             #       is remapped to serializable before being
351             #       sent to the database, so it is in fact
352             #       unavailable for use with pg 9.1.
353             isolation_level = \
354                 ISOLATION_LEVEL_REPEATABLE_READ \
355                 if self._serialized \
356                 else ISOLATION_LEVEL_READ_COMMITTED
357         self._cnx.set_isolation_level(isolation_level)
358
359     @check
360     def commit(self):
361         """ Perform an SQL `COMMIT`
362         """
363         return self._cnx.commit()
364
365     @check
366     def rollback(self):
367         """ Perform an SQL `ROLLBACK`
368         """
369         return self._cnx.rollback()
370
371     def __enter__(self):
372         """ Using the cursor as a contextmanager automatically commits and
373             closes it::
374
375                 with cr:
376                     cr.execute(...)
377
378                 # cr is committed if no failure occurred
379                 # cr is closed in any case
380         """
381         return self
382
383     def __exit__(self, exc_type, exc_value, traceback):
384         if exc_type is None:
385             self.commit()
386         self.close()
387
388     @contextmanager
389     @check
390     def savepoint(self):
391         """context manager entering in a new savepoint"""
392         name = uuid.uuid1().hex
393         self.execute('SAVEPOINT "%s"' % name)
394         try:
395             yield
396             self.execute('RELEASE SAVEPOINT "%s"' % name)
397         except:
398             self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
399             raise
400
401     @check
402     def __getattr__(self, name):
403         return getattr(self._obj, name)
404
405 class TestCursor(Cursor):
406     """ A cursor to be used for tests. It keeps the transaction open across
407         several requests, and simulates committing, rolling back, and closing.
408     """
409     def __init__(self, *args, **kwargs):
410         super(TestCursor, self).__init__(*args, **kwargs)
411         # in order to simulate commit and rollback, the cursor maintains a
412         # savepoint at its last commit
413         self.execute("SAVEPOINT test_cursor")
414         # we use a lock to serialize concurrent requests
415         self._lock = threading.RLock()
416
417     def acquire(self):
418         self._lock.acquire()
419
420     def release(self):
421         self._lock.release()
422
423     def force_close(self):
424         super(TestCursor, self).close()
425
426     def close(self):
427         if not self._closed:
428             self.rollback()             # for stuff that has not been committed
429         self.release()
430
431     def autocommit(self, on):
432         _logger.debug("TestCursor.autocommit(%r) does nothing", on)
433
434     def commit(self):
435         self.execute("RELEASE SAVEPOINT test_cursor")
436         self.execute("SAVEPOINT test_cursor")
437
438     def rollback(self):
439         self.execute("ROLLBACK TO SAVEPOINT test_cursor")
440         self.execute("SAVEPOINT test_cursor")
441
442 class PsycoConnection(psycopg2.extensions.connection):
443     pass
444
445 class ConnectionPool(object):
446     """ The pool of connections to database(s)
447
448         Keep a set of connections to pg databases open, and reuse them
449         to open cursors for all transactions.
450
451         The connections are *not* automatically closed. Only a close_db()
452         can trigger that.
453     """
454
455     def locked(fun):
456         @wraps(fun)
457         def _locked(self, *args, **kwargs):
458             self._lock.acquire()
459             try:
460                 return fun(self, *args, **kwargs)
461             finally:
462                 self._lock.release()
463         return _locked
464
465     def __init__(self, maxconn=64):
466         self._connections = []
467         self._maxconn = max(maxconn, 1)
468         self._lock = threading.Lock()
469
470     def __repr__(self):
471         used = len([1 for c, u in self._connections[:] if u])
472         count = len(self._connections)
473         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
474
475     def _debug(self, msg, *args):
476         _logger.debug(('%r ' + msg), self, *args)
477
478     @locked
479     def borrow(self, dsn):
480         # free dead and leaked connections
481         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
482             if cnx.closed:
483                 self._connections.pop(i)
484                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
485                 continue
486             if getattr(cnx, 'leaked', False):
487                 delattr(cnx, 'leaked')
488                 self._connections.pop(i)
489                 self._connections.append((cnx, False))
490                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
491
492         for i, (cnx, used) in enumerate(self._connections):
493             if not used and cnx._original_dsn == dsn:
494                 try:
495                     cnx.reset()
496                 except psycopg2.OperationalError:
497                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
498                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
499                     if not cnx.closed:
500                         cnx.close()
501                     continue
502                 self._connections.pop(i)
503                 self._connections.append((cnx, True))
504                 self._debug('Borrow existing connection to %r at index %d', cnx.dsn, i)
505
506                 return cnx
507
508         if len(self._connections) >= self._maxconn:
509             # try to remove the oldest connection not used
510             for i, (cnx, used) in enumerate(self._connections):
511                 if not used:
512                     self._connections.pop(i)
513                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
514                     break
515             else:
516                 # note: this code is called only if the for loop has completed (no break)
517                 raise PoolError('The Connection Pool Is Full')
518
519         try:
520             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
521         except psycopg2.Error:
522             _logger.exception('Connection to the database failed')
523             raise
524         result._original_dsn = dsn
525         self._connections.append((result, True))
526         self._debug('Create new connection')
527         return result
528
529     @locked
530     def give_back(self, connection, keep_in_pool=True):
531         self._debug('Give back connection to %r', connection.dsn)
532         for i, (cnx, used) in enumerate(self._connections):
533             if cnx is connection:
534                 self._connections.pop(i)
535                 if keep_in_pool:
536                     self._connections.append((cnx, False))
537                     self._debug('Put connection to %r in pool', cnx.dsn)
538                 else:
539                     self._debug('Forgot connection to %r', cnx.dsn)
540                     cnx.close()
541                 break
542         else:
543             raise PoolError('This connection does not below to the pool')
544
545     @locked
546     def close_all(self, dsn=None):
547         count = 0
548         last = None
549         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
550             if dsn is None or cnx._original_dsn == dsn:
551                 cnx.close()
552                 last = self._connections.pop(i)[0]
553                 count += 1
554         _logger.info('%r: Closed %d connections %s', self, count,
555                     (dsn and last and 'to %r' % last.dsn) or '')
556
557
558 class Connection(object):
559     """ A lightweight instance of a connection to postgres
560     """
561     def __init__(self, pool, dbname, dsn):
562         self.dbname = dbname
563         self.dsn = dsn
564         self.__pool = pool
565
566     def cursor(self, serialized=True):
567         cursor_type = serialized and 'serialized ' or ''
568         _logger.debug('create %scursor to %r', cursor_type, self.dsn)
569         return Cursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
570
571     def test_cursor(self, serialized=True):
572         cursor_type = serialized and 'serialized ' or ''
573         _logger.debug('create test %scursor to %r', cursor_type, self.dsn)
574         return TestCursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
575
576     # serialized_cursor is deprecated - cursors are serialized by default
577     serialized_cursor = cursor
578
579     def __nonzero__(self):
580         """Check if connection is possible"""
581         try:
582             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
583             cr = self.cursor()
584             cr.close()
585             return True
586         except Exception:
587             return False
588
589 def dsn(db_or_uri):
590     """parse the given `db_or_uri` and return a 2-tuple (dbname, uri)"""
591     if db_or_uri.startswith(('postgresql://', 'postgres://')):
592         # extract db from uri
593         us = urlparse.urlsplit(db_or_uri)
594         if len(us.path) > 1:
595             db_name = us.path[1:]
596         elif us.username:
597             db_name = us.username
598         else:
599             db_name = us.hostname
600         return db_name, db_or_uri
601
602     _dsn = ''
603     for p in ('host', 'port', 'user', 'password'):
604         cfg = tools.config['db_' + p]
605         if cfg:
606             _dsn += '%s=%s ' % (p, cfg)
607
608     return db_or_uri, '%sdbname=%s' % (_dsn, db_or_uri)
609
610 _Pool = None
611
612 def db_connect(to, allow_uri=False):
613     global _Pool
614     if _Pool is None:
615         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
616
617     db, uri = dsn(to)
618     if not allow_uri and db != to:
619         raise ValueError('URI connections not allowed')
620     return Connection(_Pool, db, uri)
621
622 def close_db(db_name):
623     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
624     global _Pool
625     if _Pool:
626         _Pool.close_all(dsn(db_name)[1])
627
628 def close_all():
629     global _Pool
630     if _Pool:
631         _Pool.close_all()
632
633 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: