[FIX] Use the currentThread function instead of current_thread
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 from threading import currentThread
25 import logging
26 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
27 from psycopg2.psycopg1 import cursor as psycopg1cursor
28 from psycopg2.pool import PoolError
29
30 import psycopg2.extensions
31
32 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
33
34 types_mapping = {
35     'date': (1082,),
36     'time': (1083,),
37     'datetime': (1114,),
38 }
39
40 def unbuffer(symb, cr):
41     if symb is None: return None
42     return str(symb)
43
44 def undecimalize(symb, cr):
45     if symb is None: return None
46     return float(symb)
47
48 for name, typeoid in types_mapping.items():
49     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
50 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
51
52
53 import tools
54 from tools.func import wraps
55 from datetime import datetime as mdt
56 from datetime import timedelta
57 import threading
58 from inspect import stack
59
60 import re
61 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
62 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
63
64 sql_counter = 0
65
66 class Cursor(object):
67     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
68     __logger = logging.getLogger('db.cursor')
69
70     def check(f):
71         @wraps(f)
72         def wrapper(self, *args, **kwargs):
73             if self.__closed:
74                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closed it')
75             return f(self, *args, **kwargs)
76         return wrapper
77
78     def __init__(self, pool, dbname, serialized=False):
79         self.sql_from_log = {}
80         self.sql_into_log = {}
81
82         # default log level determined at cursor creation, could be
83         # overridden later for debugging purposes
84         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
85
86         self.sql_log_count = 0
87         self.__closed = True    # avoid the call of close() (by __del__) if an exception
88                                 # is raised by any of the following initialisations
89         self._pool = pool
90         self.dbname = dbname
91         self._serialized = serialized
92         self._cnx = pool.borrow(dsn(dbname))
93         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
94         self.__closed = False   # real initialisation value
95         self.autocommit(False)
96         self.__caller = tuple(stack()[2][1:3])
97
98     def __del__(self):
99         if not self.__closed:
100             # Oops. 'self' has not been closed explicitly.
101             # The cursor will be deleted by the garbage collector,
102             # but the database connection is not put back into the connection
103             # pool, preventing some operation on the database like dropping it.
104             # This can also lead to a server overload.
105             msg = "Cursor not closed explicitly\n"  \
106                   "Cursor was created at %s:%s"
107             self.__logger.warn(msg, *self.__caller)
108             self._close(True)
109
110     @check
111     def execute(self, query, params=None, log_exceptions=True):
112         if '%d' in query or '%f' in query:
113             self.__logger.warn(query)
114             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
115                                "Use only %s")
116             if params:
117                 query = query.replace('%d', '%s').replace('%f', '%s')
118
119         if self.sql_log:
120             now = mdt.now()
121
122         try:
123             params = params or None
124             res = self._obj.execute(query, params)
125         except psycopg2.ProgrammingError, pe:
126             if log_exceptions or self.sql_log:
127                 self.__logger.error("Programming error: %s, in query %s", pe, query)
128             raise
129         except Exception:
130             if log_exceptions or self.sql_log:
131                 self.__logger.exception("bad query: %s", self._obj.query or query)
132             raise
133
134         if self.sql_log:
135             delay = mdt.now() - now
136             delay = delay.seconds * 1E6 + delay.microseconds
137
138             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
139             self.sql_log_count+=1
140             res_from = re_from.match(query.lower())
141             if res_from:
142                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
143                 self.sql_from_log[res_from.group(1)][0] += 1
144                 self.sql_from_log[res_from.group(1)][1] += delay
145             res_into = re_into.match(query.lower())
146             if res_into:
147                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
148                 self.sql_into_log[res_into.group(1)][0] += 1
149                 self.sql_into_log[res_into.group(1)][1] += delay
150         return res
151
152
153     def split_for_in_conditions(self, ids):
154         """Split a list of identifiers into one or more smaller tuples
155            safe for IN conditions, after uniquifying them."""
156         return tools.misc.split_every(self.IN_MAX, set(ids))
157
158     def print_log(self):
159         global sql_counter
160         sql_counter += self.sql_log_count
161         if not self.sql_log:
162             return
163         def process(type):
164             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
165             sum = 0
166             if sqllogs[type]:
167                 sqllogitems = sqllogs[type].items()
168                 sqllogitems.sort(key=lambda k: k[1][1])
169                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
170                 for r in sqllogitems:
171                     delay = timedelta(microseconds=r[1][1])
172                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
173                                         r[0], delay, r[1][0])
174                     sum+= r[1][1]
175                 sqllogs[type].clear()
176             sum = timedelta(microseconds=sum)
177             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
178                                 type, sum, self.sql_log_count, sql_counter)
179             sqllogs[type].clear()
180         process('from')
181         process('into')
182         self.sql_log_count = 0
183         self.sql_log = False
184
185     @check
186     def close(self):
187         return self._close(False)
188
189     def _close(self, leak=False):
190         if not self._obj:
191             return
192
193         self.print_log()
194
195         if not self._serialized:
196             self.rollback() # Ensure we close the current transaction.
197
198         self._obj.close()
199
200         # This force the cursor to be freed, and thus, available again. It is
201         # important because otherwise we can overload the server very easily
202         # because of a cursor shortage (because cursors are not garbage
203         # collected as fast as they should). The problem is probably due in
204         # part because browse records keep a reference to the cursor.
205         del self._obj
206         self.__closed = True
207
208         if leak:
209             self._cnx.leaked = True
210         else:
211             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
212             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
213
214     @check
215     def autocommit(self, on):
216         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
217         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
218
219     @check
220     def commit(self):
221         return self._cnx.commit()
222
223     @check
224     def rollback(self):
225         return self._cnx.rollback()
226
227     @check
228     def __getattr__(self, name):
229         return getattr(self._obj, name)
230
231
232 class PsycoConnection(psycopg2.extensions.connection):
233     pass
234
235 class ConnectionPool(object):
236
237     __logger = logging.getLogger('db.connection_pool')
238
239     def locked(fun):
240         @wraps(fun)
241         def _locked(self, *args, **kwargs):
242             self._lock.acquire()
243             try:
244                 return fun(self, *args, **kwargs)
245             finally:
246                 self._lock.release()
247         return _locked
248
249
250     def __init__(self, maxconn=64):
251         self._connections = []
252         self._maxconn = max(maxconn, 1)
253         self._lock = threading.Lock()
254
255     def __repr__(self):
256         used = len([1 for c, u in self._connections[:] if u])
257         count = len(self._connections)
258         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
259
260     def _debug(self, msg, *args):
261         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
262
263     @locked
264     def borrow(self, dsn):
265         self._debug('Borrow connection to %r', dsn)
266
267         # free leaked connections
268         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
269             if getattr(cnx, 'leaked', False):
270                 delattr(cnx, 'leaked')
271                 self._connections.pop(i)
272                 self._connections.append((cnx, False))
273                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
274
275         for i, (cnx, used) in enumerate(self._connections):
276             if not used and dsn_are_equals(cnx.dsn, dsn):
277                 self._connections.pop(i)
278                 self._connections.append((cnx, True))
279                 self._debug('Existing connection found at index %d', i)
280
281                 return cnx
282
283         if len(self._connections) >= self._maxconn:
284             # try to remove the oldest connection not used
285             for i, (cnx, used) in enumerate(self._connections):
286                 if not used:
287                     self._connections.pop(i)
288                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
289                     break
290             else:
291                 # note: this code is called only if the for loop has completed (no break)
292                 raise PoolError('The Connection Pool Is Full')
293
294         result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
295         self._connections.append((result, True))
296         self._debug('Create new connection')
297         return result
298
299     @locked
300     def give_back(self, connection, keep_in_pool=True):
301         self._debug('Give back connection to %r', connection.dsn)
302         for i, (cnx, used) in enumerate(self._connections):
303             if cnx is connection:
304                 self._connections.pop(i)
305                 if keep_in_pool:
306                     self._connections.append((cnx, False))
307                     self._debug('Put connection to %r in pool', cnx.dsn)
308                 else:
309                     self._debug('Forgot connection to %r', cnx.dsn)
310                 break
311         else:
312             raise PoolError('This connection does not below to the pool')
313
314     @locked
315     def close_all(self, dsn):
316         self.__logger.info('%r: Close all connections to %r', self, dsn)
317         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
318             if dsn_are_equals(cnx.dsn, dsn):
319                 cnx.close()
320                 self._connections.pop(i)
321
322
323 class Connection(object):
324     __logger = logging.getLogger('db.connection')
325
326     def __init__(self, pool, dbname):
327         self.dbname = dbname
328         self._pool = pool
329
330     def cursor(self, serialized=False):
331         cursor_type = serialized and 'serialized ' or ''
332         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
333         return Cursor(self._pool, self.dbname, serialized=serialized)
334
335     def serialized_cursor(self):
336         return self.cursor(True)
337
338     def __nonzero__(self):
339         """Check if connection is possible"""
340         try:
341             cr = self.cursor()
342             cr.close()
343             return True
344         except:
345             return False
346
347
348 _dsn = ''
349 for p in ('host', 'port', 'user', 'password'):
350     cfg = tools.config['db_' + p]
351     if cfg:
352         _dsn += '%s=%s ' % (p, cfg)
353
354 def dsn(db_name):
355     return '%sdbname=%s' % (_dsn, db_name)
356
357 def dsn_are_equals(first, second):
358     def key(dsn):
359         k = dict(x.split('=', 1) for x in dsn.strip().split())
360         k.pop('password', None) # password is not relevant
361         return k
362     return key(first) == key(second)
363
364
365 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
366
367 def db_connect(db_name):
368     currentThread().dbname = db_name
369     return Connection(_Pool, db_name)
370
371 def close_db(db_name):
372     _Pool.close_all(dsn(db_name))
373     tools.cache.clean_caches_for_db(db_name)
374     ct = currentThread()
375     if hasattr(ct, 'dbname'):
376         delattr(ct, 'dbname')
377
378
379 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
380