[FIX] event: reset tax for each event to invoice
[odoo/odoo.git] / bin / sql_db.py
1 # -*- encoding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution   
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
6 #    $Id$
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU General Public License as published by
10 #    the Free Software Foundation, either version 3 of the License, or
11 #    (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU General Public License for more details.
17 #
18 #    You should have received a copy of the GNU General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 __all__ = ['db_connect', 'close_db']
24
25 from threading import currentThread
26 import netsvc
27 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
28 from psycopg2.psycopg1 import cursor as psycopg1cursor
29 from psycopg2.pool import PoolError
30
31 import psycopg2.extensions
32
33 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
34
35 types_mapping = {
36     'date': (1082,),
37     'time': (1083,),
38     'datetime': (1114,),
39 }
40
41 def unbuffer(symb, cr):
42     if symb is None: return None
43     return str(symb)
44
45 def undecimalize(symb, cr):
46     if symb is None: return None
47     return float(symb)
48
49 for name, typeoid in types_mapping.items():
50     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
51 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
52
53
54 import tools
55 from tools.func import wraps, partial
56 from datetime import datetime as mdt
57 from datetime import timedelta
58 import threading
59 from inspect import stack
60
61 import re
62 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
63 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
64
65
66 def log(msg, lvl=netsvc.LOG_DEBUG):
67     logger = netsvc.Logger()
68     logger.notifyChannel('sql', lvl, msg)
69
70 class Cursor(object):
71     IN_MAX = 1000
72
73     def check(f):
74         @wraps(f)
75         def wrapper(self, *args, **kwargs):
76             if self.__closed:
77                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closing it')
78             return f(self, *args, **kwargs)
79         return wrapper
80
81     def __init__(self, pool, dbname, serialized=False):
82         self.sql_from_log = {}
83         self.sql_into_log = {}
84         self.sql_log = False
85         self.sql_log_count = 0
86
87         self.__closed = True    # avoid the call of close() (by __del__) if an exception
88                                 # is raised by any of the following initialisations
89         self._pool = pool
90         self.dbname = dbname
91         self._serialized = serialized
92         self._cnx = pool.borrow(dsn(dbname))
93         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
94         self.__closed = False   # real initialisation value
95         self.autocommit(False)
96         self.__caller = tuple(stack()[2][1:3])
97
98     def __del__(self):
99         if not self.__closed:
100             # Oops. 'self' has not been closed explicitly.
101             # The cursor will be deleted by the garbage collector,
102             # but the database connection is not put back into the connection
103             # pool, preventing some operation on the database like dropping it.
104             # This can also lead to a server overload.
105             msg = "Cursor not closed explicitly\n"  \
106                   "Cursor was created at %s:%s" % self.__caller
107             log(msg, netsvc.LOG_WARNING)
108             self._close(True)
109
110     @check
111     def execute(self, query, params=None):
112         if '%d' in query or '%f' in query:
113             log(query, netsvc.LOG_WARNING)
114             log("SQL queries mustn't contain %d or %f anymore. Use only %s", netsvc.LOG_WARNING)
115             if params:
116                 query = query.replace('%d', '%s').replace('%f', '%s')
117
118         if self.sql_log:
119             now = mdt.now()
120
121         try:
122             params = params or None
123             res = self._obj.execute(query, params)
124         except Exception, e:
125             q = self._obj.query or query
126             log("bad query: %s" % (q,))
127             log(params)
128             log(e)
129             from traceback import format_stack
130             log(''.join(format_stack()))
131             raise
132
133         if self.sql_log:
134             delay = mdt.now() - now
135             delay = delay.seconds * 1E6 + delay.microseconds
136
137             log("query: %s" % self._obj.query)
138             self.sql_log_count+=1
139             res_from = re_from.match(query.lower())
140             if res_from:
141                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
142                 self.sql_from_log[res_from.group(1)][0] += 1
143                 self.sql_from_log[res_from.group(1)][1] += delay
144             res_into = re_into.match(query.lower())
145             if res_into:
146                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
147                 self.sql_into_log[res_into.group(1)][0] += 1
148                 self.sql_into_log[res_into.group(1)][1] += delay
149         return res
150
151     def print_log(self):
152         if not self.sql_log:
153             return
154
155         def process(type):
156             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
157             if not sqllogs[type]:
158                 return
159             sqllogitems = sqllogs[type].items()
160             sqllogitems.sort(key=lambda k: k[1][1])
161             sum = 0
162             log("SQL LOG %s:" % (type,))
163             for r in sqllogitems:
164                 delay = timedelta(microseconds=r[1][1])
165                 log("table: %s: %s/%s" %(r[0], str(delay), r[1][0]))
166                 sum+= r[1][1]
167             sum = timedelta(microseconds=sum)
168             log("SUM:%s/%d" % (str(sum), self.sql_log_count))
169             sqllogs[type].clear()
170         process('from')
171         process('into')
172         self.sql_log_count = 0
173         self.sql_log = False
174
175     @check
176     def close(self):
177         return self._close(False)
178
179     def _close(self, leak=False):
180         if not self._obj:
181             return
182
183         self.print_log()
184
185         if not self._serialized:
186             self.rollback() # Ensure we close the current transaction.
187
188         self._obj.close()
189
190         # This force the cursor to be freed, and thus, available again. It is
191         # important because otherwise we can overload the server very easily
192         # because of a cursor shortage (because cursors are not garbage
193         # collected as fast as they should). The problem is probably due in
194         # part because browse records keep a reference to the cursor.
195         del self._obj
196         self.__closed = True
197
198         if leak:
199             self._cnx.leaked = True
200         else:
201             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
202             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
203
204     @check
205     def autocommit(self, on):
206         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
207         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
208
209     @check
210     def commit(self):
211         return self._cnx.commit()
212
213     @check
214     def rollback(self):
215         return self._cnx.rollback()
216
217     @check
218     def __getattr__(self, name):
219         return getattr(self._obj, name)
220
221
222 class PsycoConnection(psycopg2.extensions.connection):
223     pass
224
225 class ConnectionPool(object):
226
227     def locked(fun):
228         @wraps(fun)
229         def _locked(self, *args, **kwargs):
230             self._lock.acquire()
231             try:
232                 return fun(self, *args, **kwargs)
233             finally:
234                 self._lock.release()
235         return _locked
236
237
238     def __init__(self, maxconn=64):
239         self._connections = []
240         self._maxconn = max(maxconn, 1)
241         self._lock = threading.Lock()
242         self._log = partial(netsvc.Logger().notifyChannel, 'ConnectionPool')
243
244     def __repr__(self):
245         used = len([1 for c, u in self._connections[:] if u])
246         count = len(self._connections)
247         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
248
249     def _debug(self, msg):
250         msg = "%s %s" % (repr(self), msg)
251         self._log(netsvc.LOG_DEBUG, msg)
252     def _info(self, msg):
253         msg = "%s %s" % (repr(self), msg)
254         self._log(netsvc.LOG_INFO, msg)
255     def _warn(self, msg):
256         msg = "%s %s" % (repr(self), msg)
257         self._log(netsvc.LOG_WARNING, msg)
258
259     @locked
260     def borrow(self, dsn):
261         # free leaked connections
262         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
263             if getattr(cnx, 'leaked', False):
264                 delattr(cnx, 'leaked')
265                 self._connections.pop(i)
266                 self._connections.append((cnx, False))
267                 self._warn('Free leaked connection to %s' % (cnx.dsn,))
268
269         for i, (cnx, used) in enumerate(self._connections):
270             if not used and dsn_are_equals(cnx.dsn, dsn):
271                 self._connections.pop(i)
272                 self._connections.append((cnx, True))
273
274                 return cnx
275
276         if len(self._connections) >= self._maxconn:
277             # try to remove the oldest connection not used
278             for i, (cnx, used) in enumerate(self._connections):
279                 if not used:
280                     self._connections.pop(i)
281                     self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
282                     break
283             else:
284                 # note: this code is called only if the for loop has completed (no break)
285                 raise PoolError('The Connection Pool Is Full')
286
287         result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
288         self._connections.append((result, True))
289         return result
290
291     @locked
292     def give_back(self, connection, keep_in_pool=True):
293         for i, (cnx, used) in enumerate(self._connections):
294             if cnx is connection:
295                 self._connections.pop(i)
296                 if keep_in_pool:
297                     self._connections.append((cnx, False))
298                 break
299         else:
300             raise PoolError('This connection does not below to the pool')
301
302     @locked
303     def close_all(self, dsn):
304         self._info('Close all connections to %s' % (dsn,))
305         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
306             if dsn_are_equals(cnx.dsn, dsn):
307                 cnx.close()
308                 self._connections.pop(i)
309
310
311 class Connection(object):
312     def __init__(self, pool, dbname):
313         self.dbname = dbname
314         self._pool = pool
315
316     def cursor(self, serialized=False):
317         return Cursor(self._pool, self.dbname, serialized=serialized)
318
319     def serialized_cursor(self):
320         return self.cursor(True)
321
322     def __nonzero__(self):
323         """Check if connection is possible"""
324         try:
325             cr = self.cursor()
326             cr.close()
327             return True
328         except:
329             return False
330
331
332 _dsn = ''
333 for p in ('host', 'port', 'user', 'password'):
334     cfg = tools.config['db_' + p]
335     if cfg:
336         _dsn += '%s=%s ' % (p, cfg)
337
338 def dsn(db_name):
339     return '%sdbname=%s' % (_dsn, db_name)
340
341 def dsn_are_equals(first, second):
342     def key(dsn):
343         k = dict(x.split('=', 1) for x in dsn.strip().split())
344         k.pop('password', None) # password is not relevant
345         return k
346     return key(first) == key(second)
347
348
349 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
350
351 def db_connect(db_name):
352     currentThread().dbname = db_name
353     return Connection(_Pool, db_name)
354
355 def close_db(db_name):
356     _Pool.close_all(dsn(db_name))
357     tools.cache.clean_caches_for_db(db_name)
358     ct = currentThread()
359     if hasattr(ct, 'dbname'):
360         delattr(ct, 'dbname')
361
362
363 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
364