[FIX] debug by default until we fix css merging
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from threading import currentThread
39 import logging
40 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE,\
41     ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.psycopg1 import cursor as psycopg1cursor
43 from psycopg2.pool import PoolError
44
45 import psycopg2.extensions
46 import warnings
47
48 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import wraps, frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141     """
142     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
143     __logger = None
144
145     def check(f):
146         @wraps(f)
147         def wrapper(self, *args, **kwargs):
148             if self.__closed:
149                 msg = 'Unable to use a closed cursor.'
150                 if self.__closer:
151                     msg += ' It was closed at %s, line %s' % self.__closer
152                 raise psycopg2.OperationalError(msg)
153             return f(self, *args, **kwargs)
154         return wrapper
155
156     def __init__(self, pool, dbname, serialized=True):
157         if self.__class__.__logger is None:
158             self.__class__.__logger = logging.getLogger('db.cursor')
159         self.sql_from_log = {}
160         self.sql_into_log = {}
161
162         # default log level determined at cursor creation, could be
163         # overridden later for debugging purposes
164         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
165
166         self.sql_log_count = 0
167         self.__closed = True    # avoid the call of close() (by __del__) if an exception
168                                 # is raised by any of the following initialisations
169         self._pool = pool
170         self.dbname = dbname
171
172         # Whether to enable snapshot isolation level for this cursor.
173         # see also the docstring of Cursor.  
174         self._serialized = serialized
175
176         self._cnx = pool.borrow(dsn(dbname))
177         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
178         self.__closed = False   # real initialisation value
179         self.autocommit(False)
180         if self.sql_log:
181             self.__caller = frame_codeinfo(currentframe(),2)
182         else:
183             self.__caller = False
184         self.__closer = False
185
186     def __del__(self):
187         if not self.__closed:
188             # Oops. 'self' has not been closed explicitly.
189             # The cursor will be deleted by the garbage collector,
190             # but the database connection is not put back into the connection
191             # pool, preventing some operation on the database like dropping it.
192             # This can also lead to a server overload.
193             msg = "Cursor not closed explicitly\n"
194             if self.__caller:
195                 msg += "Cursor was created at %s:%s" % self.__caller
196             else:
197                 msg += "Please enable sql debugging to trace the caller."
198             self.__logger.warn(msg)
199             self._close(True)
200
201     @check
202     def execute(self, query, params=None, log_exceptions=True):
203         if '%d' in query or '%f' in query:
204             self.__logger.warn(query)
205             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
206                                "Use only %s")
207
208         if self.sql_log:
209             now = mdt.now()
210
211         try:
212             params = params or None
213             res = self._obj.execute(query, params)
214         except psycopg2.ProgrammingError, pe:
215             if log_exceptions:
216                 self.__logger.error("Programming error: %s, in query %s", pe, query)
217             raise
218         except Exception:
219             if log_exceptions:
220                 self.__logger.exception("bad query: %s", self._obj.query or query)
221             raise
222
223         if self.sql_log:
224             delay = mdt.now() - now
225             delay = delay.seconds * 1E6 + delay.microseconds
226
227             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
228             self.sql_log_count+=1
229             res_from = re_from.match(query.lower())
230             if res_from:
231                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
232                 self.sql_from_log[res_from.group(1)][0] += 1
233                 self.sql_from_log[res_from.group(1)][1] += delay
234             res_into = re_into.match(query.lower())
235             if res_into:
236                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
237                 self.sql_into_log[res_into.group(1)][0] += 1
238                 self.sql_into_log[res_into.group(1)][1] += delay
239         return res
240
241
242     def split_for_in_conditions(self, ids):
243         """Split a list of identifiers into one or more smaller tuples
244            safe for IN conditions, after uniquifying them."""
245         return tools.misc.split_every(self.IN_MAX, set(ids))
246
247     def print_log(self):
248         global sql_counter
249         sql_counter += self.sql_log_count
250         if not self.sql_log:
251             return
252         def process(type):
253             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
254             sum = 0
255             if sqllogs[type]:
256                 sqllogitems = sqllogs[type].items()
257                 sqllogitems.sort(key=lambda k: k[1][1])
258                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
259                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
260                 for r in sqllogitems:
261                     delay = timedelta(microseconds=r[1][1])
262                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
263                                         r[0], delay, r[1][0])
264                     sum+= r[1][1]
265                 sqllogs[type].clear()
266             sum = timedelta(microseconds=sum)
267             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
268                                 type, sum, self.sql_log_count, sql_counter)
269             sqllogs[type].clear()
270         process('from')
271         process('into')
272         self.sql_log_count = 0
273         self.sql_log = False
274
275     @check
276     def close(self):
277         return self._close(False)
278
279     def _close(self, leak=False):
280         if not self._obj:
281             return
282
283         if self.sql_log:
284             self.__closer = frame_codeinfo(currentframe(),3)
285         self.print_log()
286
287         self._obj.close()
288
289         # This force the cursor to be freed, and thus, available again. It is
290         # important because otherwise we can overload the server very easily
291         # because of a cursor shortage (because cursors are not garbage
292         # collected as fast as they should). The problem is probably due in
293         # part because browse records keep a reference to the cursor.
294         del self._obj
295         self.__closed = True
296
297         if leak:
298             self._cnx.leaked = True
299         else:
300             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
301             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
302
303     @check
304     def autocommit(self, on):
305         if on:
306             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
307         else:
308             # If a serializable cursor was requested, we
309             # use the appropriate PotsgreSQL isolation level
310             # that maps to snaphsot isolation.
311             # For all supported PostgreSQL versions (8.3-9.x),
312             # this is currently the ISOLATION_REPEATABLE_READ.
313             # See also the docstring of this class.
314             # NOTE: up to psycopg 2.4.2, repeatable read
315             #       is remapped to serializable before being
316             #       sent to the database, so it is in fact
317             #       unavailable for use with pg 9.1.
318             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
319                                   if self._serialized \
320                                   else ISOLATION_LEVEL_READ_COMMITTED
321         self._cnx.set_isolation_level(isolation_level)
322
323     @check
324     def commit(self):
325         """ Perform an SQL `COMMIT`
326         """
327         return self._cnx.commit()
328
329     @check
330     def rollback(self):
331         """ Perform an SQL `ROLLBACK`
332         """
333         return self._cnx.rollback()
334
335     @check
336     def __getattr__(self, name):
337         return getattr(self._obj, name)
338
339         """ Set the mode of postgres operations for all cursors
340         """
341         """Obtain the mode of postgres operations for all cursors
342         """
343
344 class PsycoConnection(psycopg2.extensions.connection):
345     pass
346
347 class ConnectionPool(object):
348     """ The pool of connections to database(s)
349     
350         Keep a set of connections to pg databases open, and reuse them
351         to open cursors for all transactions.
352         
353         The connections are *not* automatically closed. Only a close_db()
354         can trigger that.
355     """
356     __logger = logging.getLogger('db.connection_pool')
357
358     def locked(fun):
359         @wraps(fun)
360         def _locked(self, *args, **kwargs):
361             self._lock.acquire()
362             try:
363                 return fun(self, *args, **kwargs)
364             finally:
365                 self._lock.release()
366         return _locked
367
368
369     def __init__(self, maxconn=64):
370         self._connections = []
371         self._maxconn = max(maxconn, 1)
372         self._lock = threading.Lock()
373
374     def __repr__(self):
375         used = len([1 for c, u in self._connections[:] if u])
376         count = len(self._connections)
377         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
378
379     def _debug(self, msg, *args):
380         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
381
382     @locked
383     def borrow(self, dsn):
384         self._debug('Borrow connection to %r', dsn)
385
386         # free leaked connections
387         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
388             if getattr(cnx, 'leaked', False):
389                 delattr(cnx, 'leaked')
390                 self._connections.pop(i)
391                 self._connections.append((cnx, False))
392                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
393
394         for i, (cnx, used) in enumerate(self._connections):
395             if not used and dsn_are_equals(cnx.dsn, dsn):
396                 self._connections.pop(i)
397                 self._connections.append((cnx, True))
398                 self._debug('Existing connection found at index %d', i)
399
400                 return cnx
401
402         if len(self._connections) >= self._maxconn:
403             # try to remove the oldest connection not used
404             for i, (cnx, used) in enumerate(self._connections):
405                 if not used:
406                     self._connections.pop(i)
407                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
408                     break
409             else:
410                 # note: this code is called only if the for loop has completed (no break)
411                 raise PoolError('The Connection Pool Is Full')
412
413         try:
414             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
415         except psycopg2.Error, e:
416             self.__logger.exception('Connection to the database failed')
417             raise
418         self._connections.append((result, True))
419         self._debug('Create new connection')
420         return result
421
422     @locked
423     def give_back(self, connection, keep_in_pool=True):
424         self._debug('Give back connection to %r', connection.dsn)
425         for i, (cnx, used) in enumerate(self._connections):
426             if cnx is connection:
427                 self._connections.pop(i)
428                 if keep_in_pool:
429                     self._connections.append((cnx, False))
430                     self._debug('Put connection to %r in pool', cnx.dsn)
431                 else:
432                     self._debug('Forgot connection to %r', cnx.dsn)
433                 break
434         else:
435             raise PoolError('This connection does not below to the pool')
436
437     @locked
438     def close_all(self, dsn):
439         self.__logger.info('%r: Close all connections to %r', self, dsn)
440         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
441             if dsn_are_equals(cnx.dsn, dsn):
442                 cnx.close()
443                 self._connections.pop(i)
444
445
446 class Connection(object):
447     """ A lightweight instance of a connection to postgres
448     """
449     __logger = logging.getLogger('db.connection')
450
451     def __init__(self, pool, dbname):
452         self.dbname = dbname
453         self._pool = pool
454
455     def cursor(self, serialized=True):
456         cursor_type = serialized and 'serialized ' or ''
457         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
458         return Cursor(self._pool, self.dbname, serialized=serialized)
459
460     # serialized_cursor is deprecated - cursors are serialized by default
461     serialized_cursor = cursor
462
463     def __nonzero__(self):
464         """Check if connection is possible"""
465         try:
466             warnings.warn("You use an expensive function to test a connection.",
467                       DeprecationWarning, stacklevel=1)
468             cr = self.cursor()
469             cr.close()
470             return True
471         except Exception:
472             return False
473
474 def dsn(db_name):
475     _dsn = ''
476     for p in ('host', 'port', 'user', 'password'):
477         cfg = tools.config['db_' + p]
478         if cfg:
479             _dsn += '%s=%s ' % (p, cfg)
480
481     return '%sdbname=%s' % (_dsn, db_name)
482
483 def dsn_are_equals(first, second):
484     def key(dsn):
485         k = dict(x.split('=', 1) for x in dsn.strip().split())
486         k.pop('password', None) # password is not relevant
487         return k
488     return key(first) == key(second)
489
490
491 _Pool = None
492
493 def db_connect(db_name):
494     global _Pool
495     if _Pool is None:
496         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
497     currentThread().dbname = db_name
498     return Connection(_Pool, db_name)
499
500 def close_db(db_name):
501     """ You might want to call openerp.netsvc.Agent.cancel(db_name) along this function."""
502     _Pool.close_all(dsn(db_name))
503     ct = currentThread()
504     if hasattr(ct, 'dbname'):
505         delattr(ct, 'dbname')
506
507
508 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
509