1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ##############################################################################
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
40 _logger = logging.getLogger(__name__)
42 # This variable can be set by a signal handler to stop the infinite loop in
44 quit_signal_received = False
46 # This variable can be checked to know if ir_cron._run() is processing a job or
48 job_in_progress = True
51 return eval('tuple(%s)' % (s or ''))
54 'work_days': lambda interval: relativedelta(days=interval),
55 'days': lambda interval: relativedelta(days=interval),
56 'hours': lambda interval: relativedelta(hours=interval),
57 'weeks': lambda interval: relativedelta(days=7*interval),
58 'months': lambda interval: relativedelta(months=interval),
59 'minutes': lambda interval: relativedelta(minutes=interval),
62 class ir_cron(osv.osv):
63 """ Model describing cron jobs (also called actions or tasks).
66 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
67 # that would cause database wake-up even if the database has not been
68 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
69 # See also openerp.cron
74 'name': fields.char('Name', size=60, required=True),
75 'user_id': fields.many2one('res.users', 'User', required=True),
76 'active': fields.boolean('Active'),
77 'interval_number': fields.integer('Interval Number',help="Repeat every x."),
78 'interval_type': fields.selection( [('minutes', 'Minutes'),
79 ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
80 'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
81 'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
82 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
83 'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
84 'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
85 'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
86 'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
90 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
91 'priority' : lambda *a: 5,
92 'user_id' : lambda obj,cr,uid,context: uid,
93 'interval_number' : lambda *a: 1,
94 'interval_type' : lambda *a: 'months',
95 'numbercall' : lambda *a: 1,
96 'active' : lambda *a: 1,
97 'doall' : lambda *a: 1
100 def _check_args(self, cr, uid, ids, context=None):
102 for this in self.browse(cr, uid, ids, context):
109 (_check_args, 'Invalid arguments', ['args']),
112 def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
113 """ Method called when an exception is raised by a job.
115 Simply logs the exception and rollback the transaction.
117 :param model_name: model name on which the job method is located.
118 :param method_name: name of the method to call when this job is processed.
119 :param args: arguments of the method (without the usual self, cr, uid).
120 :param job_id: job id.
121 :param job_exception: exception raised by the job.
125 _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
127 def _callback(self, cr, uid, model_name, method_name, args, job_id):
128 """ Run the method associated to a given job
130 It takes care of logging and exception handling.
132 :param model_name: model name on which the job method is located.
133 :param method_name: name of the method to call when this job is processed.
134 :param args: arguments of the method (without the usual self, cr, uid).
135 :param job_id: job id.
137 args = str2tuple(args)
138 model = self.pool.get(model_name)
139 if model and hasattr(model, method_name):
140 method = getattr(model, method_name)
142 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
143 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
144 if _logger.isEnabledFor(logging.DEBUG):
145 start_time = time.time()
146 method(cr, uid, *args)
147 if _logger.isEnabledFor(logging.DEBUG):
148 end_time = time.time()
149 _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
151 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
153 def _run_job(self, cr, job, now):
154 """ Run a given job taking care of the repetition.
156 The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
157 method is run in a worker thread (spawned by _run_jobs_multithread())).
159 :param job: job to be run (as a dictionary).
160 :param now: timestamp (result of datetime.now(), no need to call it multiple time).
164 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
165 numbercall = job['numbercall']
168 while nextcall < now and numbercall:
171 if not ok or job['doall']:
172 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
174 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
178 addsql = ', active=False'
179 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
180 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
183 # Reschedule our own main cron thread if necessary.
184 # This is really needed if this job runs longer than its rescheduling period.
185 nextcall = calendar.timegm(nextcall.timetuple())
186 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
190 openerp.cron.release_thread_slot()
192 def _run_jobs_multithread(self):
193 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
194 """ Process the cron jobs by spawning worker threads.
196 This selects in database all the jobs that should be processed. It then
197 tries to lock each of them and, if it succeeds, spawns a thread to run
198 the cron job (if it doesn't succeed, it means the job was already
199 locked to be taken care of by another thread).
201 The cursor used to lock the job in database is given to the worker
202 thread (which has to close it itself).
209 jobs = {} # mapping job ids to jobs for all jobs being processed.
211 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
212 cr.execute("""SELECT * FROM ir_cron
213 WHERE numbercall != 0
214 AND active AND nextcall <= (now() at time zone 'UTC')
215 ORDER BY priority""")
216 for job in cr.dictfetchall():
217 if not openerp.cron.get_thread_slots():
219 jobs[job['id']] = job
221 task_cr = db.cursor()
223 # Try to grab an exclusive lock on the job row from within the task transaction
224 acquired_lock = False
225 task_cr.execute("""SELECT *
228 FOR UPDATE NOWAIT""",
229 (job['id'],), log_exceptions=False)
231 except psycopg2.OperationalError, e:
232 if e.pgcode == '55P03':
233 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
234 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
237 # Unexpected OperationalError
240 if not acquired_lock:
241 # we're exiting due to an exception while acquiring the lot
244 # Force call to strptime just before starting the cron thread
245 # to prevent time.strptime AttributeError within the thread.
246 # See: http://bugs.python.org/issue7980
247 datetime.strptime('2012-01-01', '%Y-%m-%d')
249 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
250 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
251 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
252 task_thread.setDaemon(False)
253 openerp.cron.take_thread_slot()
255 _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
257 # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
258 find_next_time_query = """SELECT min(nextcall) AS min_next_call
259 FROM ir_cron WHERE numbercall != 0 AND active"""
261 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
263 cr.execute(find_next_time_query)
264 next_call = cr.dictfetchone()['min_next_call']
267 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
269 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
270 # this delay will likely be modified when running jobs complete their tasks
271 next_call = time.time() + (24*3600)
273 openerp.cron.schedule_wakeup(next_call, db_name)
275 except Exception, ex:
276 _logger.warning('Exception in cron:', exc_info=True)
282 def _process_job(self, cr, job):
283 """ Run a given job taking care of the repetition.
285 The cursor has a lock on the job (aquired by _acquire_job()).
287 :param job: job to be run (as a dictionary).
291 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
292 numbercall = job['numbercall']
295 while nextcall < now and numbercall:
298 if not ok or job['doall']:
299 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
301 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
305 addsql = ', active=False'
306 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
307 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
314 def _acquire_job(cls, db_name):
315 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
316 """ Try to process one cron job.
318 This selects in database all the jobs that should be processed. It then
319 tries to lock each of them and, if it succeeds, run the cron job (if it
320 doesn't succeed, it means the job was already locked to be taken care
321 of by another thread) and return.
323 If a job was processed, returns True, otherwise returns False.
325 db = openerp.sql_db.db_connect(db_name)
328 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
329 cr.execute("""SELECT * FROM ir_cron
330 WHERE numbercall != 0
331 AND active AND nextcall <= (now() at time zone 'UTC')
332 ORDER BY priority""")
333 for job in cr.dictfetchall():
334 task_cr = db.cursor()
336 # Try to grab an exclusive lock on the job row from within the task transaction
337 acquired_lock = False
338 task_cr.execute("""SELECT *
341 FOR UPDATE NOWAIT""",
342 (job['id'],), log_exceptions=False)
344 except psycopg2.OperationalError, e:
345 if e.pgcode == '55P03':
346 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
347 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
350 # Unexpected OperationalError
353 if not acquired_lock:
354 # we're exiting due to an exception while acquiring the lot
357 # Got the lock on the job row, run its code
358 _logger.debug('Starting job `%s`.', job['name'])
359 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
360 registry = openerp.pooler.get_pool(db_name)
361 registry[cls._name]._process_job(task_cr, job)
362 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
365 except psycopg2.ProgrammingError, e:
366 if e.pgcode == '42P01':
367 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
368 # The table ir_cron does not exist; this is probably not an OpenERP database.
369 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
372 except Exception, ex:
373 _logger.warning('Exception in cron:', exc_info=True)
382 def _run(cls, db_names):
384 Class method intended to be run in a dedicated process to handle jobs.
385 This polls the database for jobs that can be run every 60 seconds.
387 :param db_names: list of database names to poll or callable to
388 generate such a list.
390 global quit_signal_received
391 while not quit_signal_received:
392 if callable(db_names):
397 if quit_signal_received:
400 for db_name in names:
402 # Small hack to re-use the openerp-server config:
403 # If the cpu_time_limit has not its default value, we
404 # truly want to establish limits.
405 if openerp.tools.config['cpu_time_limit'] != 60:
406 openerp.wsgi.core.pre_request('dummy', 'dummy')
407 acquired = cls._acquire_job(db_name)
408 if openerp.tools.config['cpu_time_limit'] != 60:
412 openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
417 if quit_signal_received:
421 global job_in_progress
423 _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
425 job_in_progress = False
427 job_in_progress = True
429 def update_running_cron(self, cr):
430 """ Schedule as soon as possible a wake-up for this database. """
431 # Verify whether the server is already started and thus whether we need to commit
432 # immediately our changes and restart the cron agent in order to apply the change
433 # immediately. The commit() is needed because as soon as the cron is (re)started it
434 # will query the database with its own cursor, possibly before the end of the
435 # current transaction.
436 # This commit() is not an issue in most cases, but we must absolutely avoid it
437 # when the server is only starting or loading modules (hence the test on pool._init).
438 if not self.pool._init:
440 openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
442 def _try_lock(self, cr, uid, ids, context=None):
443 """Try to grab a dummy exclusive write-lock to the rows with the given ids,
444 to make sure a following write() or unlink() will not block due
445 to a process currently executing those cron tasks"""
447 cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
448 (tuple(ids),), log_exceptions=False)
449 except psycopg2.OperationalError:
450 cr.rollback() # early rollback to allow translations to work for the user feedback
451 raise osv.except_osv(_("Record cannot be modified right now"),
452 _("This cron task is currently being executed and may not be modified, "
453 "please try again in a few minutes"))
455 def create(self, cr, uid, vals, context=None):
456 res = super(ir_cron, self).create(cr, uid, vals, context=context)
457 self.update_running_cron(cr)
460 def write(self, cr, uid, ids, vals, context=None):
461 self._try_lock(cr, uid, ids, context)
462 res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
463 self.update_running_cron(cr)
466 def unlink(self, cr, uid, ids, context=None):
467 self._try_lock(cr, uid, ids, context)
468 res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
469 self.update_running_cron(cr)
473 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: