7b6f4469b8d1ed99041ab7c2a62e52754e64e0db
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
45
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
47
48 _logger = logging.getLogger(__name__)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141     """
142     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
143
144     def check(f):
145         @wraps(f)
146         def wrapper(self, *args, **kwargs):
147             if self.__closed:
148                 msg = 'Unable to use a closed cursor.'
149                 if self.__closer:
150                     msg += ' It was closed at %s, line %s' % self.__closer
151                 raise psycopg2.OperationalError(msg)
152             return f(self, *args, **kwargs)
153         return wrapper
154
155     def __init__(self, pool, dbname, serialized=True):
156         self.sql_from_log = {}
157         self.sql_into_log = {}
158
159         # default log level determined at cursor creation, could be
160         # overridden later for debugging purposes
161         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
162
163         self.sql_log_count = 0
164         self.__closed = True    # avoid the call of close() (by __del__) if an exception
165                                 # is raised by any of the following initialisations
166         self._pool = pool
167         self.dbname = dbname
168
169         # Whether to enable snapshot isolation level for this cursor.
170         # see also the docstring of Cursor.  
171         self._serialized = serialized
172
173         self._cnx = pool.borrow(dsn(dbname))
174         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
175         if self.sql_log:
176             self.__caller = frame_codeinfo(currentframe(),2)
177         else:
178             self.__caller = False
179         self.__closed = False   # real initialisation value
180         self.autocommit(False)
181         self.__closer = False
182
183         self._default_log_exceptions = True
184
185     def __del__(self):
186         if not self.__closed and not self._cnx.closed:
187             # Oops. 'self' has not been closed explicitly.
188             # The cursor will be deleted by the garbage collector,
189             # but the database connection is not put back into the connection
190             # pool, preventing some operation on the database like dropping it.
191             # This can also lead to a server overload.
192             msg = "Cursor not closed explicitly\n"
193             if self.__caller:
194                 msg += "Cursor was created at %s:%s" % self.__caller
195             else:
196                 msg += "Please enable sql debugging to trace the caller."
197             _logger.warning(msg)
198             self._close(True)
199
200     @check
201     def execute(self, query, params=None, log_exceptions=None):
202         if '%d' in query or '%f' in query:
203             _logger.warning(query)
204             _logger.warning("SQL queries cannot contain %d or %f anymore. "
205                          "Use only %s")
206
207         if self.sql_log:
208             now = mdt.now()
209
210         try:
211             params = params or None
212             res = self._obj.execute(query, params)
213         except psycopg2.ProgrammingError, pe:
214             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
215                 _logger.error("Programming error: %s, in query %s", pe, query)
216             raise
217         except Exception:
218             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
219                 _logger.exception("bad query: %s", self._obj.query or query)
220             raise
221
222         if self.sql_log:
223             delay = mdt.now() - now
224             delay = delay.seconds * 1E6 + delay.microseconds
225
226             _logger.debug("query: %s", self._obj.query)
227             self.sql_log_count+=1
228             res_from = re_from.match(query.lower())
229             if res_from:
230                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
231                 self.sql_from_log[res_from.group(1)][0] += 1
232                 self.sql_from_log[res_from.group(1)][1] += delay
233             res_into = re_into.match(query.lower())
234             if res_into:
235                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
236                 self.sql_into_log[res_into.group(1)][0] += 1
237                 self.sql_into_log[res_into.group(1)][1] += delay
238         return res
239
240
241     def split_for_in_conditions(self, ids):
242         """Split a list of identifiers into one or more smaller tuples
243            safe for IN conditions, after uniquifying them."""
244         return tools.misc.split_every(self.IN_MAX, set(ids))
245
246     def print_log(self):
247         global sql_counter
248         sql_counter += self.sql_log_count
249         if not self.sql_log:
250             return
251         def process(type):
252             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
253             sum = 0
254             if sqllogs[type]:
255                 sqllogitems = sqllogs[type].items()
256                 sqllogitems.sort(key=lambda k: k[1][1])
257                 _logger.debug("SQL LOG %s:", type)
258                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
259                 for r in sqllogitems:
260                     delay = timedelta(microseconds=r[1][1])
261                     _logger.debug("table: %s: %s/%s",
262                                         r[0], delay, r[1][0])
263                     sum+= r[1][1]
264                 sqllogs[type].clear()
265             sum = timedelta(microseconds=sum)
266             _logger.debug("SUM %s:%s/%d [%d]",
267                                 type, sum, self.sql_log_count, sql_counter)
268             sqllogs[type].clear()
269         process('from')
270         process('into')
271         self.sql_log_count = 0
272         self.sql_log = False
273
274     @check
275     def close(self):
276         return self._close(False)
277
278     def _close(self, leak=False):
279         if not self._obj:
280             return
281
282         if self.sql_log:
283             self.__closer = frame_codeinfo(currentframe(),3)
284         self.print_log()
285
286         self._obj.close()
287
288         # This force the cursor to be freed, and thus, available again. It is
289         # important because otherwise we can overload the server very easily
290         # because of a cursor shortage (because cursors are not garbage
291         # collected as fast as they should). The problem is probably due in
292         # part because browse records keep a reference to the cursor.
293         del self._obj
294         self.__closed = True
295
296         # Clean the underlying connection.
297         self._cnx.rollback()
298
299         if leak:
300             self._cnx.leaked = True
301         else:
302             chosen_template = tools.config['db_template']
303             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
304             keep_in_pool = self.dbname not in templates_list
305             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
306
307     @check
308     def autocommit(self, on):
309         if on:
310             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
311         else:
312             # If a serializable cursor was requested, we
313             # use the appropriate PotsgreSQL isolation level
314             # that maps to snaphsot isolation.
315             # For all supported PostgreSQL versions (8.3-9.x),
316             # this is currently the ISOLATION_REPEATABLE_READ.
317             # See also the docstring of this class.
318             # NOTE: up to psycopg 2.4.2, repeatable read
319             #       is remapped to serializable before being
320             #       sent to the database, so it is in fact
321             #       unavailable for use with pg 9.1.
322             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
323                                   if self._serialized \
324                                   else ISOLATION_LEVEL_READ_COMMITTED
325         self._cnx.set_isolation_level(isolation_level)
326
327     @check
328     def commit(self):
329         """ Perform an SQL `COMMIT`
330         """
331         return self._cnx.commit()
332
333     @check
334     def rollback(self):
335         """ Perform an SQL `ROLLBACK`
336         """
337         return self._cnx.rollback()
338
339     @check
340     def __getattr__(self, name):
341         return getattr(self._obj, name)
342
343         """ Set the mode of postgres operations for all cursors
344         """
345         """Obtain the mode of postgres operations for all cursors
346         """
347
348 class PsycoConnection(psycopg2.extensions.connection):
349     pass
350
351 class ConnectionPool(object):
352     """ The pool of connections to database(s)
353     
354         Keep a set of connections to pg databases open, and reuse them
355         to open cursors for all transactions.
356         
357         The connections are *not* automatically closed. Only a close_db()
358         can trigger that.
359     """
360
361     def locked(fun):
362         @wraps(fun)
363         def _locked(self, *args, **kwargs):
364             self._lock.acquire()
365             try:
366                 return fun(self, *args, **kwargs)
367             finally:
368                 self._lock.release()
369         return _locked
370
371
372     def __init__(self, maxconn=64):
373         self._connections = []
374         self._maxconn = max(maxconn, 1)
375         self._lock = threading.Lock()
376
377     def __repr__(self):
378         used = len([1 for c, u in self._connections[:] if u])
379         count = len(self._connections)
380         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
381
382     def _debug(self, msg, *args):
383         _logger.debug(('%r ' + msg), self, *args)
384
385     @locked
386     def borrow(self, dsn):
387         self._debug('Borrow connection to %r', dsn)
388
389         # free leaked connections
390         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
391             if cnx.closed:
392                 self._connections.pop(i)
393                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
394                 continue
395             if getattr(cnx, 'leaked', False):
396                 delattr(cnx, 'leaked')
397                 self._connections.pop(i)
398                 self._connections.append((cnx, False))
399                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
400
401         for i, (cnx, used) in enumerate(self._connections):
402             if not used and dsn_are_equals(cnx.dsn, dsn):
403                 self._connections.pop(i)
404                 self._connections.append((cnx, True))
405                 self._debug('Existing connection found at index %d', i)
406
407                 return cnx
408
409         if len(self._connections) >= self._maxconn:
410             # try to remove the oldest connection not used
411             for i, (cnx, used) in enumerate(self._connections):
412                 if not used:
413                     self._connections.pop(i)
414                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
415                     break
416             else:
417                 # note: this code is called only if the for loop has completed (no break)
418                 raise PoolError('The Connection Pool Is Full')
419
420         try:
421             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
422         except psycopg2.Error:
423             _logger.exception('Connection to the database failed')
424             raise
425         self._connections.append((result, True))
426         self._debug('Create new connection')
427         return result
428
429     @locked
430     def give_back(self, connection, keep_in_pool=True):
431         self._debug('Give back connection to %r', connection.dsn)
432         for i, (cnx, used) in enumerate(self._connections):
433             if cnx is connection:
434                 self._connections.pop(i)
435                 if keep_in_pool:
436                     self._connections.append((cnx, False))
437                     self._debug('Put connection to %r in pool', cnx.dsn)
438                 else:
439                     self._debug('Forgot connection to %r', cnx.dsn)
440                     cnx.close()
441                 break
442         else:
443             raise PoolError('This connection does not below to the pool')
444
445     @locked
446     def close_all(self, dsn):
447         _logger.info('%r: Close all connections to %r', self, dsn)
448         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
449             if dsn_are_equals(cnx.dsn, dsn):
450                 cnx.close()
451                 self._connections.pop(i)
452
453
454 class Connection(object):
455     """ A lightweight instance of a connection to postgres
456     """
457
458     def __init__(self, pool, dbname):
459         self.dbname = dbname
460         self._pool = pool
461
462     def cursor(self, serialized=True):
463         cursor_type = serialized and 'serialized ' or ''
464         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
465         return Cursor(self._pool, self.dbname, serialized=serialized)
466
467     # serialized_cursor is deprecated - cursors are serialized by default
468     serialized_cursor = cursor
469
470     def __nonzero__(self):
471         """Check if connection is possible"""
472         try:
473             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
474             cr = self.cursor()
475             cr.close()
476             return True
477         except Exception:
478             return False
479
480 def dsn(db_name):
481     _dsn = ''
482     for p in ('host', 'port', 'user', 'password'):
483         cfg = tools.config['db_' + p]
484         if cfg:
485             _dsn += '%s=%s ' % (p, cfg)
486
487     return '%sdbname=%s' % (_dsn, db_name)
488
489 def dsn_are_equals(first, second):
490     def key(dsn):
491         k = dict(x.split('=', 1) for x in dsn.strip().split())
492         k.pop('password', None) # password is not relevant
493         return k
494     return key(first) == key(second)
495
496
497 _Pool = None
498
499 def db_connect(db_name):
500     global _Pool
501     if _Pool is None:
502         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
503     currentThread().dbname = db_name
504     return Connection(_Pool, db_name)
505
506 def close_db(db_name):
507     global _Pool
508     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
509     if _Pool:
510         _Pool.close_all(dsn(db_name))
511     ct = currentThread()
512     if hasattr(ct, 'dbname'):
513         delattr(ct, 'dbname')
514
515
516 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
517