[MERGE] forward port of branch 7.0 up to revid 4067 chs@openerp.com-20131114142639...
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30
31 __all__ = ['db_connect', 'close_db']
32
33 from functools import wraps
34 import logging
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41
42 _logger = logging.getLogger(__name__)
43
44 types_mapping = {
45     'date': (1082,),
46     'time': (1083,),
47     'datetime': (1114,),
48 }
49
50 def unbuffer(symb, cr):
51     if symb is None: return None
52     return str(symb)
53
54 def undecimalize(symb, cr):
55     if symb is None: return None
56     return float(symb)
57
58 for name, typeoid in types_mapping.items():
59     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
61
62
63 import tools
64 from tools.func import frame_codeinfo
65 from datetime import datetime as mdt
66 from datetime import timedelta
67 import threading
68 from inspect import currentframe
69
70 import re
71 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
72 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
73
74 sql_counter = 0
75
76 class Cursor(object):
77     """Represents an open transaction to the PostgreSQL DB backend,
78        acting as a lightweight wrapper around psycopg2's
79        ``psycopg1cursor`` objects.
80
81         ``Cursor`` is the object behind the ``cr`` variable used all
82         over the OpenERP code.
83
84         .. rubric:: Transaction Isolation
85
86         One very important property of database transactions is the
87         level of isolation between concurrent transactions.
88         The SQL standard defines four levels of transaction isolation,
89         ranging from the most strict *Serializable* level, to the least
90         strict *Read Uncommitted* level. These levels are defined in
91         terms of the phenomena that must not occur between concurrent
92         transactions, such as *dirty read*, etc.
93         In the context of a generic business data management software
94         such as OpenERP, we need the best guarantees that no data
95         corruption can ever be cause by simply running multiple
96         transactions in parallel. Therefore, the preferred level would
97         be the *serializable* level, which ensures that a set of
98         transactions is guaranteed to produce the same effect as
99         running them one at a time in some order.
100
101         However, most database management systems implement a limited
102         serializable isolation in the form of
103         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
104         providing most of the same advantages as True Serializability,
105         with a fraction of the performance cost.
106         With PostgreSQL up to version 9.0, this snapshot isolation was
107         the implementation of both the ``REPEATABLE READ`` and
108         ``SERIALIZABLE`` levels of the SQL standard.
109         As of PostgreSQL 9.1, the previous snapshot isolation implementation
110         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
111         level was introduced, providing some additional heuristics to
112         detect a concurrent update by parallel transactions, and forcing
113         one of them to rollback.
114
115         OpenERP implements its own level of locking protection
116         for transactions that are highly likely to provoke concurrent
117         updates, such as stock reservations or document sequences updates.
118         Therefore we mostly care about the properties of snapshot isolation,
119         but we don't really need additional heuristics to trigger transaction
120         rollbacks, as we are taking care of triggering instant rollbacks
121         ourselves when it matters (and we can save the additional performance
122         hit of these heuristics).
123
124         As a result of the above, we have selected ``REPEATABLE READ`` as
125         the default transaction isolation level for OpenERP cursors, as
126         it will be mapped to the desired ``snapshot isolation`` level for
127         all supported PostgreSQL version (8.3 - 9.x).
128
129         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
130         read level to serializable before sending it to the database, so it would
131         actually select the new serializable mode on PostgreSQL 9.1. Make
132         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
133         the performance hit is a concern for you.
134
135         .. attribute:: cache
136
137             Cache dictionary with a "request" (-ish) lifecycle, only lives as
138             long as the cursor itself does and proactively cleared when the
139             cursor is closed.
140
141             This cache should *only* be used to store repeatable reads as it
142             ignores rollbacks and savepoints, it should not be used to store
143             *any* data which may be modified during the life of the cursor.
144
145     """
146     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147
148     def check(f):
149         @wraps(f)
150         def wrapper(self, *args, **kwargs):
151             if self.__closed:
152                 msg = 'Unable to use a closed cursor.'
153                 if self.__closer:
154                     msg += ' It was closed at %s, line %s' % self.__closer
155                 raise psycopg2.OperationalError(msg)
156             return f(self, *args, **kwargs)
157         return wrapper
158
159     def __init__(self, pool, dbname, serialized=True):
160         self.sql_from_log = {}
161         self.sql_into_log = {}
162
163         # default log level determined at cursor creation, could be
164         # overridden later for debugging purposes
165         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
166
167         self.sql_log_count = 0
168         self.__closed = True    # avoid the call of close() (by __del__) if an exception
169                                 # is raised by any of the following initialisations
170         self._pool = pool
171         self.dbname = dbname
172
173         # Whether to enable snapshot isolation level for this cursor.
174         # see also the docstring of Cursor.  
175         self._serialized = serialized
176
177         self._cnx = pool.borrow(dsn(dbname))
178         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
179         if self.sql_log:
180             self.__caller = frame_codeinfo(currentframe(),2)
181         else:
182             self.__caller = False
183         self.__closed = False   # real initialisation value
184         self.autocommit(False)
185         self.__closer = False
186
187         self._default_log_exceptions = True
188
189         self.cache = {}
190
191     def __del__(self):
192         if not self.__closed and not self._cnx.closed:
193             # Oops. 'self' has not been closed explicitly.
194             # The cursor will be deleted by the garbage collector,
195             # but the database connection is not put back into the connection
196             # pool, preventing some operation on the database like dropping it.
197             # This can also lead to a server overload.
198             msg = "Cursor not closed explicitly\n"
199             if self.__caller:
200                 msg += "Cursor was created at %s:%s" % self.__caller
201             else:
202                 msg += "Please enable sql debugging to trace the caller."
203             _logger.warning(msg)
204             self._close(True)
205
206     @check
207     def execute(self, query, params=None, log_exceptions=None):
208         if '%d' in query or '%f' in query:
209             _logger.warning(query)
210             _logger.warning("SQL queries cannot contain %d or %f anymore. "
211                          "Use only %s")
212         if params and not isinstance(params, (tuple, list, dict)):
213             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
214             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
215
216         if self.sql_log:
217             now = mdt.now()
218
219         try:
220             params = params or None
221             res = self._obj.execute(query, params)
222         except psycopg2.ProgrammingError, pe:
223             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
224                 _logger.error("Programming error: %s, in query %s", pe, query)
225             raise
226         except Exception:
227             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
228                 _logger.exception("bad query: %s", self._obj.query or query)
229             raise
230
231         if self.sql_log:
232             delay = mdt.now() - now
233             delay = delay.seconds * 1E6 + delay.microseconds
234
235             _logger.debug("query: %s", self._obj.query)
236             self.sql_log_count+=1
237             res_from = re_from.match(query.lower())
238             if res_from:
239                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
240                 self.sql_from_log[res_from.group(1)][0] += 1
241                 self.sql_from_log[res_from.group(1)][1] += delay
242             res_into = re_into.match(query.lower())
243             if res_into:
244                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
245                 self.sql_into_log[res_into.group(1)][0] += 1
246                 self.sql_into_log[res_into.group(1)][1] += delay
247         return res
248
249
250     def split_for_in_conditions(self, ids):
251         """Split a list of identifiers into one or more smaller tuples
252            safe for IN conditions, after uniquifying them."""
253         return tools.misc.split_every(self.IN_MAX, set(ids))
254
255     def print_log(self):
256         global sql_counter
257         sql_counter += self.sql_log_count
258         if not self.sql_log:
259             return
260         def process(type):
261             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
262             sum = 0
263             if sqllogs[type]:
264                 sqllogitems = sqllogs[type].items()
265                 sqllogitems.sort(key=lambda k: k[1][1])
266                 _logger.debug("SQL LOG %s:", type)
267                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
268                 for r in sqllogitems:
269                     delay = timedelta(microseconds=r[1][1])
270                     _logger.debug("table: %s: %s/%s",
271                                         r[0], delay, r[1][0])
272                     sum+= r[1][1]
273                 sqllogs[type].clear()
274             sum = timedelta(microseconds=sum)
275             _logger.debug("SUM %s:%s/%d [%d]",
276                                 type, sum, self.sql_log_count, sql_counter)
277             sqllogs[type].clear()
278         process('from')
279         process('into')
280         self.sql_log_count = 0
281         self.sql_log = False
282
283     @check
284     def close(self):
285         return self._close(False)
286
287     def _close(self, leak=False):
288         if not self._obj:
289             return
290
291         del self.cache
292
293         if self.sql_log:
294             self.__closer = frame_codeinfo(currentframe(),3)
295         self.print_log()
296
297         self._obj.close()
298
299         # This force the cursor to be freed, and thus, available again. It is
300         # important because otherwise we can overload the server very easily
301         # because of a cursor shortage (because cursors are not garbage
302         # collected as fast as they should). The problem is probably due in
303         # part because browse records keep a reference to the cursor.
304         del self._obj
305         self.__closed = True
306
307         # Clean the underlying connection.
308         self._cnx.rollback()
309
310         if leak:
311             self._cnx.leaked = True
312         else:
313             chosen_template = tools.config['db_template']
314             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
315             keep_in_pool = self.dbname not in templates_list
316             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
317
318     @check
319     def autocommit(self, on):
320         if on:
321             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
322         else:
323             # If a serializable cursor was requested, we
324             # use the appropriate PotsgreSQL isolation level
325             # that maps to snaphsot isolation.
326             # For all supported PostgreSQL versions (8.3-9.x),
327             # this is currently the ISOLATION_REPEATABLE_READ.
328             # See also the docstring of this class.
329             # NOTE: up to psycopg 2.4.2, repeatable read
330             #       is remapped to serializable before being
331             #       sent to the database, so it is in fact
332             #       unavailable for use with pg 9.1.
333             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
334                                   if self._serialized \
335                                   else ISOLATION_LEVEL_READ_COMMITTED
336         self._cnx.set_isolation_level(isolation_level)
337
338     @check
339     def commit(self):
340         """ Perform an SQL `COMMIT`
341         """
342         return self._cnx.commit()
343
344     @check
345     def rollback(self):
346         """ Perform an SQL `ROLLBACK`
347         """
348         return self._cnx.rollback()
349
350     @check
351     def __getattr__(self, name):
352         return getattr(self._obj, name)
353
354 class PsycoConnection(psycopg2.extensions.connection):
355     pass
356
357 class ConnectionPool(object):
358     """ The pool of connections to database(s)
359     
360         Keep a set of connections to pg databases open, and reuse them
361         to open cursors for all transactions.
362         
363         The connections are *not* automatically closed. Only a close_db()
364         can trigger that.
365     """
366
367     def locked(fun):
368         @wraps(fun)
369         def _locked(self, *args, **kwargs):
370             self._lock.acquire()
371             try:
372                 return fun(self, *args, **kwargs)
373             finally:
374                 self._lock.release()
375         return _locked
376
377
378     def __init__(self, maxconn=64):
379         self._connections = []
380         self._maxconn = max(maxconn, 1)
381         self._lock = threading.Lock()
382
383     def __repr__(self):
384         used = len([1 for c, u in self._connections[:] if u])
385         count = len(self._connections)
386         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
387
388     def _debug(self, msg, *args):
389         _logger.debug(('%r ' + msg), self, *args)
390
391     @locked
392     def borrow(self, dsn):
393         self._debug('Borrow connection to %r', dsn)
394
395         # free dead and leaked connections
396         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
397             if cnx.closed:
398                 self._connections.pop(i)
399                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
400                 continue
401             if getattr(cnx, 'leaked', False):
402                 delattr(cnx, 'leaked')
403                 self._connections.pop(i)
404                 self._connections.append((cnx, False))
405                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
406
407         for i, (cnx, used) in enumerate(self._connections):
408             if not used and dsn_are_equals(cnx.dsn, dsn):
409                 try:
410                     cnx.reset()
411                 except psycopg2.OperationalError:
412                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
413                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
414                     if not cnx.closed:
415                         cnx.close()
416                     continue
417                 self._connections.pop(i)
418                 self._connections.append((cnx, True))
419                 self._debug('Existing connection found at index %d', i)
420
421                 return cnx
422
423         if len(self._connections) >= self._maxconn:
424             # try to remove the oldest connection not used
425             for i, (cnx, used) in enumerate(self._connections):
426                 if not used:
427                     self._connections.pop(i)
428                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
429                     break
430             else:
431                 # note: this code is called only if the for loop has completed (no break)
432                 raise PoolError('The Connection Pool Is Full')
433
434         try:
435             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
436         except psycopg2.Error:
437             _logger.exception('Connection to the database failed')
438             raise
439         self._connections.append((result, True))
440         self._debug('Create new connection')
441         return result
442
443     @locked
444     def give_back(self, connection, keep_in_pool=True):
445         self._debug('Give back connection to %r', connection.dsn)
446         for i, (cnx, used) in enumerate(self._connections):
447             if cnx is connection:
448                 self._connections.pop(i)
449                 if keep_in_pool:
450                     self._connections.append((cnx, False))
451                     self._debug('Put connection to %r in pool', cnx.dsn)
452                 else:
453                     self._debug('Forgot connection to %r', cnx.dsn)
454                     cnx.close()
455                 break
456         else:
457             raise PoolError('This connection does not below to the pool')
458
459     @locked
460     def close_all(self, dsn):
461         _logger.info('%r: Close all connections to %r', self, dsn)
462         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
463             if dsn_are_equals(cnx.dsn, dsn):
464                 cnx.close()
465                 self._connections.pop(i)
466
467
468 class Connection(object):
469     """ A lightweight instance of a connection to postgres
470     """
471
472     def __init__(self, pool, dbname):
473         self.dbname = dbname
474         self._pool = pool
475
476     def cursor(self, serialized=True):
477         cursor_type = serialized and 'serialized ' or ''
478         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
479         return Cursor(self._pool, self.dbname, serialized=serialized)
480
481     # serialized_cursor is deprecated - cursors are serialized by default
482     serialized_cursor = cursor
483
484     def __nonzero__(self):
485         """Check if connection is possible"""
486         try:
487             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
488             cr = self.cursor()
489             cr.close()
490             return True
491         except Exception:
492             return False
493
494 def dsn(db_name):
495     _dsn = ''
496     for p in ('host', 'port', 'user', 'password'):
497         cfg = tools.config['db_' + p]
498         if cfg:
499             _dsn += '%s=%s ' % (p, cfg)
500
501     return '%sdbname=%s' % (_dsn, db_name)
502
503 def dsn_are_equals(first, second):
504     def key(dsn):
505         k = dict(x.split('=', 1) for x in dsn.strip().split())
506         k.pop('password', None) # password is not relevant
507         return k
508     return key(first) == key(second)
509
510
511 _Pool = None
512
513 def db_connect(db_name):
514     global _Pool
515     if _Pool is None:
516         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
517     return Connection(_Pool, db_name)
518
519 def close_db(db_name):
520     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
521     global _Pool
522     if _Pool:
523         _Pool.close_all(dsn(db_name))
524
525
526 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
527