[IMP] sql_db.py: improve logging
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #    
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.     
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 import logging
25 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
26 from psycopg2.psycopg1 import cursor as psycopg1cursor
27 from psycopg2.pool import PoolError
28
29 import psycopg2.extensions
30
31 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
32
33 types_mapping = {
34     'date': (1082,),
35     'time': (1083,),
36     'datetime': (1114,),
37 }
38
39 def unbuffer(symb, cr):
40     if symb is None: return None
41     return str(symb)
42
43 def undecimalize(symb, cr):
44     if symb is None: return None
45     return float(symb)
46
47 for name, typeoid in types_mapping.items():
48     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
49 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
50
51
52 import tools
53 from tools.func import wraps
54 from datetime import datetime as mdt
55 from datetime import timedelta
56 import threading
57 from inspect import stack
58
59 import re
60 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
61 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
62
63 sql_counter = 0
64
65 class Cursor(object):
66     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
67     __logger = logging.getLogger('db.cursor')
68
69     def check(f):
70         @wraps(f)
71         def wrapper(self, *args, **kwargs):
72             if self.__closed:
73                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closed it')
74             return f(self, *args, **kwargs)
75         return wrapper
76
77     def __init__(self, pool, dbname, serialized=False):
78         self.sql_from_log = {}
79         self.sql_into_log = {}
80         self.sql_log = False
81         self.sql_log_count = 0
82         self.__closed = True    # avoid the call of close() (by __del__) if an exception
83                                 # is raised by any of the following initialisations
84         self._pool = pool
85         self.dbname = dbname
86         self._serialized = serialized
87         self._cnx = pool.borrow(dsn(dbname))
88         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
89         self.__closed = False   # real initialisation value
90         self.autocommit(False)
91         self.__caller = tuple(stack()[2][1:3])
92
93     def __del__(self):
94         if not self.__closed:
95             # Oops. 'self' has not been closed explicitly.
96             # The cursor will be deleted by the garbage collector,
97             # but the database connection is not put back into the connection
98             # pool, preventing some operation on the database like dropping it.
99             # This can also lead to a server overload.
100             msg = "Cursor not closed explicitly\n"  \
101                   "Cursor was created at %s:%s"
102             self.__logger.warn(msg, *self.__caller)
103             self._close(True)
104
105     @check
106     def execute(self, query, params=None):
107         if '%d' in query or '%f' in query:
108             self.__logger.warn(query)
109             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
110                                "Use only %s")
111             if params:
112                 query = query.replace('%d', '%s').replace('%f', '%s')
113
114         if self.sql_log:
115             now = mdt.now()
116
117         try:
118             params = params or None
119             res = self._obj.execute(query, params)
120         except psycopg2.ProgrammingError, pe:
121             self.__logger.error("Programming error: %s, in query %s", pe, query)
122             raise
123         except Exception:
124             self.__logger.exception("bad query: %s", self._obj.query)
125             raise
126
127         if self.sql_log:
128             delay = mdt.now() - now
129             delay = delay.seconds * 1E6 + delay.microseconds
130
131             self.__logger.debug("query: %s", self._obj.query)
132             self.sql_log_count+=1
133             res_from = re_from.match(query.lower())
134             if res_from:
135                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
136                 self.sql_from_log[res_from.group(1)][0] += 1
137                 self.sql_from_log[res_from.group(1)][1] += delay
138             res_into = re_into.match(query.lower())
139             if res_into:
140                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
141                 self.sql_into_log[res_into.group(1)][0] += 1
142                 self.sql_into_log[res_into.group(1)][1] += delay
143         return res
144
145
146     def split_for_in_conditions(self, ids):
147         """Split a list of identifiers into one or more smaller tuples
148            safe for IN conditions, after uniquifying them."""
149         return tools.misc.split_every(self.IN_MAX, set(ids))
150
151     def print_log(self):
152         global sql_counter
153         sql_counter += self.sql_log_count
154         if not self.sql_log:
155             return
156         def process(type):
157             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
158             sum = 0
159             if sqllogs[type]:
160                 sqllogitems = sqllogs[type].items()
161                 sqllogitems.sort(key=lambda k: k[1][1])
162                 self.__logger.debug("SQL LOG %s:", type)
163                 for r in sqllogitems:
164                     delay = timedelta(microseconds=r[1][1])
165                     self.__logger.debug("table: %s: %s/%s",
166                                         r[0], delay, r[1][0])
167                     sum+= r[1][1]
168                 sqllogs[type].clear()
169             sum = timedelta(microseconds=sum)
170             self.__logger.debug("SUM %s:%s/%d [%d]",
171                                 type, sum, self.sql_log_count, sql_counter)
172             sqllogs[type].clear()
173         process('from')
174         process('into')
175         self.sql_log_count = 0
176         self.sql_log = False
177
178     @check
179     def close(self):
180         return self._close(False)
181
182     def _close(self, leak=False):
183         if not self._obj:
184             return
185
186         self.print_log()
187
188         if not self._serialized:
189             self.rollback() # Ensure we close the current transaction.
190
191         self._obj.close()
192
193         # This force the cursor to be freed, and thus, available again. It is
194         # important because otherwise we can overload the server very easily
195         # because of a cursor shortage (because cursors are not garbage
196         # collected as fast as they should). The problem is probably due in
197         # part because browse records keep a reference to the cursor.
198         del self._obj
199         self.__closed = True
200
201         if leak:
202             self._cnx.leaked = True
203         else:
204             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
205             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
206
207     @check
208     def autocommit(self, on):
209         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
210         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
211
212     @check
213     def commit(self):
214         return self._cnx.commit()
215
216     @check
217     def rollback(self):
218         return self._cnx.rollback()
219
220     @check
221     def __getattr__(self, name):
222         return getattr(self._obj, name)
223
224
225 class PsycoConnection(psycopg2.extensions.connection):
226     pass
227
228 class ConnectionPool(object):
229
230     __logger = logging.getLogger('db.connection_pool')
231
232     def locked(fun):
233         @wraps(fun)
234         def _locked(self, *args, **kwargs):
235             self._lock.acquire()
236             try:
237                 return fun(self, *args, **kwargs)
238             finally:
239                 self._lock.release()
240         return _locked
241
242
243     def __init__(self, maxconn=64):
244         self._connections = []
245         self._maxconn = max(maxconn, 1)
246         self._lock = threading.Lock()
247
248     def __repr__(self):
249         used = len([1 for c, u in self._connections[:] if u])
250         count = len(self._connections)
251         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
252
253     def _debug(self, msg, *args):
254         msg = '%r ' + msg
255         self.__logger.debug(msg, self, *args)
256
257     @locked
258     def borrow(self, dsn):
259         self._debug('Borrow connection to %r', dsn)
260
261         # free leaked connections
262         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
263             if getattr(cnx, 'leaked', False):
264                 delattr(cnx, 'leaked')
265                 self._connections.pop(i)
266                 self._connections.append((cnx, False))
267                 self._debug('Free leaked connection to %r', cnx.dsn)
268
269         for i, (cnx, used) in enumerate(self._connections):
270             if not used and dsn_are_equals(cnx.dsn, dsn):
271                 self._connections.pop(i)
272                 self._connections.append((cnx, True))
273                 self._debug('Existing connection found at index %d', i)
274
275                 return cnx
276
277         if len(self._connections) >= self._maxconn:
278             # try to remove the oldest connection not used
279             for i, (cnx, used) in enumerate(self._connections):
280                 if not used:
281                     self._connections.pop(i)
282                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
283                     break
284             else:
285                 # note: this code is called only if the for loop has completed (no break)
286                 raise PoolError('The Connection Pool Is Full')
287
288         result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
289         self._connections.append((result, True))
290         self._debug('Create new connection')
291         return result
292
293     @locked
294     def give_back(self, connection, keep_in_pool=True):
295         self._debug('Give back connection to %r', connection.dsn)
296         for i, (cnx, used) in enumerate(self._connections):
297             if cnx is connection:
298                 self._connections.pop(i)
299                 if keep_in_pool:
300                     self._connections.append((cnx, False))
301                     self._debug('Put connection to %r in pool', cnx.dsn)
302                 else:
303                     self._debug('Forgot connection to %r', cnx.dsn)
304                 break
305         else:
306             raise PoolError('This connection does not below to the pool')
307
308     @locked
309     def close_all(self, dsn):
310         self._debug('Close all connections to %r', dsn)
311         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
312             if dsn_are_equals(cnx.dsn, dsn):
313                 cnx.close()
314                 self._connections.pop(i)
315
316
317 class Connection(object):
318     __logger = logging.getLogger('db.connection')
319
320     def __init__(self, pool, dbname):
321         self.dbname = dbname
322         self._pool = pool
323
324     def cursor(self, serialized=False):
325         cursor_type = serialized and 'serialized ' or ''
326         self.__logger.debug('create %scursor to %r', cursor_type, self.dbname)
327         return Cursor(self._pool, self.dbname, serialized=serialized)
328
329     def serialized_cursor(self):
330         return self.cursor(True)
331
332     def __nonzero__(self):
333         """Check if connection is possible"""
334         try:
335             cr = self.cursor()
336             cr.close()
337             return True
338         except:
339             return False
340
341
342 _dsn = ''
343 for p in ('host', 'port', 'user', 'password'):
344     cfg = tools.config['db_' + p]
345     if cfg:
346         _dsn += '%s=%s ' % (p, cfg)
347
348 def dsn(db_name):
349     return '%sdbname=%s' % (_dsn, db_name)
350
351 def dsn_are_equals(first, second):
352     def key(dsn):
353         k = dict(x.split('=', 1) for x in dsn.strip().split())
354         k.pop('password', None) # password is not relevant
355         return k
356     return key(first) == key(second)
357
358
359 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
360
361 def db_connect(db_name):
362     return Connection(_Pool, db_name)
363
364 def close_db(db_name):
365     _Pool.close_all(dsn(db_name))
366     tools.cache.clean_caches_for_db(db_name)
367
368
369 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
370