[FIX] osv: Automatically retry the typical transaction serialization errors
[odoo/odoo.git] / bin / osv / osv.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 #
23 # OSV: Objects Services
24 #
25
26 import orm
27 import netsvc
28 import pooler
29 import copy
30 import logging
31 from psycopg2 import IntegrityError, OperationalError, errorcodes
32 from tools.func import wraps
33 from tools.translate import translate
34 import time
35 import random
36
37 module_list = []
38 module_class_list = {}
39 class_pool = {}
40
41 PG_CONCURRENCY_ERRORS_TO_RETRY = (errorcodes.LOCK_NOT_AVAILABLE, errorcodes.SERIALIZATION_FAILURE, errorcodes.DEADLOCK_DETECTED)
42 MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
43
44 class except_osv(Exception):
45     def __init__(self, name, value, exc_type='warning'):
46         self.name = name
47         self.exc_type = exc_type
48         self.value = value
49         self.args = (exc_type, name)
50
51
52 class object_proxy(netsvc.Service):
53     def __init__(self):
54         self.logger = logging.getLogger('web-services')
55         netsvc.Service.__init__(self, 'object_proxy', audience='')
56         self.exportMethod(self.exec_workflow)
57         self.exportMethod(self.execute)
58
59     def check(f):
60         @wraps(f)
61         def wrapper(self, dbname, *args, **kwargs):
62             """ Wraps around OSV functions and normalises a few exceptions
63             """
64
65             def tr(src, ttype):
66                 # We try to do the same as the _(), but without the frame
67                 # inspection, since we aready are wrapping an osv function
68                 # trans_obj = self.get('ir.translation') cannot work yet :(
69                 ctx = {}
70                 if not kwargs:
71                     if args and isinstance(args[-1], dict):
72                         ctx = args[-1]
73                 elif isinstance(kwargs, dict):
74                     ctx = kwargs.get('context', {})
75
76                 uid = 1
77                 if args and isinstance(args[0], (long, int)):
78                     uid = args[0]
79
80                 lang = ctx and ctx.get('lang')
81                 if not (lang or hasattr(src, '__call__')):
82                     return src
83
84                 # We open a *new* cursor here, one reason is that failed SQL
85                 # queries (as in IntegrityError) will invalidate the current one.
86                 cr = False
87                 
88                 if hasattr(src, '__call__'):
89                     # callable. We need to find the right parameters to call
90                     # the  orm._sql_message(self, cr, uid, ids, context) function,
91                     # or we skip..
92                     # our signature is f(osv_pool, dbname [,uid, obj, method, args])
93                     try:
94                         if args and len(args) > 1:
95                             obj = self.get(args[1])
96                             if len(args) > 3 and isinstance(args[3], (long, int, list)):
97                                 ids = args[3]
98                             else:
99                                 ids = []
100                         cr = pooler.get_db_only(dbname).cursor()
101                         return src(obj, cr, uid, ids, context=(ctx or {}))
102                     except Exception:
103                         pass
104                     finally:
105                         if cr: cr.close()
106                    
107                     return False # so that the original SQL error will
108                                  # be returned, it is the best we have.
109
110                 try:
111                     cr = pooler.get_db_only(dbname).cursor()
112                     res = translate(cr, name=False, source_type=ttype,
113                                     lang=lang, source=src)
114                     if res:
115                         return res
116                     else:
117                         return src
118                 finally:
119                     if cr: cr.close()
120
121             def _(src):
122                 return tr(src, 'code')
123
124             tries = 0
125             while True:
126                 try:
127                     if not pooler.get_pool(dbname)._ready:
128                         raise except_osv('Database not ready', 'Currently, this database is not fully loaded and can not be used.')
129                     return f(self, dbname, *args, **kwargs)
130                 except OperationalError, e:
131                     # Automatically retry the typical transaction serialization errors
132                     if not e.pgcode in PG_CONCURRENCY_ERRORS_TO_RETRY or tries >= MAX_TRIES_ON_CONCURRENCY_FAILURE:
133                         self.logger.warning("%s, maximum number of tries reached" % errorcodes.lookup(e.pgcode))
134                         raise
135                     wait_time = random.uniform(0.0, 2 ** tries)
136                     tries += 1
137                     self.logger.info("%s, retrying %d/%d in %.04f sec..." % (errorcodes.lookup(e.pgcode), tries, MAX_TRIES_ON_CONCURRENCY_FAILURE, wait_time))
138                     time.sleep(wait_time)
139                 except orm.except_orm, inst:
140                     if inst.name == 'AccessError':
141                         self.logger.debug("AccessError", exc_info=True)
142                     self.abortResponse(1, inst.name, 'warning', inst.value)
143                 except except_osv, inst:
144                     self.abortResponse(1, inst.name, inst.exc_type, inst.value)
145                 except IntegrityError, inst:
146                     osv_pool = pooler.get_pool(dbname)
147                     for key in osv_pool._sql_error.keys():
148                         if key in inst[0]:
149                             self.abortResponse(1, _('Constraint Error'), 'warning',
150                                             tr(osv_pool._sql_error[key], 'sql_constraint') or inst[0])
151                     if inst.pgcode in (errorcodes.NOT_NULL_VIOLATION, errorcodes.FOREIGN_KEY_VIOLATION, errorcodes.RESTRICT_VIOLATION):
152                         msg = _('The operation cannot be completed, probably due to the following:\n- deletion: you may be trying to delete a record while other records still reference it\n- creation/update: a mandatory field is not correctly set')
153                         self.logger.debug("IntegrityError", exc_info=True)
154                         try:
155                             errortxt = inst.pgerror.replace('«','"').replace('»','"')
156                             if '"public".' in errortxt:
157                                 context = errortxt.split('"public".')[1]
158                                 model_name = table = context.split('"')[1]
159                             else:
160                                 last_quote_end = errortxt.rfind('"')
161                                 last_quote_begin = errortxt.rfind('"', 0, last_quote_end)
162                                 model_name = table = errortxt[last_quote_begin+1:last_quote_end].strip()
163                             model = table.replace("_",".")
164                             model_obj = osv_pool.get(model)
165                             if model_obj:
166                                 model_name = model_obj._description or model_obj._name
167                             msg += _('\n\n[object with reference: %s - %s]') % (model_name, model)
168                         except Exception:
169                             pass
170                         self.abortResponse(1, _('Integrity Error'), 'warning', msg)
171                     else:
172                         self.abortResponse(1, _('Integrity Error'), 'warning', inst[0])
173                 except Exception:
174                     self.logger.exception("Uncaught exception")
175                     raise
176
177         return wrapper
178
179     def execute_cr(self, cr, uid, obj, method, *args, **kw):
180         object = pooler.get_pool(cr.dbname).get(obj)
181         if not object:
182             raise except_osv('Object Error', 'Object %s doesn\'t exist' % str(obj))
183         return getattr(object, method)(cr, uid, *args, **kw)
184
185     @check
186     def execute(self, db, uid, obj, method, *args, **kw):
187         cr = pooler.get_db(db).cursor()
188         try:
189             try:
190                 if method.startswith('_'):
191                     raise except_osv('Access Denied', 'Private methods (such as %s) cannot be called remotely.' % (method,))
192                 res = self.execute_cr(cr, uid, obj, method, *args, **kw)
193                 if res is None:
194                     self.logger.warning('The method %s of the object %s can not return `None` !', method, obj)
195                 cr.commit()
196             except Exception:
197                 cr.rollback()
198                 raise
199         finally:
200             cr.close()
201         return res
202
203     def exec_workflow_cr(self, cr, uid, obj, method, *args):
204         wf_service = netsvc.LocalService("workflow")
205         return wf_service.trg_validate(uid, obj, args[0], method, cr)
206
207     @check
208     def exec_workflow(self, db, uid, obj, method, *args):
209         cr = pooler.get_db(db).cursor()
210         try:
211             try:
212                 res = self.exec_workflow_cr(cr, uid, obj, method, *args)
213                 cr.commit()
214             except Exception:
215                 cr.rollback()
216                 raise
217         finally:
218             cr.close()
219         return res
220
221 object_proxy()
222
223 class osv_pool(object):
224     def __init__(self):
225         self._ready = False
226         self.obj_pool = {}
227         self.module_object_list = {}
228         self.created = []
229         self._sql_error = {}
230         self._store_function = {}
231         self._init = True
232         self._init_parent = {}
233         self.logger = logging.getLogger("pool")
234
235     def init_set(self, cr, mode):
236         different = mode != self._init
237         if different:
238             if mode:
239                 self._init_parent = {}
240             if not mode:
241                 for o in self._init_parent:
242                     self.get(o)._parent_store_compute(cr)
243             self._init = mode
244
245         self._ready = True
246         return different
247
248
249     def obj_list(self):
250         return self.obj_pool.keys()
251
252     # adds a new object instance to the object pool.
253     # if it already existed, the instance is replaced
254     def add(self, name, obj_inst):
255         if name in self.obj_pool:
256             del self.obj_pool[name]
257         self.obj_pool[name] = obj_inst
258
259         module = str(obj_inst.__class__)[6:]
260         module = module[:len(module)-1]
261         module = module.split('.')[0][2:]
262         self.module_object_list.setdefault(module, []).append(obj_inst)
263
264     # Return None if object does not exist
265     def get(self, name):
266         obj = self.obj_pool.get(name, None)
267         return obj
268
269     #TODO: pass a list of modules to load
270     def instanciate(self, module, cr):
271         res = []
272         class_list = module_class_list.get(module, [])
273         for klass in class_list:
274             res.append(klass.createInstance(self, module, cr))
275         return res
276
277 class osv_base(object):
278     def __init__(self, pool, cr):
279         pool.add(self._name, self)
280         self.pool = pool
281         super(osv_base, self).__init__(cr)
282
283     def __new__(cls):
284         module = str(cls)[6:]
285         module = module[:len(module)-1]
286         module = module.split('.')[0][2:]
287         if not hasattr(cls, '_module'):
288             cls._module = module
289         module_class_list.setdefault(cls._module, []).append(cls)
290         class_pool[cls._name] = cls
291         if module not in module_list:
292             module_list.append(cls._module)
293         return None
294
295 class osv_memory(osv_base, orm.orm_memory):
296     #
297     # Goal: try to apply inheritancy at the instanciation level and
298     #       put objects in the pool var
299     #
300     def createInstance(cls, pool, module, cr):
301         parent_names = getattr(cls, '_inherit', None)
302         if parent_names:
303             if isinstance(parent_names, (str, unicode)):
304                 name = cls._name or parent_names
305                 parent_names = [parent_names]
306             else:
307                 name = cls._name
308             if not name:
309                 raise TypeError('_name is mandatory in case of multiple inheritance')
310
311             for parent_name in ((type(parent_names)==list) and parent_names or [parent_names]):
312                 parent_class = pool.get(parent_name).__class__
313                 assert pool.get(parent_name), "parent class %s does not exist in module %s !" % (parent_name, module)
314                 nattr = {}
315                 for s in ('_columns', '_defaults'):
316                     new = copy.copy(getattr(pool.get(parent_name), s))
317                     if s == '_columns':
318                         # Don't _inherit custom fields.
319                         for c in new.keys():
320                             if new[c].manual:
321                                 del new[c]
322                         # Duplicate float fields because they have a .digits
323                         # cache (which must be per-pool, not server-wide).
324                         for c in new.keys():
325                             if new[c]._type == 'float':
326                                 new[c] = copy.copy(new[c])
327                     if hasattr(new, 'update'):
328                         new.update(cls.__dict__.get(s, {}))
329                     else:
330                         new.extend(cls.__dict__.get(s, []))
331                     nattr[s] = new
332                 cls = type(name, (cls, parent_class), nattr)
333         else:
334             # Duplicate float fields because they have a .digits
335             # cache (which must be per-pool, not server-wide).
336             for field_name, field in cls._columns.items():
337                 if field._type == 'float':
338                     cls._columns[field_name] = copy.copy(field)
339
340         obj = object.__new__(cls)
341         obj.__init__(pool, cr)
342         return obj
343     createInstance = classmethod(createInstance)
344
345 class osv(osv_base, orm.orm):
346     #
347     # Goal: try to apply inheritancy at the instanciation level and
348     #       put objects in the pool var
349     #
350     def createInstance(cls, pool, module, cr):
351         parent_names = getattr(cls, '_inherit', None)
352         if parent_names:
353             if isinstance(parent_names, (str, unicode)):
354                 name = cls._name or parent_names
355                 parent_names = [parent_names]
356             else:
357                 name = cls._name
358             if not name:
359                 raise TypeError('_name is mandatory in case of multiple inheritance')
360
361             for parent_name in ((type(parent_names)==list) and parent_names or [parent_names]):
362                 parent_class = pool.get(parent_name).__class__
363                 assert pool.get(parent_name), "parent class %s does not exist in module %s !" % (parent_name, module)
364                 nattr = {}
365                 for s in ('_columns', '_defaults', '_inherits', '_constraints', '_sql_constraints'):
366                     new = copy.copy(getattr(pool.get(parent_name), s))
367                     if s == '_columns':
368                         # Don't _inherit custom fields.
369                         for c in new.keys():
370                             if new[c].manual:
371                                 del new[c]
372                         # Duplicate float fields because they have a .digits
373                         # cache (which must be per-pool, not server-wide).
374                         for c in new.keys():
375                             if new[c]._type == 'float':
376                                 new[c] = copy.copy(new[c])
377
378                     if hasattr(new, 'update'):
379                         new.update(cls.__dict__.get(s, {}))
380                     else:
381                         if s=='_constraints':
382                             for c in cls.__dict__.get(s, []):
383                                 exist = False
384                                 for c2 in range(len(new)):
385                                      #For _constraints, we should check field and methods as well
386                                      if new[c2][2]==c[2] and (new[c2][0] == c[0] \
387                                             or getattr(new[c2][0],'__name__', True) == \
388                                                 getattr(c[0],'__name__', False)):
389                                         # If new class defines a constraint with
390                                         # same function name, we let it override
391                                         # the old one.
392                                         new[c2] = c
393                                         exist = True
394                                         break
395                                 if not exist:
396                                     new.append(c)
397                         else:
398                             new.extend(cls.__dict__.get(s, []))
399                     nattr[s] = new
400                 cls = type(name, (cls, parent_class), nattr)
401         else:
402             # Duplicate float fields because they have a .digits
403             # cache (which must be per-pool, not server-wide).
404             for field_name, field in cls._columns.items():
405                 if field._type == 'float':
406                     cls._columns[field_name] = copy.copy(field)
407         obj = object.__new__(cls)
408         obj.__init__(pool, cr)
409         return obj
410     createInstance = classmethod(createInstance)
411
412 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
413