[IMP] Make wizard work
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 #.apidoc title: PostgreSQL interface
24
25 """
26 The PostgreSQL connector is a connectivity layer between the OpenERP code and
27 the database, *not* a database abstraction toolkit. Database abstraction is what
28 the ORM does, in fact.
29
30 See also: the `pooler` module
31 """
32
33 #.apidoc add-functions: print_stats
34 #.apidoc add-classes: Cursor Connection ConnectionPool
35
36 __all__ = ['db_connect', 'close_db']
37
38 from functools import wraps
39 import logging
40 import psycopg2.extensions
41 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
42 from psycopg2.pool import PoolError
43 from psycopg2.psycopg1 import cursor as psycopg1cursor
44 from threading import currentThread
45
46 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
47
48 _logger = logging.getLogger(__name__)
49
50 types_mapping = {
51     'date': (1082,),
52     'time': (1083,),
53     'datetime': (1114,),
54 }
55
56 def unbuffer(symb, cr):
57     if symb is None: return None
58     return str(symb)
59
60 def undecimalize(symb, cr):
61     if symb is None: return None
62     return float(symb)
63
64 for name, typeoid in types_mapping.items():
65     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
66 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
67
68
69 import tools
70 from tools.func import frame_codeinfo
71 from datetime import datetime as mdt
72 from datetime import timedelta
73 import threading
74 from inspect import currentframe
75
76 import re
77 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
78 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
79
80 sql_counter = 0
81
82 class Cursor(object):
83     """Represents an open transaction to the PostgreSQL DB backend,
84        acting as a lightweight wrapper around psycopg2's
85        ``psycopg1cursor`` objects.
86
87         ``Cursor`` is the object behind the ``cr`` variable used all
88         over the OpenERP code.
89
90         .. rubric:: Transaction Isolation
91
92         One very important property of database transactions is the
93         level of isolation between concurrent transactions.
94         The SQL standard defines four levels of transaction isolation,
95         ranging from the most strict *Serializable* level, to the least
96         strict *Read Uncommitted* level. These levels are defined in
97         terms of the phenomena that must not occur between concurrent
98         transactions, such as *dirty read*, etc.
99         In the context of a generic business data management software
100         such as OpenERP, we need the best guarantees that no data
101         corruption can ever be cause by simply running multiple
102         transactions in parallel. Therefore, the preferred level would
103         be the *serializable* level, which ensures that a set of
104         transactions is guaranteed to produce the same effect as
105         running them one at a time in some order.
106
107         However, most database management systems implement a limited
108         serializable isolation in the form of
109         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
110         providing most of the same advantages as True Serializability,
111         with a fraction of the performance cost.
112         With PostgreSQL up to version 9.0, this snapshot isolation was
113         the implementation of both the ``REPEATABLE READ`` and
114         ``SERIALIZABLE`` levels of the SQL standard.
115         As of PostgreSQL 9.1, the previous snapshot isolation implementation
116         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
117         level was introduced, providing some additional heuristics to
118         detect a concurrent update by parallel transactions, and forcing
119         one of them to rollback.
120
121         OpenERP implements its own level of locking protection
122         for transactions that are highly likely to provoke concurrent
123         updates, such as stock reservations or document sequences updates.
124         Therefore we mostly care about the properties of snapshot isolation,
125         but we don't really need additional heuristics to trigger transaction
126         rollbacks, as we are taking care of triggering instant rollbacks
127         ourselves when it matters (and we can save the additional performance
128         hit of these heuristics).
129
130         As a result of the above, we have selected ``REPEATABLE READ`` as
131         the default transaction isolation level for OpenERP cursors, as
132         it will be mapped to the desired ``snapshot isolation`` level for
133         all supported PostgreSQL version (8.3 - 9.x).
134
135         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
136         read level to serializable before sending it to the database, so it would
137         actually select the new serializable mode on PostgreSQL 9.1. Make
138         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
139         the performance hit is a concern for you.
140
141         .. attribute:: cache
142
143             Cache dictionary with a "request" (-ish) lifecycle, only lives as
144             long as the cursor itself does and proactively cleared when the
145             cursor is closed.
146
147             This cache should *only* be used to store repeatable reads as it
148             ignores rollbacks and savepoints, it should not be used to store
149             *any* data which may be modified during the life of the cursor.
150
151     """
152     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
153
154     def check(f):
155         @wraps(f)
156         def wrapper(self, *args, **kwargs):
157             if self.__closed:
158                 msg = 'Unable to use a closed cursor.'
159                 if self.__closer:
160                     msg += ' It was closed at %s, line %s' % self.__closer
161                 raise psycopg2.OperationalError(msg)
162             return f(self, *args, **kwargs)
163         return wrapper
164
165     def __init__(self, pool, dbname, serialized=True):
166         self.sql_from_log = {}
167         self.sql_into_log = {}
168
169         # default log level determined at cursor creation, could be
170         # overridden later for debugging purposes
171         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
172
173         self.sql_log_count = 0
174         self.__closed = True    # avoid the call of close() (by __del__) if an exception
175                                 # is raised by any of the following initialisations
176         self._pool = pool
177         self.dbname = dbname
178
179         # Whether to enable snapshot isolation level for this cursor.
180         # see also the docstring of Cursor.  
181         self._serialized = serialized
182
183         self._cnx = pool.borrow(dsn(dbname))
184         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
185         if self.sql_log:
186             self.__caller = frame_codeinfo(currentframe(),2)
187         else:
188             self.__caller = False
189         self.__closed = False   # real initialisation value
190         self.autocommit(False)
191         self.__closer = False
192
193         self._default_log_exceptions = True
194
195         self.cache = {}
196
197     def __del__(self):
198         if not self.__closed and not self._cnx.closed:
199             # Oops. 'self' has not been closed explicitly.
200             # The cursor will be deleted by the garbage collector,
201             # but the database connection is not put back into the connection
202             # pool, preventing some operation on the database like dropping it.
203             # This can also lead to a server overload.
204             msg = "Cursor not closed explicitly\n"
205             if self.__caller:
206                 msg += "Cursor was created at %s:%s" % self.__caller
207             else:
208                 msg += "Please enable sql debugging to trace the caller."
209             _logger.warning(msg)
210             self._close(True)
211
212     @check
213     def execute(self, query, params=None, log_exceptions=None):
214         if '%d' in query or '%f' in query:
215             _logger.warning(query)
216             _logger.warning("SQL queries cannot contain %d or %f anymore. "
217                          "Use only %s")
218         if params and not isinstance(params, (tuple, list, dict)):
219             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
220             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
221
222         if self.sql_log:
223             now = mdt.now()
224
225         try:
226             params = params or None
227             res = self._obj.execute(query, params)
228         except psycopg2.ProgrammingError, pe:
229             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
230                 _logger.error("Programming error: %s, in query %s", pe, query)
231             raise
232         except Exception:
233             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
234                 _logger.exception("bad query: %s", self._obj.query or query)
235             raise
236
237         if self.sql_log:
238             delay = mdt.now() - now
239             delay = delay.seconds * 1E6 + delay.microseconds
240
241             _logger.debug("query: %s", self._obj.query)
242             self.sql_log_count+=1
243             res_from = re_from.match(query.lower())
244             if res_from:
245                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
246                 self.sql_from_log[res_from.group(1)][0] += 1
247                 self.sql_from_log[res_from.group(1)][1] += delay
248             res_into = re_into.match(query.lower())
249             if res_into:
250                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
251                 self.sql_into_log[res_into.group(1)][0] += 1
252                 self.sql_into_log[res_into.group(1)][1] += delay
253         return res
254
255
256     def split_for_in_conditions(self, ids):
257         """Split a list of identifiers into one or more smaller tuples
258            safe for IN conditions, after uniquifying them."""
259         return tools.misc.split_every(self.IN_MAX, set(ids))
260
261     def print_log(self):
262         global sql_counter
263         sql_counter += self.sql_log_count
264         if not self.sql_log:
265             return
266         def process(type):
267             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
268             sum = 0
269             if sqllogs[type]:
270                 sqllogitems = sqllogs[type].items()
271                 sqllogitems.sort(key=lambda k: k[1][1])
272                 _logger.debug("SQL LOG %s:", type)
273                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
274                 for r in sqllogitems:
275                     delay = timedelta(microseconds=r[1][1])
276                     _logger.debug("table: %s: %s/%s",
277                                         r[0], delay, r[1][0])
278                     sum+= r[1][1]
279                 sqllogs[type].clear()
280             sum = timedelta(microseconds=sum)
281             _logger.debug("SUM %s:%s/%d [%d]",
282                                 type, sum, self.sql_log_count, sql_counter)
283             sqllogs[type].clear()
284         process('from')
285         process('into')
286         self.sql_log_count = 0
287         self.sql_log = False
288
289     @check
290     def close(self):
291         return self._close(False)
292
293     def _close(self, leak=False):
294         if not self._obj:
295             return
296
297         del self.cache
298
299         if self.sql_log:
300             self.__closer = frame_codeinfo(currentframe(),3)
301         self.print_log()
302
303         self._obj.close()
304
305         # This force the cursor to be freed, and thus, available again. It is
306         # important because otherwise we can overload the server very easily
307         # because of a cursor shortage (because cursors are not garbage
308         # collected as fast as they should). The problem is probably due in
309         # part because browse records keep a reference to the cursor.
310         del self._obj
311         self.__closed = True
312
313         # Clean the underlying connection.
314         self._cnx.rollback()
315
316         if leak:
317             self._cnx.leaked = True
318         else:
319             chosen_template = tools.config['db_template']
320             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
321             keep_in_pool = self.dbname not in templates_list
322             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
323
324     @check
325     def autocommit(self, on):
326         if on:
327             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
328         else:
329             # If a serializable cursor was requested, we
330             # use the appropriate PotsgreSQL isolation level
331             # that maps to snaphsot isolation.
332             # For all supported PostgreSQL versions (8.3-9.x),
333             # this is currently the ISOLATION_REPEATABLE_READ.
334             # See also the docstring of this class.
335             # NOTE: up to psycopg 2.4.2, repeatable read
336             #       is remapped to serializable before being
337             #       sent to the database, so it is in fact
338             #       unavailable for use with pg 9.1.
339             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
340                                   if self._serialized \
341                                   else ISOLATION_LEVEL_READ_COMMITTED
342         self._cnx.set_isolation_level(isolation_level)
343
344     @check
345     def commit(self):
346         """ Perform an SQL `COMMIT`
347         """
348         return self._cnx.commit()
349
350     @check
351     def rollback(self):
352         """ Perform an SQL `ROLLBACK`
353         """
354         return self._cnx.rollback()
355
356     @check
357     def __getattr__(self, name):
358         return getattr(self._obj, name)
359
360 class PsycoConnection(psycopg2.extensions.connection):
361     pass
362
363 class ConnectionPool(object):
364     """ The pool of connections to database(s)
365     
366         Keep a set of connections to pg databases open, and reuse them
367         to open cursors for all transactions.
368         
369         The connections are *not* automatically closed. Only a close_db()
370         can trigger that.
371     """
372
373     def locked(fun):
374         @wraps(fun)
375         def _locked(self, *args, **kwargs):
376             self._lock.acquire()
377             try:
378                 return fun(self, *args, **kwargs)
379             finally:
380                 self._lock.release()
381         return _locked
382
383
384     def __init__(self, maxconn=64):
385         self._connections = []
386         self._maxconn = max(maxconn, 1)
387         self._lock = threading.Lock()
388
389     def __repr__(self):
390         used = len([1 for c, u in self._connections[:] if u])
391         count = len(self._connections)
392         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
393
394     def _debug(self, msg, *args):
395         _logger.debug(('%r ' + msg), self, *args)
396
397     @locked
398     def borrow(self, dsn):
399         self._debug('Borrow connection to %r', dsn)
400
401         # free leaked connections
402         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
403             if cnx.closed:
404                 self._connections.pop(i)
405                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
406                 continue
407             if getattr(cnx, 'leaked', False):
408                 delattr(cnx, 'leaked')
409                 self._connections.pop(i)
410                 self._connections.append((cnx, False))
411                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
412
413         for i, (cnx, used) in enumerate(self._connections):
414             if not used and dsn_are_equals(cnx.dsn, dsn):
415                 self._connections.pop(i)
416                 self._connections.append((cnx, True))
417                 self._debug('Existing connection found at index %d', i)
418
419                 return cnx
420
421         if len(self._connections) >= self._maxconn:
422             # try to remove the oldest connection not used
423             for i, (cnx, used) in enumerate(self._connections):
424                 if not used:
425                     self._connections.pop(i)
426                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
427                     break
428             else:
429                 # note: this code is called only if the for loop has completed (no break)
430                 raise PoolError('The Connection Pool Is Full')
431
432         try:
433             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
434         except psycopg2.Error:
435             _logger.exception('Connection to the database failed')
436             raise
437         self._connections.append((result, True))
438         self._debug('Create new connection')
439         return result
440
441     @locked
442     def give_back(self, connection, keep_in_pool=True):
443         self._debug('Give back connection to %r', connection.dsn)
444         for i, (cnx, used) in enumerate(self._connections):
445             if cnx is connection:
446                 self._connections.pop(i)
447                 if keep_in_pool:
448                     self._connections.append((cnx, False))
449                     self._debug('Put connection to %r in pool', cnx.dsn)
450                 else:
451                     self._debug('Forgot connection to %r', cnx.dsn)
452                     cnx.close()
453                 break
454         else:
455             raise PoolError('This connection does not below to the pool')
456
457     @locked
458     def close_all(self, dsn):
459         _logger.info('%r: Close all connections to %r', self, dsn)
460         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
461             if dsn_are_equals(cnx.dsn, dsn):
462                 cnx.close()
463                 self._connections.pop(i)
464
465
466 class Connection(object):
467     """ A lightweight instance of a connection to postgres
468     """
469
470     def __init__(self, pool, dbname):
471         self.dbname = dbname
472         self._pool = pool
473
474     def cursor(self, serialized=True):
475         cursor_type = serialized and 'serialized ' or ''
476         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
477         return Cursor(self._pool, self.dbname, serialized=serialized)
478
479     # serialized_cursor is deprecated - cursors are serialized by default
480     serialized_cursor = cursor
481
482     def __nonzero__(self):
483         """Check if connection is possible"""
484         try:
485             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
486             cr = self.cursor()
487             cr.close()
488             return True
489         except Exception:
490             return False
491
492 def dsn(db_name):
493     _dsn = ''
494     for p in ('host', 'port', 'user', 'password'):
495         cfg = tools.config['db_' + p]
496         if cfg:
497             _dsn += '%s=%s ' % (p, cfg)
498
499     return '%sdbname=%s' % (_dsn, db_name)
500
501 def dsn_are_equals(first, second):
502     def key(dsn):
503         k = dict(x.split('=', 1) for x in dsn.strip().split())
504         k.pop('password', None) # password is not relevant
505         return k
506     return key(first) == key(second)
507
508
509 _Pool = None
510
511 def db_connect(db_name):
512     global _Pool
513     if _Pool is None:
514         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
515     currentThread().dbname = db_name
516     return Connection(_Pool, db_name)
517
518 def close_db(db_name):
519     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
520     global _Pool
521     if _Pool:
522         _Pool.close_all(dsn(db_name))
523     ct = currentThread()
524     if hasattr(ct, 'dbname'):
525         delattr(ct, 'dbname')
526
527
528 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
529