c27784433c08a3ab8b969b27904dec6f990ff1a6
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28 """
29
30
31 __all__ = ['db_connect', 'close_db']
32
33 from functools import wraps
34 import logging
35 import psycopg2.extensions
36 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
37 from psycopg2.pool import PoolError
38 from psycopg2.psycopg1 import cursor as psycopg1cursor
39 from threading import currentThread
40
41 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
42
43 _logger = logging.getLogger(__name__)
44
45 types_mapping = {
46     'date': (1082,),
47     'time': (1083,),
48     'datetime': (1114,),
49 }
50
51 def unbuffer(symb, cr):
52     if symb is None: return None
53     return str(symb)
54
55 def undecimalize(symb, cr):
56     if symb is None: return None
57     return float(symb)
58
59 for name, typeoid in types_mapping.items():
60     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
61 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
62
63
64 import tools
65 from tools.func import frame_codeinfo
66 from datetime import datetime as mdt
67 from datetime import timedelta
68 import threading
69 from inspect import currentframe
70
71 import re
72 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
73 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
74
75 sql_counter = 0
76
77 class Cursor(object):
78     """Represents an open transaction to the PostgreSQL DB backend,
79        acting as a lightweight wrapper around psycopg2's
80        ``psycopg1cursor`` objects.
81
82         ``Cursor`` is the object behind the ``cr`` variable used all
83         over the OpenERP code.
84
85         .. rubric:: Transaction Isolation
86
87         One very important property of database transactions is the
88         level of isolation between concurrent transactions.
89         The SQL standard defines four levels of transaction isolation,
90         ranging from the most strict *Serializable* level, to the least
91         strict *Read Uncommitted* level. These levels are defined in
92         terms of the phenomena that must not occur between concurrent
93         transactions, such as *dirty read*, etc.
94         In the context of a generic business data management software
95         such as OpenERP, we need the best guarantees that no data
96         corruption can ever be cause by simply running multiple
97         transactions in parallel. Therefore, the preferred level would
98         be the *serializable* level, which ensures that a set of
99         transactions is guaranteed to produce the same effect as
100         running them one at a time in some order.
101
102         However, most database management systems implement a limited
103         serializable isolation in the form of
104         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
105         providing most of the same advantages as True Serializability,
106         with a fraction of the performance cost.
107         With PostgreSQL up to version 9.0, this snapshot isolation was
108         the implementation of both the ``REPEATABLE READ`` and
109         ``SERIALIZABLE`` levels of the SQL standard.
110         As of PostgreSQL 9.1, the previous snapshot isolation implementation
111         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
112         level was introduced, providing some additional heuristics to
113         detect a concurrent update by parallel transactions, and forcing
114         one of them to rollback.
115
116         OpenERP implements its own level of locking protection
117         for transactions that are highly likely to provoke concurrent
118         updates, such as stock reservations or document sequences updates.
119         Therefore we mostly care about the properties of snapshot isolation,
120         but we don't really need additional heuristics to trigger transaction
121         rollbacks, as we are taking care of triggering instant rollbacks
122         ourselves when it matters (and we can save the additional performance
123         hit of these heuristics).
124
125         As a result of the above, we have selected ``REPEATABLE READ`` as
126         the default transaction isolation level for OpenERP cursors, as
127         it will be mapped to the desired ``snapshot isolation`` level for
128         all supported PostgreSQL version (8.3 - 9.x).
129
130         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
131         read level to serializable before sending it to the database, so it would
132         actually select the new serializable mode on PostgreSQL 9.1. Make
133         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
134         the performance hit is a concern for you.
135
136         .. attribute:: cache
137
138             Cache dictionary with a "request" (-ish) lifecycle, only lives as
139             long as the cursor itself does and proactively cleared when the
140             cursor is closed.
141
142             This cache should *only* be used to store repeatable reads as it
143             ignores rollbacks and savepoints, it should not be used to store
144             *any* data which may be modified during the life of the cursor.
145
146     """
147     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
148
149     def check(f):
150         @wraps(f)
151         def wrapper(self, *args, **kwargs):
152             if self.__closed:
153                 msg = 'Unable to use a closed cursor.'
154                 if self.__closer:
155                     msg += ' It was closed at %s, line %s' % self.__closer
156                 raise psycopg2.OperationalError(msg)
157             return f(self, *args, **kwargs)
158         return wrapper
159
160     def __init__(self, pool, dbname, serialized=True):
161         self.sql_from_log = {}
162         self.sql_into_log = {}
163
164         # default log level determined at cursor creation, could be
165         # overridden later for debugging purposes
166         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
167
168         self.sql_log_count = 0
169         self.__closed = True    # avoid the call of close() (by __del__) if an exception
170                                 # is raised by any of the following initialisations
171         self._pool = pool
172         self.dbname = dbname
173
174         # Whether to enable snapshot isolation level for this cursor.
175         # see also the docstring of Cursor.  
176         self._serialized = serialized
177
178         self._cnx = pool.borrow(dsn(dbname))
179         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
180         if self.sql_log:
181             self.__caller = frame_codeinfo(currentframe(),2)
182         else:
183             self.__caller = False
184         self.__closed = False   # real initialisation value
185         self.autocommit(False)
186         self.__closer = False
187
188         self._default_log_exceptions = True
189
190         self.cache = {}
191
192     def __del__(self):
193         if not self.__closed and not self._cnx.closed:
194             # Oops. 'self' has not been closed explicitly.
195             # The cursor will be deleted by the garbage collector,
196             # but the database connection is not put back into the connection
197             # pool, preventing some operation on the database like dropping it.
198             # This can also lead to a server overload.
199             msg = "Cursor not closed explicitly\n"
200             if self.__caller:
201                 msg += "Cursor was created at %s:%s" % self.__caller
202             else:
203                 msg += "Please enable sql debugging to trace the caller."
204             _logger.warning(msg)
205             self._close(True)
206
207     @check
208     def execute(self, query, params=None, log_exceptions=None):
209         if '%d' in query or '%f' in query:
210             _logger.warning(query)
211             _logger.warning("SQL queries cannot contain %d or %f anymore. "
212                          "Use only %s")
213         if params and not isinstance(params, (tuple, list, dict)):
214             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
215             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
216
217         if self.sql_log:
218             now = mdt.now()
219
220         try:
221             params = params or None
222             res = self._obj.execute(query, params)
223         except psycopg2.ProgrammingError, pe:
224             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
225                 _logger.error("Programming error: %s, in query %s", pe, query)
226             raise
227         except Exception:
228             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
229                 _logger.exception("bad query: %s", self._obj.query or query)
230             raise
231
232         if self.sql_log:
233             delay = mdt.now() - now
234             delay = delay.seconds * 1E6 + delay.microseconds
235
236             _logger.debug("query: %s", self._obj.query)
237             self.sql_log_count+=1
238             res_from = re_from.match(query.lower())
239             if res_from:
240                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
241                 self.sql_from_log[res_from.group(1)][0] += 1
242                 self.sql_from_log[res_from.group(1)][1] += delay
243             res_into = re_into.match(query.lower())
244             if res_into:
245                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
246                 self.sql_into_log[res_into.group(1)][0] += 1
247                 self.sql_into_log[res_into.group(1)][1] += delay
248         return res
249
250
251     def split_for_in_conditions(self, ids):
252         """Split a list of identifiers into one or more smaller tuples
253            safe for IN conditions, after uniquifying them."""
254         return tools.misc.split_every(self.IN_MAX, set(ids))
255
256     def print_log(self):
257         global sql_counter
258         sql_counter += self.sql_log_count
259         if not self.sql_log:
260             return
261         def process(type):
262             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
263             sum = 0
264             if sqllogs[type]:
265                 sqllogitems = sqllogs[type].items()
266                 sqllogitems.sort(key=lambda k: k[1][1])
267                 _logger.debug("SQL LOG %s:", type)
268                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
269                 for r in sqllogitems:
270                     delay = timedelta(microseconds=r[1][1])
271                     _logger.debug("table: %s: %s/%s",
272                                         r[0], delay, r[1][0])
273                     sum+= r[1][1]
274                 sqllogs[type].clear()
275             sum = timedelta(microseconds=sum)
276             _logger.debug("SUM %s:%s/%d [%d]",
277                                 type, sum, self.sql_log_count, sql_counter)
278             sqllogs[type].clear()
279         process('from')
280         process('into')
281         self.sql_log_count = 0
282         self.sql_log = False
283
284     @check
285     def close(self):
286         return self._close(False)
287
288     def _close(self, leak=False):
289         if not self._obj:
290             return
291
292         del self.cache
293
294         if self.sql_log:
295             self.__closer = frame_codeinfo(currentframe(),3)
296         self.print_log()
297
298         self._obj.close()
299
300         # This force the cursor to be freed, and thus, available again. It is
301         # important because otherwise we can overload the server very easily
302         # because of a cursor shortage (because cursors are not garbage
303         # collected as fast as they should). The problem is probably due in
304         # part because browse records keep a reference to the cursor.
305         del self._obj
306         self.__closed = True
307
308         # Clean the underlying connection.
309         self._cnx.rollback()
310
311         if leak:
312             self._cnx.leaked = True
313         else:
314             chosen_template = tools.config['db_template']
315             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
316             keep_in_pool = self.dbname not in templates_list
317             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
318
319     @check
320     def autocommit(self, on):
321         if on:
322             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
323         else:
324             # If a serializable cursor was requested, we
325             # use the appropriate PotsgreSQL isolation level
326             # that maps to snaphsot isolation.
327             # For all supported PostgreSQL versions (8.3-9.x),
328             # this is currently the ISOLATION_REPEATABLE_READ.
329             # See also the docstring of this class.
330             # NOTE: up to psycopg 2.4.2, repeatable read
331             #       is remapped to serializable before being
332             #       sent to the database, so it is in fact
333             #       unavailable for use with pg 9.1.
334             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
335                                   if self._serialized \
336                                   else ISOLATION_LEVEL_READ_COMMITTED
337         self._cnx.set_isolation_level(isolation_level)
338
339     @check
340     def commit(self):
341         """ Perform an SQL `COMMIT`
342         """
343         return self._cnx.commit()
344
345     @check
346     def rollback(self):
347         """ Perform an SQL `ROLLBACK`
348         """
349         return self._cnx.rollback()
350
351     @check
352     def __getattr__(self, name):
353         return getattr(self._obj, name)
354
355 class PsycoConnection(psycopg2.extensions.connection):
356     pass
357
358 class ConnectionPool(object):
359     """ The pool of connections to database(s)
360     
361         Keep a set of connections to pg databases open, and reuse them
362         to open cursors for all transactions.
363         
364         The connections are *not* automatically closed. Only a close_db()
365         can trigger that.
366     """
367
368     def locked(fun):
369         @wraps(fun)
370         def _locked(self, *args, **kwargs):
371             self._lock.acquire()
372             try:
373                 return fun(self, *args, **kwargs)
374             finally:
375                 self._lock.release()
376         return _locked
377
378
379     def __init__(self, maxconn=64):
380         self._connections = []
381         self._maxconn = max(maxconn, 1)
382         self._lock = threading.Lock()
383
384     def __repr__(self):
385         used = len([1 for c, u in self._connections[:] if u])
386         count = len(self._connections)
387         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
388
389     def _debug(self, msg, *args):
390         _logger.debug(('%r ' + msg), self, *args)
391
392     @locked
393     def borrow(self, dsn):
394         self._debug('Borrow connection to %r', dsn)
395
396         # free leaked connections
397         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
398             if cnx.closed:
399                 self._connections.pop(i)
400                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
401                 continue
402             if getattr(cnx, 'leaked', False):
403                 delattr(cnx, 'leaked')
404                 self._connections.pop(i)
405                 self._connections.append((cnx, False))
406                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
407
408         for i, (cnx, used) in enumerate(self._connections):
409             if not used and dsn_are_equals(cnx.dsn, dsn):
410                 self._connections.pop(i)
411                 self._connections.append((cnx, True))
412                 self._debug('Existing connection found at index %d', i)
413
414                 return cnx
415
416         if len(self._connections) >= self._maxconn:
417             # try to remove the oldest connection not used
418             for i, (cnx, used) in enumerate(self._connections):
419                 if not used:
420                     self._connections.pop(i)
421                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
422                     break
423             else:
424                 # note: this code is called only if the for loop has completed (no break)
425                 raise PoolError('The Connection Pool Is Full')
426
427         try:
428             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
429         except psycopg2.Error:
430             _logger.exception('Connection to the database failed')
431             raise
432         self._connections.append((result, True))
433         self._debug('Create new connection')
434         return result
435
436     @locked
437     def give_back(self, connection, keep_in_pool=True):
438         self._debug('Give back connection to %r', connection.dsn)
439         for i, (cnx, used) in enumerate(self._connections):
440             if cnx is connection:
441                 self._connections.pop(i)
442                 if keep_in_pool:
443                     self._connections.append((cnx, False))
444                     self._debug('Put connection to %r in pool', cnx.dsn)
445                 else:
446                     self._debug('Forgot connection to %r', cnx.dsn)
447                     cnx.close()
448                 break
449         else:
450             raise PoolError('This connection does not below to the pool')
451
452     @locked
453     def close_all(self, dsn):
454         _logger.info('%r: Close all connections to %r', self, dsn)
455         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
456             if dsn_are_equals(cnx.dsn, dsn):
457                 cnx.close()
458                 self._connections.pop(i)
459
460
461 class Connection(object):
462     """ A lightweight instance of a connection to postgres
463     """
464
465     def __init__(self, pool, dbname):
466         self.dbname = dbname
467         self._pool = pool
468
469     def cursor(self, serialized=True):
470         cursor_type = serialized and 'serialized ' or ''
471         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
472         return Cursor(self._pool, self.dbname, serialized=serialized)
473
474     # serialized_cursor is deprecated - cursors are serialized by default
475     serialized_cursor = cursor
476
477     def __nonzero__(self):
478         """Check if connection is possible"""
479         try:
480             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
481             cr = self.cursor()
482             cr.close()
483             return True
484         except Exception:
485             return False
486
487 def dsn(db_name):
488     _dsn = ''
489     for p in ('host', 'port', 'user', 'password'):
490         cfg = tools.config['db_' + p]
491         if cfg:
492             _dsn += '%s=%s ' % (p, cfg)
493
494     return '%sdbname=%s' % (_dsn, db_name)
495
496 def dsn_are_equals(first, second):
497     def key(dsn):
498         k = dict(x.split('=', 1) for x in dsn.strip().split())
499         k.pop('password', None) # password is not relevant
500         return k
501     return key(first) == key(second)
502
503
504 _Pool = None
505
506 def db_connect(db_name):
507     global _Pool
508     if _Pool is None:
509         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
510     currentThread().dbname = db_name
511     return Connection(_Pool, db_name)
512
513 def close_db(db_name):
514     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
515     global _Pool
516     if _Pool:
517         _Pool.close_all(dsn(db_name))
518     ct = currentThread()
519     if hasattr(ct, 'dbname'):
520         delattr(ct, 'dbname')
521
522
523 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
524