[MERGE] lp:912793 (account/demo: fix last day of February period)
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
45 import warnings
46
47 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
48
49 types_mapping = {
50     'date': (1082,),
51     'time': (1083,),
52     'datetime': (1114,),
53 }
54
55 def unbuffer(symb, cr):
56     if symb is None: return None
57     return str(symb)
58
59 def undecimalize(symb, cr):
60     if symb is None: return None
61     return float(symb)
62
63 for name, typeoid in types_mapping.items():
64     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
65 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
66
67
68 import tools
69 from tools.func import frame_codeinfo
70 from datetime import datetime as mdt
71 from datetime import timedelta
72 import threading
73 from inspect import currentframe
74
75 import re
76 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
77 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
78
79 sql_counter = 0
80
81 class Cursor(object):
82     """Represents an open transaction to the PostgreSQL DB backend,
83        acting as a lightweight wrapper around psycopg2's
84        ``psycopg1cursor`` objects.
85
86         ``Cursor`` is the object behind the ``cr`` variable used all
87         over the OpenERP code.
88
89         .. rubric:: Transaction Isolation
90
91         One very important property of database transactions is the
92         level of isolation between concurrent transactions.
93         The SQL standard defines four levels of transaction isolation,
94         ranging from the most strict *Serializable* level, to the least
95         strict *Read Uncommitted* level. These levels are defined in
96         terms of the phenomena that must not occur between concurrent
97         transactions, such as *dirty read*, etc.
98         In the context of a generic business data management software
99         such as OpenERP, we need the best guarantees that no data
100         corruption can ever be cause by simply running multiple
101         transactions in parallel. Therefore, the preferred level would
102         be the *serializable* level, which ensures that a set of
103         transactions is guaranteed to produce the same effect as
104         running them one at a time in some order.
105
106         However, most database management systems implement a limited
107         serializable isolation in the form of
108         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
109         providing most of the same advantages as True Serializability,
110         with a fraction of the performance cost.
111         With PostgreSQL up to version 9.0, this snapshot isolation was
112         the implementation of both the ``REPEATABLE READ`` and
113         ``SERIALIZABLE`` levels of the SQL standard.
114         As of PostgreSQL 9.1, the previous snapshot isolation implementation
115         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
116         level was introduced, providing some additional heuristics to
117         detect a concurrent update by parallel transactions, and forcing
118         one of them to rollback.
119
120         OpenERP implements its own level of locking protection
121         for transactions that are highly likely to provoke concurrent
122         updates, such as stock reservations or document sequences updates.
123         Therefore we mostly care about the properties of snapshot isolation,
124         but we don't really need additional heuristics to trigger transaction
125         rollbacks, as we are taking care of triggering instant rollbacks
126         ourselves when it matters (and we can save the additional performance
127         hit of these heuristics).
128
129         As a result of the above, we have selected ``REPEATABLE READ`` as
130         the default transaction isolation level for OpenERP cursors, as
131         it will be mapped to the desired ``snapshot isolation`` level for
132         all supported PostgreSQL version (8.3 - 9.x).
133
134         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
135         read level to serializable before sending it to the database, so it would
136         actually select the new serializable mode on PostgreSQL 9.1. Make
137         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
138         the performance hit is a concern for you.
139
140     """
141     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
142     __logger = None
143
144     def check(f):
145         @wraps(f)
146         def wrapper(self, *args, **kwargs):
147             if self.__closed:
148                 msg = 'Unable to use a closed cursor.'
149                 if self.__closer:
150                     msg += ' It was closed at %s, line %s' % self.__closer
151                 raise psycopg2.OperationalError(msg)
152             return f(self, *args, **kwargs)
153         return wrapper
154
155     def __init__(self, pool, dbname, serialized=True):
156         if self.__class__.__logger is None:
157             self.__class__.__logger = logging.getLogger('db.cursor')
158         self.sql_from_log = {}
159         self.sql_into_log = {}
160
161         # default log level determined at cursor creation, could be
162         # overridden later for debugging purposes
163         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
164
165         self.sql_log_count = 0
166         self.__closed = True    # avoid the call of close() (by __del__) if an exception
167                                 # is raised by any of the following initialisations
168         self._pool = pool
169         self.dbname = dbname
170
171         # Whether to enable snapshot isolation level for this cursor.
172         # see also the docstring of Cursor.  
173         self._serialized = serialized
174
175         self._cnx = pool.borrow(dsn(dbname))
176         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
177         if self.sql_log:
178             self.__caller = frame_codeinfo(currentframe(),2)
179         else:
180             self.__caller = False
181         self.__closed = False   # real initialisation value
182         self.autocommit(False)
183         self.__closer = False
184
185         self._default_log_exceptions = True
186
187     def __del__(self):
188         if not self.__closed and not self._cnx.closed:
189             # Oops. 'self' has not been closed explicitly.
190             # The cursor will be deleted by the garbage collector,
191             # but the database connection is not put back into the connection
192             # pool, preventing some operation on the database like dropping it.
193             # This can also lead to a server overload.
194             msg = "Cursor not closed explicitly\n"
195             if self.__caller:
196                 msg += "Cursor was created at %s:%s" % self.__caller
197             else:
198                 msg += "Please enable sql debugging to trace the caller."
199             self.__logger.warn(msg)
200             self._close(True)
201
202     @check
203     def execute(self, query, params=None, log_exceptions=None):
204         if '%d' in query or '%f' in query:
205             self.__logger.warn(query)
206             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
207                                "Use only %s")
208
209         if self.sql_log:
210             now = mdt.now()
211
212         try:
213             params = params or None
214             res = self._obj.execute(query, params)
215         except psycopg2.ProgrammingError, pe:
216             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
217                 self.__logger.error("Programming error: %s, in query %s", pe, query)
218             raise
219         except Exception:
220             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
221                 self.__logger.exception("bad query: %s", self._obj.query or query)
222             raise
223
224         if self.sql_log:
225             delay = mdt.now() - now
226             delay = delay.seconds * 1E6 + delay.microseconds
227
228             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
229             self.sql_log_count+=1
230             res_from = re_from.match(query.lower())
231             if res_from:
232                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
233                 self.sql_from_log[res_from.group(1)][0] += 1
234                 self.sql_from_log[res_from.group(1)][1] += delay
235             res_into = re_into.match(query.lower())
236             if res_into:
237                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
238                 self.sql_into_log[res_into.group(1)][0] += 1
239                 self.sql_into_log[res_into.group(1)][1] += delay
240         return res
241
242
243     def split_for_in_conditions(self, ids):
244         """Split a list of identifiers into one or more smaller tuples
245            safe for IN conditions, after uniquifying them."""
246         return tools.misc.split_every(self.IN_MAX, set(ids))
247
248     def print_log(self):
249         global sql_counter
250         sql_counter += self.sql_log_count
251         if not self.sql_log:
252             return
253         def process(type):
254             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
255             sum = 0
256             if sqllogs[type]:
257                 sqllogitems = sqllogs[type].items()
258                 sqllogitems.sort(key=lambda k: k[1][1])
259                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
260                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
261                 for r in sqllogitems:
262                     delay = timedelta(microseconds=r[1][1])
263                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
264                                         r[0], delay, r[1][0])
265                     sum+= r[1][1]
266                 sqllogs[type].clear()
267             sum = timedelta(microseconds=sum)
268             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
269                                 type, sum, self.sql_log_count, sql_counter)
270             sqllogs[type].clear()
271         process('from')
272         process('into')
273         self.sql_log_count = 0
274         self.sql_log = False
275
276     @check
277     def close(self):
278         return self._close(False)
279
280     def _close(self, leak=False):
281         if not self._obj:
282             return
283
284         if self.sql_log:
285             self.__closer = frame_codeinfo(currentframe(),3)
286         self.print_log()
287
288         self._obj.close()
289
290         # This force the cursor to be freed, and thus, available again. It is
291         # important because otherwise we can overload the server very easily
292         # because of a cursor shortage (because cursors are not garbage
293         # collected as fast as they should). The problem is probably due in
294         # part because browse records keep a reference to the cursor.
295         del self._obj
296         self.__closed = True
297
298         # Clean the underlying connection.
299         self._cnx.rollback()
300
301         if leak:
302             self._cnx.leaked = True
303         else:
304             chosen_template = tools.config['db_template']
305             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
306             keep_in_pool = self.dbname not in templates_list
307             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
308
309     @check
310     def autocommit(self, on):
311         if on:
312             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
313         else:
314             # If a serializable cursor was requested, we
315             # use the appropriate PotsgreSQL isolation level
316             # that maps to snaphsot isolation.
317             # For all supported PostgreSQL versions (8.3-9.x),
318             # this is currently the ISOLATION_REPEATABLE_READ.
319             # See also the docstring of this class.
320             # NOTE: up to psycopg 2.4.2, repeatable read
321             #       is remapped to serializable before being
322             #       sent to the database, so it is in fact
323             #       unavailable for use with pg 9.1.
324             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
325                                   if self._serialized \
326                                   else ISOLATION_LEVEL_READ_COMMITTED
327         self._cnx.set_isolation_level(isolation_level)
328
329     @check
330     def commit(self):
331         """ Perform an SQL `COMMIT`
332         """
333         return self._cnx.commit()
334
335     @check
336     def rollback(self):
337         """ Perform an SQL `ROLLBACK`
338         """
339         return self._cnx.rollback()
340
341     @check
342     def __getattr__(self, name):
343         return getattr(self._obj, name)
344
345         """ Set the mode of postgres operations for all cursors
346         """
347         """Obtain the mode of postgres operations for all cursors
348         """
349
350 class PsycoConnection(psycopg2.extensions.connection):
351     pass
352
353 class ConnectionPool(object):
354     """ The pool of connections to database(s)
355     
356         Keep a set of connections to pg databases open, and reuse them
357         to open cursors for all transactions.
358         
359         The connections are *not* automatically closed. Only a close_db()
360         can trigger that.
361     """
362     __logger = logging.getLogger('db.connection_pool')
363
364     def locked(fun):
365         @wraps(fun)
366         def _locked(self, *args, **kwargs):
367             self._lock.acquire()
368             try:
369                 return fun(self, *args, **kwargs)
370             finally:
371                 self._lock.release()
372         return _locked
373
374
375     def __init__(self, maxconn=64):
376         self._connections = []
377         self._maxconn = max(maxconn, 1)
378         self._lock = threading.Lock()
379
380     def __repr__(self):
381         used = len([1 for c, u in self._connections[:] if u])
382         count = len(self._connections)
383         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
384
385     def _debug(self, msg, *args):
386         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
387
388     @locked
389     def borrow(self, dsn):
390         self._debug('Borrow connection to %r', dsn)
391
392         # free leaked connections
393         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
394             if cnx.closed:
395                 self._connections.pop(i)
396                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
397                 continue
398             if getattr(cnx, 'leaked', False):
399                 delattr(cnx, 'leaked')
400                 self._connections.pop(i)
401                 self._connections.append((cnx, False))
402                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
403
404         for i, (cnx, used) in enumerate(self._connections):
405             if not used and dsn_are_equals(cnx.dsn, dsn):
406                 self._connections.pop(i)
407                 self._connections.append((cnx, True))
408                 self._debug('Existing connection found at index %d', i)
409
410                 return cnx
411
412         if len(self._connections) >= self._maxconn:
413             # try to remove the oldest connection not used
414             for i, (cnx, used) in enumerate(self._connections):
415                 if not used:
416                     self._connections.pop(i)
417                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
418                     break
419             else:
420                 # note: this code is called only if the for loop has completed (no break)
421                 raise PoolError('The Connection Pool Is Full')
422
423         try:
424             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
425         except psycopg2.Error:
426             self.__logger.exception('Connection to the database failed')
427             raise
428         self._connections.append((result, True))
429         self._debug('Create new connection')
430         return result
431
432     @locked
433     def give_back(self, connection, keep_in_pool=True):
434         self._debug('Give back connection to %r', connection.dsn)
435         for i, (cnx, used) in enumerate(self._connections):
436             if cnx is connection:
437                 self._connections.pop(i)
438                 if keep_in_pool:
439                     self._connections.append((cnx, False))
440                     self._debug('Put connection to %r in pool', cnx.dsn)
441                 else:
442                     self._debug('Forgot connection to %r', cnx.dsn)
443                     cnx.close()
444                 break
445         else:
446             raise PoolError('This connection does not below to the pool')
447
448     @locked
449     def close_all(self, dsn):
450         self.__logger.info('%r: Close all connections to %r', self, dsn)
451         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
452             if dsn_are_equals(cnx.dsn, dsn):
453                 cnx.close()
454                 self._connections.pop(i)
455
456
457 class Connection(object):
458     """ A lightweight instance of a connection to postgres
459     """
460     __logger = logging.getLogger('db.connection')
461
462     def __init__(self, pool, dbname):
463         self.dbname = dbname
464         self._pool = pool
465
466     def cursor(self, serialized=True):
467         cursor_type = serialized and 'serialized ' or ''
468         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
469         return Cursor(self._pool, self.dbname, serialized=serialized)
470
471     # serialized_cursor is deprecated - cursors are serialized by default
472     serialized_cursor = cursor
473
474     def __nonzero__(self):
475         """Check if connection is possible"""
476         try:
477             warnings.warn("You use an expensive function to test a connection.",
478                       DeprecationWarning, stacklevel=1)
479             cr = self.cursor()
480             cr.close()
481             return True
482         except Exception:
483             return False
484
485 def dsn(db_name):
486     _dsn = ''
487     for p in ('host', 'port', 'user', 'password'):
488         cfg = tools.config['db_' + p]
489         if cfg:
490             _dsn += '%s=%s ' % (p, cfg)
491
492     return '%sdbname=%s' % (_dsn, db_name)
493
494 def dsn_are_equals(first, second):
495     def key(dsn):
496         k = dict(x.split('=', 1) for x in dsn.strip().split())
497         k.pop('password', None) # password is not relevant
498         return k
499     return key(first) == key(second)
500
501
502 _Pool = None
503
504 def db_connect(db_name):
505     global _Pool
506     if _Pool is None:
507         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
508     currentThread().dbname = db_name
509     return Connection(_Pool, db_name)
510
511 def close_db(db_name):
512     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
513     _Pool.close_all(dsn(db_name))
514     ct = currentThread()
515     if hasattr(ct, 'dbname'):
516         delattr(ct, 'dbname')
517
518
519 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
520