[MERGE] OPW 581239: cron: fix _strptime import errors when multiple scheduled actions...
[odoo/odoo.git] / openerp / addons / base / ir / ir_cron.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import calendar
23 import time
24 import logging
25 import threading
26 import psycopg2
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
29
30 import netsvc
31 import openerp
32 import pooler
33 import tools
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
39
40 _logger = logging.getLogger(__name__)
41
42 # This variable can be set by a signal handler to stop the infinite loop in
43 # ir_cron._run()
44 quit_signal_received = False
45
46 # This variable can be checked to know if ir_cron._run() is processing a job or
47 # sleeping.
48 job_in_progress = True
49
50 def str2tuple(s):
51     return eval('tuple(%s)' % (s or ''))
52
53 _intervalTypes = {
54     'work_days': lambda interval: relativedelta(days=interval),
55     'days': lambda interval: relativedelta(days=interval),
56     'hours': lambda interval: relativedelta(hours=interval),
57     'weeks': lambda interval: relativedelta(days=7*interval),
58     'months': lambda interval: relativedelta(months=interval),
59     'minutes': lambda interval: relativedelta(minutes=interval),
60 }
61
62 class ir_cron(osv.osv):
63     """ Model describing cron jobs (also called actions or tasks).
64     """
65
66     # TODO: perhaps in the future we could consider a flag on ir.cron jobs
67     # that would cause database wake-up even if the database has not been
68     # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
69     # See also openerp.cron
70
71     _name = "ir.cron"
72     _order = 'name'
73     _columns = {
74         'name': fields.char('Name', size=60, required=True),
75         'user_id': fields.many2one('res.users', 'User', required=True),
76         'active': fields.boolean('Active'),
77         'interval_number': fields.integer('Interval Number',help="Repeat every x."),
78         'interval_type': fields.selection( [('minutes', 'Minutes'),
79             ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
80         'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
81         'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
82         'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
83         'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
84         'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
85         'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
86         'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
87     }
88
89     _defaults = {
90         'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
91         'priority' : lambda *a: 5,
92         'user_id' : lambda obj,cr,uid,context: uid,
93         'interval_number' : lambda *a: 1,
94         'interval_type' : lambda *a: 'months',
95         'numbercall' : lambda *a: 1,
96         'active' : lambda *a: 1,
97         'doall' : lambda *a: 1
98     }
99
100     def _check_args(self, cr, uid, ids, context=None):
101         try:
102             for this in self.browse(cr, uid, ids, context):
103                 str2tuple(this.args)
104         except Exception:
105             return False
106         return True
107
108     _constraints = [
109         (_check_args, 'Invalid arguments', ['args']),
110     ]
111
112     def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
113         """ Method called when an exception is raised by a job.
114
115         Simply logs the exception and rollback the transaction.
116
117         :param model_name: model name on which the job method is located.
118         :param method_name: name of the method to call when this job is processed.
119         :param args: arguments of the method (without the usual self, cr, uid).
120         :param job_id: job id.
121         :param job_exception: exception raised by the job.
122
123         """
124         cr.rollback()
125         _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
126
127     def _callback(self, cr, uid, model_name, method_name, args, job_id):
128         """ Run the method associated to a given job
129
130         It takes care of logging and exception handling.
131
132         :param model_name: model name on which the job method is located.
133         :param method_name: name of the method to call when this job is processed.
134         :param args: arguments of the method (without the usual self, cr, uid).
135         :param job_id: job id.
136         """
137         args = str2tuple(args)
138         model = self.pool.get(model_name)
139         if model and hasattr(model, method_name):
140             method = getattr(model, method_name)
141             try:
142                 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
143                 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
144                 if _logger.isEnabledFor(logging.DEBUG):
145                     start_time = time.time()
146                 method(cr, uid, *args)
147                 if _logger.isEnabledFor(logging.DEBUG):
148                     end_time = time.time()
149                     _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
150             except Exception, e:
151                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
152
153     def _run_job(self, cr, job, now):
154         """ Run a given job taking care of the repetition.
155
156         The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
157         method is run in a worker thread (spawned by _run_jobs_multithread())).
158
159         :param job: job to be run (as a dictionary).
160         :param now: timestamp (result of datetime.now(), no need to call it multiple time).
161
162         """
163         try:
164             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
165             numbercall = job['numbercall']
166
167             ok = False
168             while nextcall < now and numbercall:
169                 if numbercall > 0:
170                     numbercall -= 1
171                 if not ok or job['doall']:
172                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
173                 if numbercall:
174                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
175                 ok = True
176             addsql = ''
177             if not numbercall:
178                 addsql = ', active=False'
179             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
180                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
181
182             if numbercall:
183                 # Reschedule our own main cron thread if necessary.
184                 # This is really needed if this job runs longer than its rescheduling period.
185                 nextcall = calendar.timegm(nextcall.timetuple())
186                 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
187         finally:
188             cr.commit()
189             cr.close()
190             openerp.cron.release_thread_slot()
191
192     def _run_jobs_multithread(self):
193         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
194         """ Process the cron jobs by spawning worker threads.
195
196         This selects in database all the jobs that should be processed. It then
197         tries to lock each of them and, if it succeeds, spawns a thread to run
198         the cron job (if it doesn't succeed, it means the job was already
199         locked to be taken care of by another thread).
200
201         The cursor used to lock the job in database is given to the worker
202         thread (which has to close it itself).
203
204         """
205         db = self.pool.db
206         cr = db.cursor()
207         db_name = db.dbname
208         try:
209             jobs = {} # mapping job ids to jobs for all jobs being processed.
210             now = datetime.now() 
211             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
212             cr.execute("""SELECT * FROM ir_cron
213                           WHERE numbercall != 0
214                               AND active AND nextcall <= (now() at time zone 'UTC')
215                           ORDER BY priority""")
216             for job in cr.dictfetchall():
217                 if not openerp.cron.get_thread_slots():
218                     break
219                 jobs[job['id']] = job
220
221                 task_cr = db.cursor()
222                 try:
223                     # Try to grab an exclusive lock on the job row from within the task transaction
224                     acquired_lock = False
225                     task_cr.execute("""SELECT *
226                                        FROM ir_cron
227                                        WHERE id=%s
228                                        FOR UPDATE NOWAIT""",
229                                    (job['id'],), log_exceptions=False)
230                     acquired_lock = True
231                 except psycopg2.OperationalError, e:
232                     if e.pgcode == '55P03':
233                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
234                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
235                         continue
236                     else:
237                         # Unexpected OperationalError
238                         raise
239                 finally:
240                     if not acquired_lock:
241                         # we're exiting due to an exception while acquiring the lot
242                         task_cr.close()
243
244                 # Force call to strptime just before starting the cron thread
245                 # to prevent time.strptime AttributeError within the thread.
246                 # See: http://bugs.python.org/issue7980
247                 datetime.strptime('2012-01-01', '%Y-%m-%d')
248
249                 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
250                 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
251                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
252                 task_thread.setDaemon(False)
253                 openerp.cron.take_thread_slot()
254                 task_thread.start()
255                 _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
256
257             # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
258             find_next_time_query = """SELECT min(nextcall) AS min_next_call
259                                       FROM ir_cron WHERE numbercall != 0 AND active""" 
260             if jobs:
261                 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
262             else:
263                 cr.execute(find_next_time_query)
264             next_call = cr.dictfetchone()['min_next_call']
265
266             if next_call:
267                 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
268             else:
269                 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
270                 # this delay will likely be modified when running jobs complete their tasks
271                 next_call = time.time() + (24*3600)
272
273             openerp.cron.schedule_wakeup(next_call, db_name)
274
275         except Exception, ex:
276             _logger.warning('Exception in cron:', exc_info=True)
277
278         finally:
279             cr.commit()
280             cr.close()
281
282     def _process_job(self, cr, job):
283         """ Run a given job taking care of the repetition.
284
285         The cursor has a lock on the job (aquired by _acquire_job()).
286
287         :param job: job to be run (as a dictionary).
288         """
289         try:
290             now = datetime.now() 
291             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
292             numbercall = job['numbercall']
293
294             ok = False
295             while nextcall < now and numbercall:
296                 if numbercall > 0:
297                     numbercall -= 1
298                 if not ok or job['doall']:
299                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
300                 if numbercall:
301                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
302                 ok = True
303             addsql = ''
304             if not numbercall:
305                 addsql = ', active=False'
306             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
307                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
308
309         finally:
310             cr.commit()
311             cr.close()
312
313     @classmethod
314     def _acquire_job(cls, db_name):
315         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
316         """ Try to process one cron job.
317
318         This selects in database all the jobs that should be processed. It then
319         tries to lock each of them and, if it succeeds, run the cron job (if it
320         doesn't succeed, it means the job was already locked to be taken care
321         of by another thread) and return.
322
323         If a job was processed, returns True, otherwise returns False.
324         """
325         db = openerp.sql_db.db_connect(db_name)
326         cr = db.cursor()
327         try:
328             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
329             cr.execute("""SELECT * FROM ir_cron
330                           WHERE numbercall != 0
331                               AND active AND nextcall <= (now() at time zone 'UTC')
332                           ORDER BY priority""")
333             for job in cr.dictfetchall():
334                 task_cr = db.cursor()
335                 try:
336                     # Try to grab an exclusive lock on the job row from within the task transaction
337                     acquired_lock = False
338                     task_cr.execute("""SELECT *
339                                        FROM ir_cron
340                                        WHERE id=%s
341                                        FOR UPDATE NOWAIT""",
342                                    (job['id'],), log_exceptions=False)
343                     acquired_lock = True
344                 except psycopg2.OperationalError, e:
345                     if e.pgcode == '55P03':
346                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
347                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
348                         continue
349                     else:
350                         # Unexpected OperationalError
351                         raise
352                 finally:
353                     if not acquired_lock:
354                         # we're exiting due to an exception while acquiring the lot
355                         task_cr.close()
356
357                 # Got the lock on the job row, run its code
358                 _logger.debug('Starting job `%s`.', job['name'])
359                 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
360                 registry = openerp.pooler.get_pool(db_name)
361                 registry[cls._name]._process_job(task_cr, job)
362                 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
363                 return True
364
365         except psycopg2.ProgrammingError, e:
366             if e.pgcode == '42P01':
367                 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
368                 # The table ir_cron does not exist; this is probably not an OpenERP database.
369                 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
370             else:
371                 raise
372         except Exception, ex:
373             _logger.warning('Exception in cron:', exc_info=True)
374
375         finally:
376             cr.commit()
377             cr.close()
378
379         return False
380
381     @classmethod
382     def _run(cls, db_names):
383         """
384         Class method intended to be run in a dedicated process to handle jobs.
385         This polls the database for jobs that can be run every 60 seconds.
386
387         :param db_names: list of database names to poll or callable to
388             generate such a list.
389         """
390         global quit_signal_received
391         while not quit_signal_received:
392             if callable(db_names):
393                 names = db_names()
394             else:
395                 names = db_names
396             for x in xrange(5):
397                 if quit_signal_received:
398                     return
399                 t1 = time.time()
400                 for db_name in names:
401                     while True:
402                         # Small hack to re-use the openerp-server config:
403                         # If the cpu_time_limit has not its default value, we
404                         # truly want to establish limits.
405                         if openerp.tools.config['cpu_time_limit'] != 60:
406                             openerp.wsgi.core.pre_request('dummy', 'dummy')
407                         acquired = cls._acquire_job(db_name)
408                         if openerp.tools.config['cpu_time_limit'] != 60:
409                             class W(object):
410                                 alive = True
411                             worker = W()
412                             openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
413                             if not worker.alive:
414                                 return
415                         if not acquired:
416                             break
417                         if quit_signal_received:
418                             return
419                 t2 = time.time()
420                 t = t2 - t1
421                 global job_in_progress
422                 if t > 60:
423                     _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
424                 else:
425                     job_in_progress = False
426                     time.sleep(60 - t)
427                     job_in_progress = True
428
429     def update_running_cron(self, cr):
430         """ Schedule as soon as possible a wake-up for this database. """
431         # Verify whether the server is already started and thus whether we need to commit
432         # immediately our changes and restart the cron agent in order to apply the change
433         # immediately. The commit() is needed because as soon as the cron is (re)started it
434         # will query the database with its own cursor, possibly before the end of the
435         # current transaction.
436         # This commit() is not an issue in most cases, but we must absolutely avoid it
437         # when the server is only starting or loading modules (hence the test on pool._init).
438         if not self.pool._init:
439             cr.commit()
440             openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
441
442     def _try_lock(self, cr, uid, ids, context=None):
443         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
444            to make sure a following write() or unlink() will not block due
445            to a process currently executing those cron tasks"""
446         try:
447             cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
448                        (tuple(ids),), log_exceptions=False)
449         except psycopg2.OperationalError:
450             cr.rollback() # early rollback to allow translations to work for the user feedback
451             raise osv.except_osv(_("Record cannot be modified right now"),
452                                  _("This cron task is currently being executed and may not be modified, "
453                                   "please try again in a few minutes"))
454
455     def create(self, cr, uid, vals, context=None):
456         res = super(ir_cron, self).create(cr, uid, vals, context=context)
457         self.update_running_cron(cr)
458         return res
459
460     def write(self, cr, uid, ids, vals, context=None):
461         self._try_lock(cr, uid, ids, context)
462         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
463         self.update_running_cron(cr)
464         return res
465
466     def unlink(self, cr, uid, ids, context=None):
467         self._try_lock(cr, uid, ids, context)
468         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
469         self.update_running_cron(cr)
470         return res
471 ir_cron()
472
473 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: