[MERGE] report branch with <pto> support in RML rendering
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 from threading import currentThread
25 import logging
26 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
27 from psycopg2.psycopg1 import cursor as psycopg1cursor
28 from psycopg2.pool import PoolError
29
30 import psycopg2.extensions
31
32 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
33
34 types_mapping = {
35     'date': (1082,),
36     'time': (1083,),
37     'datetime': (1114,),
38 }
39
40 def unbuffer(symb, cr):
41     if symb is None: return None
42     return str(symb)
43
44 def undecimalize(symb, cr):
45     if symb is None: return None
46     return float(symb)
47
48 for name, typeoid in types_mapping.items():
49     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
50 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
51
52
53 import tools
54 from tools.func import wraps
55 from datetime import datetime as mdt
56 from datetime import timedelta
57 import threading
58 from inspect import stack
59
60 import re
61 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
62 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
63
64 sql_counter = 0
65
66 class Cursor(object):
67     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
68     __logger = logging.getLogger('db.cursor')
69
70     def check(f):
71         @wraps(f)
72         def wrapper(self, *args, **kwargs):
73             if self.__closed:
74                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closed it')
75             return f(self, *args, **kwargs)
76         return wrapper
77
78     def __init__(self, pool, dbname, serialized=False):
79         self.sql_from_log = {}
80         self.sql_into_log = {}
81
82         # default log level determined at cursor creation, could be
83         # overridden later for debugging purposes
84         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
85
86         self.sql_log_count = 0
87         self.__closed = True    # avoid the call of close() (by __del__) if an exception
88                                 # is raised by any of the following initialisations
89         self._pool = pool
90         self.dbname = dbname
91         self._serialized = serialized
92         self._cnx = pool.borrow(dsn(dbname))
93         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
94         self.__closed = False   # real initialisation value
95         self.autocommit(False)
96         self.__caller = tuple(stack()[2][1:3])
97
98     def __del__(self):
99         if not self.__closed:
100             # Oops. 'self' has not been closed explicitly.
101             # The cursor will be deleted by the garbage collector,
102             # but the database connection is not put back into the connection
103             # pool, preventing some operation on the database like dropping it.
104             # This can also lead to a server overload.
105             msg = "Cursor not closed explicitly\n"  \
106                   "Cursor was created at %s:%s"
107             self.__logger.warn(msg, *self.__caller)
108             self._close(True)
109
110     @check
111     def execute(self, query, params=None, log_exceptions=True):
112         if '%d' in query or '%f' in query:
113             self.__logger.warn(query)
114             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
115                                "Use only %s")
116
117         if self.sql_log:
118             now = mdt.now()
119
120         try:
121             params = params or None
122             res = self._obj.execute(query, params)
123         except psycopg2.ProgrammingError, pe:
124             if log_exceptions or self.sql_log:
125                 self.__logger.error("Programming error: %s, in query %s", pe, query)
126             raise
127         except Exception:
128             if log_exceptions or self.sql_log:
129                 self.__logger.exception("bad query: %s", self._obj.query or query)
130             raise
131
132         if self.sql_log:
133             delay = mdt.now() - now
134             delay = delay.seconds * 1E6 + delay.microseconds
135
136             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
137             self.sql_log_count+=1
138             res_from = re_from.match(query.lower())
139             if res_from:
140                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
141                 self.sql_from_log[res_from.group(1)][0] += 1
142                 self.sql_from_log[res_from.group(1)][1] += delay
143             res_into = re_into.match(query.lower())
144             if res_into:
145                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
146                 self.sql_into_log[res_into.group(1)][0] += 1
147                 self.sql_into_log[res_into.group(1)][1] += delay
148         return res
149
150
151     def split_for_in_conditions(self, ids):
152         """Split a list of identifiers into one or more smaller tuples
153            safe for IN conditions, after uniquifying them."""
154         return tools.misc.split_every(self.IN_MAX, set(ids))
155
156     def print_log(self):
157         global sql_counter
158         sql_counter += self.sql_log_count
159         if not self.sql_log:
160             return
161         def process(type):
162             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
163             sum = 0
164             if sqllogs[type]:
165                 sqllogitems = sqllogs[type].items()
166                 sqllogitems.sort(key=lambda k: k[1][1])
167                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
168                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
169                 for r in sqllogitems:
170                     delay = timedelta(microseconds=r[1][1])
171                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
172                                         r[0], delay, r[1][0])
173                     sum+= r[1][1]
174                 sqllogs[type].clear()
175             sum = timedelta(microseconds=sum)
176             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
177                                 type, sum, self.sql_log_count, sql_counter)
178             sqllogs[type].clear()
179         process('from')
180         process('into')
181         self.sql_log_count = 0
182         self.sql_log = False
183
184     @check
185     def close(self):
186         return self._close(False)
187
188     def _close(self, leak=False):
189         if not self._obj:
190             return
191
192         self.print_log()
193
194         if not self._serialized:
195             self.rollback() # Ensure we close the current transaction.
196
197         self._obj.close()
198
199         # This force the cursor to be freed, and thus, available again. It is
200         # important because otherwise we can overload the server very easily
201         # because of a cursor shortage (because cursors are not garbage
202         # collected as fast as they should). The problem is probably due in
203         # part because browse records keep a reference to the cursor.
204         del self._obj
205         self.__closed = True
206
207         if leak:
208             self._cnx.leaked = True
209         else:
210             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
211             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
212
213     @check
214     def autocommit(self, on):
215         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
216         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
217
218     @check
219     def commit(self):
220         return self._cnx.commit()
221
222     @check
223     def rollback(self):
224         return self._cnx.rollback()
225
226     @check
227     def __getattr__(self, name):
228         return getattr(self._obj, name)
229
230
231 class PsycoConnection(psycopg2.extensions.connection):
232     pass
233
234 class ConnectionPool(object):
235
236     __logger = logging.getLogger('db.connection_pool')
237
238     def locked(fun):
239         @wraps(fun)
240         def _locked(self, *args, **kwargs):
241             self._lock.acquire()
242             try:
243                 return fun(self, *args, **kwargs)
244             finally:
245                 self._lock.release()
246         return _locked
247
248
249     def __init__(self, maxconn=64):
250         self._connections = []
251         self._maxconn = max(maxconn, 1)
252         self._lock = threading.Lock()
253
254     def __repr__(self):
255         used = len([1 for c, u in self._connections[:] if u])
256         count = len(self._connections)
257         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
258
259     def _debug(self, msg, *args):
260         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
261
262     @locked
263     def borrow(self, dsn):
264         self._debug('Borrow connection to %r', dsn)
265
266         # free leaked connections
267         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
268             if getattr(cnx, 'leaked', False):
269                 delattr(cnx, 'leaked')
270                 self._connections.pop(i)
271                 self._connections.append((cnx, False))
272                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
273
274         for i, (cnx, used) in enumerate(self._connections):
275             if not used and dsn_are_equals(cnx.dsn, dsn):
276                 self._connections.pop(i)
277                 self._connections.append((cnx, True))
278                 self._debug('Existing connection found at index %d', i)
279
280                 return cnx
281
282         if len(self._connections) >= self._maxconn:
283             # try to remove the oldest connection not used
284             for i, (cnx, used) in enumerate(self._connections):
285                 if not used:
286                     self._connections.pop(i)
287                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
288                     break
289             else:
290                 # note: this code is called only if the for loop has completed (no break)
291                 raise PoolError('The Connection Pool Is Full')
292
293         result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
294         self._connections.append((result, True))
295         self._debug('Create new connection')
296         return result
297
298     @locked
299     def give_back(self, connection, keep_in_pool=True):
300         self._debug('Give back connection to %r', connection.dsn)
301         for i, (cnx, used) in enumerate(self._connections):
302             if cnx is connection:
303                 self._connections.pop(i)
304                 if keep_in_pool:
305                     self._connections.append((cnx, False))
306                     self._debug('Put connection to %r in pool', cnx.dsn)
307                 else:
308                     self._debug('Forgot connection to %r', cnx.dsn)
309                 break
310         else:
311             raise PoolError('This connection does not below to the pool')
312
313     @locked
314     def close_all(self, dsn):
315         self.__logger.info('%r: Close all connections to %r', self, dsn)
316         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
317             if dsn_are_equals(cnx.dsn, dsn):
318                 cnx.close()
319                 self._connections.pop(i)
320
321
322 class Connection(object):
323     __logger = logging.getLogger('db.connection')
324
325     def __init__(self, pool, dbname):
326         self.dbname = dbname
327         self._pool = pool
328
329     def cursor(self, serialized=False):
330         cursor_type = serialized and 'serialized ' or ''
331         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
332         return Cursor(self._pool, self.dbname, serialized=serialized)
333
334     def serialized_cursor(self):
335         return self.cursor(True)
336
337     def __nonzero__(self):
338         """Check if connection is possible"""
339         try:
340             cr = self.cursor()
341             cr.close()
342             return True
343         except:
344             return False
345
346
347 _dsn = ''
348 for p in ('host', 'port', 'user', 'password'):
349     cfg = tools.config['db_' + p]
350     if cfg:
351         _dsn += '%s=%s ' % (p, cfg)
352
353 def dsn(db_name):
354     return '%sdbname=%s' % (_dsn, db_name)
355
356 def dsn_are_equals(first, second):
357     def key(dsn):
358         k = dict(x.split('=', 1) for x in dsn.strip().split())
359         k.pop('password', None) # password is not relevant
360         return k
361     return key(first) == key(second)
362
363
364 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
365
366 def db_connect(db_name):
367     currentThread().dbname = db_name
368     return Connection(_Pool, db_name)
369
370 def close_db(db_name):
371     _Pool.close_all(dsn(db_name))
372     tools.cache.clean_caches_for_db(db_name)
373     ct = currentThread()
374     if hasattr(ct, 'dbname'):
375         delattr(ct, 'dbname')
376
377
378 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
379