[FIX] Fields.function of type integer with multi attribute behaviour improved
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #    
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.     
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 import logging
25 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
26 from psycopg2.psycopg1 import cursor as psycopg1cursor
27 from psycopg2.pool import PoolError
28
29 import psycopg2.extensions
30
31 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
32
33 types_mapping = {
34     'date': (1082,),
35     'time': (1083,),
36     'datetime': (1114,),
37 }
38
39 def unbuffer(symb, cr):
40     if symb is None: return None
41     return str(symb)
42
43 def undecimalize(symb, cr):
44     if symb is None: return None
45     return float(symb)
46
47 for name, typeoid in types_mapping.items():
48     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
49 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
50
51
52 import tools
53 from tools.func import wraps
54 from datetime import datetime as mdt
55 from datetime import timedelta
56 import threading
57 from inspect import stack
58
59 import re
60 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
61 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
62
63 sql_counter = 0
64
65 class Cursor(object):
66     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
67     __logger = logging.getLogger('db.cursor')
68
69     def check(f):
70         @wraps(f)
71         def wrapper(self, *args, **kwargs):
72             if self.__closed:
73                 raise psycopg2.ProgrammingError('Unable to use the cursor after having closed it')
74             return f(self, *args, **kwargs)
75         return wrapper
76
77     def __init__(self, pool, dbname, serialized=False):
78         self.sql_from_log = {}
79         self.sql_into_log = {}
80         self.sql_log = False
81         self.sql_log_count = 0
82         self.__closed = True    # avoid the call of close() (by __del__) if an exception
83                                 # is raised by any of the following initialisations
84         self._pool = pool
85         self.dbname = dbname
86         self._serialized = serialized
87         self._cnx = pool.borrow(dsn(dbname))
88         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
89         self.__closed = False   # real initialisation value
90         self.autocommit(False)
91         self.__caller = tuple(stack()[2][1:3])
92
93     def __del__(self):
94         if not self.__closed:
95             # Oops. 'self' has not been closed explicitly.
96             # The cursor will be deleted by the garbage collector,
97             # but the database connection is not put back into the connection
98             # pool, preventing some operation on the database like dropping it.
99             # This can also lead to a server overload.
100             msg = "Cursor not closed explicitly\n"  \
101                   "Cursor was created at %s:%s"
102             self.__logger.warn(msg, *self.__caller)
103             self.close()
104
105     @check
106     def execute(self, query, params=None):
107         if '%d' in query or '%f' in query:
108             self.__logger.warn(query)
109             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
110                                "Use only %s")
111             if params:
112                 query = query.replace('%d', '%s').replace('%f', '%s')
113
114         if self.sql_log:
115             now = mdt.now()
116
117         try:
118             params = params or None
119             res = self._obj.execute(query, params)
120         except psycopg2.ProgrammingError, pe:
121             self.__logger.error("Programming error: %s, in query %s" % (pe, query))
122             raise
123         except Exception:
124             self.__logger.exception("bad query: %s", self._obj.query)
125             raise
126
127         if self.sql_log:
128             delay = mdt.now() - now
129             delay = delay.seconds * 1E6 + delay.microseconds
130
131             self.__logger.debug("query: %s", self._obj.query)
132             self.sql_log_count+=1
133             res_from = re_from.match(query.lower())
134             if res_from:
135                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
136                 self.sql_from_log[res_from.group(1)][0] += 1
137                 self.sql_from_log[res_from.group(1)][1] += delay
138             res_into = re_into.match(query.lower())
139             if res_into:
140                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
141                 self.sql_into_log[res_into.group(1)][0] += 1
142                 self.sql_into_log[res_into.group(1)][1] += delay
143         return res
144
145
146     def split_for_in_conditions(self, ids):
147         """Split a list of identifiers into one or more smaller tuples
148            safe for IN conditions, after uniquifying them."""
149         return tools.misc.split_every(self.IN_MAX, set(ids))
150
151     def print_log(self):
152         global sql_counter
153         sql_counter += self.sql_log_count
154         if not self.sql_log:
155             return
156         def process(type):
157             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
158             sum = 0
159             if sqllogs[type]:
160                 sqllogitems = sqllogs[type].items()
161                 sqllogitems.sort(key=lambda k: k[1][1])
162                 self.__logger.debug("SQL LOG %s:", type)
163                 for r in sqllogitems:
164                     delay = timedelta(microseconds=r[1][1])
165                     self.__logger.debug("table: %s: %s/%s",
166                                         r[0], delay, r[1][0])
167                     sum+= r[1][1]
168                 sqllogs[type].clear()
169             sum = timedelta(microseconds=sum)
170             self.__logger.debug("SUM %s:%s/%d [%d]",
171                                 type, sum, self.sql_log_count, sql_counter)
172             sqllogs[type].clear()
173         process('from')
174         process('into')
175         self.sql_log_count = 0
176         self.sql_log = False
177
178     @check
179     def close(self):
180         if not self._obj:
181             return
182
183         self.print_log()
184
185         if not self._serialized:
186             self.rollback() # Ensure we close the current transaction.
187
188         self._obj.close()
189
190         # This force the cursor to be freed, and thus, available again. It is
191         # important because otherwise we can overload the server very easily
192         # because of a cursor shortage (because cursors are not garbage
193         # collected as fast as they should). The problem is probably due in
194         # part because browse records keep a reference to the cursor.
195         del self._obj
196         self.__closed = True
197         self._pool.give_back(self._cnx)
198
199     @check
200     def autocommit(self, on):
201         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
202         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
203
204     @check
205     def commit(self):
206         return self._cnx.commit()
207
208     @check
209     def rollback(self):
210         return self._cnx.rollback()
211
212     @check
213     def __getattr__(self, name):
214         return getattr(self._obj, name)
215
216
217 class ConnectionPool(object):
218
219     __logger = logging.getLogger('db.connection_pool')
220
221     def locked(fun):
222         @wraps(fun)
223         def _locked(self, *args, **kwargs):
224             self._lock.acquire()
225             try:
226                 return fun(self, *args, **kwargs)
227             finally:
228                 self._lock.release()
229         return _locked
230
231
232     def __init__(self, maxconn=64):
233         self._connections = []
234         self._maxconn = max(maxconn, 1)
235         self._lock = threading.Lock()
236
237     def __repr__(self):
238         used = len([1 for c, u in self._connections[:] if u])
239         count = len(self._connections)
240         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
241
242     def _debug(self, msg):
243         self.__logger.debug(repr(self))
244         self.__logger.debug(msg)
245
246     @locked
247     def borrow(self, dsn):
248         self._debug('Borrow connection to %s' % (dsn,))
249
250         result = None
251         for i, (cnx, used) in enumerate(self._connections):
252             if not used and dsn_are_equals(cnx.dsn, dsn):
253                 self._debug('Existing connection found at index %d' % i)
254
255                 self._connections.pop(i)
256                 self._connections.append((cnx, True))
257
258                 result = cnx
259                 break
260         if result:
261             return result
262
263         if len(self._connections) >= self._maxconn:
264             # try to remove the oldest connection not used
265             for i, (cnx, used) in enumerate(self._connections):
266                 if not used:
267                     self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
268                     self._connections.pop(i)
269                     break
270             else:
271                 # note: this code is called only if the for loop has completed (no break)
272                 raise PoolError('The Connection Pool Is Full')
273
274         self._debug('Create new connection')
275         result = psycopg2.connect(dsn=dsn)
276         self._connections.append((result, True))
277         return result
278
279     @locked
280     def give_back(self, connection):
281         self._debug('Give back connection to %s' % (connection.dsn,))
282         for i, (cnx, used) in enumerate(self._connections):
283             if cnx is connection:
284                 self._connections.pop(i)
285                 self._connections.append((cnx, False))
286                 break
287         else:
288             raise PoolError('This connection does not below to the pool')
289
290     @locked
291     def close_all(self, dsn):
292         self._debug('Close all connections to %s' % (dsn,))
293         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
294             if dsn_are_equals(cnx.dsn, dsn):
295                 cnx.close()
296                 self._connections.pop(i)
297
298
299 class Connection(object):
300     __logger = logging.getLogger('db.connection')
301
302     def __init__(self, pool, dbname):
303         self.dbname = dbname
304         self._pool = pool
305
306     def cursor(self, serialized=False):
307         cursor_type = serialized and 'serialized ' or ''
308         self.__logger.debug('create %scursor to "%s"' % (cursor_type, self.dbname,))
309         return Cursor(self._pool, self.dbname, serialized=serialized)
310
311     def serialized_cursor(self):
312         return self.cursor(True)
313
314     def __nonzero__(self):
315         """Check if connection is possible"""
316         try:
317             cr = self.cursor()
318             cr.close()
319             return True
320         except:
321             return False
322
323
324 _dsn = ''
325 for p in ('host', 'port', 'user', 'password'):
326     cfg = tools.config['db_' + p]
327     if cfg:
328         _dsn += '%s=%s ' % (p, cfg)
329
330 def dsn(db_name):
331     return '%sdbname=%s' % (_dsn, db_name)
332
333 def dsn_are_equals(first, second):
334     def key(dsn):
335         k = dict(x.split('=', 1) for x in dsn.strip().split())
336         k.pop('password', None) # password is not relevant
337         return k
338     return key(first) == key(second)
339
340
341 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
342
343 def db_connect(db_name):
344     return Connection(_Pool, db_name)
345
346 def close_db(db_name):
347     _Pool.close_all(dsn(db_name))
348     tools.cache.clean_caches_for_db(db_name)
349
350
351 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
352