17bfa3406530318d683258ba17a3c0484251918f
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #    
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.     
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 import logging
25 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
26 from psycopg2.psycopg1 import cursor as psycopg1cursor
27 from psycopg2.pool import PoolError
28
29 import psycopg2.extensions
30
31 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
32
33 types_mapping = {
34     'date': (1082,),
35     'time': (1083,),
36     'datetime': (1114,),
37 }
38
39 def unbuffer(symb, cr):
40     if symb is None: return None
41     return str(symb)
42
43 def undecimalize(symb, cr):
44     if symb is None: return None
45     return float(symb)
46
47 for name, typeoid in types_mapping.items():
48     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
49 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
50
51
52 import tools
53 from tools.func import wraps
54 from datetime import datetime as mdt
55 from datetime import timedelta
56 import threading
57 from inspect import stack
58
59 import re
60 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
61 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
62
63 sql_counter = 0
64
65 class Cursor(object):
66     IN_MAX = 1000
67     __logger = logging.getLogger('db.cursor')
68
69     def check(f):
70         @wraps(f)
71         def wrapper(self, *args, **kwargs):
72             if self.__closed:
73                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closed it')
74             return f(self, *args, **kwargs)
75         return wrapper
76
77     def __init__(self, pool, dbname, serialized=False):
78         self.sql_from_log = {}
79         self.sql_into_log = {}
80         self.sql_log = False
81         self.sql_log_count = 0
82         self.__closed = True    # avoid the call of close() (by __del__) if an exception
83                                 # is raised by any of the following initialisations
84         self._pool = pool
85         self.dbname = dbname
86         self._serialized = serialized
87         self._cnx = pool.borrow(dsn(dbname))
88         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
89         self.__closed = False   # real initialisation value
90         self.autocommit(False)
91         self.__caller = tuple(stack()[2][1:3])
92
93     def __del__(self):
94         if not self.__closed:
95             # Oops. 'self' has not been closed explicitly.
96             # The cursor will be deleted by the garbage collector,
97             # but the database connection is not put back into the connection
98             # pool, preventing some operation on the database like dropping it.
99             # This can also lead to a server overload.
100             msg = "Cursor not closed explicitly\n"  \
101                   "Cursor was created at %s:%s"
102             self.__logger.warn(msg, *self.__caller)
103             self.close()
104
105     @check
106     def execute(self, query, params=None):
107         if '%d' in query or '%f' in query:
108             self.__logger.warn(query)
109             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
110                                "Use only %s")
111             if params:
112                 query = query.replace('%d', '%s').replace('%f', '%s')
113
114         if self.sql_log:
115             now = mdt.now()
116
117         try:
118             params = params or None
119             res = self._obj.execute(query, params)
120         except psycopg2.ProgrammingError, pe:
121             self.__logger.error("Programming error: %s, in query %s" % (pe, query))
122             raise
123         except Exception:
124             self.__logger.exception("bad query: %s", self._obj.query)
125             raise
126
127         if self.sql_log:
128             delay = mdt.now() - now
129             delay = delay.seconds * 1E6 + delay.microseconds
130
131             self.__logger.debug("query: %s", self._obj.query)
132             self.sql_log_count+=1
133             res_from = re_from.match(query.lower())
134             if res_from:
135                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
136                 self.sql_from_log[res_from.group(1)][0] += 1
137                 self.sql_from_log[res_from.group(1)][1] += delay
138             res_into = re_into.match(query.lower())
139             if res_into:
140                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
141                 self.sql_into_log[res_into.group(1)][0] += 1
142                 self.sql_into_log[res_into.group(1)][1] += delay
143         return res
144
145     def print_log(self):
146         global sql_counter
147         sql_counter += self.sql_log_count
148         if not self.sql_log:
149             return
150         def process(type):
151             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
152             sum = 0
153             if sqllogs[type]:
154                 sqllogitems = sqllogs[type].items()
155                 sqllogitems.sort(key=lambda k: k[1][1])
156                 self.__logger.debug("SQL LOG %s:", type)
157                 for r in sqllogitems:
158                     delay = timedelta(microseconds=r[1][1])
159                     self.__logger.debug("table: %s: %s/%s",
160                                         r[0], delay, r[1][0])
161                     sum+= r[1][1]
162                 sqllogs[type].clear()
163             sum = timedelta(microseconds=sum)
164             self.__logger.debug("SUM %s:%s/%d [%d]",
165                                 type, sum, self.sql_log_count, sql_counter)
166             sqllogs[type].clear()
167         process('from')
168         process('into')
169         self.sql_log_count = 0
170         self.sql_log = False
171
172     @check
173     def close(self):
174         if not self._obj:
175             return
176
177         self.print_log()
178
179         if not self._serialized:
180             self.rollback() # Ensure we close the current transaction.
181
182         self._obj.close()
183
184         # This force the cursor to be freed, and thus, available again. It is
185         # important because otherwise we can overload the server very easily
186         # because of a cursor shortage (because cursors are not garbage
187         # collected as fast as they should). The problem is probably due in
188         # part because browse records keep a reference to the cursor.
189         del self._obj
190         self.__closed = True
191         self._pool.give_back(self._cnx)
192
193     @check
194     def autocommit(self, on):
195         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
196         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
197
198     @check
199     def commit(self):
200         return self._cnx.commit()
201
202     @check
203     def rollback(self):
204         return self._cnx.rollback()
205
206     @check
207     def __getattr__(self, name):
208         return getattr(self._obj, name)
209
210
211 class ConnectionPool(object):
212
213     __logger = logging.getLogger('db.connection_pool')
214
215     def locked(fun):
216         @wraps(fun)
217         def _locked(self, *args, **kwargs):
218             self._lock.acquire()
219             try:
220                 return fun(self, *args, **kwargs)
221             finally:
222                 self._lock.release()
223         return _locked
224
225
226     def __init__(self, maxconn=64):
227         self._connections = []
228         self._maxconn = max(maxconn, 1)
229         self._lock = threading.Lock()
230
231     def __repr__(self):
232         used = len([1 for c, u in self._connections[:] if u])
233         count = len(self._connections)
234         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
235
236     def _debug(self, msg):
237         self.__logger.debug(repr(self))
238         self.__logger.debug(msg)
239
240     @locked
241     def borrow(self, dsn):
242         self._debug('Borrow connection to %s' % (dsn,))
243
244         result = None
245         for i, (cnx, used) in enumerate(self._connections):
246             if not used and dsn_are_equals(cnx.dsn, dsn):
247                 self._debug('Existing connection found at index %d' % i)
248
249                 self._connections.pop(i)
250                 self._connections.append((cnx, True))
251
252                 result = cnx
253                 break
254         if result:
255             return result
256
257         if len(self._connections) >= self._maxconn:
258             # try to remove the oldest connection not used
259             for i, (cnx, used) in enumerate(self._connections):
260                 if not used:
261                     self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
262                     self._connections.pop(i)
263                     break
264             else:
265                 # note: this code is called only if the for loop has completed (no break)
266                 raise PoolError('The Connection Pool Is Full')
267
268         self._debug('Create new connection')
269         result = psycopg2.connect(dsn=dsn)
270         self._connections.append((result, True))
271         return result
272
273     @locked
274     def give_back(self, connection):
275         self._debug('Give back connection to %s' % (connection.dsn,))
276         for i, (cnx, used) in enumerate(self._connections):
277             if cnx is connection:
278                 self._connections.pop(i)
279                 self._connections.append((cnx, False))
280                 break
281         else:
282             raise PoolError('This connection does not below to the pool')
283
284     @locked
285     def close_all(self, dsn):
286         self._debug('Close all connections to %s' % (dsn,))
287         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
288             if dsn_are_equals(cnx.dsn, dsn):
289                 cnx.close()
290                 self._connections.pop(i)
291
292
293 class Connection(object):
294     __logger = logging.getLogger('db.connection')
295
296     def __init__(self, pool, dbname):
297         self.dbname = dbname
298         self._pool = pool
299
300     def cursor(self, serialized=False):
301         cursor_type = serialized and 'serialized ' or ''
302         self.__logger.debug('create %scursor to "%s"' % (cursor_type, self.dbname,))
303         return Cursor(self._pool, self.dbname, serialized=serialized)
304
305     def serialized_cursor(self):
306         return self.cursor(True)
307
308     def __nonzero__(self):
309         """Check if connection is possible"""
310         try:
311             cr = self.cursor()
312             cr.close()
313             return True
314         except:
315             return False
316
317
318 _dsn = ''
319 for p in ('host', 'port', 'user', 'password'):
320     cfg = tools.config['db_' + p]
321     if cfg:
322         _dsn += '%s=%s ' % (p, cfg)
323
324 def dsn(db_name):
325     return '%sdbname=%s' % (_dsn, db_name)
326
327 def dsn_are_equals(first, second):
328     def key(dsn):
329         k = dict(x.split('=', 1) for x in dsn.strip().split())
330         k.pop('password', None) # password is not relevant
331         return k
332     return key(first) == key(second)
333
334
335 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
336
337 def db_connect(db_name):
338     return Connection(_Pool, db_name)
339
340 def close_db(db_name):
341     _Pool.close_all(dsn(db_name))
342     tools.cache.clean_caches_for_db(db_name)
343
344
345 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
346