[FIX]Improve code.
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from threading import currentThread
39 import logging
40 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE,\
41     ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.psycopg1 import cursor as psycopg1cursor
43 from psycopg2.pool import PoolError
44
45 import psycopg2.extensions
46 import warnings
47
48 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import wraps, frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141     """
142     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
143     __logger = None
144
145     def check(f):
146         @wraps(f)
147         def wrapper(self, *args, **kwargs):
148             if self.__closed:
149                 msg = 'Unable to use a closed cursor.'
150                 if self.__closer:
151                     msg += ' It was closed at %s, line %s' % self.__closer
152                 raise psycopg2.OperationalError(msg)
153             return f(self, *args, **kwargs)
154         return wrapper
155
156     def __init__(self, pool, dbname, serialized=True):
157         if self.__class__.__logger is None:
158             self.__class__.__logger = logging.getLogger('db.cursor')
159         self.sql_from_log = {}
160         self.sql_into_log = {}
161
162         # default log level determined at cursor creation, could be
163         # overridden later for debugging purposes
164         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
165
166         self.sql_log_count = 0
167         self.__closed = True    # avoid the call of close() (by __del__) if an exception
168                                 # is raised by any of the following initialisations
169         self._pool = pool
170         self.dbname = dbname
171
172         # Whether to enable snapshot isolation level for this cursor.
173         # see also the docstring of Cursor.  
174         self._serialized = serialized
175
176         self._cnx = pool.borrow(dsn(dbname))
177         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
178         if self.sql_log:
179             self.__caller = frame_codeinfo(currentframe(),2)
180         else:
181             self.__caller = False
182         self.__closed = False   # real initialisation value
183         self.autocommit(False)
184         self.__closer = False
185
186         self._default_log_exceptions = True
187
188     def __del__(self):
189         if not self.__closed and not self._cnx.closed:
190             # Oops. 'self' has not been closed explicitly.
191             # The cursor will be deleted by the garbage collector,
192             # but the database connection is not put back into the connection
193             # pool, preventing some operation on the database like dropping it.
194             # This can also lead to a server overload.
195             msg = "Cursor not closed explicitly\n"
196             if self.__caller:
197                 msg += "Cursor was created at %s:%s" % self.__caller
198             else:
199                 msg += "Please enable sql debugging to trace the caller."
200             self.__logger.warn(msg)
201             self._close(True)
202
203     @check
204     def execute(self, query, params=None, log_exceptions=None):
205         if '%d' in query or '%f' in query:
206             self.__logger.warn(query)
207             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
208                                "Use only %s")
209
210         if self.sql_log:
211             now = mdt.now()
212
213         try:
214             params = params or None
215             res = self._obj.execute(query, params)
216         except psycopg2.ProgrammingError, pe:
217             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
218                 self.__logger.error("Programming error: %s, in query %s", pe, query)
219             raise
220         except Exception:
221             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
222                 self.__logger.exception("bad query: %s", self._obj.query or query)
223             raise
224
225         if self.sql_log:
226             delay = mdt.now() - now
227             delay = delay.seconds * 1E6 + delay.microseconds
228
229             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
230             self.sql_log_count+=1
231             res_from = re_from.match(query.lower())
232             if res_from:
233                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
234                 self.sql_from_log[res_from.group(1)][0] += 1
235                 self.sql_from_log[res_from.group(1)][1] += delay
236             res_into = re_into.match(query.lower())
237             if res_into:
238                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
239                 self.sql_into_log[res_into.group(1)][0] += 1
240                 self.sql_into_log[res_into.group(1)][1] += delay
241         return res
242
243
244     def split_for_in_conditions(self, ids):
245         """Split a list of identifiers into one or more smaller tuples
246            safe for IN conditions, after uniquifying them."""
247         return tools.misc.split_every(self.IN_MAX, set(ids))
248
249     def print_log(self):
250         global sql_counter
251         sql_counter += self.sql_log_count
252         if not self.sql_log:
253             return
254         def process(type):
255             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
256             sum = 0
257             if sqllogs[type]:
258                 sqllogitems = sqllogs[type].items()
259                 sqllogitems.sort(key=lambda k: k[1][1])
260                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
261                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
262                 for r in sqllogitems:
263                     delay = timedelta(microseconds=r[1][1])
264                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
265                                         r[0], delay, r[1][0])
266                     sum+= r[1][1]
267                 sqllogs[type].clear()
268             sum = timedelta(microseconds=sum)
269             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
270                                 type, sum, self.sql_log_count, sql_counter)
271             sqllogs[type].clear()
272         process('from')
273         process('into')
274         self.sql_log_count = 0
275         self.sql_log = False
276
277     @check
278     def close(self):
279         return self._close(False)
280
281     def _close(self, leak=False):
282         if not self._obj:
283             return
284
285         if self.sql_log:
286             self.__closer = frame_codeinfo(currentframe(),3)
287         self.print_log()
288
289         self._obj.close()
290
291         # This force the cursor to be freed, and thus, available again. It is
292         # important because otherwise we can overload the server very easily
293         # because of a cursor shortage (because cursors are not garbage
294         # collected as fast as they should). The problem is probably due in
295         # part because browse records keep a reference to the cursor.
296         del self._obj
297         self.__closed = True
298
299         # Clean the underlying connection.
300         self._cnx.rollback()
301
302         if leak:
303             self._cnx.leaked = True
304         else:
305             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
306             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
307
308     @check
309     def autocommit(self, on):
310         if on:
311             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
312         else:
313             # If a serializable cursor was requested, we
314             # use the appropriate PotsgreSQL isolation level
315             # that maps to snaphsot isolation.
316             # For all supported PostgreSQL versions (8.3-9.x),
317             # this is currently the ISOLATION_REPEATABLE_READ.
318             # See also the docstring of this class.
319             # NOTE: up to psycopg 2.4.2, repeatable read
320             #       is remapped to serializable before being
321             #       sent to the database, so it is in fact
322             #       unavailable for use with pg 9.1.
323             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
324                                   if self._serialized \
325                                   else ISOLATION_LEVEL_READ_COMMITTED
326         self._cnx.set_isolation_level(isolation_level)
327
328     @check
329     def commit(self):
330         """ Perform an SQL `COMMIT`
331         """
332         return self._cnx.commit()
333
334     @check
335     def rollback(self):
336         """ Perform an SQL `ROLLBACK`
337         """
338         return self._cnx.rollback()
339
340     @check
341     def __getattr__(self, name):
342         return getattr(self._obj, name)
343
344         """ Set the mode of postgres operations for all cursors
345         """
346         """Obtain the mode of postgres operations for all cursors
347         """
348
349 class PsycoConnection(psycopg2.extensions.connection):
350     pass
351
352 class ConnectionPool(object):
353     """ The pool of connections to database(s)
354     
355         Keep a set of connections to pg databases open, and reuse them
356         to open cursors for all transactions.
357         
358         The connections are *not* automatically closed. Only a close_db()
359         can trigger that.
360     """
361     __logger = logging.getLogger('db.connection_pool')
362
363     def locked(fun):
364         @wraps(fun)
365         def _locked(self, *args, **kwargs):
366             self._lock.acquire()
367             try:
368                 return fun(self, *args, **kwargs)
369             finally:
370                 self._lock.release()
371         return _locked
372
373
374     def __init__(self, maxconn=64):
375         self._connections = []
376         self._maxconn = max(maxconn, 1)
377         self._lock = threading.Lock()
378
379     def __repr__(self):
380         used = len([1 for c, u in self._connections[:] if u])
381         count = len(self._connections)
382         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
383
384     def _debug(self, msg, *args):
385         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
386
387     @locked
388     def borrow(self, dsn):
389         self._debug('Borrow connection to %r', dsn)
390
391         # free leaked connections
392         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
393             if cnx.closed:
394                 self._connections.pop(i)
395                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
396                 continue
397             if getattr(cnx, 'leaked', False):
398                 delattr(cnx, 'leaked')
399                 self._connections.pop(i)
400                 self._connections.append((cnx, False))
401                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
402
403         for i, (cnx, used) in enumerate(self._connections):
404             if not used and dsn_are_equals(cnx.dsn, dsn):
405                 self._connections.pop(i)
406                 self._connections.append((cnx, True))
407                 self._debug('Existing connection found at index %d', i)
408
409                 return cnx
410
411         if len(self._connections) >= self._maxconn:
412             # try to remove the oldest connection not used
413             for i, (cnx, used) in enumerate(self._connections):
414                 if not used:
415                     self._connections.pop(i)
416                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
417                     break
418             else:
419                 # note: this code is called only if the for loop has completed (no break)
420                 raise PoolError('The Connection Pool Is Full')
421
422         try:
423             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
424         except psycopg2.Error, e:
425             self.__logger.exception('Connection to the database failed')
426             raise
427         self._connections.append((result, True))
428         self._debug('Create new connection')
429         return result
430
431     @locked
432     def give_back(self, connection, keep_in_pool=True):
433         self._debug('Give back connection to %r', connection.dsn)
434         for i, (cnx, used) in enumerate(self._connections):
435             if cnx is connection:
436                 self._connections.pop(i)
437                 if keep_in_pool:
438                     self._connections.append((cnx, False))
439                     self._debug('Put connection to %r in pool', cnx.dsn)
440                 else:
441                     self._debug('Forgot connection to %r', cnx.dsn)
442                     cnx.close()
443                 break
444         else:
445             raise PoolError('This connection does not below to the pool')
446
447     @locked
448     def close_all(self, dsn):
449         self.__logger.info('%r: Close all connections to %r', self, dsn)
450         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
451             if dsn_are_equals(cnx.dsn, dsn):
452                 cnx.close()
453                 self._connections.pop(i)
454
455
456 class Connection(object):
457     """ A lightweight instance of a connection to postgres
458     """
459     __logger = logging.getLogger('db.connection')
460
461     def __init__(self, pool, dbname):
462         self.dbname = dbname
463         self._pool = pool
464
465     def cursor(self, serialized=True):
466         cursor_type = serialized and 'serialized ' or ''
467         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
468         return Cursor(self._pool, self.dbname, serialized=serialized)
469
470     # serialized_cursor is deprecated - cursors are serialized by default
471     serialized_cursor = cursor
472
473     def __nonzero__(self):
474         """Check if connection is possible"""
475         try:
476             warnings.warn("You use an expensive function to test a connection.",
477                       DeprecationWarning, stacklevel=1)
478             cr = self.cursor()
479             cr.close()
480             return True
481         except Exception:
482             return False
483
484 def dsn(db_name):
485     _dsn = ''
486     for p in ('host', 'port', 'user', 'password'):
487         cfg = tools.config['db_' + p]
488         if cfg:
489             _dsn += '%s=%s ' % (p, cfg)
490
491     return '%sdbname=%s' % (_dsn, db_name)
492
493 def dsn_are_equals(first, second):
494     def key(dsn):
495         k = dict(x.split('=', 1) for x in dsn.strip().split())
496         k.pop('password', None) # password is not relevant
497         return k
498     return key(first) == key(second)
499
500
501 _Pool = None
502
503 def db_connect(db_name):
504     global _Pool
505     if _Pool is None:
506         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
507     currentThread().dbname = db_name
508     return Connection(_Pool, db_name)
509
510 def close_db(db_name):
511     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
512     _Pool.close_all(dsn(db_name))
513     ct = currentThread()
514     if hasattr(ct, 'dbname'):
515         delattr(ct, 'dbname')
516
517
518 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
519