cleanups
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
45
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
47
48 _logger = logging.getLogger(__name__)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141         .. attribute:: cache
142
143             Cache dictionary with a "request" (-ish) lifecycle, only lives as
144             long as the cursor itself does and proactively cleared when the
145             cursor is closed.
146
147             This cache should *only* be used to store repeatable reads as it
148             ignores rollbacks and savepoints, it should not be used to store
149             *any* data which may be modified during the life of the cursor.
150
151     """
152     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
153
154     def check(f):
155         @wraps(f)
156         def wrapper(self, *args, **kwargs):
157             if self.__closed:
158                 msg = 'Unable to use a closed cursor.'
159                 if self.__closer:
160                     msg += ' It was closed at %s, line %s' % self.__closer
161                 raise psycopg2.OperationalError(msg)
162             return f(self, *args, **kwargs)
163         return wrapper
164
165     def __init__(self, pool, dbname, serialized=True):
166         self.sql_from_log = {}
167         self.sql_into_log = {}
168
169         # default log level determined at cursor creation, could be
170         # overridden later for debugging purposes
171         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
172
173         self.sql_log_count = 0
174         self.__closed = True    # avoid the call of close() (by __del__) if an exception
175                                 # is raised by any of the following initialisations
176         self._pool = pool
177         self.dbname = dbname
178
179         # Whether to enable snapshot isolation level for this cursor.
180         # see also the docstring of Cursor.  
181         self._serialized = serialized
182
183         self._cnx = pool.borrow(dsn(dbname))
184         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
185         if self.sql_log:
186             self.__caller = frame_codeinfo(currentframe(),2)
187         else:
188             self.__caller = False
189         self.__closed = False   # real initialisation value
190         self.autocommit(False)
191         self.__closer = False
192
193         self._default_log_exceptions = True
194
195         self.cache = {}
196
197     def __del__(self):
198         if not self.__closed and not self._cnx.closed:
199             # Oops. 'self' has not been closed explicitly.
200             # The cursor will be deleted by the garbage collector,
201             # but the database connection is not put back into the connection
202             # pool, preventing some operation on the database like dropping it.
203             # This can also lead to a server overload.
204             msg = "Cursor not closed explicitly\n"
205             if self.__caller:
206                 msg += "Cursor was created at %s:%s" % self.__caller
207             else:
208                 msg += "Please enable sql debugging to trace the caller."
209             _logger.warning(msg)
210             self._close(True)
211
212     @check
213     def execute(self, query, params=None, log_exceptions=None):
214         if '%d' in query or '%f' in query:
215             _logger.warning(query)
216             _logger.warning("SQL queries cannot contain %d or %f anymore. "
217                          "Use only %s")
218         if params and not isinstance(params, (tuple, list, dict)):
219             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
220             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
221
222         if self.sql_log:
223             now = mdt.now()
224
225         try:
226             params = params or None
227             res = self._obj.execute(query, params)
228         except psycopg2.ProgrammingError, pe:
229             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
230                 _logger.error("Programming error: %s, in query %s", pe, query)
231             raise
232         except Exception:
233             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
234                 _logger.exception("bad query: %s", self._obj.query or query)
235             raise
236
237         if self.sql_log:
238             delay = mdt.now() - now
239             delay = delay.seconds * 1E6 + delay.microseconds
240
241             _logger.debug("query: %s", self._obj.query)
242             self.sql_log_count+=1
243             res_from = re_from.match(query.lower())
244             if res_from:
245                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
246                 self.sql_from_log[res_from.group(1)][0] += 1
247                 self.sql_from_log[res_from.group(1)][1] += delay
248             res_into = re_into.match(query.lower())
249             if res_into:
250                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
251                 self.sql_into_log[res_into.group(1)][0] += 1
252                 self.sql_into_log[res_into.group(1)][1] += delay
253         return res
254
255
256     def split_for_in_conditions(self, ids):
257         """Split a list of identifiers into one or more smaller tuples
258            safe for IN conditions, after uniquifying them."""
259         return tools.misc.split_every(self.IN_MAX, set(ids))
260
261     def print_log(self):
262         global sql_counter
263         sql_counter += self.sql_log_count
264         if not self.sql_log:
265             return
266         def process(type):
267             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
268             sum = 0
269             if sqllogs[type]:
270                 sqllogitems = sqllogs[type].items()
271                 sqllogitems.sort(key=lambda k: k[1][1])
272                 _logger.debug("SQL LOG %s:", type)
273                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
274                 for r in sqllogitems:
275                     delay = timedelta(microseconds=r[1][1])
276                     _logger.debug("table: %s: %s/%s",
277                                         r[0], delay, r[1][0])
278                     sum+= r[1][1]
279                 sqllogs[type].clear()
280             sum = timedelta(microseconds=sum)
281             _logger.debug("SUM %s:%s/%d [%d]",
282                                 type, sum, self.sql_log_count, sql_counter)
283             sqllogs[type].clear()
284         process('from')
285         process('into')
286         self.sql_log_count = 0
287         self.sql_log = False
288
289     @check
290     def close(self):
291         return self._close(False)
292
293     def _close(self, leak=False):
294         if not self._obj:
295             return
296
297         del self.cache
298
299         if self.sql_log:
300             self.__closer = frame_codeinfo(currentframe(),3)
301         self.print_log()
302
303         self._obj.close()
304
305         # This force the cursor to be freed, and thus, available again. It is
306         # important because otherwise we can overload the server very easily
307         # because of a cursor shortage (because cursors are not garbage
308         # collected as fast as they should). The problem is probably due in
309         # part because browse records keep a reference to the cursor.
310         del self._obj
311         self.__closed = True
312
313         # Clean the underlying connection.
314         self._cnx.rollback()
315
316         if leak:
317             self._cnx.leaked = True
318         else:
319             chosen_template = tools.config['db_template']
320             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
321             keep_in_pool = self.dbname not in templates_list
322             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
323
324     @check
325     def autocommit(self, on):
326         if on:
327             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
328         else:
329             # If a serializable cursor was requested, we
330             # use the appropriate PotsgreSQL isolation level
331             # that maps to snaphsot isolation.
332             # For all supported PostgreSQL versions (8.3-9.x),
333             # this is currently the ISOLATION_REPEATABLE_READ.
334             # See also the docstring of this class.
335             # NOTE: up to psycopg 2.4.2, repeatable read
336             #       is remapped to serializable before being
337             #       sent to the database, so it is in fact
338             #       unavailable for use with pg 9.1.
339             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
340                                   if self._serialized \
341                                   else ISOLATION_LEVEL_READ_COMMITTED
342         self._cnx.set_isolation_level(isolation_level)
343
344     @check
345     def commit(self):
346         """ Perform an SQL `COMMIT`
347         """
348         return self._cnx.commit()
349
350     @check
351     def rollback(self):
352         """ Perform an SQL `ROLLBACK`
353         """
354         return self._cnx.rollback()
355
356     @check
357     def __getattr__(self, name):
358         return getattr(self._obj, name)
359
360         """ Set the mode of postgres operations for all cursors
361         """
362         """Obtain the mode of postgres operations for all cursors
363         """
364
365 class PsycoConnection(psycopg2.extensions.connection):
366     pass
367
368 class ConnectionPool(object):
369     """ The pool of connections to database(s)
370     
371         Keep a set of connections to pg databases open, and reuse them
372         to open cursors for all transactions.
373         
374         The connections are *not* automatically closed. Only a close_db()
375         can trigger that.
376     """
377
378     def locked(fun):
379         @wraps(fun)
380         def _locked(self, *args, **kwargs):
381             self._lock.acquire()
382             try:
383                 return fun(self, *args, **kwargs)
384             finally:
385                 self._lock.release()
386         return _locked
387
388
389     def __init__(self, maxconn=64):
390         self._connections = []
391         self._maxconn = max(maxconn, 1)
392         self._lock = threading.Lock()
393
394     def __repr__(self):
395         used = len([1 for c, u in self._connections[:] if u])
396         count = len(self._connections)
397         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
398
399     def _debug(self, msg, *args):
400         _logger.debug(('%r ' + msg), self, *args)
401
402     @locked
403     def borrow(self, dsn):
404         self._debug('Borrow connection to %r', dsn)
405
406         # free leaked connections
407         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
408             if cnx.closed:
409                 self._connections.pop(i)
410                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
411                 continue
412             if getattr(cnx, 'leaked', False):
413                 delattr(cnx, 'leaked')
414                 self._connections.pop(i)
415                 self._connections.append((cnx, False))
416                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
417
418         for i, (cnx, used) in enumerate(self._connections):
419             if not used and dsn_are_equals(cnx.dsn, dsn):
420                 self._connections.pop(i)
421                 self._connections.append((cnx, True))
422                 self._debug('Existing connection found at index %d', i)
423
424                 return cnx
425
426         if len(self._connections) >= self._maxconn:
427             # try to remove the oldest connection not used
428             for i, (cnx, used) in enumerate(self._connections):
429                 if not used:
430                     self._connections.pop(i)
431                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
432                     break
433             else:
434                 # note: this code is called only if the for loop has completed (no break)
435                 raise PoolError('The Connection Pool Is Full')
436
437         try:
438             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
439         except psycopg2.Error:
440             _logger.exception('Connection to the database failed')
441             raise
442         self._connections.append((result, True))
443         self._debug('Create new connection')
444         return result
445
446     @locked
447     def give_back(self, connection, keep_in_pool=True):
448         self._debug('Give back connection to %r', connection.dsn)
449         for i, (cnx, used) in enumerate(self._connections):
450             if cnx is connection:
451                 self._connections.pop(i)
452                 if keep_in_pool:
453                     self._connections.append((cnx, False))
454                     self._debug('Put connection to %r in pool', cnx.dsn)
455                 else:
456                     self._debug('Forgot connection to %r', cnx.dsn)
457                     cnx.close()
458                 break
459         else:
460             raise PoolError('This connection does not below to the pool')
461
462     @locked
463     def close_all(self, dsn):
464         _logger.info('%r: Close all connections to %r', self, dsn)
465         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
466             if dsn_are_equals(cnx.dsn, dsn):
467                 cnx.close()
468                 self._connections.pop(i)
469
470
471 class Connection(object):
472     """ A lightweight instance of a connection to postgres
473     """
474
475     def __init__(self, pool, dbname):
476         self.dbname = dbname
477         self._pool = pool
478
479     def cursor(self, serialized=True):
480         cursor_type = serialized and 'serialized ' or ''
481         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
482         return Cursor(self._pool, self.dbname, serialized=serialized)
483
484     # serialized_cursor is deprecated - cursors are serialized by default
485     serialized_cursor = cursor
486
487     def __nonzero__(self):
488         """Check if connection is possible"""
489         try:
490             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
491             cr = self.cursor()
492             cr.close()
493             return True
494         except Exception:
495             return False
496
497 def dsn(db_name):
498     _dsn = ''
499     for p in ('host', 'port', 'user', 'password'):
500         cfg = tools.config['db_' + p]
501         if cfg:
502             _dsn += '%s=%s ' % (p, cfg)
503
504     return '%sdbname=%s' % (_dsn, db_name)
505
506 def dsn_are_equals(first, second):
507     def key(dsn):
508         k = dict(x.split('=', 1) for x in dsn.strip().split())
509         k.pop('password', None) # password is not relevant
510         return k
511     return key(first) == key(second)
512
513
514 _Pool = None
515
516 def db_connect(db_name):
517     global _Pool
518     if _Pool is None:
519         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
520     currentThread().dbname = db_name
521     return Connection(_Pool, db_name)
522
523 def close_db(db_name):
524     global _Pool
525     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
526     if _Pool:
527         _Pool.close_all(dsn(db_name))
528     ct = currentThread()
529     if hasattr(ct, 'dbname'):
530         delattr(ct, 'dbname')
531
532
533 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
534