[FIX] do not keep connections to template1, template0 and postgres in the connection...
[odoo/odoo.git] / bin / sql_db.py
1 # -*- encoding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution   
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
6 #    $Id$
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU General Public License as published by
10 #    the Free Software Foundation, either version 3 of the License, or
11 #    (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU General Public License for more details.
17 #
18 #    You should have received a copy of the GNU General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 __all__ = ['db_connect', 'close_db']
24
25 import netsvc
26 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
27 from psycopg2.psycopg1 import cursor as psycopg1cursor
28 from psycopg2.pool import PoolError
29
30 import psycopg2.extensions
31
32 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
33
34 types_mapping = {
35     'date': (1082,),
36     'time': (1083,),
37     'datetime': (1114,),
38 }
39
40 def unbuffer(symb, cr):
41     if symb is None: return None
42     return str(symb)
43
44 def undecimalize(symb, cr):
45     if symb is None: return None
46     return float(symb)
47
48 for name, typeoid in types_mapping.items():
49     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
50 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
51
52
53 import tools
54 from tools.func import wraps
55 from datetime import datetime as mdt
56 from datetime import timedelta
57 import threading
58 from inspect import stack
59
60 import re
61 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
62 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
63
64
65 def log(msg, lvl=netsvc.LOG_DEBUG):
66     logger = netsvc.Logger()
67     logger.notifyChannel('sql', lvl, msg)
68
69 class Cursor(object):
70     IN_MAX = 1000
71
72     def check(f):
73         @wraps(f)
74         def wrapper(self, *args, **kwargs):
75             if self.__closed:
76                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closing it')
77             return f(self, *args, **kwargs)
78         return wrapper
79
80     def __init__(self, pool, dbname, serialized=False):
81         self.sql_from_log = {}
82         self.sql_into_log = {}
83         self.sql_log = False
84         self.sql_log_count = 0
85
86         self.__closed = True    # avoid the call of close() (by __del__) if an exception
87                                 # is raised by any of the following initialisations
88         self._pool = pool
89         self.dbname = dbname
90         self._serialized = serialized
91         self._cnx = pool.borrow(dsn(dbname))
92         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
93         self.__closed = False   # real initialisation value
94         self.autocommit(False)
95         self.__caller = tuple(stack()[2][1:3])
96
97     def __del__(self):
98         if not self.__closed:
99             # Oops. 'self' has not been closed explicitly.
100             # The cursor will be deleted by the garbage collector,
101             # but the database connection is not put back into the connection
102             # pool, preventing some operation on the database like dropping it.
103             # This can also lead to a server overload.
104             msg = "Cursor not closed explicitly\n"  \
105                   "Cursor was created at %s:%s" % self.__caller
106             log(msg, netsvc.LOG_WARNING)
107             self.close()
108
109     @check
110     def execute(self, query, params=None):
111         if '%d' in query or '%f' in query:
112             log(query, netsvc.LOG_WARNING)
113             log("SQL queries mustn't contain %d or %f anymore. Use only %s", netsvc.LOG_WARNING)
114             if params:
115                 query = query.replace('%d', '%s').replace('%f', '%s')
116
117         if self.sql_log:
118             now = mdt.now()
119
120         try:
121             params = params or None
122             res = self._obj.execute(query, params)
123         except Exception, e:
124             log("bad query: %s" % self._obj.query)
125             log(e)
126             raise
127
128         if self.sql_log:
129             delay = mdt.now() - now
130             delay = delay.seconds * 1E6 + delay.microseconds
131
132             log("query: %s" % self._obj.query)
133             self.sql_log_count+=1
134             res_from = re_from.match(query.lower())
135             if res_from:
136                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
137                 self.sql_from_log[res_from.group(1)][0] += 1
138                 self.sql_from_log[res_from.group(1)][1] += delay
139             res_into = re_into.match(query.lower())
140             if res_into:
141                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
142                 self.sql_into_log[res_into.group(1)][0] += 1
143                 self.sql_into_log[res_into.group(1)][1] += delay
144         return res
145
146     def print_log(self):
147         if not self.sql_log:
148             return
149
150         def process(type):
151             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
152             if not sqllogs[type]:
153                 return
154             sqllogitems = sqllogs[type].items()
155             sqllogitems.sort(key=lambda k: k[1][1])
156             sum = 0
157             log("SQL LOG %s:" % (type,))
158             for r in sqllogitems:
159                 delay = timedelta(microseconds=r[1][1])
160                 log("table: %s: %s/%s" %(r[0], str(delay), r[1][0]))
161                 sum+= r[1][1]
162             sum = timedelta(microseconds=sum)
163             log("SUM:%s/%d" % (str(sum), self.sql_log_count))
164             sqllogs[type].clear()
165         process('from')
166         process('into')
167         self.sql_log_count = 0
168         self.sql_log = False
169
170     @check
171     def close(self):
172         if not self._obj:
173             return
174
175         self.print_log()
176
177         if not self._serialized:
178             self.rollback() # Ensure we close the current transaction.
179
180         self._obj.close()
181
182         # This force the cursor to be freed, and thus, available again. It is
183         # important because otherwise we can overload the server very easily
184         # because of a cursor shortage (because cursors are not garbage
185         # collected as fast as they should). The problem is probably due in
186         # part because browse records keep a reference to the cursor.
187         del self._obj
188         self.__closed = True
189         keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
190         self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
191
192     @check
193     def autocommit(self, on):
194         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
195         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
196
197     @check
198     def commit(self):
199         return self._cnx.commit()
200
201     @check
202     def rollback(self):
203         return self._cnx.rollback()
204
205     @check
206     def __getattr__(self, name):
207         return getattr(self._obj, name)
208
209
210 class ConnectionPool(object):
211
212     def locked(fun):
213         @wraps(fun)
214         def _locked(self, *args, **kwargs):
215             self._lock.acquire()
216             try:
217                 return fun(self, *args, **kwargs)
218             finally:
219                 self._lock.release()
220         return _locked
221
222
223     def __init__(self, maxconn=64):
224         self._connections = []
225         self._maxconn = max(maxconn, 1)
226         self._lock = threading.Lock()
227         self._logger = netsvc.Logger()
228
229     def __repr__(self):
230         used = len([1 for c, u in self._connections[:] if u])
231         count = len(self._connections)
232         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
233
234     def _debug(self, msg):
235         self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, repr(self))
236         self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, msg)
237
238     @locked
239     def borrow(self, dsn):
240         self._debug('Borrow connection to %s' % (dsn,))
241
242         result = None
243         for i, (cnx, used) in enumerate(self._connections):
244             if not used and dsn_are_equals(cnx.dsn, dsn):
245                 self._debug('Existing connection found at index %d' % i)
246
247                 self._connections.pop(i)
248                 self._connections.append((cnx, True))
249
250                 result = cnx
251                 break
252         if result:
253             return result
254
255         if len(self._connections) >= self._maxconn:
256             # try to remove the oldest connection not used
257             for i, (cnx, used) in enumerate(self._connections):
258                 if not used:
259                     self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
260                     self._connections.pop(i)
261                     break
262             else:
263                 # note: this code is called only if the for loop has completed (no break)
264                 raise PoolError('The Connection Pool Is Full')
265
266         self._debug('Create new connection')
267         result = psycopg2.connect(dsn=dsn)
268         self._connections.append((result, True))
269         return result
270
271     @locked
272     def give_back(self, connection, keep_in_pool=True):
273         self._debug('Give back connection to %s' % (connection.dsn,))
274         for i, (cnx, used) in enumerate(self._connections):
275             if cnx is connection:
276                 self._connections.pop(i)
277                 if keep_in_pool:
278                     self._connections.append((cnx, False))
279                     self._debug('Put connection to %s in pool' % (cnx.dsn,))
280                 break
281         else:
282             raise PoolError('This connection does not below to the pool')
283
284     @locked
285     def close_all(self, dsn):
286         self._debug('Close all connections to %s' % (dsn,))
287         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
288             if dsn_are_equals(cnx.dsn, dsn):
289                 cnx.close()
290                 self._connections.pop(i)
291
292
293 class Connection(object):
294     def _debug(self, msg):
295         self._logger.notifyChannel('Connection', netsvc.LOG_DEBUG, msg)
296
297     def __init__(self, pool, dbname):
298         self.dbname = dbname
299         self._pool = pool
300         self._logger = netsvc.Logger()
301
302     def cursor(self, serialized=False):
303         cursor_type = serialized and 'serialized ' or ''
304         self._debug('create %scursor to "%s"' % (cursor_type, self.dbname,))
305         return Cursor(self._pool, self.dbname, serialized=serialized)
306
307     def serialized_cursor(self):
308         return self.cursor(True)
309
310     def __nonzero__(self):
311         """Check if connection is possible"""
312         try:
313             cr = self.cursor()
314             cr.close()
315             return True
316         except:
317             return False
318
319
320 _dsn = ''
321 for p in ('host', 'port', 'user', 'password'):
322     cfg = tools.config['db_' + p]
323     if cfg:
324         _dsn += '%s=%s ' % (p, cfg)
325
326 def dsn(db_name):
327     return '%sdbname=%s' % (_dsn, db_name)
328
329 def dsn_are_equals(first, second):
330     def key(dsn):
331         k = dict(x.split('=', 1) for x in dsn.strip().split())
332         k.pop('password', None) # password is not relevant
333         return k
334     return key(first) == key(second)
335
336
337 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
338
339 def db_connect(db_name):
340     return Connection(_Pool, db_name)
341
342 def close_db(db_name):
343     _Pool.close_all(dsn(db_name))
344     tools.cache.clean_caches_for_db(db_name)
345
346
347 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
348