dbc89278ece9f5b7ac3cd4441e7dbf1046ed1c4c
[odoo/odoo.git] / openerp / addons / base / ir / ir_cron.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import calendar
23 import time
24 import logging
25 import threading
26 import psycopg2
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
29
30 import netsvc
31 import openerp
32 import pooler
33 import tools
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
39
40 _logger = logging.getLogger(__name__)
41
42 def str2tuple(s):
43     return eval('tuple(%s)' % (s or ''))
44
45 _intervalTypes = {
46     'work_days': lambda interval: relativedelta(days=interval),
47     'days': lambda interval: relativedelta(days=interval),
48     'hours': lambda interval: relativedelta(hours=interval),
49     'weeks': lambda interval: relativedelta(days=7*interval),
50     'months': lambda interval: relativedelta(months=interval),
51     'minutes': lambda interval: relativedelta(minutes=interval),
52 }
53
54 class ir_cron(osv.osv):
55     """ Model describing cron jobs (also called actions or tasks).
56     """
57
58     # TODO: perhaps in the future we could consider a flag on ir.cron jobs
59     # that would cause database wake-up even if the database has not been
60     # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
61     # See also openerp.cron
62
63     _name = "ir.cron"
64     _order = 'name'
65     _columns = {
66         'name': fields.char('Name', size=60, required=True),
67         'user_id': fields.many2one('res.users', 'User', required=True),
68         'active': fields.boolean('Active'),
69         'interval_number': fields.integer('Interval Number',help="Repeat every x."),
70         'interval_type': fields.selection( [('minutes', 'Minutes'),
71             ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
72         'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
73         'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
74         'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
75         'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
76         'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
77         'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
78         'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
79     }
80
81     _defaults = {
82         'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
83         'priority' : lambda *a: 5,
84         'user_id' : lambda obj,cr,uid,context: uid,
85         'interval_number' : lambda *a: 1,
86         'interval_type' : lambda *a: 'months',
87         'numbercall' : lambda *a: 1,
88         'active' : lambda *a: 1,
89         'doall' : lambda *a: 1
90     }
91
92     def _check_args(self, cr, uid, ids, context=None):
93         try:
94             for this in self.browse(cr, uid, ids, context):
95                 str2tuple(this.args)
96         except Exception:
97             return False
98         return True
99
100     _constraints = [
101         (_check_args, 'Invalid arguments', ['args']),
102     ]
103
104     def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
105         """ Method called when an exception is raised by a job.
106
107         Simply logs the exception and rollback the transaction.
108
109         :param model_name: model name on which the job method is located.
110         :param method_name: name of the method to call when this job is processed.
111         :param args: arguments of the method (without the usual self, cr, uid).
112         :param job_id: job id.
113         :param job_exception: exception raised by the job.
114
115         """
116         cr.rollback()
117         _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
118
119     def _callback(self, cr, uid, model_name, method_name, args, job_id):
120         """ Run the method associated to a given job
121
122         It takes care of logging and exception handling.
123
124         :param model_name: model name on which the job method is located.
125         :param method_name: name of the method to call when this job is processed.
126         :param args: arguments of the method (without the usual self, cr, uid).
127         :param job_id: job id.
128         """
129         args = str2tuple(args)
130         model = self.pool.get(model_name)
131         if model and hasattr(model, method_name):
132             method = getattr(model, method_name)
133             try:
134                 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
135                 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
136                 if _logger.isEnabledFor(logging.DEBUG):
137                     start_time = time.time()
138                 method(cr, uid, *args)
139                 if _logger.isEnabledFor(logging.DEBUG):
140                     end_time = time.time()
141                     _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
142             except Exception, e:
143                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
144
145     def _run_job(self, cr, job, now):
146         """ Run a given job taking care of the repetition.
147
148         The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
149         method is run in a worker thread (spawned by _run_jobs_multithread())).
150
151         :param job: job to be run (as a dictionary).
152         :param now: timestamp (result of datetime.now(), no need to call it multiple time).
153
154         """
155         try:
156             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
157             numbercall = job['numbercall']
158
159             ok = False
160             while nextcall < now and numbercall:
161                 if numbercall > 0:
162                     numbercall -= 1
163                 if not ok or job['doall']:
164                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
165                 if numbercall:
166                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
167                 ok = True
168             addsql = ''
169             if not numbercall:
170                 addsql = ', active=False'
171             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
172                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
173
174             if numbercall:
175                 # Reschedule our own main cron thread if necessary.
176                 # This is really needed if this job runs longer than its rescheduling period.
177                 nextcall = calendar.timegm(nextcall.timetuple())
178                 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
179         finally:
180             cr.commit()
181             cr.close()
182             openerp.cron.release_thread_slot()
183
184     def _run_jobs_multithread(self):
185         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
186         """ Process the cron jobs by spawning worker threads.
187
188         This selects in database all the jobs that should be processed. It then
189         tries to lock each of them and, if it succeeds, spawns a thread to run
190         the cron job (if it doesn't succeed, it means the job was already
191         locked to be taken care of by another thread).
192
193         The cursor used to lock the job in database is given to the worker
194         thread (which has to close it itself).
195
196         """
197         db = self.pool.db
198         cr = db.cursor()
199         db_name = db.dbname
200         try:
201             jobs = {} # mapping job ids to jobs for all jobs being processed.
202             now = datetime.now() 
203             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
204             cr.execute("""SELECT * FROM ir_cron
205                           WHERE numbercall != 0
206                               AND active AND nextcall <= (now() at time zone 'UTC')
207                           ORDER BY priority""")
208             for job in cr.dictfetchall():
209                 if not openerp.cron.get_thread_slots():
210                     break
211                 jobs[job['id']] = job
212
213                 task_cr = db.cursor()
214                 try:
215                     # Try to grab an exclusive lock on the job row from within the task transaction
216                     acquired_lock = False
217                     task_cr.execute("""SELECT *
218                                        FROM ir_cron
219                                        WHERE id=%s
220                                        FOR UPDATE NOWAIT""",
221                                    (job['id'],), log_exceptions=False)
222                     acquired_lock = True
223                 except psycopg2.OperationalError, e:
224                     if e.pgcode == '55P03':
225                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
226                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
227                         continue
228                     else:
229                         # Unexpected OperationalError
230                         raise
231                 finally:
232                     if not acquired_lock:
233                         # we're exiting due to an exception while acquiring the lot
234                         task_cr.close()
235
236                 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
237                 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
238                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
239                 task_thread.setDaemon(False)
240                 openerp.cron.take_thread_slot()
241                 task_thread.start()
242                 _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
243
244             # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
245             find_next_time_query = """SELECT min(nextcall) AS min_next_call
246                                       FROM ir_cron WHERE numbercall != 0 AND active""" 
247             if jobs:
248                 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
249             else:
250                 cr.execute(find_next_time_query)
251             next_call = cr.dictfetchone()['min_next_call']
252
253             if next_call:
254                 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
255             else:
256                 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
257                 # this delay will likely be modified when running jobs complete their tasks
258                 next_call = time.time() + (24*3600)
259
260             openerp.cron.schedule_wakeup(next_call, db_name)
261
262         except Exception, ex:
263             _logger.warning('Exception in cron:', exc_info=True)
264
265         finally:
266             cr.commit()
267             cr.close()
268
269     def _process_job(self, cr, job):
270         """ Run a given job taking care of the repetition.
271
272         The cursor has a lock on the job (aquired by _acquire_job()).
273
274         :param job: job to be run (as a dictionary).
275         """
276         try:
277             now = datetime.now() 
278             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
279             numbercall = job['numbercall']
280
281             ok = False
282             while nextcall < now and numbercall:
283                 if numbercall > 0:
284                     numbercall -= 1
285                 if not ok or job['doall']:
286                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
287                 if numbercall:
288                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
289                 ok = True
290             addsql = ''
291             if not numbercall:
292                 addsql = ', active=False'
293             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
294                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
295
296         finally:
297             cr.commit()
298             cr.close()
299
300     @classmethod
301     def _acquire_job(cls, db_name):
302         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
303         """ Try to process one cron job.
304
305         This selects in database all the jobs that should be processed. It then
306         tries to lock each of them and, if it succeeds, run the cron job (if it
307         doesn't succeed, it means the job was already locked to be taken care
308         of by another thread) and return.
309
310         If a job was processed, returns True, otherwise returns False.
311         """
312         db = openerp.sql_db.db_connect(db_name)
313         cr = db.cursor()
314         try:
315             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
316             cr.execute("""SELECT * FROM ir_cron
317                           WHERE numbercall != 0
318                               AND active AND nextcall <= (now() at time zone 'UTC')
319                           ORDER BY priority""")
320             for job in cr.dictfetchall():
321                 task_cr = db.cursor()
322                 try:
323                     # Try to grab an exclusive lock on the job row from within the task transaction
324                     acquired_lock = False
325                     task_cr.execute("""SELECT *
326                                        FROM ir_cron
327                                        WHERE id=%s
328                                        FOR UPDATE NOWAIT""",
329                                    (job['id'],), log_exceptions=False)
330                     acquired_lock = True
331                 except psycopg2.OperationalError, e:
332                     if e.pgcode == '55P03':
333                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
334                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
335                         continue
336                     else:
337                         # Unexpected OperationalError
338                         raise
339                 finally:
340                     if not acquired_lock:
341                         # we're exiting due to an exception while acquiring the lot
342                         task_cr.close()
343
344                 # Got the lock on the job row, run its code
345                 _logger.debug('Starting job `%s`.', job['name'])
346                 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
347                 registry = openerp.pooler.get_pool(db_name)
348                 registry[cls._name]._process_job(task_cr, job)
349                 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
350                 return True
351
352         except psycopg2.ProgrammingError, e:
353             if e.pgcode == '42P01':
354                 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
355                 # The table ir_cron does not exist; this is probably not an OpenERP database.
356                 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
357             else:
358                 raise
359         except Exception, ex:
360             _logger.warning('Exception in cron:', exc_info=True)
361
362         finally:
363             cr.commit()
364             cr.close()
365
366         return False
367
368     def update_running_cron(self, cr):
369         """ Schedule as soon as possible a wake-up for this database. """
370         # Verify whether the server is already started and thus whether we need to commit
371         # immediately our changes and restart the cron agent in order to apply the change
372         # immediately. The commit() is needed because as soon as the cron is (re)started it
373         # will query the database with its own cursor, possibly before the end of the
374         # current transaction.
375         # This commit() is not an issue in most cases, but we must absolutely avoid it
376         # when the server is only starting or loading modules (hence the test on pool._init).
377         if not self.pool._init:
378             cr.commit()
379             openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
380
381     def _try_lock(self, cr, uid, ids, context=None):
382         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
383            to make sure a following write() or unlink() will not block due
384            to a process currently executing those cron tasks"""
385         try:
386             cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
387                        (tuple(ids),), log_exceptions=False)
388         except psycopg2.OperationalError:
389             cr.rollback() # early rollback to allow translations to work for the user feedback
390             raise osv.except_osv(_("Record cannot be modified right now"),
391                                  _("This cron task is currently being executed and may not be modified, "
392                                   "please try again in a few minutes"))
393
394     def create(self, cr, uid, vals, context=None):
395         res = super(ir_cron, self).create(cr, uid, vals, context=context)
396         self.update_running_cron(cr)
397         return res
398
399     def write(self, cr, uid, ids, vals, context=None):
400         self._try_lock(cr, uid, ids, context)
401         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
402         self.update_running_cron(cr)
403         return res
404
405     def unlink(self, cr, uid, ids, context=None):
406         self._try_lock(cr, uid, ids, context)
407         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
408         self.update_running_cron(cr)
409         return res
410 ir_cron()
411
412 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: