[FIX] gamification: call _send_badge on right object
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30 from contextlib import contextmanager
31 from functools import wraps
32 import logging
33 import time
34 import uuid
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39
40 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41
42 _logger = logging.getLogger(__name__)
43
44 types_mapping = {
45     'date': (1082,),
46     'time': (1083,),
47     'datetime': (1114,),
48 }
49
50 def unbuffer(symb, cr):
51     if symb is None: return None
52     return str(symb)
53
54 def undecimalize(symb, cr):
55     if symb is None: return None
56     return float(symb)
57
58 for name, typeoid in types_mapping.items():
59     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
60 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
61
62
63 import tools
64 from tools.func import frame_codeinfo
65 from datetime import datetime as mdt
66 from datetime import timedelta
67 import threading
68 from inspect import currentframe
69
70 import re
71 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
72 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
73
74 sql_counter = 0
75
76 class Cursor(object):
77     """Represents an open transaction to the PostgreSQL DB backend,
78        acting as a lightweight wrapper around psycopg2's
79        ``psycopg1cursor`` objects.
80
81         ``Cursor`` is the object behind the ``cr`` variable used all
82         over the OpenERP code.
83
84         .. rubric:: Transaction Isolation
85
86         One very important property of database transactions is the
87         level of isolation between concurrent transactions.
88         The SQL standard defines four levels of transaction isolation,
89         ranging from the most strict *Serializable* level, to the least
90         strict *Read Uncommitted* level. These levels are defined in
91         terms of the phenomena that must not occur between concurrent
92         transactions, such as *dirty read*, etc.
93         In the context of a generic business data management software
94         such as OpenERP, we need the best guarantees that no data
95         corruption can ever be cause by simply running multiple
96         transactions in parallel. Therefore, the preferred level would
97         be the *serializable* level, which ensures that a set of
98         transactions is guaranteed to produce the same effect as
99         running them one at a time in some order.
100
101         However, most database management systems implement a limited
102         serializable isolation in the form of
103         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
104         providing most of the same advantages as True Serializability,
105         with a fraction of the performance cost.
106         With PostgreSQL up to version 9.0, this snapshot isolation was
107         the implementation of both the ``REPEATABLE READ`` and
108         ``SERIALIZABLE`` levels of the SQL standard.
109         As of PostgreSQL 9.1, the previous snapshot isolation implementation
110         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
111         level was introduced, providing some additional heuristics to
112         detect a concurrent update by parallel transactions, and forcing
113         one of them to rollback.
114
115         OpenERP implements its own level of locking protection
116         for transactions that are highly likely to provoke concurrent
117         updates, such as stock reservations or document sequences updates.
118         Therefore we mostly care about the properties of snapshot isolation,
119         but we don't really need additional heuristics to trigger transaction
120         rollbacks, as we are taking care of triggering instant rollbacks
121         ourselves when it matters (and we can save the additional performance
122         hit of these heuristics).
123
124         As a result of the above, we have selected ``REPEATABLE READ`` as
125         the default transaction isolation level for OpenERP cursors, as
126         it will be mapped to the desired ``snapshot isolation`` level for
127         all supported PostgreSQL version (8.3 - 9.x).
128
129         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
130         read level to serializable before sending it to the database, so it would
131         actually select the new serializable mode on PostgreSQL 9.1. Make
132         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
133         the performance hit is a concern for you.
134
135         .. attribute:: cache
136
137             Cache dictionary with a "request" (-ish) lifecycle, only lives as
138             long as the cursor itself does and proactively cleared when the
139             cursor is closed.
140
141             This cache should *only* be used to store repeatable reads as it
142             ignores rollbacks and savepoints, it should not be used to store
143             *any* data which may be modified during the life of the cursor.
144
145     """
146     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
147
148     def check(f):
149         @wraps(f)
150         def wrapper(self, *args, **kwargs):
151             if self.__closed:
152                 msg = 'Unable to use a closed cursor.'
153                 if self.__closer:
154                     msg += ' It was closed at %s, line %s' % self.__closer
155                 raise psycopg2.OperationalError(msg)
156             return f(self, *args, **kwargs)
157         return wrapper
158
159     def __init__(self, pool, dbname, serialized=True):
160         self.sql_from_log = {}
161         self.sql_into_log = {}
162
163         # default log level determined at cursor creation, could be
164         # overridden later for debugging purposes
165         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
166
167         self.sql_log_count = 0
168         self.__closed = True    # avoid the call of close() (by __del__) if an exception
169                                 # is raised by any of the following initialisations
170         self.__pool = pool
171         self.dbname = dbname
172
173         # Whether to enable snapshot isolation level for this cursor.
174         # see also the docstring of Cursor.  
175         self._serialized = serialized
176
177         self._cnx = pool.borrow(dsn(dbname))
178         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
179         if self.sql_log:
180             self.__caller = frame_codeinfo(currentframe(),2)
181         else:
182             self.__caller = False
183         self.__closed = False   # real initialisation value
184         self.autocommit(False)
185         self.__closer = False
186
187         self._default_log_exceptions = True
188
189         self.cache = {}
190
191     def __del__(self):
192         if not self.__closed and not self._cnx.closed:
193             # Oops. 'self' has not been closed explicitly.
194             # The cursor will be deleted by the garbage collector,
195             # but the database connection is not put back into the connection
196             # pool, preventing some operation on the database like dropping it.
197             # This can also lead to a server overload.
198             msg = "Cursor not closed explicitly\n"
199             if self.__caller:
200                 msg += "Cursor was created at %s:%s" % self.__caller
201             else:
202                 msg += "Please enable sql debugging to trace the caller."
203             _logger.warning(msg)
204             self._close(True)
205
206     @check
207     def execute(self, query, params=None, log_exceptions=None):
208         if '%d' in query or '%f' in query:
209             _logger.warning(query)
210             _logger.warning("SQL queries cannot contain %d or %f anymore. "
211                          "Use only %s")
212         if params and not isinstance(params, (tuple, list, dict)):
213             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
214             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
215
216         if self.sql_log:
217             now = mdt.now()
218
219         try:
220             params = params or None
221             res = self._obj.execute(query, params)
222         except psycopg2.ProgrammingError, pe:
223             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
224                 _logger.error("Programming error: %s, in query %s", pe, query)
225             raise
226         except Exception:
227             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
228                 _logger.exception("bad query: %s", self._obj.query or query)
229             raise
230
231         if self.sql_log:
232             delay = mdt.now() - now
233             delay = delay.seconds * 1E6 + delay.microseconds
234
235             _logger.debug("query: %s", self._obj.query)
236             self.sql_log_count+=1
237             res_from = re_from.match(query.lower())
238             if res_from:
239                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
240                 self.sql_from_log[res_from.group(1)][0] += 1
241                 self.sql_from_log[res_from.group(1)][1] += delay
242             res_into = re_into.match(query.lower())
243             if res_into:
244                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
245                 self.sql_into_log[res_into.group(1)][0] += 1
246                 self.sql_into_log[res_into.group(1)][1] += delay
247         return res
248
249
250     def split_for_in_conditions(self, ids):
251         """Split a list of identifiers into one or more smaller tuples
252            safe for IN conditions, after uniquifying them."""
253         return tools.misc.split_every(self.IN_MAX, set(ids))
254
255     def print_log(self):
256         global sql_counter
257         sql_counter += self.sql_log_count
258         if not self.sql_log:
259             return
260         def process(type):
261             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
262             sum = 0
263             if sqllogs[type]:
264                 sqllogitems = sqllogs[type].items()
265                 sqllogitems.sort(key=lambda k: k[1][1])
266                 _logger.debug("SQL LOG %s:", type)
267                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
268                 for r in sqllogitems:
269                     delay = timedelta(microseconds=r[1][1])
270                     _logger.debug("table: %s: %s/%s",
271                                         r[0], delay, r[1][0])
272                     sum+= r[1][1]
273                 sqllogs[type].clear()
274             sum = timedelta(microseconds=sum)
275             _logger.debug("SUM %s:%s/%d [%d]",
276                                 type, sum, self.sql_log_count, sql_counter)
277             sqllogs[type].clear()
278         process('from')
279         process('into')
280         self.sql_log_count = 0
281         self.sql_log = False
282
283     @check
284     def close(self):
285         return self._close(False)
286
287     def _close(self, leak=False):
288         if not self._obj:
289             return
290
291         del self.cache
292
293         if self.sql_log:
294             self.__closer = frame_codeinfo(currentframe(),3)
295         self.print_log()
296
297         self._obj.close()
298
299         # This force the cursor to be freed, and thus, available again. It is
300         # important because otherwise we can overload the server very easily
301         # because of a cursor shortage (because cursors are not garbage
302         # collected as fast as they should). The problem is probably due in
303         # part because browse records keep a reference to the cursor.
304         del self._obj
305         self.__closed = True
306
307         # Clean the underlying connection.
308         self._cnx.rollback()
309
310         if leak:
311             self._cnx.leaked = True
312         else:
313             chosen_template = tools.config['db_template']
314             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
315             keep_in_pool = self.dbname not in templates_list
316             self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
317
318     @check
319     def autocommit(self, on):
320         if on:
321             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
322         else:
323             # If a serializable cursor was requested, we
324             # use the appropriate PotsgreSQL isolation level
325             # that maps to snaphsot isolation.
326             # For all supported PostgreSQL versions (8.3-9.x),
327             # this is currently the ISOLATION_REPEATABLE_READ.
328             # See also the docstring of this class.
329             # NOTE: up to psycopg 2.4.2, repeatable read
330             #       is remapped to serializable before being
331             #       sent to the database, so it is in fact
332             #       unavailable for use with pg 9.1.
333             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
334                                   if self._serialized \
335                                   else ISOLATION_LEVEL_READ_COMMITTED
336         self._cnx.set_isolation_level(isolation_level)
337
338     @check
339     def commit(self):
340         """ Perform an SQL `COMMIT`
341         """
342         return self._cnx.commit()
343
344     @check
345     def rollback(self):
346         """ Perform an SQL `ROLLBACK`
347         """
348         return self._cnx.rollback()
349
350     @contextmanager
351     @check
352     def savepoint(self):
353         """context manager entering in a new savepoint"""
354         name = uuid.uuid1().hex
355         self.execute('SAVEPOINT "%s"' % name)
356         try:
357             yield
358             self.execute('RELEASE SAVEPOINT "%s"' % name)
359         except:
360             self.execute('ROLLBACK TO SAVEPOINT "%s"' % name)
361             raise
362
363     @check
364     def __getattr__(self, name):
365         return getattr(self._obj, name)
366
367 class PsycoConnection(psycopg2.extensions.connection):
368     pass
369
370 class ConnectionPool(object):
371     """ The pool of connections to database(s)
372     
373         Keep a set of connections to pg databases open, and reuse them
374         to open cursors for all transactions.
375         
376         The connections are *not* automatically closed. Only a close_db()
377         can trigger that.
378     """
379
380     def locked(fun):
381         @wraps(fun)
382         def _locked(self, *args, **kwargs):
383             self._lock.acquire()
384             try:
385                 return fun(self, *args, **kwargs)
386             finally:
387                 self._lock.release()
388         return _locked
389
390
391     def __init__(self, maxconn=64):
392         self._connections = []
393         self._maxconn = max(maxconn, 1)
394         self._lock = threading.Lock()
395
396     def __repr__(self):
397         used = len([1 for c, u in self._connections[:] if u])
398         count = len(self._connections)
399         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
400
401     def _debug(self, msg, *args):
402         _logger.debug(('%r ' + msg), self, *args)
403
404     @locked
405     def borrow(self, dsn):
406         self._debug('Borrow connection to %r', dsn)
407
408         # free dead and leaked connections
409         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
410             if cnx.closed:
411                 self._connections.pop(i)
412                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
413                 continue
414             if getattr(cnx, 'leaked', False):
415                 delattr(cnx, 'leaked')
416                 self._connections.pop(i)
417                 self._connections.append((cnx, False))
418                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
419
420         for i, (cnx, used) in enumerate(self._connections):
421             if not used and dsn_are_equals(cnx.dsn, dsn):
422                 try:
423                     cnx.reset()
424                 except psycopg2.OperationalError:
425                     self._debug('Cannot reset connection at index %d: %r', i, cnx.dsn)
426                     # psycopg2 2.4.4 and earlier do not allow closing a closed connection
427                     if not cnx.closed:
428                         cnx.close()
429                     continue
430                 self._connections.pop(i)
431                 self._connections.append((cnx, True))
432                 self._debug('Existing connection found at index %d', i)
433
434                 return cnx
435
436         if len(self._connections) >= self._maxconn:
437             # try to remove the oldest connection not used
438             for i, (cnx, used) in enumerate(self._connections):
439                 if not used:
440                     self._connections.pop(i)
441                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
442                     break
443             else:
444                 # note: this code is called only if the for loop has completed (no break)
445                 raise PoolError('The Connection Pool Is Full')
446
447         try:
448             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
449         except psycopg2.Error:
450             _logger.exception('Connection to the database failed')
451             raise
452         self._connections.append((result, True))
453         self._debug('Create new connection')
454         return result
455
456     @locked
457     def give_back(self, connection, keep_in_pool=True):
458         self._debug('Give back connection to %r', connection.dsn)
459         for i, (cnx, used) in enumerate(self._connections):
460             if cnx is connection:
461                 self._connections.pop(i)
462                 if keep_in_pool:
463                     self._connections.append((cnx, False))
464                     self._debug('Put connection to %r in pool', cnx.dsn)
465                 else:
466                     self._debug('Forgot connection to %r', cnx.dsn)
467                     cnx.close()
468                 break
469         else:
470             raise PoolError('This connection does not below to the pool')
471
472     @locked
473     def close_all(self, dsn=None):
474         _logger.info('%r: Close all connections to %r', self, dsn)
475         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
476             if dsn is None or dsn_are_equals(cnx.dsn, dsn):
477                 cnx.close()
478                 self._connections.pop(i)
479
480
481 class Connection(object):
482     """ A lightweight instance of a connection to postgres
483     """
484
485     def __init__(self, pool, dbname):
486         self.dbname = dbname
487         self.__pool = pool
488
489     def cursor(self, serialized=True):
490         cursor_type = serialized and 'serialized ' or ''
491         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
492         return Cursor(self.__pool, self.dbname, serialized=serialized)
493
494     # serialized_cursor is deprecated - cursors are serialized by default
495     serialized_cursor = cursor
496
497     def __nonzero__(self):
498         """Check if connection is possible"""
499         try:
500             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
501             cr = self.cursor()
502             cr.close()
503             return True
504         except Exception:
505             return False
506
507 def dsn(db_name):
508     _dsn = ''
509     for p in ('host', 'port', 'user', 'password'):
510         cfg = tools.config['db_' + p]
511         if cfg:
512             _dsn += '%s=%s ' % (p, cfg)
513
514     return '%sdbname=%s' % (_dsn, db_name)
515
516 def dsn_are_equals(first, second):
517     def key(dsn):
518         k = dict(x.split('=', 1) for x in dsn.strip().split())
519         k.pop('password', None) # password is not relevant
520         return k
521     return key(first) == key(second)
522
523
524 _Pool = None
525
526 def db_connect(db_name):
527     global _Pool
528     if _Pool is None:
529         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
530     return Connection(_Pool, db_name)
531
532 def close_db(db_name):
533     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
534     global _Pool
535     if _Pool:
536         _Pool.close_all(dsn(db_name))
537
538 def close_all():
539     global _Pool
540     if _Pool:
541         _Pool.close_all()
542
543
544 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
545