[FIX] convert endpoint responses only if it's the current request endpoint
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30 from functools import wraps
31 import logging
32 import psycopg2.extensions
33 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
34 from psycopg2.pool import PoolError
35 from psycopg2.psycopg1 import cursor as psycopg1cursor
36
37 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
38
39 _logger = logging.getLogger(__name__)
40
41 types_mapping = {
42     'date': (1082,),
43     'time': (1083,),
44     'datetime': (1114,),
45 }
46
47 def unbuffer(symb, cr):
48     if symb is None: return None
49     return str(symb)
50
51 def undecimalize(symb, cr):
52     if symb is None: return None
53     return float(symb)
54
55 for name, typeoid in types_mapping.items():
56     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
57 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
58
59
60 import tools
61 from tools.func import frame_codeinfo
62 from datetime import datetime as mdt
63 from datetime import timedelta
64 import threading
65 from inspect import currentframe
66
67 import re
68 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
69 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
70
71 sql_counter = 0
72
73 class Cursor(object):
74     """Represents an open transaction to the PostgreSQL DB backend,
75        acting as a lightweight wrapper around psycopg2's
76        ``psycopg1cursor`` objects.
77
78         ``Cursor`` is the object behind the ``cr`` variable used all
79         over the OpenERP code.
80
81         .. rubric:: Transaction Isolation
82
83         One very important property of database transactions is the
84         level of isolation between concurrent transactions.
85         The SQL standard defines four levels of transaction isolation,
86         ranging from the most strict *Serializable* level, to the least
87         strict *Read Uncommitted* level. These levels are defined in
88         terms of the phenomena that must not occur between concurrent
89         transactions, such as *dirty read*, etc.
90         In the context of a generic business data management software
91         such as OpenERP, we need the best guarantees that no data
92         corruption can ever be cause by simply running multiple
93         transactions in parallel. Therefore, the preferred level would
94         be the *serializable* level, which ensures that a set of
95         transactions is guaranteed to produce the same effect as
96         running them one at a time in some order.
97
98         However, most database management systems implement a limited
99         serializable isolation in the form of
100         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
101         providing most of the same advantages as True Serializability,
102         with a fraction of the performance cost.
103         With PostgreSQL up to version 9.0, this snapshot isolation was
104         the implementation of both the ``REPEATABLE READ`` and
105         ``SERIALIZABLE`` levels of the SQL standard.
106         As of PostgreSQL 9.1, the previous snapshot isolation implementation
107         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
108         level was introduced, providing some additional heuristics to
109         detect a concurrent update by parallel transactions, and forcing
110         one of them to rollback.
111
112         OpenERP implements its own level of locking protection
113         for transactions that are highly likely to provoke concurrent
114         updates, such as stock reservations or document sequences updates.
115         Therefore we mostly care about the properties of snapshot isolation,
116         but we don't really need additional heuristics to trigger transaction
117         rollbacks, as we are taking care of triggering instant rollbacks
118         ourselves when it matters (and we can save the additional performance
119         hit of these heuristics).
120
121         As a result of the above, we have selected ``REPEATABLE READ`` as
122         the default transaction isolation level for OpenERP cursors, as
123         it will be mapped to the desired ``snapshot isolation`` level for
124         all supported PostgreSQL version (8.3 - 9.x).
125
126         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
127         read level to serializable before sending it to the database, so it would
128         actually select the new serializable mode on PostgreSQL 9.1. Make
129         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
130         the performance hit is a concern for you.
131
132         .. attribute:: cache
133
134             Cache dictionary with a "request" (-ish) lifecycle, only lives as
135             long as the cursor itself does and proactively cleared when the
136             cursor is closed.
137
138             This cache should *only* be used to store repeatable reads as it
139             ignores rollbacks and savepoints, it should not be used to store
140             *any* data which may be modified during the life of the cursor.
141
142     """
143     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
144
145     def check(f):
146         @wraps(f)
147         def wrapper(self, *args, **kwargs):
148             if self.__closed:
149                 msg = 'Unable to use a closed cursor.'
150                 if self.__closer:
151                     msg += ' It was closed at %s, line %s' % self.__closer
152                 raise psycopg2.OperationalError(msg)
153             return f(self, *args, **kwargs)
154         return wrapper
155
156     def __init__(self, pool, dbname, serialized=True):
157         self.sql_from_log = {}
158         self.sql_into_log = {}
159
160         # default log level determined at cursor creation, could be
161         # overridden later for debugging purposes
162         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
163
164         self.sql_log_count = 0
165         self.__closed = True    # avoid the call of close() (by __del__) if an exception
166                                 # is raised by any of the following initialisations
167         self._pool = pool
168         self.dbname = dbname
169
170         # Whether to enable snapshot isolation level for this cursor.
171         # see also the docstring of Cursor.  
172         self._serialized = serialized
173
174         self._cnx = pool.borrow(dsn(dbname))
175         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
176         if self.sql_log:
177             self.__caller = frame_codeinfo(currentframe(),2)
178         else:
179             self.__caller = False
180         self.__closed = False   # real initialisation value
181         self.autocommit(False)
182         self.__closer = False
183
184         self._default_log_exceptions = True
185
186         self.cache = {}
187
188     def __del__(self):
189         if not self.__closed and not self._cnx.closed:
190             # Oops. 'self' has not been closed explicitly.
191             # The cursor will be deleted by the garbage collector,
192             # but the database connection is not put back into the connection
193             # pool, preventing some operation on the database like dropping it.
194             # This can also lead to a server overload.
195             msg = "Cursor not closed explicitly\n"
196             if self.__caller:
197                 msg += "Cursor was created at %s:%s" % self.__caller
198             else:
199                 msg += "Please enable sql debugging to trace the caller."
200             _logger.warning(msg)
201             self._close(True)
202
203     @check
204     def execute(self, query, params=None, log_exceptions=None):
205         if '%d' in query or '%f' in query:
206             _logger.warning(query)
207             _logger.warning("SQL queries cannot contain %d or %f anymore. "
208                          "Use only %s")
209         if params and not isinstance(params, (tuple, list, dict)):
210             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
211             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
212
213         if self.sql_log:
214             now = mdt.now()
215
216         try:
217             params = params or None
218             res = self._obj.execute(query, params)
219         except psycopg2.ProgrammingError, pe:
220             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
221                 _logger.error("Programming error: %s, in query %s", pe, query)
222             raise
223         except Exception:
224             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
225                 _logger.exception("bad query: %s", self._obj.query or query)
226             raise
227
228         if self.sql_log:
229             delay = mdt.now() - now
230             delay = delay.seconds * 1E6 + delay.microseconds
231
232             _logger.debug("query: %s", self._obj.query)
233             self.sql_log_count+=1
234             res_from = re_from.match(query.lower())
235             if res_from:
236                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
237                 self.sql_from_log[res_from.group(1)][0] += 1
238                 self.sql_from_log[res_from.group(1)][1] += delay
239             res_into = re_into.match(query.lower())
240             if res_into:
241                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
242                 self.sql_into_log[res_into.group(1)][0] += 1
243                 self.sql_into_log[res_into.group(1)][1] += delay
244         return res
245
246
247     def split_for_in_conditions(self, ids):
248         """Split a list of identifiers into one or more smaller tuples
249            safe for IN conditions, after uniquifying them."""
250         return tools.misc.split_every(self.IN_MAX, set(ids))
251
252     def print_log(self):
253         global sql_counter
254         sql_counter += self.sql_log_count
255         if not self.sql_log:
256             return
257         def process(type):
258             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
259             sum = 0
260             if sqllogs[type]:
261                 sqllogitems = sqllogs[type].items()
262                 sqllogitems.sort(key=lambda k: k[1][1])
263                 _logger.debug("SQL LOG %s:", type)
264                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
265                 for r in sqllogitems:
266                     delay = timedelta(microseconds=r[1][1])
267                     _logger.debug("table: %s: %s/%s",
268                                         r[0], delay, r[1][0])
269                     sum+= r[1][1]
270                 sqllogs[type].clear()
271             sum = timedelta(microseconds=sum)
272             _logger.debug("SUM %s:%s/%d [%d]",
273                                 type, sum, self.sql_log_count, sql_counter)
274             sqllogs[type].clear()
275         process('from')
276         process('into')
277         self.sql_log_count = 0
278         self.sql_log = False
279
280     @check
281     def close(self):
282         return self._close(False)
283
284     def _close(self, leak=False):
285         if not self._obj:
286             return
287
288         del self.cache
289
290         if self.sql_log:
291             self.__closer = frame_codeinfo(currentframe(),3)
292         self.print_log()
293
294         self._obj.close()
295
296         # This force the cursor to be freed, and thus, available again. It is
297         # important because otherwise we can overload the server very easily
298         # because of a cursor shortage (because cursors are not garbage
299         # collected as fast as they should). The problem is probably due in
300         # part because browse records keep a reference to the cursor.
301         del self._obj
302         self.__closed = True
303
304         # Clean the underlying connection.
305         self._cnx.rollback()
306
307         if leak:
308             self._cnx.leaked = True
309         else:
310             chosen_template = tools.config['db_template']
311             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
312             keep_in_pool = self.dbname not in templates_list
313             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
314
315     @check
316     def autocommit(self, on):
317         if on:
318             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
319         else:
320             # If a serializable cursor was requested, we
321             # use the appropriate PotsgreSQL isolation level
322             # that maps to snaphsot isolation.
323             # For all supported PostgreSQL versions (8.3-9.x),
324             # this is currently the ISOLATION_REPEATABLE_READ.
325             # See also the docstring of this class.
326             # NOTE: up to psycopg 2.4.2, repeatable read
327             #       is remapped to serializable before being
328             #       sent to the database, so it is in fact
329             #       unavailable for use with pg 9.1.
330             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
331                                   if self._serialized \
332                                   else ISOLATION_LEVEL_READ_COMMITTED
333         self._cnx.set_isolation_level(isolation_level)
334
335     @check
336     def commit(self):
337         """ Perform an SQL `COMMIT`
338         """
339         return self._cnx.commit()
340
341     @check
342     def rollback(self):
343         """ Perform an SQL `ROLLBACK`
344         """
345         return self._cnx.rollback()
346
347     @check
348     def __getattr__(self, name):
349         return getattr(self._obj, name)
350
351 class PsycoConnection(psycopg2.extensions.connection):
352     pass
353
354 class ConnectionPool(object):
355     """ The pool of connections to database(s)
356     
357         Keep a set of connections to pg databases open, and reuse them
358         to open cursors for all transactions.
359         
360         The connections are *not* automatically closed. Only a close_db()
361         can trigger that.
362     """
363
364     def locked(fun):
365         @wraps(fun)
366         def _locked(self, *args, **kwargs):
367             self._lock.acquire()
368             try:
369                 return fun(self, *args, **kwargs)
370             finally:
371                 self._lock.release()
372         return _locked
373
374
375     def __init__(self, maxconn=64):
376         self._connections = []
377         self._maxconn = max(maxconn, 1)
378         self._lock = threading.Lock()
379
380     def __repr__(self):
381         used = len([1 for c, u in self._connections[:] if u])
382         count = len(self._connections)
383         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
384
385     def _debug(self, msg, *args):
386         _logger.debug(('%r ' + msg), self, *args)
387
388     @locked
389     def borrow(self, dsn):
390         self._debug('Borrow connection to %r', dsn)
391
392         # free dead and leaked connections
393         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
394             if cnx.closed:
395                 self._connections.pop(i)
396                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
397                 continue
398             if getattr(cnx, 'leaked', False):
399                 delattr(cnx, 'leaked')
400                 self._connections.pop(i)
401                 self._connections.append((cnx, False))
402                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
403
404         for i, (cnx, used) in enumerate(self._connections):
405             if not used and dsn_are_equals(cnx.dsn, dsn):
406                 try:
407                     cnx.reset()
408                 except psycopg2.OperationalError:
409                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
410                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
411                     if not cnx.closed:
412                         cnx.close()
413                     continue
414                 self._connections.pop(i)
415                 self._connections.append((cnx, True))
416                 self._debug('Existing connection found at index %d', i)
417
418                 return cnx
419
420         if len(self._connections) >= self._maxconn:
421             # try to remove the oldest connection not used
422             for i, (cnx, used) in enumerate(self._connections):
423                 if not used:
424                     self._connections.pop(i)
425                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
426                     break
427             else:
428                 # note: this code is called only if the for loop has completed (no break)
429                 raise PoolError('The Connection Pool Is Full')
430
431         try:
432             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
433         except psycopg2.Error:
434             _logger.exception('Connection to the database failed')
435             raise
436         self._connections.append((result, True))
437         self._debug('Create new connection')
438         return result
439
440     @locked
441     def give_back(self, connection, keep_in_pool=True):
442         self._debug('Give back connection to %r', connection.dsn)
443         for i, (cnx, used) in enumerate(self._connections):
444             if cnx is connection:
445                 self._connections.pop(i)
446                 if keep_in_pool:
447                     self._connections.append((cnx, False))
448                     self._debug('Put connection to %r in pool', cnx.dsn)
449                 else:
450                     self._debug('Forgot connection to %r', cnx.dsn)
451                     cnx.close()
452                 break
453         else:
454             raise PoolError('This connection does not below to the pool')
455
456     @locked
457     def close_all(self, dsn=None):
458         _logger.info('%r: Close all connections to %r', self, dsn)
459         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
460             if dsn is None or dsn_are_equals(cnx.dsn, dsn):
461                 cnx.close()
462                 self._connections.pop(i)
463
464
465 class Connection(object):
466     """ A lightweight instance of a connection to postgres
467     """
468
469     def __init__(self, pool, dbname):
470         self.dbname = dbname
471         self._pool = pool
472
473     def cursor(self, serialized=True):
474         cursor_type = serialized and 'serialized ' or ''
475         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
476         return Cursor(self._pool, self.dbname, serialized=serialized)
477
478     # serialized_cursor is deprecated - cursors are serialized by default
479     serialized_cursor = cursor
480
481     def __nonzero__(self):
482         """Check if connection is possible"""
483         try:
484             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
485             cr = self.cursor()
486             cr.close()
487             return True
488         except Exception:
489             return False
490
491 def dsn(db_name):
492     _dsn = ''
493     for p in ('host', 'port', 'user', 'password'):
494         cfg = tools.config['db_' + p]
495         if cfg:
496             _dsn += '%s=%s ' % (p, cfg)
497
498     return '%sdbname=%s' % (_dsn, db_name)
499
500 def dsn_are_equals(first, second):
501     def key(dsn):
502         k = dict(x.split('=', 1) for x in dsn.strip().split())
503         k.pop('password', None) # password is not relevant
504         return k
505     return key(first) == key(second)
506
507
508 _Pool = None
509
510 def db_connect(db_name):
511     global _Pool
512     if _Pool is None:
513         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
514     return Connection(_Pool, db_name)
515
516 def close_db(db_name):
517     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
518     global _Pool
519     if _Pool:
520         _Pool.close_all(dsn(db_name))
521
522 def close_all():
523     global _Pool
524     if _Pool:
525         _Pool.close_all()
526
527
528 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
529