[REF] OpenERP --> Odoo in various UI texts (2)
[odoo/odoo.git] / openerp / addons / base / ir / ir_cron.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21 import logging
22 import threading
23 import time
24 import psycopg2
25 from datetime import datetime
26 from dateutil.relativedelta import relativedelta
27
28 import openerp
29 from openerp import SUPERUSER_ID, netsvc, api
30 from openerp.osv import fields, osv
31 from openerp.tools import DEFAULT_SERVER_DATETIME_FORMAT
32 from openerp.tools.safe_eval import safe_eval as eval
33 from openerp.tools.translate import _
34 from openerp.modules import load_information_from_description_file
35
36 _logger = logging.getLogger(__name__)
37
38 BASE_VERSION = load_information_from_description_file('base')['version']
39
40 def str2tuple(s):
41     return eval('tuple(%s)' % (s or ''))
42
43 _intervalTypes = {
44     'work_days': lambda interval: relativedelta(days=interval),
45     'days': lambda interval: relativedelta(days=interval),
46     'hours': lambda interval: relativedelta(hours=interval),
47     'weeks': lambda interval: relativedelta(days=7*interval),
48     'months': lambda interval: relativedelta(months=interval),
49     'minutes': lambda interval: relativedelta(minutes=interval),
50 }
51
52 class ir_cron(osv.osv):
53     """ Model describing cron jobs (also called actions or tasks).
54     """
55
56     # TODO: perhaps in the future we could consider a flag on ir.cron jobs
57     # that would cause database wake-up even if the database has not been
58     # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
59     # See also openerp.cron
60
61     _name = "ir.cron"
62     _order = 'name'
63     _columns = {
64         'name': fields.char('Name', required=True),
65         'user_id': fields.many2one('res.users', 'User', required=True),
66         'active': fields.boolean('Active'),
67         'interval_number': fields.integer('Interval Number',help="Repeat every x."),
68         'interval_type': fields.selection( [('minutes', 'Minutes'),
69             ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
70         'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
71         'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
72         'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
73         'model': fields.char('Object', help="Model name on which the method to be called is located, e.g. 'res.partner'."),
74         'function': fields.char('Method', help="Name of the method to be called when this job is processed."),
75         'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
76         'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
77     }
78
79     _defaults = {
80         'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
81         'priority' : 5,
82         'user_id' : lambda obj,cr,uid,context: uid,
83         'interval_number' : 1,
84         'interval_type' : 'months',
85         'numbercall' : 1,
86         'active' : 1,
87         'doall' : 1
88     }
89
90     def _check_args(self, cr, uid, ids, context=None):
91         try:
92             for this in self.browse(cr, uid, ids, context):
93                 str2tuple(this.args)
94         except Exception:
95             return False
96         return True
97
98     _constraints = [
99         (_check_args, 'Invalid arguments', ['args']),
100     ]
101
102     def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
103         """ Method called when an exception is raised by a job.
104
105         Simply logs the exception and rollback the transaction.
106
107         :param model_name: model name on which the job method is located.
108         :param method_name: name of the method to call when this job is processed.
109         :param args: arguments of the method (without the usual self, cr, uid).
110         :param job_id: job id.
111         :param job_exception: exception raised by the job.
112
113         """
114         cr.rollback()
115         _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
116
117     def _callback(self, cr, uid, model_name, method_name, args, job_id):
118         """ Run the method associated to a given job
119
120         It takes care of logging and exception handling.
121
122         :param model_name: model name on which the job method is located.
123         :param method_name: name of the method to call when this job is processed.
124         :param args: arguments of the method (without the usual self, cr, uid).
125         :param job_id: job id.
126         """
127         try:
128             args = str2tuple(args)
129             openerp.modules.registry.RegistryManager.check_registry_signaling(cr.dbname)
130             registry = openerp.registry(cr.dbname)
131             if model_name in registry:
132                 model = registry[model_name]
133                 if hasattr(model, method_name):
134                     log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
135                     netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
136                     if _logger.isEnabledFor(logging.DEBUG):
137                         start_time = time.time()
138                     getattr(model, method_name)(cr, uid, *args)
139                     if _logger.isEnabledFor(logging.DEBUG):
140                         end_time = time.time()
141                         _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
142                     openerp.modules.registry.RegistryManager.signal_caches_change(cr.dbname)
143                 else:
144                     msg = "Method `%s.%s` does not exist." % (model_name, method_name)
145                     _logger.warning(msg)
146             else:
147                 msg = "Model `%s` does not exist." % model_name
148                 _logger.warning(msg)
149         except Exception, e:
150             self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
151
152     def _process_job(self, cr, job, cron_cr):
153         """ Run a given job taking care of the repetition.
154
155         :param cr: cursor to use to execute the job, safe to commit/rollback
156         :param job: job to be run (as a dictionary).
157         :param cron_cr: cursor holding lock on the cron job row, to use to update the next exec date,
158             must not be committed/rolled back!
159         """
160         try:
161             with api.Environment.manage():
162                 now = datetime.now() 
163                 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
164                 numbercall = job['numbercall']
165
166                 ok = False
167                 while nextcall < now and numbercall:
168                     if numbercall > 0:
169                         numbercall -= 1
170                     if not ok or job['doall']:
171                         self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
172                     if numbercall:
173                         nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
174                     ok = True
175                 addsql = ''
176                 if not numbercall:
177                     addsql = ', active=False'
178                 cron_cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
179                            (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
180                 self.invalidate_cache(cr, SUPERUSER_ID)
181
182         finally:
183             cr.commit()
184             cron_cr.commit()
185
186     @classmethod
187     def _acquire_job(cls, db_name):
188         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
189         """ Try to process one cron job.
190
191         This selects in database all the jobs that should be processed. It then
192         tries to lock each of them and, if it succeeds, run the cron job (if it
193         doesn't succeed, it means the job was already locked to be taken care
194         of by another thread) and return.
195
196         If a job was processed, returns True, otherwise returns False.
197         """
198         db = openerp.sql_db.db_connect(db_name)
199         threading.current_thread().dbname = db_name
200         cr = db.cursor()
201         jobs = []
202         try:
203             # Make sure the database we poll has the same version as the code of base
204             cr.execute("SELECT 1 FROM ir_module_module WHERE name=%s AND latest_version=%s", ('base', BASE_VERSION))
205             if cr.fetchone():
206                 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
207                 cr.execute("""SELECT * FROM ir_cron
208                               WHERE numbercall != 0
209                                   AND active AND nextcall <= (now() at time zone 'UTC')
210                               ORDER BY priority""")
211                 jobs = cr.dictfetchall()
212             else:
213                 _logger.warning('Skipping database %s as its base version is not %s.', db_name, BASE_VERSION)
214         except psycopg2.ProgrammingError, e:
215             if e.pgcode == '42P01':
216                 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
217                 # The table ir_cron does not exist; this is probably not an OpenERP database.
218                 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
219             else:
220                 raise
221         except Exception:
222             _logger.warning('Exception in cron:', exc_info=True)
223         finally:
224             cr.close()
225
226         for job in jobs:
227             lock_cr = db.cursor()
228             try:
229                 # Try to grab an exclusive lock on the job row from within the task transaction
230                 # Restrict to the same conditions as for the search since the job may have already
231                 # been run by an other thread when cron is running in multi thread
232                 lock_cr.execute("""SELECT *
233                                    FROM ir_cron
234                                    WHERE numbercall != 0
235                                       AND active
236                                       AND nextcall <= (now() at time zone 'UTC')
237                                       AND id=%s
238                                    FOR UPDATE NOWAIT""",
239                                (job['id'],), log_exceptions=False)
240
241                 locked_job = lock_cr.fetchone()
242                 if not locked_job:
243                     _logger.debug("Job `%s` already executed by another process/thread. skipping it", job['name'])
244                     continue
245                 # Got the lock on the job row, run its code
246                 _logger.debug('Starting job `%s`.', job['name'])
247                 job_cr = db.cursor()
248                 try:
249                     registry = openerp.registry(db_name)
250                     registry[cls._name]._process_job(job_cr, job, lock_cr)
251                 except Exception:
252                     _logger.exception('Unexpected exception while processing cron job %r', job)
253                 finally:
254                     job_cr.close()
255
256             except psycopg2.OperationalError, e:
257                 if e.pgcode == '55P03':
258                     # Class 55: Object not in prerequisite state; 55P03: lock_not_available
259                     _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
260                     continue
261                 else:
262                     # Unexpected OperationalError
263                     raise
264             finally:
265                 # we're exiting due to an exception while acquiring the lock
266                 lock_cr.close()
267
268         if hasattr(threading.current_thread(), 'dbname'): # cron job could have removed it as side-effect
269             del threading.current_thread().dbname
270
271     def _try_lock(self, cr, uid, ids, context=None):
272         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
273            to make sure a following write() or unlink() will not block due
274            to a process currently executing those cron tasks"""
275         try:
276             cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
277                        (tuple(ids),), log_exceptions=False)
278         except psycopg2.OperationalError:
279             cr.rollback() # early rollback to allow translations to work for the user feedback
280             raise osv.except_osv(_("Record cannot be modified right now"),
281                                  _("This cron task is currently being executed and may not be modified, "
282                                   "please try again in a few minutes"))
283
284     def create(self, cr, uid, vals, context=None):
285         res = super(ir_cron, self).create(cr, uid, vals, context=context)
286         return res
287
288     def write(self, cr, uid, ids, vals, context=None):
289         self._try_lock(cr, uid, ids, context)
290         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
291         return res
292
293     def unlink(self, cr, uid, ids, context=None):
294         self._try_lock(cr, uid, ids, context)
295         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
296         return res
297
298 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: