1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ##############################################################################
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
41 return eval('tuple(%s)' % (s or ''))
44 'work_days': lambda interval: relativedelta(days=interval),
45 'days': lambda interval: relativedelta(days=interval),
46 'hours': lambda interval: relativedelta(hours=interval),
47 'weeks': lambda interval: relativedelta(days=7*interval),
48 'months': lambda interval: relativedelta(months=interval),
49 'minutes': lambda interval: relativedelta(minutes=interval),
52 class ir_cron(osv.osv):
53 """ Model describing cron jobs (also called actions or tasks).
56 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
57 # that would cause database wake-up even if the database has not been
58 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
59 # See also openerp.cron
64 'name': fields.char('Name', size=60, required=True),
65 'user_id': fields.many2one('res.users', 'User', required=True),
66 'active': fields.boolean('Active'),
67 'interval_number': fields.integer('Interval Number',help="Repeat every x."),
68 'interval_type': fields.selection( [('minutes', 'Minutes'),
69 ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
70 'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
71 'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
72 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
73 'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
74 'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
75 'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
76 'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
80 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
81 'priority' : lambda *a: 5,
82 'user_id' : lambda obj,cr,uid,context: uid,
83 'interval_number' : lambda *a: 1,
84 'interval_type' : lambda *a: 'months',
85 'numbercall' : lambda *a: 1,
86 'active' : lambda *a: 1,
87 'doall' : lambda *a: 1
90 _logger = logging.getLogger('cron')
92 def _check_args(self, cr, uid, ids, context=None):
94 for this in self.browse(cr, uid, ids, context):
101 (_check_args, 'Invalid arguments', ['args']),
104 def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
105 """ Method called when an exception is raised by a job.
107 Simply logs the exception and rollback the transaction.
109 :param model_name: model name on which the job method is located.
110 :param method_name: name of the method to call when this job is processed.
111 :param args: arguments of the method (without the usual self, cr, uid).
112 :param job_id: job id.
113 :param job_exception: exception raised by the job.
117 self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
119 def _callback(self, cr, uid, model_name, method_name, args, job_id):
120 """ Run the method associated to a given job
122 It takes care of logging and exception handling.
124 :param model_name: model name on which the job method is located.
125 :param method_name: name of the method to call when this job is processed.
126 :param args: arguments of the method (without the usual self, cr, uid).
127 :param job_id: job id.
129 args = str2tuple(args)
130 model = self.pool.get(model_name)
131 if model and hasattr(model, method_name):
132 method = getattr(model, method_name)
134 netsvc.log('cron', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), channel=logging.DEBUG,
135 depth=(None if self._logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='object.execute')
136 logger = logging.getLogger('execution time')
137 if logger.isEnabledFor(logging.DEBUG):
138 start_time = time.time()
139 method(cr, uid, *args)
140 if logger.isEnabledFor(logging.DEBUG):
141 end_time = time.time()
142 logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
144 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
146 def _run_job(self, cr, job, now):
147 """ Run a given job taking care of the repetition.
149 The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
150 method is run in a worker thread (spawned by _run_jobs_multithread())).
152 :param job: job to be run (as a dictionary).
153 :param now: timestamp (result of datetime.now(), no need to call it multiple time).
157 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
158 numbercall = job['numbercall']
161 while nextcall < now and numbercall:
164 if not ok or job['doall']:
165 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
167 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
171 addsql = ', active=False'
172 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
173 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
176 # Reschedule our own main cron thread if necessary.
177 # This is really needed if this job runs longer than its rescheduling period.
178 nextcall = calendar.timegm(nextcall.timetuple())
179 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
183 openerp.cron.release_thread_slot()
185 def _run_jobs_multithread(self):
186 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
187 """ Process the cron jobs by spawning worker threads.
189 This selects in database all the jobs that should be processed. It then
190 tries to lock each of them and, if it succeeds, spawns a thread to run
191 the cron job (if it doesn't succeed, it means the job was already
192 locked to be taken care of by another thread).
194 The cursor used to lock the job in database is given to the worker
195 thread (which has to close it itself).
202 jobs = {} # mapping job ids to jobs for all jobs being processed.
204 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
205 cr.execute("""SELECT * FROM ir_cron
206 WHERE numbercall != 0
207 AND active AND nextcall <= (now() at time zone 'UTC')
208 ORDER BY priority""")
209 for job in cr.dictfetchall():
210 if not openerp.cron.get_thread_slots():
212 jobs[job['id']] = job
214 task_cr = db.cursor()
216 # Try to grab an exclusive lock on the job row from within the task transaction
217 acquired_lock = False
218 task_cr.execute("""SELECT *
221 FOR UPDATE NOWAIT""",
222 (job['id'],), log_exceptions=False)
224 except psycopg2.OperationalError, e:
225 if e.pgcode == '55P03':
226 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
227 self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
230 # Unexpected OperationalError
233 if not acquired_lock:
234 # we're exiting due to an exception while acquiring the lot
237 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
238 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
239 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
240 task_thread.setDaemon(False)
241 openerp.cron.take_thread_slot()
243 self._logger.debug('Cron execution thread for job `%s` spawned', job['name'])
245 # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
246 find_next_time_query = """SELECT min(nextcall) AS min_next_call
247 FROM ir_cron WHERE numbercall != 0 AND active"""
249 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
251 cr.execute(find_next_time_query)
252 next_call = cr.dictfetchone()['min_next_call']
255 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
257 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
258 # this delay will likely be modified when running jobs complete their tasks
259 next_call = time.time() + (24*3600)
261 openerp.cron.schedule_wakeup(next_call, db_name)
263 except Exception, ex:
264 self._logger.warning('Exception in cron:', exc_info=True)
270 def update_running_cron(self, cr):
271 """ Schedule as soon as possible a wake-up for this database. """
272 # Verify whether the server is already started and thus whether we need to commit
273 # immediately our changes and restart the cron agent in order to apply the change
274 # immediately. The commit() is needed because as soon as the cron is (re)started it
275 # will query the database with its own cursor, possibly before the end of the
276 # current transaction.
277 # This commit() is not an issue in most cases, but we must absolutely avoid it
278 # when the server is only starting or loading modules (hence the test on pool._init).
279 if not self.pool._init:
281 openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
283 def _try_lock(self, cr, uid, ids, context=None):
284 """Try to grab a dummy exclusive write-lock to the rows with the given ids,
285 to make sure a following write() or unlink() will not block due
286 to a process currently executing those cron tasks"""
288 cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
289 (tuple(ids),), log_exceptions=False)
290 except psycopg2.OperationalError:
291 cr.rollback() # early rollback to allow translations to work for the user feedback
292 raise osv.except_osv(_("Record cannot be modified right now"),
293 _("This cron task is currently being executed and may not be modified, "
294 "please try again in a few minutes"))
296 def create(self, cr, uid, vals, context=None):
297 res = super(ir_cron, self).create(cr, uid, vals, context=context)
298 self.update_running_cron(cr)
301 def write(self, cr, uid, ids, vals, context=None):
302 self._try_lock(cr, uid, ids, context)
303 res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
304 self.update_running_cron(cr)
307 def unlink(self, cr, uid, ids, context=None):
308 self._try_lock(cr, uid, ids, context)
309 res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
310 self.update_running_cron(cr)
314 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: