[IMP] float_utils: fix HALF_UP rounding according to discussions on bug 882036
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from threading import currentThread
39 import logging
40 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE,\
41     ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.psycopg1 import cursor as psycopg1cursor
43 from psycopg2.pool import PoolError
44
45 import psycopg2.extensions
46 import warnings
47
48 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import wraps, frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141     """
142     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
143     __logger = None
144
145     def check(f):
146         @wraps(f)
147         def wrapper(self, *args, **kwargs):
148             if self.__closed:
149                 msg = 'Unable to use a closed cursor.'
150                 if self.__closer:
151                     msg += ' It was closed at %s, line %s' % self.__closer
152                 raise psycopg2.OperationalError(msg)
153             return f(self, *args, **kwargs)
154         return wrapper
155
156     def __init__(self, pool, dbname, serialized=True):
157         if self.__class__.__logger is None:
158             self.__class__.__logger = logging.getLogger('db.cursor')
159         self.sql_from_log = {}
160         self.sql_into_log = {}
161
162         # default log level determined at cursor creation, could be
163         # overridden later for debugging purposes
164         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
165
166         self.sql_log_count = 0
167         self.__closed = True    # avoid the call of close() (by __del__) if an exception
168                                 # is raised by any of the following initialisations
169         self._pool = pool
170         self.dbname = dbname
171
172         # Whether to enable snapshot isolation level for this cursor.
173         # see also the docstring of Cursor.  
174         self._serialized = serialized
175
176         self._cnx = pool.borrow(dsn(dbname))
177         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
178         self.__closed = False   # real initialisation value
179         self.autocommit(False)
180         if self.sql_log:
181             self.__caller = frame_codeinfo(currentframe(),2)
182         else:
183             self.__caller = False
184         self.__closer = False
185
186         self._default_log_exceptions = True
187
188     def __del__(self):
189         if not self.__closed:
190             # Oops. 'self' has not been closed explicitly.
191             # The cursor will be deleted by the garbage collector,
192             # but the database connection is not put back into the connection
193             # pool, preventing some operation on the database like dropping it.
194             # This can also lead to a server overload.
195             msg = "Cursor not closed explicitly\n"
196             if self.__caller:
197                 msg += "Cursor was created at %s:%s" % self.__caller
198             else:
199                 msg += "Please enable sql debugging to trace the caller."
200             self.__logger.warn(msg)
201             self._close(True)
202
203     @check
204     def execute(self, query, params=None, log_exceptions=None):
205         if '%d' in query or '%f' in query:
206             self.__logger.warn(query)
207             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
208                                "Use only %s")
209
210         if self.sql_log:
211             now = mdt.now()
212
213         try:
214             params = params or None
215             res = self._obj.execute(query, params)
216         except psycopg2.ProgrammingError, pe:
217             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
218                 self.__logger.error("Programming error: %s, in query %s", pe, query)
219             raise
220         except Exception:
221             if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
222                 self.__logger.exception("bad query: %s", self._obj.query or query)
223             raise
224
225         if self.sql_log:
226             delay = mdt.now() - now
227             delay = delay.seconds * 1E6 + delay.microseconds
228
229             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
230             self.sql_log_count+=1
231             res_from = re_from.match(query.lower())
232             if res_from:
233                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
234                 self.sql_from_log[res_from.group(1)][0] += 1
235                 self.sql_from_log[res_from.group(1)][1] += delay
236             res_into = re_into.match(query.lower())
237             if res_into:
238                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
239                 self.sql_into_log[res_into.group(1)][0] += 1
240                 self.sql_into_log[res_into.group(1)][1] += delay
241         return res
242
243
244     def split_for_in_conditions(self, ids):
245         """Split a list of identifiers into one or more smaller tuples
246            safe for IN conditions, after uniquifying them."""
247         return tools.misc.split_every(self.IN_MAX, set(ids))
248
249     def print_log(self):
250         global sql_counter
251         sql_counter += self.sql_log_count
252         if not self.sql_log:
253             return
254         def process(type):
255             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
256             sum = 0
257             if sqllogs[type]:
258                 sqllogitems = sqllogs[type].items()
259                 sqllogitems.sort(key=lambda k: k[1][1])
260                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
261                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
262                 for r in sqllogitems:
263                     delay = timedelta(microseconds=r[1][1])
264                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
265                                         r[0], delay, r[1][0])
266                     sum+= r[1][1]
267                 sqllogs[type].clear()
268             sum = timedelta(microseconds=sum)
269             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
270                                 type, sum, self.sql_log_count, sql_counter)
271             sqllogs[type].clear()
272         process('from')
273         process('into')
274         self.sql_log_count = 0
275         self.sql_log = False
276
277     @check
278     def close(self):
279         return self._close(False)
280
281     def _close(self, leak=False):
282         if not self._obj:
283             return
284
285         if self.sql_log:
286             self.__closer = frame_codeinfo(currentframe(),3)
287         self.print_log()
288
289         self._obj.close()
290
291         # This force the cursor to be freed, and thus, available again. It is
292         # important because otherwise we can overload the server very easily
293         # because of a cursor shortage (because cursors are not garbage
294         # collected as fast as they should). The problem is probably due in
295         # part because browse records keep a reference to the cursor.
296         del self._obj
297         self.__closed = True
298
299         # Clean the underlying connection.
300         self._cnx.rollback()
301
302         if leak:
303             self._cnx.leaked = True
304         else:
305             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
306             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
307
308     @check
309     def autocommit(self, on):
310         if on:
311             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
312         else:
313             # If a serializable cursor was requested, we
314             # use the appropriate PotsgreSQL isolation level
315             # that maps to snaphsot isolation.
316             # For all supported PostgreSQL versions (8.3-9.x),
317             # this is currently the ISOLATION_REPEATABLE_READ.
318             # See also the docstring of this class.
319             # NOTE: up to psycopg 2.4.2, repeatable read
320             #       is remapped to serializable before being
321             #       sent to the database, so it is in fact
322             #       unavailable for use with pg 9.1.
323             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
324                                   if self._serialized \
325                                   else ISOLATION_LEVEL_READ_COMMITTED
326         self._cnx.set_isolation_level(isolation_level)
327
328     @check
329     def commit(self):
330         """ Perform an SQL `COMMIT`
331         """
332         return self._cnx.commit()
333
334     @check
335     def rollback(self):
336         """ Perform an SQL `ROLLBACK`
337         """
338         return self._cnx.rollback()
339
340     @check
341     def __getattr__(self, name):
342         return getattr(self._obj, name)
343
344         """ Set the mode of postgres operations for all cursors
345         """
346         """Obtain the mode of postgres operations for all cursors
347         """
348
349 class PsycoConnection(psycopg2.extensions.connection):
350     pass
351
352 class ConnectionPool(object):
353     """ The pool of connections to database(s)
354     
355         Keep a set of connections to pg databases open, and reuse them
356         to open cursors for all transactions.
357         
358         The connections are *not* automatically closed. Only a close_db()
359         can trigger that.
360     """
361     __logger = logging.getLogger('db.connection_pool')
362
363     def locked(fun):
364         @wraps(fun)
365         def _locked(self, *args, **kwargs):
366             self._lock.acquire()
367             try:
368                 return fun(self, *args, **kwargs)
369             finally:
370                 self._lock.release()
371         return _locked
372
373
374     def __init__(self, maxconn=64):
375         self._connections = []
376         self._maxconn = max(maxconn, 1)
377         self._lock = threading.Lock()
378
379     def __repr__(self):
380         used = len([1 for c, u in self._connections[:] if u])
381         count = len(self._connections)
382         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
383
384     def _debug(self, msg, *args):
385         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
386
387     @locked
388     def borrow(self, dsn):
389         self._debug('Borrow connection to %r', dsn)
390
391         # free leaked connections
392         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
393             if getattr(cnx, 'leaked', False):
394                 delattr(cnx, 'leaked')
395                 self._connections.pop(i)
396                 self._connections.append((cnx, False))
397                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
398
399         for i, (cnx, used) in enumerate(self._connections):
400             if not used and dsn_are_equals(cnx.dsn, dsn):
401                 self._connections.pop(i)
402                 self._connections.append((cnx, True))
403                 self._debug('Existing connection found at index %d', i)
404
405                 return cnx
406
407         if len(self._connections) >= self._maxconn:
408             # try to remove the oldest connection not used
409             for i, (cnx, used) in enumerate(self._connections):
410                 if not used:
411                     self._connections.pop(i)
412                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
413                     break
414             else:
415                 # note: this code is called only if the for loop has completed (no break)
416                 raise PoolError('The Connection Pool Is Full')
417
418         try:
419             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
420         except psycopg2.Error, e:
421             self.__logger.exception('Connection to the database failed')
422             raise
423         self._connections.append((result, True))
424         self._debug('Create new connection')
425         return result
426
427     @locked
428     def give_back(self, connection, keep_in_pool=True):
429         self._debug('Give back connection to %r', connection.dsn)
430         for i, (cnx, used) in enumerate(self._connections):
431             if cnx is connection:
432                 self._connections.pop(i)
433                 if keep_in_pool:
434                     self._connections.append((cnx, False))
435                     self._debug('Put connection to %r in pool', cnx.dsn)
436                 else:
437                     self._debug('Forgot connection to %r', cnx.dsn)
438                     cnx.close()
439                 break
440         else:
441             raise PoolError('This connection does not below to the pool')
442
443     @locked
444     def close_all(self, dsn):
445         self.__logger.info('%r: Close all connections to %r', self, dsn)
446         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
447             if dsn_are_equals(cnx.dsn, dsn):
448                 cnx.close()
449                 self._connections.pop(i)
450
451
452 class Connection(object):
453     """ A lightweight instance of a connection to postgres
454     """
455     __logger = logging.getLogger('db.connection')
456
457     def __init__(self, pool, dbname):
458         self.dbname = dbname
459         self._pool = pool
460
461     def cursor(self, serialized=True):
462         cursor_type = serialized and 'serialized ' or ''
463         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
464         return Cursor(self._pool, self.dbname, serialized=serialized)
465
466     # serialized_cursor is deprecated - cursors are serialized by default
467     serialized_cursor = cursor
468
469     def __nonzero__(self):
470         """Check if connection is possible"""
471         try:
472             warnings.warn("You use an expensive function to test a connection.",
473                       DeprecationWarning, stacklevel=1)
474             cr = self.cursor()
475             cr.close()
476             return True
477         except Exception:
478             return False
479
480 def dsn(db_name):
481     _dsn = ''
482     for p in ('host', 'port', 'user', 'password'):
483         cfg = tools.config['db_' + p]
484         if cfg:
485             _dsn += '%s=%s ' % (p, cfg)
486
487     return '%sdbname=%s' % (_dsn, db_name)
488
489 def dsn_are_equals(first, second):
490     def key(dsn):
491         k = dict(x.split('=', 1) for x in dsn.strip().split())
492         k.pop('password', None) # password is not relevant
493         return k
494     return key(first) == key(second)
495
496
497 _Pool = None
498
499 def db_connect(db_name):
500     global _Pool
501     if _Pool is None:
502         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
503     currentThread().dbname = db_name
504     return Connection(_Pool, db_name)
505
506 def close_db(db_name):
507     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
508     _Pool.close_all(dsn(db_name))
509     ct = currentThread()
510     if hasattr(ct, 'dbname'):
511         delattr(ct, 'dbname')
512
513
514 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
515