[Fix] report: if header is false then not display header
[odoo/odoo.git] / bin / sql_db.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 __all__ = ['db_connect', 'close_db']
23
24 from threading import currentThread
25 import logging
26 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
27 from psycopg2.psycopg1 import cursor as psycopg1cursor
28 from psycopg2.pool import PoolError
29
30 import psycopg2.extensions
31 import warnings
32
33 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
34
35 types_mapping = {
36     'date': (1082,),
37     'time': (1083,),
38     'datetime': (1114,),
39 }
40
41 def unbuffer(symb, cr):
42     if symb is None: return None
43     return str(symb)
44
45 def undecimalize(symb, cr):
46     if symb is None: return None
47     return float(symb)
48
49 for name, typeoid in types_mapping.items():
50     psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
51 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
52
53
54 import tools
55 from tools.func import wraps, frame_codeinfo
56 from datetime import datetime as mdt
57 from datetime import timedelta
58 import threading
59 from inspect import currentframe
60
61 import re
62 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
63 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
64
65 sql_counter = 0
66
67 class Cursor(object):
68     IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
69     __logger = logging.getLogger('db.cursor')
70
71     def check(f):
72         @wraps(f)
73         def wrapper(self, *args, **kwargs):
74             if self.__closed:
75                 raise psycopg2.OperationalError('Unable to use the cursor after having closed it')
76             return f(self, *args, **kwargs)
77         return wrapper
78
79     def __init__(self, pool, dbname, serialized=False):
80         self.sql_from_log = {}
81         self.sql_into_log = {}
82
83         # default log level determined at cursor creation, could be
84         # overridden later for debugging purposes
85         self.sql_log = self.__logger.isEnabledFor(logging.DEBUG_SQL)
86
87         self.sql_log_count = 0
88         self.__closed = True    # avoid the call of close() (by __del__) if an exception
89                                 # is raised by any of the following initialisations
90         self._pool = pool
91         self.dbname = dbname
92         self._serialized = serialized
93         self._cnx = pool.borrow(dsn(dbname))
94         self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
95         self.__closed = False   # real initialisation value
96         self.autocommit(False)
97         if self.sql_log:
98             self.__caller = frame_codeinfo(currentframe(),2)
99         else:
100             self.__caller = False
101
102     def __del__(self):
103         if not self.__closed:
104             # Oops. 'self' has not been closed explicitly.
105             # The cursor will be deleted by the garbage collector,
106             # but the database connection is not put back into the connection
107             # pool, preventing some operation on the database like dropping it.
108             # This can also lead to a server overload.
109             msg = "Cursor not closed explicitly\n"
110             if self.__caller:
111                 msg += "Cursor was created at %s:%s" % self.__caller
112             else:
113                 msg += "Please enable sql debugging to trace the caller."
114             self.__logger.warn(msg)
115             self._close(True)
116
117     @check
118     def execute(self, query, params=None, log_exceptions=True):
119         if '%d' in query or '%f' in query:
120             self.__logger.warn(query)
121             self.__logger.warn("SQL queries cannot contain %d or %f anymore. "
122                                "Use only %s")
123
124         if self.sql_log:
125             now = mdt.now()
126
127         try:
128             params = params or None
129             res = self._obj.execute(query, params)
130         except psycopg2.ProgrammingError, pe:
131             if log_exceptions or self.sql_log:
132                 self.__logger.error("Programming error: %s, in query %s", pe, query)
133             raise
134         except Exception:
135             if log_exceptions or self.sql_log:
136                 self.__logger.exception("bad query: %s", self._obj.query or query)
137             raise
138
139         if self.sql_log:
140             delay = mdt.now() - now
141             delay = delay.seconds * 1E6 + delay.microseconds
142
143             self.__logger.log(logging.DEBUG_SQL, "query: %s", self._obj.query)
144             self.sql_log_count+=1
145             res_from = re_from.match(query.lower())
146             if res_from:
147                 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
148                 self.sql_from_log[res_from.group(1)][0] += 1
149                 self.sql_from_log[res_from.group(1)][1] += delay
150             res_into = re_into.match(query.lower())
151             if res_into:
152                 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
153                 self.sql_into_log[res_into.group(1)][0] += 1
154                 self.sql_into_log[res_into.group(1)][1] += delay
155         return res
156
157
158     def split_for_in_conditions(self, ids):
159         """Split a list of identifiers into one or more smaller tuples
160            safe for IN conditions, after uniquifying them."""
161         return tools.misc.split_every(self.IN_MAX, set(ids))
162
163     def print_log(self):
164         global sql_counter
165         sql_counter += self.sql_log_count
166         if not self.sql_log:
167             return
168         def process(type):
169             sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
170             sum = 0
171             if sqllogs[type]:
172                 sqllogitems = sqllogs[type].items()
173                 sqllogitems.sort(key=lambda k: k[1][1])
174                 self.__logger.log(logging.DEBUG_SQL, "SQL LOG %s:", type)
175                 sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0]))
176                 for r in sqllogitems:
177                     delay = timedelta(microseconds=r[1][1])
178                     self.__logger.log(logging.DEBUG_SQL, "table: %s: %s/%s",
179                                         r[0], delay, r[1][0])
180                     sum+= r[1][1]
181                 sqllogs[type].clear()
182             sum = timedelta(microseconds=sum)
183             self.__logger.log(logging.DEBUG_SQL, "SUM %s:%s/%d [%d]",
184                                 type, sum, self.sql_log_count, sql_counter)
185             sqllogs[type].clear()
186         process('from')
187         process('into')
188         self.sql_log_count = 0
189         self.sql_log = False
190
191     @check
192     def close(self):
193         return self._close(False)
194
195     def _close(self, leak=False):
196         if not self._obj:
197             return
198
199         self.print_log()
200
201         if not self._serialized:
202             self.rollback() # Ensure we close the current transaction.
203
204         self._obj.close()
205
206         # This force the cursor to be freed, and thus, available again. It is
207         # important because otherwise we can overload the server very easily
208         # because of a cursor shortage (because cursors are not garbage
209         # collected as fast as they should). The problem is probably due in
210         # part because browse records keep a reference to the cursor.
211         del self._obj
212         self.__closed = True
213
214         if leak:
215             self._cnx.leaked = True
216         else:
217             keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
218             self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
219
220     @check
221     def autocommit(self, on):
222         offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
223         self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
224
225     @check
226     def commit(self):
227         return self._cnx.commit()
228
229     @check
230     def rollback(self):
231         return self._cnx.rollback()
232
233     @check
234     def __getattr__(self, name):
235         return getattr(self._obj, name)
236
237
238 class PsycoConnection(psycopg2.extensions.connection):
239     pass
240
241 class ConnectionPool(object):
242
243     __logger = logging.getLogger('db.connection_pool')
244
245     def locked(fun):
246         @wraps(fun)
247         def _locked(self, *args, **kwargs):
248             self._lock.acquire()
249             try:
250                 return fun(self, *args, **kwargs)
251             finally:
252                 self._lock.release()
253         return _locked
254
255
256     def __init__(self, maxconn=64):
257         self._connections = []
258         self._maxconn = max(maxconn, 1)
259         self._lock = threading.Lock()
260
261     def __repr__(self):
262         used = len([1 for c, u in self._connections[:] if u])
263         count = len(self._connections)
264         return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
265
266     def _debug(self, msg, *args):
267         self.__logger.log(logging.DEBUG_SQL, ('%r ' + msg), self, *args)
268
269     @locked
270     def borrow(self, dsn):
271         self._debug('Borrow connection to %r', dsn)
272
273         # free leaked connections
274         for i, (cnx, _) in tools.reverse_enumerate(self._connections):
275             if getattr(cnx, 'leaked', False):
276                 delattr(cnx, 'leaked')
277                 self._connections.pop(i)
278                 self._connections.append((cnx, False))
279                 self.__logger.warn('%r: Free leaked connection to %r', self, cnx.dsn)
280
281         for i, (cnx, used) in enumerate(self._connections):
282             if not used and dsn_are_equals(cnx.dsn, dsn):
283                 self._connections.pop(i)
284                 self._connections.append((cnx, True))
285                 self._debug('Existing connection found at index %d', i)
286
287                 return cnx
288
289         if len(self._connections) >= self._maxconn:
290             # try to remove the oldest connection not used
291             for i, (cnx, used) in enumerate(self._connections):
292                 if not used:
293                     self._connections.pop(i)
294                     self._debug('Removing old connection at index %d: %r', i, cnx.dsn)
295                     break
296             else:
297                 # note: this code is called only if the for loop has completed (no break)
298                 raise PoolError('The Connection Pool Is Full')
299
300         try:
301             result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
302         except psycopg2.Error, e:
303             self.__logger.exception('Connection to the database failed')
304             raise
305         self._connections.append((result, True))
306         self._debug('Create new connection')
307         return result
308
309     @locked
310     def give_back(self, connection, keep_in_pool=True):
311         self._debug('Give back connection to %r', connection.dsn)
312         for i, (cnx, used) in enumerate(self._connections):
313             if cnx is connection:
314                 self._connections.pop(i)
315                 if keep_in_pool:
316                     self._connections.append((cnx, False))
317                     self._debug('Put connection to %r in pool', cnx.dsn)
318                 else:
319                     self._debug('Forgot connection to %r', cnx.dsn)
320                 break
321         else:
322             raise PoolError('This connection does not below to the pool')
323
324     @locked
325     def close_all(self, dsn):
326         self.__logger.info('%r: Close all connections to %r', self, dsn)
327         for i, (cnx, used) in tools.reverse_enumerate(self._connections):
328             if dsn_are_equals(cnx.dsn, dsn):
329                 cnx.close()
330                 self._connections.pop(i)
331
332
333 class Connection(object):
334     __logger = logging.getLogger('db.connection')
335
336     def __init__(self, pool, dbname):
337         self.dbname = dbname
338         self._pool = pool
339
340     def cursor(self, serialized=False):
341         cursor_type = serialized and 'serialized ' or ''
342         self.__logger.log(logging.DEBUG_SQL, 'create %scursor to %r', cursor_type, self.dbname)
343         return Cursor(self._pool, self.dbname, serialized=serialized)
344
345     def serialized_cursor(self):
346         return self.cursor(True)
347
348     def __nonzero__(self):
349         """Check if connection is possible"""
350         try:
351             warnings.warn("You use an expensive function to test a connection.",
352                       DeprecationWarning, stacklevel=1)
353             cr = self.cursor()
354             cr.close()
355             return True
356         except Exception:
357             return False
358
359
360 _dsn = ''
361 for p in ('host', 'port', 'user', 'password'):
362     cfg = tools.config['db_' + p]
363     if cfg:
364         _dsn += '%s=%s ' % (p, cfg)
365
366 def dsn(db_name):
367     return '%sdbname=%s' % (_dsn, db_name)
368
369 def dsn_are_equals(first, second):
370     def key(dsn):
371         k = dict(x.split('=', 1) for x in dsn.strip().split())
372         k.pop('password', None) # password is not relevant
373         return k
374     return key(first) == key(second)
375
376
377 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
378
379 def db_connect(db_name):
380     currentThread().dbname = db_name
381     return Connection(_Pool, db_name)
382
383 def close_db(db_name):
384     _Pool.close_all(dsn(db_name))
385     tools.cache.clean_caches_for_db(db_name)
386     ct = currentThread()
387     if hasattr(ct, 'dbname'):
388         delattr(ct, 'dbname')
389
390
391 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
392