[FIX] Allow missing opcodes, harden check for private attributes (dunder), check...
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #    Copyright (C) 2010-2011 OpenERP s.a. (<http://openerp.com>).
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 __all__ = ['db_connect', 'close_db']
24
25 from threading import currentThread
26 import logging
27 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
28 from psycopg2.psycopg1 import cursor as psycopg1cursor
29 from psycopg2.pool import PoolError
30
31 import psycopg2.extensions
32 import warnings
33
34 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
35
36 types_mapping = {
37     'date': (1082,),
38     'time': (1083,),
39     'datetime': (1114,),
40 }
41
42 def unbuffer(symb, cr):
43     if symb is None: return None
44     return str(symb)
45
46 def undecimalize(symb, cr):
47     if symb is None: return None
48     return float(symb)
49
50 for name, typeoid in types_mapping.items():
51     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
52 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
53
54
55 import tools
56 from tools.func import wraps, frame_codeinfo
57 from netsvc import Agent
58 from datetime import datetime as mdt
59 from datetime import timedelta
60 import threading
61 from inspect import currentframe
62
63 import re
64 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
65 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
66
67 sql_counter = 0
68
69 class Cursor(object):
70     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
71     __logger = logging.getLogger('db.cursor')
72
73     def check(f):
74         @wraps(f)
75         def wrapper(self, *args, **kwargs):
76             if self.__closed:
77                 raise psycopg2.OperationalError('Unable to use the cursor after having closed it')
78             return f(self, *args, **kwargs)
79         return wrapper
80
81     def __init__(self, pool, dbname, serialized=False):
82         self.sql_from_log = {}
83         self.sql_into_log = {}
84
85         # default log level determined at cursor creation, could be
86         # overridden later for debugging purposes
87         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
88
89         self.sql_log_count = 0
90         self.__closed = True    # avoid the call of close() (by __del__) if an exception
91                                 # is raised by any of the following initialisations
92         self.__pool = pool
93         self.dbname = dbname
94         self._serialized = serialized
95         self._cnx = pool.borrow(dsn(dbname))
96         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
97         self.__closed = False   # real initialisation value
98         self.autocommit(False)
99         if self.sql_log:
100             self.__caller = frame_codeinfo(currentframe(),2)
101         else:
102             self.__caller = False
103
104     def __del__(self):
105         if not self.__closed:
106             # Oops. 'self' has not been closed explicitly.
107             # The cursor will be deleted by the garbage collector,
108             # but the database connection is not put back into the connection
109             # pool, preventing some operation on the database like dropping it.
110             # This can also lead to a server overload.
111             msg = "Cursor not closed explicitly\n"
112             if self.__caller:
113                 msg += "Cursor was created at %s:%s" % self.__caller
114             else:
115                 msg += "Please enable sql debugging to trace the caller."
116             self.__logger.warn(msg)
117             self._close(True)
118
119     @check
120     def execute(self, query, params=None, log_exceptions=True):
121         if '%d' in query or '%f' in query:
122             self.__logger.warn(query)
123             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
124                                "Use only %s")
125
126         if self.sql_log:
127             now = mdt.now()
128
129         try:
130             params = params or None
131             res = self._obj.execute(query, params)
132         except psycopg2.ProgrammingError, pe:
133             if log_exceptions:
134                 self.__logger.error("Programming error: %s, in query %s", pe, query)
135             raise
136         except Exception:
137             if log_exceptions:
138                 self.__logger.exception("bad query: %s", self._obj.query or query)
139             raise
140
141         if self.sql_log:
142             delay = mdt.now() - now
143             delay = delay.seconds * 1E6 + delay.microseconds
144
145             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
146             self.sql_log_count+=1
147             res_from = re_from.match(query.lower())
148             if res_from:
149                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
150                 self.sql_from_log[res_from.group(1)][0] += 1
151                 self.sql_from_log[res_from.group(1)][1] += delay
152             res_into = re_into.match(query.lower())
153             if res_into:
154                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
155                 self.sql_into_log[res_into.group(1)][0] += 1
156                 self.sql_into_log[res_into.group(1)][1] += delay
157         return res
158
159
160     def split_for_in_conditions(self, ids):
161         """Split a list of identifiers into one or more smaller tuples
162            safe for IN conditions, after uniquifying them."""
163         return tools.misc.split_every(self.IN_MAX, set(ids))
164
165     def print_log(self):
166         global sql_counter
167         sql_counter += self.sql_log_count
168         if not self.sql_log:
169             return
170         def process(type):
171             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
172             sum = 0
173             if sqllogs[type]:
174                 sqllogitems = sqllogs[type].items()
175                 sqllogitems.sort(key=lambda k: k[1][1])
176                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
177                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
178                 for r in sqllogitems:
179                     delay = timedelta(microseconds=r[1][1])
180                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
181                                         r[0], delay, r[1][0])
182                     sum+= r[1][1]
183                 sqllogs[type].clear()
184             sum = timedelta(microseconds=sum)
185             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
186                                 type, sum, self.sql_log_count, sql_counter)
187             sqllogs[type].clear()
188         process('from')
189         process('into')
190         self.sql_log_count = 0
191         self.sql_log = False
192
193     @check
194     def close(self):
195         return self._close(False)
196
197     def _close(self, leak=False):
198         if not self._obj:
199             return
200
201         self.print_log()
202
203         if not self._serialized:
204             self.rollback() # Ensure we close the current transaction.
205
206         self._obj.close()
207
208         # This force the cursor to be freed, and thus, available again. It is
209         # important because otherwise we can overload the server very easily
210         # because of a cursor shortage (because cursors are not garbage
211         # collected as fast as they should). The problem is probably due in
212         # part because browse records keep a reference to the cursor.
213         del self._obj
214         self.__closed = True
215
216         if leak:
217             self._cnx.leaked = True
218         else:
219             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
220             self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
221
222     @check
223     def autocommit(self, on):
224         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
225         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
226
227     @check
228     def commit(self):
229         return self._cnx.commit()
230
231     @check
232     def rollback(self):
233         return self._cnx.rollback()
234
235     @check
236     def __getattr__(self, name):
237         return getattr(self._obj, name)
238
239
240 class PsycoConnection(psycopg2.extensions.connection):
241     pass
242
243 class ConnectionPool(object):
244
245     __logger = logging.getLogger('db.connection_pool')
246
247     def locked(fun):
248         @wraps(fun)
249         def _locked(self, *args, **kwargs):
250             self._lock.acquire()
251             try:
252                 return fun(self, *args, **kwargs)
253             finally:
254                 self._lock.release()
255         return _locked
256
257
258     def __init__(self, maxconn=64):
259         self._connections = []
260         self._maxconn = max(maxconn, 1)
261         self._lock = threading.Lock()
262
263     def __repr__(self):
264         used = len([1 for c, u in self._connections[:] if u])
265         count = len(self._connections)
266         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
267
268     def _debug(self, msg, *args):
269         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
270
271     @locked
272     def borrow(self, dsn):
273         self._debug('Borrow connection to %r', dsn)
274
275         # free leaked connections
276         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
277             if getattr(cnx, 'leaked', False):
278                 delattr(cnx, 'leaked')
279                 self._connections.pop(i)
280                 self._connections.append((cnx, False))
281                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
282
283         for i, (cnx, used) in enumerate(self._connections):
284             if not used and dsn_are_equals(cnx.dsn, dsn):
285                 self._connections.pop(i)
286                 self._connections.append((cnx, True))
287                 self._debug('Existing connection found at index %d', i)
288
289                 return cnx
290
291         if len(self._connections) >= self._maxconn:
292             # try to remove the oldest connection not used
293             for i, (cnx, used) in enumerate(self._connections):
294                 if not used:
295                     self._connections.pop(i)
296                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
297                     break
298             else:
299                 # note: this code is called only if the for loop has completed (no break)
300                 raise PoolError('The Connection Pool Is Full')
301
302         try:
303             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
304         except psycopg2.Error, e:
305             self.__logger.exception('Connection to the database failed')
306             raise
307         self._connections.append((result, True))
308         self._debug('Create new connection')
309         return result
310
311     @locked
312     def give_back(self, connection, keep_in_pool=True):
313         self._debug('Give back connection to %r', connection.dsn)
314         for i, (cnx, used) in enumerate(self._connections):
315             if cnx is connection:
316                 self._connections.pop(i)
317                 if keep_in_pool:
318                     self._connections.append((cnx, False))
319                     self._debug('Put connection to %r in pool', cnx.dsn)
320                 else:
321                     self._debug('Forgot connection to %r', cnx.dsn)
322                 break
323         else:
324             raise PoolError('This connection does not below to the pool')
325
326     @locked
327     def close_all(self, dsn):
328         self.__logger.info('%r: Close all connections to %r', self, dsn)
329         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
330             if dsn_are_equals(cnx.dsn, dsn):
331                 cnx.close()
332                 self._connections.pop(i)
333
334
335 class Connection(object):
336     __logger = logging.getLogger('db.connection')
337
338     def __init__(self, pool, dbname):
339         self.dbname = dbname
340         self.__pool = pool
341
342     def cursor(self, serialized=False):
343         cursor_type = serialized and 'serialized ' or ''
344         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
345         return Cursor(self.__pool, self.dbname, serialized=serialized)
346
347     def serialized_cursor(self):
348         return self.cursor(True)
349
350     def __nonzero__(self):
351         """Check if connection is possible"""
352         try:
353             warnings.warn("You use an expensive function to test a connection.",
354                       DeprecationWarning, stacklevel=1)
355             cr = self.cursor()
356             cr.close()
357             return True
358         except Exception:
359             return False
360
361
362 _dsn = ''
363 for p in ('host', 'port', 'user', 'password'):
364     cfg = tools.config['db_' + p]
365     if cfg:
366         _dsn += '%s=%s ' % (p, cfg)
367
368 def dsn(db_name):
369     return '%sdbname=%s' % (_dsn, db_name)
370
371 def dsn_are_equals(first, second):
372     def key(dsn):
373         k = dict(x.split('=', 1) for x in dsn.strip().split())
374         k.pop('password', None) # password is not relevant
375         return k
376     return key(first) == key(second)
377
378
379 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
380
381 def db_connect(db_name):
382     currentThread().dbname = db_name
383     return Connection(_Pool, db_name)
384
385 def close_db(db_name):
386     _Pool.close_all(dsn(db_name))
387     Agent.cancel(db_name)
388     tools.cache.clean_caches_for_db(db_name)
389     ct = currentThread()
390     if hasattr(ct, 'dbname'):
391         delattr(ct, 'dbname')
392
393
394 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
395