Convert form views @version="6.1"
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30 from contextlib import contextmanager
31 from functools import wraps
32 import logging
33 import time
34 import uuid
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41
42 _logger = logging.getLogger(__name__)
43
44 types_mapping = {
45     'date': (1082,),
46     'time': (1083,),
47     'datetime': (1114,),
48 }
49
50 def unbuffer(symb, cr):
51     if symb is None: return None
52     return str(symb)
53
54 def undecimalize(symb, cr):
55     if symb is None: return None
56     return float(symb)
57
58 for name, typeoid in types_mapping.items():
59     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
61
62
63 import tools
64 from tools.func import frame_codeinfo
65 from datetime import datetime as mdt
66 from datetime import timedelta
67 import threading
68 from inspect import currentframe
69
70 import re
71 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
72 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
73
74 sql_counter = 0
75
76 class Cursor(object):
77     """Represents an open transaction to the PostgreSQL DB backend,
78        acting as a lightweight wrapper around psycopg2's
79        ``psycopg1cursor`` objects.
80
81         ``Cursor`` is the object behind the ``cr`` variable used all
82         over the OpenERP code.
83
84         .. rubric:: Transaction Isolation
85
86         One very important property of database transactions is the
87         level of isolation between concurrent transactions.
88         The SQL standard defines four levels of transaction isolation,
89         ranging from the most strict *Serializable* level, to the least
90         strict *Read Uncommitted* level. These levels are defined in
91         terms of the phenomena that must not occur between concurrent
92         transactions, such as *dirty read*, etc.
93         In the context of a generic business data management software
94         such as OpenERP, we need the best guarantees that no data
95         corruption can ever be cause by simply running multiple
96         transactions in parallel. Therefore, the preferred level would
97         be the *serializable* level, which ensures that a set of
98         transactions is guaranteed to produce the same effect as
99         running them one at a time in some order.
100
101         However, most database management systems implement a limited
102         serializable isolation in the form of
103         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
104         providing most of the same advantages as True Serializability,
105         with a fraction of the performance cost.
106         With PostgreSQL up to version 9.0, this snapshot isolation was
107         the implementation of both the ``REPEATABLE READ`` and
108         ``SERIALIZABLE`` levels of the SQL standard.
109         As of PostgreSQL 9.1, the previous snapshot isolation implementation
110         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
111         level was introduced, providing some additional heuristics to
112         detect a concurrent update by parallel transactions, and forcing
113         one of them to rollback.
114
115         OpenERP implements its own level of locking protection
116         for transactions that are highly likely to provoke concurrent
117         updates, such as stock reservations or document sequences updates.
118         Therefore we mostly care about the properties of snapshot isolation,
119         but we don't really need additional heuristics to trigger transaction
120         rollbacks, as we are taking care of triggering instant rollbacks
121         ourselves when it matters (and we can save the additional performance
122         hit of these heuristics).
123
124         As a result of the above, we have selected ``REPEATABLE READ`` as
125         the default transaction isolation level for OpenERP cursors, as
126         it will be mapped to the desired ``snapshot isolation`` level for
127         all supported PostgreSQL version (8.3 - 9.x).
128
129         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
130         read level to serializable before sending it to the database, so it would
131         actually select the new serializable mode on PostgreSQL 9.1. Make
132         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
133         the performance hit is a concern for you.
134
135         .. attribute:: cache
136
137             Cache dictionary with a "request" (-ish) lifecycle, only lives as
138             long as the cursor itself does and proactively cleared when the
139             cursor is closed.
140
141             This cache should *only* be used to store repeatable reads as it
142             ignores rollbacks and savepoints, it should not be used to store
143             *any* data which may be modified during the life of the cursor.
144
145     """
146     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147
148     def check(f):
149         @wraps(f)
150         def wrapper(self, *args, **kwargs):
151             if self._closed:
152                 msg = 'Unable to use a closed cursor.'
153                 if self.__closer:
154                     msg += ' It was closed at %s, line %s' % self.__closer
155                 raise psycopg2.OperationalError(msg)
156             return f(self, *args, **kwargs)
157         return wrapper
158
159     def __init__(self, pool, dbname, serialized=True):
160         self.sql_from_log = {}
161         self.sql_into_log = {}
162
163         # default log level determined at cursor creation, could be
164         # overridden later for debugging purposes
165         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
166
167         self.sql_log_count = 0
168         self._closed = True    # avoid the call of close() (by __del__) if an exception
169                                 # is raised by any of the following initialisations
170         self.__pool = pool
171         self.dbname = dbname
172
173         # Whether to enable snapshot isolation level for this cursor.
174         # see also the docstring of Cursor.  
175         self._serialized = serialized
176
177         self._cnx = pool.borrow(dsn(dbname))
178         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
179         if self.sql_log:
180             self.__caller = frame_codeinfo(currentframe(),2)
181         else:
182             self.__caller = False
183         self._closed = False   # real initialisation value
184         self.autocommit(False)
185         self.__closer = False
186
187         self._default_log_exceptions = True
188
189         self.cache = {}
190
191     def __del__(self):
192         if not self._closed and not self._cnx.closed:
193             # Oops. 'self' has not been closed explicitly.
194             # The cursor will be deleted by the garbage collector,
195             # but the database connection is not put back into the connection
196             # pool, preventing some operation on the database like dropping it.
197             # This can also lead to a server overload.
198             msg = "Cursor not closed explicitly\n"
199             if self.__caller:
200                 msg += "Cursor was created at %s:%s" % self.__caller
201             else:
202                 msg += "Please enable sql debugging to trace the caller."
203             _logger.warning(msg)
204             self._close(True)
205
206     @check
207     def execute(self, query, params=None, log_exceptions=None):
208         if '%d' in query or '%f' in query:
209             _logger.warning(query)
210             _logger.warning("SQL queries cannot contain %d or %f anymore. "
211                          "Use only %s")
212         if params and not isinstance(params, (tuple, list, dict)):
213             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
214             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
215
216         if self.sql_log:
217             now = mdt.now()
218
219         try:
220             params = params or None
221             res = self._obj.execute(query, params)
222         except psycopg2.ProgrammingError, pe:
223             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
224                 _logger.error("Programming error: %s, in query %s", pe, query)
225             raise
226         except Exception:
227             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
228                 _logger.exception("bad query: %s", self._obj.query or query)
229             raise
230
231         if self.sql_log:
232             delay = mdt.now() - now
233             delay = delay.seconds * 1E6 + delay.microseconds
234
235             _logger.debug("query: %s", self._obj.query)
236             self.sql_log_count+=1
237             res_from = re_from.match(query.lower())
238             if res_from:
239                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
240                 self.sql_from_log[res_from.group(1)][0] += 1
241                 self.sql_from_log[res_from.group(1)][1] += delay
242             res_into = re_into.match(query.lower())
243             if res_into:
244                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
245                 self.sql_into_log[res_into.group(1)][0] += 1
246                 self.sql_into_log[res_into.group(1)][1] += delay
247         return res
248
249
250     def split_for_in_conditions(self, ids):
251         """Split a list of identifiers into one or more smaller tuples
252            safe for IN conditions, after uniquifying them."""
253         return tools.misc.split_every(self.IN_MAX, set(ids))
254
255     def print_log(self):
256         global sql_counter
257         sql_counter += self.sql_log_count
258         if not self.sql_log:
259             return
260         def process(type):
261             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
262             sum = 0
263             if sqllogs[type]:
264                 sqllogitems = sqllogs[type].items()
265                 sqllogitems.sort(key=lambda k: k[1][1])
266                 _logger.debug("SQL LOG %s:", type)
267                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
268                 for r in sqllogitems:
269                     delay = timedelta(microseconds=r[1][1])
270                     _logger.debug("table: %s: %s/%s",
271                                         r[0], delay, r[1][0])
272                     sum+= r[1][1]
273                 sqllogs[type].clear()
274             sum = timedelta(microseconds=sum)
275             _logger.debug("SUM %s:%s/%d [%d]",
276                                 type, sum, self.sql_log_count, sql_counter)
277             sqllogs[type].clear()
278         process('from')
279         process('into')
280         self.sql_log_count = 0
281         self.sql_log = False
282
283     @check
284     def close(self):
285         return self._close(False)
286
287     def _close(self, leak=False):
288         if not self._obj:
289             return
290
291         del self.cache
292
293         if self.sql_log:
294             self.__closer = frame_codeinfo(currentframe(),3)
295         self.print_log()
296
297         self._obj.close()
298
299         # This force the cursor to be freed, and thus, available again. It is
300         # important because otherwise we can overload the server very easily
301         # because of a cursor shortage (because cursors are not garbage
302         # collected as fast as they should). The problem is probably due in
303         # part because browse records keep a reference to the cursor.
304         del self._obj
305         self._closed = True
306
307         # Clean the underlying connection.
308         self._cnx.rollback()
309
310         if leak:
311             self._cnx.leaked = True
312         else:
313             chosen_template = tools.config['db_template']
314             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
315             keep_in_pool = self.dbname not in templates_list
316             self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
317
318     @check
319     def autocommit(self, on):
320         if on:
321             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
322         else:
323             # If a serializable cursor was requested, we
324             # use the appropriate PotsgreSQL isolation level
325             # that maps to snaphsot isolation.
326             # For all supported PostgreSQL versions (8.3-9.x),
327             # this is currently the ISOLATION_REPEATABLE_READ.
328             # See also the docstring of this class.
329             # NOTE: up to psycopg 2.4.2, repeatable read
330             #       is remapped to serializable before being
331             #       sent to the database, so it is in fact
332             #       unavailable for use with pg 9.1.
333             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
334                                   if self._serialized \
335                                   else ISOLATION_LEVEL_READ_COMMITTED
336         self._cnx.set_isolation_level(isolation_level)
337
338     @check
339     def commit(self):
340         """ Perform an SQL `COMMIT`
341         """
342         return self._cnx.commit()
343
344     @check
345     def rollback(self):
346         """ Perform an SQL `ROLLBACK`
347         """
348         return self._cnx.rollback()
349
350     def __enter__(self):
351         """ Using the cursor as a contextmanager automatically commits and
352             closes it::
353
354                 with cr:
355                     cr.execute(...)
356
357                 # cr is committed if no failure occurred
358                 # cr is closed in any case
359         """
360         return self
361
362     def __exit__(self, exc_type, exc_value, traceback):
363         if exc_type is None:
364             self.commit()
365         self.close()
366
367     @contextmanager
368     @check
369     def savepoint(self):
370         """context manager entering in a new savepoint"""
371         name = uuid.uuid1().hex
372         self.execute('SAVEPOINT "%s"' % name)
373         try:
374             yield
375             self.execute('RELEASE SAVEPOINT "%s"' % name)
376         except:
377             self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
378             raise
379
380     @check
381     def __getattr__(self, name):
382         return getattr(self._obj, name)
383
384 class TestCursor(Cursor):
385     """ A cursor to be used for tests. It keeps the transaction open across
386         several requests, and simulates committing, rolling back, and closing.
387     """
388     def __init__(self, *args, **kwargs):
389         super(TestCursor, self).__init__(*args, **kwargs)
390         # in order to simulate commit and rollback, the cursor maintains a
391         # savepoint at its last commit
392         self.execute("SAVEPOINT test_cursor")
393         # we use a lock to serialize concurrent requests
394         self._lock = threading.RLock()
395
396     def acquire(self):
397         self._lock.acquire()
398
399     def release(self):
400         self._lock.release()
401
402     def force_close(self):
403         super(TestCursor, self).close()
404
405     def close(self):
406         if not self._closed:
407             self.rollback()             # for stuff that has not been committed
408         self.release()
409
410     def autocommit(self, on):
411         _logger.debug("TestCursor.autocommit(%r) does nothing", on)
412
413     def commit(self):
414         self.execute("RELEASE SAVEPOINT test_cursor")
415         self.execute("SAVEPOINT test_cursor")
416
417     def rollback(self):
418         self.execute("ROLLBACK TO SAVEPOINT test_cursor")
419         self.execute("SAVEPOINT test_cursor")
420
421 class PsycoConnection(psycopg2.extensions.connection):
422     pass
423
424 class ConnectionPool(object):
425     """ The pool of connections to database(s)
426     
427         Keep a set of connections to pg databases open, and reuse them
428         to open cursors for all transactions.
429         
430         The connections are *not* automatically closed. Only a close_db()
431         can trigger that.
432     """
433
434     def locked(fun):
435         @wraps(fun)
436         def _locked(self, *args, **kwargs):
437             self._lock.acquire()
438             try:
439                 return fun(self, *args, **kwargs)
440             finally:
441                 self._lock.release()
442         return _locked
443
444
445     def __init__(self, maxconn=64):
446         self._connections = []
447         self._maxconn = max(maxconn, 1)
448         self._lock = threading.Lock()
449
450     def __repr__(self):
451         used = len([1 for c, u in self._connections[:] if u])
452         count = len(self._connections)
453         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
454
455     def _debug(self, msg, *args):
456         _logger.debug(('%r ' + msg), self, *args)
457
458     @locked
459     def borrow(self, dsn):
460         self._debug('Borrow connection to %r', dsn)
461
462         # free dead and leaked connections
463         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
464             if cnx.closed:
465                 self._connections.pop(i)
466                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
467                 continue
468             if getattr(cnx, 'leaked', False):
469                 delattr(cnx, 'leaked')
470                 self._connections.pop(i)
471                 self._connections.append((cnx, False))
472                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
473
474         for i, (cnx, used) in enumerate(self._connections):
475             if not used and dsn_are_equals(cnx.dsn, dsn):
476                 try:
477                     cnx.reset()
478                 except psycopg2.OperationalError:
479                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
480                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
481                     if not cnx.closed:
482                         cnx.close()
483                     continue
484                 self._connections.pop(i)
485                 self._connections.append((cnx, True))
486                 self._debug('Existing connection found at index %d', i)
487
488                 return cnx
489
490         if len(self._connections) >= self._maxconn:
491             # try to remove the oldest connection not used
492             for i, (cnx, used) in enumerate(self._connections):
493                 if not used:
494                     self._connections.pop(i)
495                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
496                     break
497             else:
498                 # note: this code is called only if the for loop has completed (no break)
499                 raise PoolError('The Connection Pool Is Full')
500
501         try:
502             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
503         except psycopg2.Error:
504             _logger.exception('Connection to the database failed')
505             raise
506         self._connections.append((result, True))
507         self._debug('Create new connection')
508         return result
509
510     @locked
511     def give_back(self, connection, keep_in_pool=True):
512         self._debug('Give back connection to %r', connection.dsn)
513         for i, (cnx, used) in enumerate(self._connections):
514             if cnx is connection:
515                 self._connections.pop(i)
516                 if keep_in_pool:
517                     self._connections.append((cnx, False))
518                     self._debug('Put connection to %r in pool', cnx.dsn)
519                 else:
520                     self._debug('Forgot connection to %r', cnx.dsn)
521                     cnx.close()
522                 break
523         else:
524             raise PoolError('This connection does not below to the pool')
525
526     @locked
527     def close_all(self, dsn=None):
528         _logger.info('%r: Close all connections to %r', self, dsn)
529         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
530             if dsn is None or dsn_are_equals(cnx.dsn, dsn):
531                 cnx.close()
532                 self._connections.pop(i)
533
534
535 class Connection(object):
536     """ A lightweight instance of a connection to postgres
537     """
538
539     def __init__(self, pool, dbname):
540         self.dbname = dbname
541         self.__pool = pool
542
543     def cursor(self, serialized=True):
544         cursor_type = serialized and 'serialized ' or ''
545         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
546         return Cursor(self.__pool, self.dbname, serialized=serialized)
547
548     def test_cursor(self, serialized=True):
549         cursor_type = serialized and 'serialized ' or ''
550         _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
551         return TestCursor(self.__pool, self.dbname, serialized=serialized)
552
553     # serialized_cursor is deprecated - cursors are serialized by default
554     serialized_cursor = cursor
555
556     def __nonzero__(self):
557         """Check if connection is possible"""
558         try:
559             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
560             cr = self.cursor()
561             cr.close()
562             return True
563         except Exception:
564             return False
565
566 def dsn(db_name):
567     _dsn = ''
568     for p in ('host', 'port', 'user', 'password'):
569         cfg = tools.config['db_' + p]
570         if cfg:
571             _dsn += '%s=%s ' % (p, cfg)
572
573     return '%sdbname=%s' % (_dsn, db_name)
574
575 def dsn_are_equals(first, second):
576     def key(dsn):
577         k = dict(x.split('=', 1) for x in dsn.strip().split())
578         k.pop('password', None) # password is not relevant
579         return k
580     return key(first) == key(second)
581
582
583 _Pool = None
584
585 def db_connect(db_name):
586     global _Pool
587     if _Pool is None:
588         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
589     return Connection(_Pool, db_name)
590
591 def close_db(db_name):
592     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
593     global _Pool
594     if _Pool:
595         _Pool.close_all(dsn(db_name))
596
597 def close_all():
598     global _Pool
599     if _Pool:
600         _Pool.close_all()
601
602
603 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
604