[IMP] improved function to open attachments.
[odoo/odoo.git] / openerp / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23
24 """
25 The PostgreSQL connector is a connectivity layer between the OpenERP code and
26 the database, *not* a database abstraction toolkit. Database abstraction is what
27 the ORM does, in fact.
28
29 See also: the `pooler` module
30 """
31
32
33 __all__ = ['db_connect', 'close_db']
34
35 from functools import wraps
36 import logging
37 import psycopg2.extensions
38 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ
39 from psycopg2.pool import PoolError
40 from psycopg2.psycopg1 import cursor as psycopg1cursor
41 from threading import currentThread
42
43 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
44
45 _logger = logging.getLogger(__name__)
46
47 types_mapping = {
48     'date': (1082,),
49     'time': (1083,),
50     'datetime': (1114,),
51 }
52
53 def unbuffer(symb, cr):
54     if symb is None: return None
55     return str(symb)
56
57 def undecimalize(symb, cr):
58     if symb is None: return None
59     return float(symb)
60
61 for name, typeoid in types_mapping.items():
62     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
63 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
64
65
66 import tools
67 from tools.func import frame_codeinfo
68 from datetime import datetime as mdt
69 from datetime import timedelta
70 import threading
71 from inspect import currentframe
72
73 import re
74 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$')
75 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$')
76
77 sql_counter = 0
78
79 class Cursor(object):
80     """Represents an open transaction to the PostgreSQL DB backend,
81        acting as a lightweight wrapper around psycopg2's
82        ``psycopg1cursor`` objects.
83
84         ``Cursor`` is the object behind the ``cr`` variable used all
85         over the OpenERP code.
86
87         .. rubric:: Transaction Isolation
88
89         One very important property of database transactions is the
90         level of isolation between concurrent transactions.
91         The SQL standard defines four levels of transaction isolation,
92         ranging from the most strict *Serializable* level, to the least
93         strict *Read Uncommitted* level. These levels are defined in
94         terms of the phenomena that must not occur between concurrent
95         transactions, such as *dirty read*, etc.
96         In the context of a generic business data management software
97         such as OpenERP, we need the best guarantees that no data
98         corruption can ever be cause by simply running multiple
99         transactions in parallel. Therefore, the preferred level would
100         be the *serializable* level, which ensures that a set of
101         transactions is guaranteed to produce the same effect as
102         running them one at a time in some order.
103
104         However, most database management systems implement a limited
105         serializable isolation in the form of
106         `snapshot isolation <http://en.wikipedia.org/wiki/Snapshot_isolation>`_,
107         providing most of the same advantages as True Serializability,
108         with a fraction of the performance cost.
109         With PostgreSQL up to version 9.0, this snapshot isolation was
110         the implementation of both the ``REPEATABLE READ`` and
111         ``SERIALIZABLE`` levels of the SQL standard.
112         As of PostgreSQL 9.1, the previous snapshot isolation implementation
113         was kept for ``REPEATABLE READ``, while a new ``SERIALIZABLE``
114         level was introduced, providing some additional heuristics to
115         detect a concurrent update by parallel transactions, and forcing
116         one of them to rollback.
117
118         OpenERP implements its own level of locking protection
119         for transactions that are highly likely to provoke concurrent
120         updates, such as stock reservations or document sequences updates.
121         Therefore we mostly care about the properties of snapshot isolation,
122         but we don't really need additional heuristics to trigger transaction
123         rollbacks, as we are taking care of triggering instant rollbacks
124         ourselves when it matters (and we can save the additional performance
125         hit of these heuristics).
126
127         As a result of the above, we have selected ``REPEATABLE READ`` as
128         the default transaction isolation level for OpenERP cursors, as
129         it will be mapped to the desired ``snapshot isolation`` level for
130         all supported PostgreSQL version (8.3 - 9.x).
131
132         Note: up to psycopg2 v.2.4.2, psycopg2 itself remapped the repeatable
133         read level to serializable before sending it to the database, so it would
134         actually select the new serializable mode on PostgreSQL 9.1. Make
135         sure you use psycopg2 v2.4.2 or newer if you use PostgreSQL 9.1 and
136         the performance hit is a concern for you.
137
138         .. attribute:: cache
139
140             Cache dictionary with a "request" (-ish) lifecycle, only lives as
141             long as the cursor itself does and proactively cleared when the
142             cursor is closed.
143
144             This cache should *only* be used to store repeatable reads as it
145             ignores rollbacks and savepoints, it should not be used to store
146             *any* data which may be modified during the life of the cursor.
147
148     """
149     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
150
151     def check(f):
152         @wraps(f)
153         def wrapper(self, *args, **kwargs):
154             if self.__closed:
155                 msg = 'Unable to use a closed cursor.'
156                 if self.__closer:
157                     msg += ' It was closed at %s, line %s' % self.__closer
158                 raise psycopg2.OperationalError(msg)
159             return f(self, *args, **kwargs)
160         return wrapper
161
162     def __init__(self, pool, dbname, serialized=True):
163         self.sql_from_log = {}
164         self.sql_into_log = {}
165
166         # default log level determined at cursor creation, could be
167         # overridden later for debugging purposes
168         self.sql_log = _logger.isEnabledFor(logging.DEBUG)
169
170         self.sql_log_count = 0
171         self.__closed = True    # avoid the call of close() (by __del__) if an exception
172                                 # is raised by any of the following initialisations
173         self._pool = pool
174         self.dbname = dbname
175
176         # Whether to enable snapshot isolation level for this cursor.
177         # see also the docstring of Cursor.  
178         self._serialized = serialized
179
180         self._cnx = pool.borrow(dsn(dbname))
181         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
182         if self.sql_log:
183             self.__caller = frame_codeinfo(currentframe(),2)
184         else:
185             self.__caller = False
186         self.__closed = False   # real initialisation value
187         self.autocommit(False)
188         self.__closer = False
189
190         self._default_log_exceptions = True
191
192         self.cache = {}
193
194     def __del__(self):
195         if not self.__closed and not self._cnx.closed:
196             # Oops. 'self' has not been closed explicitly.
197             # The cursor will be deleted by the garbage collector,
198             # but the database connection is not put back into the connection
199             # pool, preventing some operation on the database like dropping it.
200             # This can also lead to a server overload.
201             msg = "Cursor not closed explicitly\n"
202             if self.__caller:
203                 msg += "Cursor was created at %s:%s" % self.__caller
204             else:
205                 msg += "Please enable sql debugging to trace the caller."
206             _logger.warning(msg)
207             self._close(True)
208
209     @check
210     def execute(self, query, params=None, log_exceptions=None):
211         if '%d' in query or '%f' in query:
212             _logger.warning(query)
213             _logger.warning("SQL queries cannot contain %d or %f anymore. "
214                          "Use only %s")
215         if params and not isinstance(params, (tuple, list, dict)):
216             _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
217             raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
218
219         if self.sql_log:
220             now = mdt.now()
221
222         try:
223             params = params or None
224             res = self._obj.execute(query, params)
225         except psycopg2.ProgrammingError, pe:
226             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
227                 _logger.error("Programming error: %s, in query %s", pe, query)
228             raise
229         except Exception:
230             if self._default_log_exceptions if log_exceptions is None else log_exceptions:
231                 _logger.exception("bad query: %s", self._obj.query or query)
232             raise
233
234         if self.sql_log:
235             delay = mdt.now() - now
236             delay = delay.seconds * 1E6 + delay.microseconds
237
238             _logger.debug("query: %s", self._obj.query)
239             self.sql_log_count+=1
240             res_from = re_from.match(query.lower())
241             if res_from:
242                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
243                 self.sql_from_log[res_from.group(1)][0] += 1
244                 self.sql_from_log[res_from.group(1)][1] += delay
245             res_into = re_into.match(query.lower())
246             if res_into:
247                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
248                 self.sql_into_log[res_into.group(1)][0] += 1
249                 self.sql_into_log[res_into.group(1)][1] += delay
250         return res
251
252
253     def split_for_in_conditions(self, ids):
254         """Split a list of identifiers into one or more smaller tuples
255            safe for IN conditions, after uniquifying them."""
256         return tools.misc.split_every(self.IN_MAX, set(ids))
257
258     def print_log(self):
259         global sql_counter
260         sql_counter += self.sql_log_count
261         if not self.sql_log:
262             return
263         def process(type):
264             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
265             sum = 0
266             if sqllogs[type]:
267                 sqllogitems = sqllogs[type].items()
268                 sqllogitems.sort(key=lambda k: k[1][1])
269                 _logger.debug("SQL LOG %s:", type)
270                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
271                 for r in sqllogitems:
272                     delay = timedelta(microseconds=r[1][1])
273                     _logger.debug("table: %s: %s/%s",
274                                         r[0], delay, r[1][0])
275                     sum+= r[1][1]
276                 sqllogs[type].clear()
277             sum = timedelta(microseconds=sum)
278             _logger.debug("SUM %s:%s/%d [%d]",
279                                 type, sum, self.sql_log_count, sql_counter)
280             sqllogs[type].clear()
281         process('from')
282         process('into')
283         self.sql_log_count = 0
284         self.sql_log = False
285
286     @check
287     def close(self):
288         return self._close(False)
289
290     def _close(self, leak=False):
291         if not self._obj:
292             return
293
294         del self.cache
295
296         if self.sql_log:
297             self.__closer = frame_codeinfo(currentframe(),3)
298         self.print_log()
299
300         self._obj.close()
301
302         # This force the cursor to be freed, and thus, available again. It is
303         # important because otherwise we can overload the server very easily
304         # because of a cursor shortage (because cursors are not garbage
305         # collected as fast as they should). The problem is probably due in
306         # part because browse records keep a reference to the cursor.
307         del self._obj
308         self.__closed = True
309
310         # Clean the underlying connection.
311         self._cnx.rollback()
312
313         if leak:
314             self._cnx.leaked = True
315         else:
316             chosen_template = tools.config['db_template']
317             templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
318             keep_in_pool = self.dbname not in templates_list
319             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
320
321     @check
322     def autocommit(self, on):
323         if on:
324             isolation_level = ISOLATION_LEVEL_AUTOCOMMIT
325         else:
326             # If a serializable cursor was requested, we
327             # use the appropriate PotsgreSQL isolation level
328             # that maps to snaphsot isolation.
329             # For all supported PostgreSQL versions (8.3-9.x),
330             # this is currently the ISOLATION_REPEATABLE_READ.
331             # See also the docstring of this class.
332             # NOTE: up to psycopg 2.4.2, repeatable read
333             #       is remapped to serializable before being
334             #       sent to the database, so it is in fact
335             #       unavailable for use with pg 9.1.
336             isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \
337                                   if self._serialized \
338                                   else ISOLATION_LEVEL_READ_COMMITTED
339         self._cnx.set_isolation_level(isolation_level)
340
341     @check
342     def commit(self):
343         """ Perform an SQL `COMMIT`
344         """
345         return self._cnx.commit()
346
347     @check
348     def rollback(self):
349         """ Perform an SQL `ROLLBACK`
350         """
351         return self._cnx.rollback()
352
353     @check
354     def __getattr__(self, name):
355         return getattr(self._obj, name)
356
357 class PsycoConnection(psycopg2.extensions.connection):
358     pass
359
360 class ConnectionPool(object):
361     """ The pool of connections to database(s)
362     
363         Keep a set of connections to pg databases open, and reuse them
364         to open cursors for all transactions.
365         
366         The connections are *not* automatically closed. Only a close_db()
367         can trigger that.
368     """
369
370     def locked(fun):
371         @wraps(fun)
372         def _locked(self, *args, **kwargs):
373             self._lock.acquire()
374             try:
375                 return fun(self, *args, **kwargs)
376             finally:
377                 self._lock.release()
378         return _locked
379
380
381     def __init__(self, maxconn=64):
382         self._connections = []
383         self._maxconn = max(maxconn, 1)
384         self._lock = threading.Lock()
385
386     def __repr__(self):
387         used = len([1 for c, u in self._connections[:] if u])
388         count = len(self._connections)
389         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
390
391     def _debug(self, msg, *args):
392         _logger.debug(('%r ' + msg), self, *args)
393
394     @locked
395     def borrow(self, dsn):
396         self._debug('Borrow connection to %r', dsn)
397
398         # free leaked connections
399         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
400             if cnx.closed:
401                 self._connections.pop(i)
402                 self._debug('Removing closed connection at index %d: %r', i, cnx.dsn)
403                 continue
404             if getattr(cnx, 'leaked', False):
405                 delattr(cnx, 'leaked')
406                 self._connections.pop(i)
407                 self._connections.append((cnx, False))
408                 _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
409
410         for i, (cnx, used) in enumerate(self._connections):
411             if not used and dsn_are_equals(cnx.dsn, dsn):
412                 self._connections.pop(i)
413                 self._connections.append((cnx, True))
414                 self._debug('Existing connection found at index %d', i)
415
416                 return cnx
417
418         if len(self._connections) >= self._maxconn:
419             # try to remove the oldest connection not used
420             for i, (cnx, used) in enumerate(self._connections):
421                 if not used:
422                     self._connections.pop(i)
423                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
424                     break
425             else:
426                 # note: this code is called only if the for loop has completed (no break)
427                 raise PoolError('The Connection Pool Is Full')
428
429         try:
430             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
431         except psycopg2.Error:
432             _logger.exception('Connection to the database failed')
433             raise
434         self._connections.append((result, True))
435         self._debug('Create new connection')
436         return result
437
438     @locked
439     def give_back(self, connection, keep_in_pool=True):
440         self._debug('Give back connection to %r', connection.dsn)
441         for i, (cnx, used) in enumerate(self._connections):
442             if cnx is connection:
443                 self._connections.pop(i)
444                 if keep_in_pool:
445                     self._connections.append((cnx, False))
446                     self._debug('Put connection to %r in pool', cnx.dsn)
447                 else:
448                     self._debug('Forgot connection to %r', cnx.dsn)
449                     cnx.close()
450                 break
451         else:
452             raise PoolError('This connection does not below to the pool')
453
454     @locked
455     def close_all(self, dsn):
456         _logger.info('%r: Close all connections to %r', self, dsn)
457         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
458             if dsn_are_equals(cnx.dsn, dsn):
459                 cnx.close()
460                 self._connections.pop(i)
461
462
463 class Connection(object):
464     """ A lightweight instance of a connection to postgres
465     """
466
467     def __init__(self, pool, dbname):
468         self.dbname = dbname
469         self._pool = pool
470
471     def cursor(self, serialized=True):
472         cursor_type = serialized and 'serialized ' or ''
473         _logger.debug('create %scursor to %r', cursor_type, self.dbname)
474         return Cursor(self._pool, self.dbname, serialized=serialized)
475
476     # serialized_cursor is deprecated - cursors are serialized by default
477     serialized_cursor = cursor
478
479     def __nonzero__(self):
480         """Check if connection is possible"""
481         try:
482             _logger.warning("__nonzero__() is deprecated. (It is too expensive to test a connection.)")
483             cr = self.cursor()
484             cr.close()
485             return True
486         except Exception:
487             return False
488
489 def dsn(db_name):
490     _dsn = ''
491     for p in ('host', 'port', 'user', 'password'):
492         cfg = tools.config['db_' + p]
493         if cfg:
494             _dsn += '%s=%s ' % (p, cfg)
495
496     return '%sdbname=%s' % (_dsn, db_name)
497
498 def dsn_are_equals(first, second):
499     def key(dsn):
500         k = dict(x.split('=', 1) for x in dsn.strip().split())
501         k.pop('password', None) # password is not relevant
502         return k
503     return key(first) == key(second)
504
505
506 _Pool = None
507
508 def db_connect(db_name):
509     global _Pool
510     if _Pool is None:
511         _Pool = ConnectionPool(int(tools.config['db_maxconn']))
512     currentThread().dbname = db_name
513     return Connection(_Pool, db_name)
514
515 def close_db(db_name):
516     """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
517     global _Pool
518     if _Pool:
519         _Pool.close_all(dsn(db_name))
520     ct = currentThread()
521     if hasattr(ct, 'dbname'):
522         delattr(ct, 'dbname')
523
524
525 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
526