1 # -*- encoding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 __all__ = ['db_connect', 'close_db']
25 from threading import currentThread
27 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
28 from psycopg2.psycopg1 import cursor as psycopg1cursor
29 from psycopg2.pool import PoolError
31 import psycopg2.extensions
33 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
41 def unbuffer(symb, cr):
42 if symb is None: return None
45 def undecimalize(symb, cr):
46 if symb is None: return None
49 for name, typeoid in types_mapping.items():
50 psycopg2.extensions.register_type(psycopg2.extensions.new_type(typeoid, name, lambda x, cr: x))
51 psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,), 'float', undecimalize))
55 from tools.func import wraps, partial
56 from datetime import datetime as mdt
57 from datetime import timedelta
59 from inspect import stack
62 re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
63 re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
66 def log(msg, lvl=netsvc.LOG_DEBUG):
67 logger = netsvc.Logger()
68 logger.notifyChannel('sql', lvl, msg)
75 def wrapper(self, *args, **kwargs):
77 raise psycopg2.ProgrammingError('Unable to use the cursor after having closing it')
78 return f(self, *args, **kwargs)
81 def __init__(self, pool, dbname, serialized=False):
82 self.sql_from_log = {}
83 self.sql_into_log = {}
85 self.sql_log_count = 0
87 self.__closed = True # avoid the call of close() (by __del__) if an exception
88 # is raised by any of the following initialisations
91 self._serialized = serialized
92 self._cnx = pool.borrow(dsn(dbname))
93 self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
94 self.__closed = False # real initialisation value
95 self.autocommit(False)
96 self.__caller = tuple(stack()[2][1:3])
100 # Oops. 'self' has not been closed explicitly.
101 # The cursor will be deleted by the garbage collector,
102 # but the database connection is not put back into the connection
103 # pool, preventing some operation on the database like dropping it.
104 # This can also lead to a server overload.
105 msg = "Cursor not closed explicitly\n" \
106 "Cursor was created at %s:%s" % self.__caller
107 log(msg, netsvc.LOG_WARNING)
111 def execute(self, query, params=None):
112 if '%d' in query or '%f' in query:
113 log(query, netsvc.LOG_WARNING)
114 log("SQL queries mustn't contain %d or %f anymore. Use only %s", netsvc.LOG_WARNING)
116 query = query.replace('%d', '%s').replace('%f', '%s')
122 params = params or None
123 res = self._obj.execute(query, params)
125 q = self._obj.query or query
126 log("bad query: %s" % (q,))
129 from traceback import format_stack
130 log(''.join(format_stack()))
134 delay = mdt.now() - now
135 delay = delay.seconds * 1E6 + delay.microseconds
137 log("query: %s" % self._obj.query)
138 self.sql_log_count+=1
139 res_from = re_from.match(query.lower())
141 self.sql_from_log.setdefault(res_from.group(1), [0, 0])
142 self.sql_from_log[res_from.group(1)][0] += 1
143 self.sql_from_log[res_from.group(1)][1] += delay
144 res_into = re_into.match(query.lower())
146 self.sql_into_log.setdefault(res_into.group(1), [0, 0])
147 self.sql_into_log[res_into.group(1)][0] += 1
148 self.sql_into_log[res_into.group(1)][1] += delay
156 sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
157 if not sqllogs[type]:
159 sqllogitems = sqllogs[type].items()
160 sqllogitems.sort(key=lambda k: k[1][1])
162 log("SQL LOG %s:" % (type,))
163 for r in sqllogitems:
164 delay = timedelta(microseconds=r[1][1])
165 log("table: %s: %s/%s" %(r[0], str(delay), r[1][0]))
167 sum = timedelta(microseconds=sum)
168 log("SUM:%s/%d" % (str(sum), self.sql_log_count))
169 sqllogs[type].clear()
172 self.sql_log_count = 0
177 return self._close(False)
179 def _close(self, leak=False):
185 if not self._serialized:
186 self.rollback() # Ensure we close the current transaction.
190 # This force the cursor to be freed, and thus, available again. It is
191 # important because otherwise we can overload the server very easily
192 # because of a cursor shortage (because cursors are not garbage
193 # collected as fast as they should). The problem is probably due in
194 # part because browse records keep a reference to the cursor.
199 self._cnx.leaked = True
201 keep_in_pool = self.dbname not in ('template1', 'template0', 'postgres')
202 self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
205 def autocommit(self, on):
206 offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
207 self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
211 return self._cnx.commit()
215 return self._cnx.rollback()
218 def __getattr__(self, name):
219 return getattr(self._obj, name)
222 class PsycoConnection(psycopg2.extensions.connection):
225 class ConnectionPool(object):
229 def _locked(self, *args, **kwargs):
232 return fun(self, *args, **kwargs)
238 def __init__(self, maxconn=64):
239 self._connections = []
240 self._maxconn = max(maxconn, 1)
241 self._lock = threading.Lock()
242 self._log = partial(netsvc.Logger().notifyChannel, 'ConnectionPool')
245 used = len([1 for c, u in self._connections[:] if u])
246 count = len(self._connections)
247 return "ConnectionPool(used=%d/count=%d/max=%d)" % (used, count, self._maxconn)
249 def _debug(self, msg):
250 msg = "%s %s" % (repr(self), msg)
251 self._log(netsvc.LOG_DEBUG, msg)
252 def _info(self, msg):
253 msg = "%s %s" % (repr(self), msg)
254 self._log(netsvc.LOG_INFO, msg)
255 def _warn(self, msg):
256 msg = "%s %s" % (repr(self), msg)
257 self._log(netsvc.LOG_WARNING, msg)
260 def borrow(self, dsn):
261 # free leaked connections
262 for i, (cnx, _) in tools.reverse_enumerate(self._connections):
263 if getattr(cnx, 'leaked', False):
264 delattr(cnx, 'leaked')
265 self._connections.pop(i)
266 self._connections.append((cnx, False))
267 self._warn('Free leaked connection to %s' % (cnx.dsn,))
269 for i, (cnx, used) in enumerate(self._connections):
270 if not used and dsn_are_equals(cnx.dsn, dsn):
271 self._connections.pop(i)
272 self._connections.append((cnx, True))
276 if len(self._connections) >= self._maxconn:
277 # try to remove the oldest connection not used
278 for i, (cnx, used) in enumerate(self._connections):
280 self._connections.pop(i)
281 self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
284 # note: this code is called only if the for loop has completed (no break)
285 raise PoolError('The Connection Pool Is Full')
287 result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection)
288 self._connections.append((result, True))
292 def give_back(self, connection, keep_in_pool=True):
293 for i, (cnx, used) in enumerate(self._connections):
294 if cnx is connection:
295 self._connections.pop(i)
297 self._connections.append((cnx, False))
300 raise PoolError('This connection does not below to the pool')
303 def close_all(self, dsn):
304 self._info('Close all connections to %s' % (dsn,))
305 for i, (cnx, used) in tools.reverse_enumerate(self._connections):
306 if dsn_are_equals(cnx.dsn, dsn):
308 self._connections.pop(i)
311 class Connection(object):
312 def __init__(self, pool, dbname):
316 def cursor(self, serialized=False):
317 return Cursor(self._pool, self.dbname, serialized=serialized)
319 def serialized_cursor(self):
320 return self.cursor(True)
322 def __nonzero__(self):
323 """Check if connection is possible"""
333 for p in ('host', 'port', 'user', 'password'):
334 cfg = tools.config['db_' + p]
336 _dsn += '%s=%s ' % (p, cfg)
339 return '%sdbname=%s' % (_dsn, db_name)
341 def dsn_are_equals(first, second):
343 k = dict(x.split('=', 1) for x in dsn.strip().split())
344 k.pop('password', None) # password is not relevant
346 return key(first) == key(second)
349 _Pool = ConnectionPool(int(tools.config['db_maxconn']))
351 def db_connect(db_name):
352 currentThread().dbname = db_name
353 return Connection(_Pool, db_name)
355 def close_db(db_name):
356 _Pool.close_all(dsn(db_name))
357 tools.cache.clean_caches_for_db(db_name)
359 if hasattr(ct, 'dbname'):
360 delattr(ct, 'dbname')
363 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: