[FIX] osv: Automatically retry the typical transaction serialization errors
[odoo/odoo.git] / openerp / osv / osv.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 #.apidoc title: Objects Services (OSV)
23
24 from functools import wraps
25 import logging
26 from psycopg2 import IntegrityError, OperationalError, errorcodes
27
28 import orm
29 import openerp
30 import openerp.netsvc as netsvc
31 import openerp.pooler as pooler
32 import openerp.sql_db as sql_db
33 from openerp.tools.translate import translate
34 from openerp.osv.orm import MetaModel, Model, TransientModel, AbstractModel
35 import openerp.exceptions
36
37 import time
38 import random
39
40 _logger = logging.getLogger(__name__)
41
42 PG_CONCURRENCY_ERRORS_TO_RETRY = (errorcodes.LOCK_NOT_AVAILABLE, errorcodes.SERIALIZATION_FAILURE, errorcodes.DEADLOCK_DETECTED)
43 MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
44
45 # Deprecated.
46 class except_osv(Exception):
47     def __init__(self, name, value):
48         self.name = name
49         self.value = value
50         self.args = (name, value)
51
52 service = None
53
54 class object_proxy(object):
55     def __init__(self):
56         global service
57         service = self
58
59     def check(f):
60         @wraps(f)
61         def wrapper(self, dbname, *args, **kwargs):
62             """ Wraps around OSV functions and normalises a few exceptions
63             """
64
65             def tr(src, ttype):
66                 # We try to do the same as the _(), but without the frame
67                 # inspection, since we aready are wrapping an osv function
68                 # trans_obj = self.get('ir.translation') cannot work yet :(
69                 ctx = {}
70                 if not kwargs:
71                     if args and isinstance(args[-1], dict):
72                         ctx = args[-1]
73                 elif isinstance(kwargs, dict):
74                     ctx = kwargs.get('context', {})
75
76                 uid = 1
77                 if args and isinstance(args[0], (long, int)):
78                     uid = args[0]
79
80                 lang = ctx and ctx.get('lang')
81                 if not (lang or hasattr(src, '__call__')):
82                     return src
83
84                 # We open a *new* cursor here, one reason is that failed SQL
85                 # queries (as in IntegrityError) will invalidate the current one.
86                 cr = False
87
88                 if hasattr(src, '__call__'):
89                     # callable. We need to find the right parameters to call
90                     # the  orm._sql_message(self, cr, uid, ids, context) function,
91                     # or we skip..
92                     # our signature is f(osv_pool, dbname [,uid, obj, method, args])
93                     try:
94                         if args and len(args) > 1:
95                             obj = self.get(args[1])
96                             if len(args) > 3 and isinstance(args[3], (long, int, list)):
97                                 ids = args[3]
98                             else:
99                                 ids = []
100                         cr = sql_db.db_connect(dbname).cursor()
101                         return src(obj, cr, uid, ids, context=(ctx or {}))
102                     except Exception:
103                         pass
104                     finally:
105                         if cr: cr.close()
106
107                     return False # so that the original SQL error will
108                                  # be returned, it is the best we have.
109
110                 try:
111                     cr = sql_db.db_connect(dbname).cursor()
112                     res = translate(cr, name=False, source_type=ttype,
113                                     lang=lang, source=src)
114                     if res:
115                         return res
116                     else:
117                         return src
118                 finally:
119                     if cr: cr.close()
120
121             def _(src):
122                 return tr(src, 'code')
123
124             tries = 0
125             while True:
126                 try:
127                     if pooler.get_pool(dbname)._init:
128                         raise except_osv('Database not ready', 'Currently, this database is not fully loaded and can not be used.')
129                     return f(self, dbname, *args, **kwargs)
130                 except OperationalError, e:
131                     # Automatically retry the typical transaction serialization errors
132                     if not e.pgcode in PG_CONCURRENCY_ERRORS_TO_RETRY or tries >= MAX_TRIES_ON_CONCURRENCY_FAILURE:
133                         self.logger.warning("%s, maximum number of tries reached" % errorcodes.lookup(e.pgcode))
134                         raise
135                     wait_time = random.uniform(0.0, 2 ** tries)
136                     tries += 1
137                     self.logger.info("%s, retrying %d/%d in %.04f sec..." % (errorcodes.lookup(e.pgcode), tries, MAX_TRIES_ON_CONCURRENCY_FAILURE, wait_time))
138                     time.sleep(wait_time)
139                 except orm.except_orm, inst:
140                     raise except_osv(inst.name, inst.value)
141                 except except_osv:
142                     raise
143                 except IntegrityError, inst:
144                     osv_pool = pooler.get_pool(dbname)
145                     for key in osv_pool._sql_error.keys():
146                         if key in inst[0]:
147                             netsvc.abort_response(1, _('Constraint Error'), 'warning',
148                                             tr(osv_pool._sql_error[key], 'sql_constraint') or inst[0])
149                     if inst.pgcode in (errorcodes.NOT_NULL_VIOLATION, errorcodes.FOREIGN_KEY_VIOLATION, errorcodes.RESTRICT_VIOLATION):
150                         msg = _('The operation cannot be completed, probably due to the following:\n- deletion: you may be trying to delete a record while other records still reference it\n- creation/update: a mandatory field is not correctly set')
151                         _logger.debug("IntegrityError", exc_info=True)
152                         try:
153                             errortxt = inst.pgerror.replace('«','"').replace('»','"')
154                             if '"public".' in errortxt:
155                                 context = errortxt.split('"public".')[1]
156                                 model_name = table = context.split('"')[1]
157                             else:
158                                 last_quote_end = errortxt.rfind('"')
159                                 last_quote_begin = errortxt.rfind('"', 0, last_quote_end)
160                                 model_name = table = errortxt[last_quote_begin+1:last_quote_end].strip()
161                             model = table.replace("_",".")
162                             model_obj = osv_pool.get(model)
163                             if model_obj:
164                                 model_name = model_obj._description or model_obj._name
165                             msg += _('\n\n[object with reference: %s - %s]') % (model_name, model)
166                         except Exception:
167                             pass
168                         netsvc.abort_response(1, _('Integrity Error'), 'warning', msg)
169                     else:
170                         netsvc.abort_response(1, _('Integrity Error'), 'warning', inst[0])
171                 except Exception:
172                     _logger.exception("Uncaught exception")
173                     raise
174
175         return wrapper
176
177     def execute_cr(self, cr, uid, obj, method, *args, **kw):
178         object = pooler.get_pool(cr.dbname).get(obj)
179         if not object:
180             raise except_osv('Object Error', 'Object %s doesn\'t exist' % str(obj))
181         return getattr(object, method)(cr, uid, *args, **kw)
182
183     def execute_kw(self, db, uid, obj, method, args, kw=None):
184         return self.execute(db, uid, obj, method, *args, **kw or {})
185
186     @check
187     def execute(self, db, uid, obj, method, *args, **kw):
188         cr = pooler.get_db(db).cursor()
189         try:
190             try:
191                 if method.startswith('_'):
192                     raise except_osv('Access Denied', 'Private methods (such as %s) cannot be called remotely.' % (method,))
193                 res = self.execute_cr(cr, uid, obj, method, *args, **kw)
194                 if res is None:
195                     _logger.warning('The method %s of the object %s can not return `None` !', method, obj)
196                 cr.commit()
197             except Exception:
198                 cr.rollback()
199                 raise
200         finally:
201             cr.close()
202         return res
203
204     def exec_workflow_cr(self, cr, uid, obj, method, *args):
205         wf_service = netsvc.LocalService("workflow")
206         return wf_service.trg_validate(uid, obj, args[0], method, cr)
207
208     @check
209     def exec_workflow(self, db, uid, obj, method, *args):
210         cr = pooler.get_db(db).cursor()
211         try:
212             try:
213                 res = self.exec_workflow_cr(cr, uid, obj, method, *args)
214                 cr.commit()
215             except Exception:
216                 cr.rollback()
217                 raise
218         finally:
219             cr.close()
220         return res
221
222 # deprecated - for backward compatibility.
223 osv = Model
224 osv_memory = TransientModel
225 osv_abstract = AbstractModel # ;-)
226
227
228 def start_object_proxy():
229     object_proxy()
230
231 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
232