tools.convert, misc: use pythonic logging
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 from threading import currentThread
25 import logging
26 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
27 from psycopg2.psycopg1 import cursor as psycopg1cursor
28 from psycopg2.pool import PoolError
29
30 import psycopg2.extensions
31 import warnings
32
33 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
34
35 types_mapping = {
36     'date': (1082,),
37     'time': (1083,),
38     'datetime': (1114,),
39 }
40
41 def unbuffer(symb, cr):
42     if symb is None: return None
43     return str(symb)
44
45 def undecimalize(symb, cr):
46     if symb is None: return None
47     return float(symb)
48
49 for name, typeoid in types_mapping.items():
50     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
51 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
52
53
54 import tools
55 from tools.func import wraps, frame_codeinfo
56 from datetime import datetime as mdt
57 from datetime import timedelta
58 import threading
59 from inspect import currentframe
60
61 import re
62 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
63 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
64
65 sql_counter = 0
66
67 class Cursor(object):
68     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
69     __logger = logging.getLogger('db.cursor')
70
71     def check(f):
72         @wraps(f)
73         def wrapper(self, *args, **kwargs):
74             if self.__closed:
75                 raise psycopg2.OperationalError('Unable to use the cursor after having closed it')
76             return f(self, *args, **kwargs)
77         return wrapper
78
79     def __init__(self, pool, dbname, serialized=False):
80         self.sql_from_log = {}
81         self.sql_into_log = {}
82
83         # default log level determined at cursor creation, could be
84         # overridden later for debugging purposes
85         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
86
87         self.sql_log_count = 0
88         self.__closed = True    # avoid the call of close() (by __del__) if an exception
89                                 # is raised by any of the following initialisations
90         self._pool = pool
91         self.dbname = dbname
92         self._serialized = serialized
93         self._cnx = pool.borrow(dsn(dbname))
94         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
95         self.__closed = False   # real initialisation value
96         self.autocommit(False)
97         self.__caller = frame_codeinfo(currentframe(),2)
98
99     def __del__(self):
100         if not self.__closed:
101             # Oops. 'self' has not been closed explicitly.
102             # The cursor will be deleted by the garbage collector,
103             # but the database connection is not put back into the connection
104             # pool, preventing some operation on the database like dropping it.
105             # This can also lead to a server overload.
106             msg = "Cursor not closed explicitly\n"  \
107                   "Cursor was created at %s:%s"
108             self.__logger.warn(msg, *self.__caller)
109             self._close(True)
110
111     @check
112     def execute(self, query, params=None, log_exceptions=True):
113         if '%d' in query or '%f' in query:
114             self.__logger.warn(query)
115             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
116                                "Use only %s")
117
118         if self.sql_log:
119             now = mdt.now()
120
121         try:
122             params = params or None
123             res = self._obj.execute(query, params)
124         except psycopg2.ProgrammingError, pe:
125             if log_exceptions or self.sql_log:
126                 self.__logger.error("Programming error: %s, in query %s", pe, query)
127             raise
128         except Exception:
129             if log_exceptions or self.sql_log:
130                 self.__logger.exception("bad query: %s", self._obj.query or query)
131             raise
132
133         if self.sql_log:
134             delay = mdt.now() - now
135             delay = delay.seconds * 1E6 + delay.microseconds
136
137             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
138             self.sql_log_count+=1
139             res_from = re_from.match(query.lower())
140             if res_from:
141                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
142                 self.sql_from_log[res_from.group(1)][0] += 1
143                 self.sql_from_log[res_from.group(1)][1] += delay
144             res_into = re_into.match(query.lower())
145             if res_into:
146                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
147                 self.sql_into_log[res_into.group(1)][0] += 1
148                 self.sql_into_log[res_into.group(1)][1] += delay
149         return res
150
151
152     def split_for_in_conditions(self, ids):
153         """Split a list of identifiers into one or more smaller tuples
154            safe for IN conditions, after uniquifying them."""
155         return tools.misc.split_every(self.IN_MAX, set(ids))
156
157     def print_log(self):
158         global sql_counter
159         sql_counter += self.sql_log_count
160         if not self.sql_log:
161             return
162         def process(type):
163             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
164             sum = 0
165             if sqllogs[type]:
166                 sqllogitems = sqllogs[type].items()
167                 sqllogitems.sort(key=lambda k: k[1][1])
168                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
169                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
170                 for r in sqllogitems:
171                     delay = timedelta(microseconds=r[1][1])
172                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
173                                         r[0], delay, r[1][0])
174                     sum+= r[1][1]
175                 sqllogs[type].clear()
176             sum = timedelta(microseconds=sum)
177             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
178                                 type, sum, self.sql_log_count, sql_counter)
179             sqllogs[type].clear()
180         process('from')
181         process('into')
182         self.sql_log_count = 0
183         self.sql_log = False
184
185     @check
186     def close(self):
187         return self._close(False)
188
189     def _close(self, leak=False):
190         if not self._obj:
191             return
192
193         self.print_log()
194
195         if not self._serialized:
196             self.rollback() # Ensure we close the current transaction.
197
198         self._obj.close()
199
200         # This force the cursor to be freed, and thus, available again. It is
201         # important because otherwise we can overload the server very easily
202         # because of a cursor shortage (because cursors are not garbage
203         # collected as fast as they should). The problem is probably due in
204         # part because browse records keep a reference to the cursor.
205         del self._obj
206         self.__closed = True
207
208         if leak:
209             self._cnx.leaked = True
210         else:
211             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
212             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
213
214     @check
215     def autocommit(self, on):
216         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
217         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
218
219     @check
220     def commit(self):
221         return self._cnx.commit()
222
223     @check
224     def rollback(self):
225         return self._cnx.rollback()
226
227     @check
228     def __getattr__(self, name):
229         return getattr(self._obj, name)
230
231
232 class PsycoConnection(psycopg2.extensions.connection):
233     pass
234
235 class ConnectionPool(object):
236
237     __logger = logging.getLogger('db.connection_pool')
238
239     def locked(fun):
240         @wraps(fun)
241         def _locked(self, *args, **kwargs):
242             self._lock.acquire()
243             try:
244                 return fun(self, *args, **kwargs)
245             finally:
246                 self._lock.release()
247         return _locked
248
249
250     def __init__(self, maxconn=64):
251         self._connections = []
252         self._maxconn = max(maxconn, 1)
253         self._lock = threading.Lock()
254
255     def __repr__(self):
256         used = len([1 for c, u in self._connections[:] if u])
257         count = len(self._connections)
258         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
259
260     def _debug(self, msg, *args):
261         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
262
263     @locked
264     def borrow(self, dsn):
265         self._debug('Borrow connection to %r', dsn)
266
267         # free leaked connections
268         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
269             if getattr(cnx, 'leaked', False):
270                 delattr(cnx, 'leaked')
271                 self._connections.pop(i)
272                 self._connections.append((cnx, False))
273                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
274
275         for i, (cnx, used) in enumerate(self._connections):
276             if not used and dsn_are_equals(cnx.dsn, dsn):
277                 self._connections.pop(i)
278                 self._connections.append((cnx, True))
279                 self._debug('Existing connection found at index %d', i)
280
281                 return cnx
282
283         if len(self._connections) >= self._maxconn:
284             # try to remove the oldest connection not used
285             for i, (cnx, used) in enumerate(self._connections):
286                 if not used:
287                     self._connections.pop(i)
288                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
289                     break
290             else:
291                 # note: this code is called only if the for loop has completed (no break)
292                 raise PoolError('The Connection Pool Is Full')
293
294         result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
295         self._connections.append((result, True))
296         self._debug('Create new connection')
297         return result
298
299     @locked
300     def give_back(self, connection, keep_in_pool=True):
301         self._debug('Give back connection to %r', connection.dsn)
302         for i, (cnx, used) in enumerate(self._connections):
303             if cnx is connection:
304                 self._connections.pop(i)
305                 if keep_in_pool:
306                     self._connections.append((cnx, False))
307                     self._debug('Put connection to %r in pool', cnx.dsn)
308                 else:
309                     self._debug('Forgot connection to %r', cnx.dsn)
310                 break
311         else:
312             raise PoolError('This connection does not below to the pool')
313
314     @locked
315     def close_all(self, dsn):
316         self.__logger.info('%r: Close all connections to %r', self, dsn)
317         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
318             if dsn_are_equals(cnx.dsn, dsn):
319                 cnx.close()
320                 self._connections.pop(i)
321
322
323 class Connection(object):
324     __logger = logging.getLogger('db.connection')
325
326     def __init__(self, pool, dbname):
327         self.dbname = dbname
328         self._pool = pool
329
330     def cursor(self, serialized=False):
331         cursor_type = serialized and 'serialized ' or ''
332         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
333         return Cursor(self._pool, self.dbname, serialized=serialized)
334
335     def serialized_cursor(self):
336         return self.cursor(True)
337
338     def __nonzero__(self):
339         """Check if connection is possible"""
340         try:
341             warnings.warn("You use an expensive function to test a connection.",
342                       DeprecationWarning, stacklevel=1)
343             cr = self.cursor()
344             cr.close()
345             return True
346         except Exception:
347             return False
348
349
350 _dsn = ''
351 for p in ('host', 'port', 'user', 'password'):
352     cfg = tools.config['db_' + p]
353     if cfg:
354         _dsn += '%s=%s ' % (p, cfg)
355
356 def dsn(db_name):
357     return '%sdbname=%s' % (_dsn, db_name)
358
359 def dsn_are_equals(first, second):
360     def key(dsn):
361         k = dict(x.split('=', 1) for x in dsn.strip().split())
362         k.pop('password', None) # password is not relevant
363         return k
364     return key(first) == key(second)
365
366
367 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
368
369 def db_connect(db_name):
370     currentThread().dbname = db_name
371     return Connection(_Pool, db_name)
372
373 def close_db(db_name):
374     _Pool.close_all(dsn(db_name))
375     tools.cache.clean_caches_for_db(db_name)
376     ct = currentThread()
377     if hasattr(ct, 'dbname'):
378         delattr(ct, 'dbname')
379
380
381 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
382