1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ##############################################################################
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
40 _logger = logging.getLogger(__name__)
43 return eval('tuple(%s)' % (s or ''))
46 'work_days': lambda interval: relativedelta(days=interval),
47 'days': lambda interval: relativedelta(days=interval),
48 'hours': lambda interval: relativedelta(hours=interval),
49 'weeks': lambda interval: relativedelta(days=7*interval),
50 'months': lambda interval: relativedelta(months=interval),
51 'minutes': lambda interval: relativedelta(minutes=interval),
54 class ir_cron(osv.osv):
55 """ Model describing cron jobs (also called actions or tasks).
58 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
59 # that would cause database wake-up even if the database has not been
60 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
61 # See also openerp.cron
66 'name': fields.char('Name', size=60, required=True),
67 'user_id': fields.many2one('res.users', 'User', required=True),
68 'active': fields.boolean('Active'),
69 'interval_number': fields.integer('Interval Number',help="Repeat every x."),
70 'interval_type': fields.selection( [('minutes', 'Minutes'),
71 ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
72 'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
73 'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
74 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
75 'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
76 'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
77 'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
78 'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
82 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
84 'user_id' : lambda obj,cr,uid,context: uid,
85 'interval_number' : 1,
86 'interval_type' : 'months',
92 def _check_args(self, cr, uid, ids, context=None):
94 for this in self.browse(cr, uid, ids, context):
101 (_check_args, 'Invalid arguments', ['args']),
104 def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
105 """ Method called when an exception is raised by a job.
107 Simply logs the exception and rollback the transaction.
109 :param model_name: model name on which the job method is located.
110 :param method_name: name of the method to call when this job is processed.
111 :param args: arguments of the method (without the usual self, cr, uid).
112 :param job_id: job id.
113 :param job_exception: exception raised by the job.
117 _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
119 def _callback(self, cr, uid, model_name, method_name, args, job_id):
120 """ Run the method associated to a given job
122 It takes care of logging and exception handling.
124 :param model_name: model name on which the job method is located.
125 :param method_name: name of the method to call when this job is processed.
126 :param args: arguments of the method (without the usual self, cr, uid).
127 :param job_id: job id.
129 args = str2tuple(args)
130 model = self.pool.get(model_name)
131 if model and hasattr(model, method_name):
132 method = getattr(model, method_name)
134 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
135 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
136 if _logger.isEnabledFor(logging.DEBUG):
137 start_time = time.time()
138 method(cr, uid, *args)
139 if _logger.isEnabledFor(logging.DEBUG):
140 end_time = time.time()
141 _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
143 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
145 def _run_job(self, cr, job, now):
146 """ Run a given job taking care of the repetition.
148 The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
149 method is run in a worker thread (spawned by _run_jobs_multithread())).
151 :param job: job to be run (as a dictionary).
152 :param now: timestamp (result of datetime.now(), no need to call it multiple time).
156 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
157 numbercall = job['numbercall']
160 while nextcall < now and numbercall:
163 if not ok or job['doall']:
164 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
166 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
170 addsql = ', active=False'
171 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
172 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
175 # Reschedule our own main cron thread if necessary.
176 # This is really needed if this job runs longer than its rescheduling period.
177 nextcall = calendar.timegm(nextcall.timetuple())
178 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
182 openerp.cron.release_thread_slot()
184 def _run_jobs_multithread(self):
185 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
186 """ Process the cron jobs by spawning worker threads.
188 This selects in database all the jobs that should be processed. It then
189 tries to lock each of them and, if it succeeds, spawns a thread to run
190 the cron job (if it doesn't succeed, it means the job was already
191 locked to be taken care of by another thread).
193 The cursor used to lock the job in database is given to the worker
194 thread (which has to close it itself).
201 jobs = {} # mapping job ids to jobs for all jobs being processed.
203 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
204 cr.execute("""SELECT * FROM ir_cron
205 WHERE numbercall != 0
206 AND active AND nextcall <= (now() at time zone 'UTC')
207 ORDER BY priority""")
208 for job in cr.dictfetchall():
209 if not openerp.cron.get_thread_slots():
211 jobs[job['id']] = job
213 task_cr = db.cursor()
215 # Try to grab an exclusive lock on the job row from within the task transaction
216 acquired_lock = False
217 task_cr.execute("""SELECT *
220 FOR UPDATE NOWAIT""",
221 (job['id'],), log_exceptions=False)
223 except psycopg2.OperationalError, e:
224 if e.pgcode == '55P03':
225 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
226 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
229 # Unexpected OperationalError
232 if not acquired_lock:
233 # we're exiting due to an exception while acquiring the lot
236 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
237 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
238 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
239 task_thread.setDaemon(False)
240 openerp.cron.take_thread_slot()
242 _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
244 # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
245 find_next_time_query = """SELECT min(nextcall) AS min_next_call
246 FROM ir_cron WHERE numbercall != 0 AND active"""
248 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
250 cr.execute(find_next_time_query)
251 next_call = cr.dictfetchone()['min_next_call']
254 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
256 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
257 # this delay will likely be modified when running jobs complete their tasks
258 next_call = time.time() + (24*3600)
260 openerp.cron.schedule_wakeup(next_call, db_name)
262 except Exception, ex:
263 _logger.warning('Exception in cron:', exc_info=True)
269 def _process_job(self, cr, job):
270 """ Run a given job taking care of the repetition.
272 The cursor has a lock on the job (aquired by _acquire_job()).
274 :param job: job to be run (as a dictionary).
278 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
279 numbercall = job['numbercall']
282 while nextcall < now and numbercall:
285 if not ok or job['doall']:
286 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
288 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
292 addsql = ', active=False'
293 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
294 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
301 def _acquire_job(cls, db_name):
302 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
303 """ Try to process one cron job.
305 This selects in database all the jobs that should be processed. It then
306 tries to lock each of them and, if it succeeds, run the cron job (if it
307 doesn't succeed, it means the job was already locked to be taken care
308 of by another thread) and return.
310 If a job was processed, returns True, otherwise returns False.
312 db = openerp.sql_db.db_connect(db_name)
315 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
316 cr.execute("""SELECT * FROM ir_cron
317 WHERE numbercall != 0
318 AND active AND nextcall <= (now() at time zone 'UTC')
319 ORDER BY priority""")
320 for job in cr.dictfetchall():
321 task_cr = db.cursor()
323 # Try to grab an exclusive lock on the job row from within the task transaction
324 acquired_lock = False
325 task_cr.execute("""SELECT *
328 FOR UPDATE NOWAIT""",
329 (job['id'],), log_exceptions=False)
331 except psycopg2.OperationalError, e:
332 if e.pgcode == '55P03':
333 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
334 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
337 # Unexpected OperationalError
340 if not acquired_lock:
341 # we're exiting due to an exception while acquiring the lot
344 # Got the lock on the job row, run its code
345 _logger.debug('Starting job `%s`.', job['name'])
346 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
347 registry = openerp.pooler.get_pool(db_name)
348 registry[cls._name]._process_job(task_cr, job)
349 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
352 except psycopg2.ProgrammingError, e:
353 if e.pgcode == '42P01':
354 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
355 # The table ir_cron does not exist; this is probably not an OpenERP database.
356 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
359 except Exception, ex:
360 _logger.warning('Exception in cron:', exc_info=True)
368 def update_running_cron(self, cr):
369 """ Schedule as soon as possible a wake-up for this database. """
370 # Verify whether the server is already started and thus whether we need to commit
371 # immediately our changes and restart the cron agent in order to apply the change
372 # immediately. The commit() is needed because as soon as the cron is (re)started it
373 # will query the database with its own cursor, possibly before the end of the
374 # current transaction.
375 # This commit() is not an issue in most cases, but we must absolutely avoid it
376 # when the server is only starting or loading modules (hence the test on pool._init).
377 if not self.pool._init:
379 openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
381 def _try_lock(self, cr, uid, ids, context=None):
382 """Try to grab a dummy exclusive write-lock to the rows with the given ids,
383 to make sure a following write() or unlink() will not block due
384 to a process currently executing those cron tasks"""
386 cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
387 (tuple(ids),), log_exceptions=False)
388 except psycopg2.OperationalError:
389 cr.rollback() # early rollback to allow translations to work for the user feedback
390 raise osv.except_osv(_("Record cannot be modified right now"),
391 _("This cron task is currently being executed and may not be modified, "
392 "please try again in a few minutes"))
394 def create(self, cr, uid, vals, context=None):
395 res = super(ir_cron, self).create(cr, uid, vals, context=context)
396 self.update_running_cron(cr)
399 def write(self, cr, uid, ids, vals, context=None):
400 self._try_lock(cr, uid, ids, context)
401 res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
402 self.update_running_cron(cr)
405 def unlink(self, cr, uid, ids, context=None):
406 self._try_lock(cr, uid, ids, context)
407 res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
408 self.update_running_cron(cr)
412 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: