[FIX]ir_cron: Attribute Error _strptime occurred when more than one schedule action...
[odoo/odoo.git] / openerp / addons / base / ir / ir_cron.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import calendar
23 import time
24 import logging
25 import threading
26 import psycopg2
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
29
30 import netsvc
31 import openerp
32 import pooler
33 import tools
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
39
40 _logger = logging.getLogger(__name__)
41
42 # This variable can be set by a signal handler to stop the infinite loop in
43 # ir_cron._run()
44 quit_signal_received = False
45
46 # This variable can be checked to know if ir_cron._run() is processing a job or
47 # sleeping.
48 job_in_progress = True
49
50 def str2tuple(s):
51     return eval('tuple(%s)' % (s or ''))
52
53 _intervalTypes = {
54     'work_days': lambda interval: relativedelta(days=interval),
55     'days': lambda interval: relativedelta(days=interval),
56     'hours': lambda interval: relativedelta(hours=interval),
57     'weeks': lambda interval: relativedelta(days=7*interval),
58     'months': lambda interval: relativedelta(months=interval),
59     'minutes': lambda interval: relativedelta(minutes=interval),
60 }
61
62 class ir_cron(osv.osv):
63     """ Model describing cron jobs (also called actions or tasks).
64     """
65
66     # TODO: perhaps in the future we could consider a flag on ir.cron jobs
67     # that would cause database wake-up even if the database has not been
68     # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
69     # See also openerp.cron
70
71     _name = "ir.cron"
72     _order = 'name'
73     _columns = {
74         'name': fields.char('Name', size=60, required=True),
75         'user_id': fields.many2one('res.users', 'User', required=True),
76         'active': fields.boolean('Active'),
77         'interval_number': fields.integer('Interval Number',help="Repeat every x."),
78         'interval_type': fields.selection( [('minutes', 'Minutes'),
79             ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
80         'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
81         'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
82         'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
83         'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
84         'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
85         'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
86         'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
87     }
88
89     _defaults = {
90         'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
91         'priority' : lambda *a: 5,
92         'user_id' : lambda obj,cr,uid,context: uid,
93         'interval_number' : lambda *a: 1,
94         'interval_type' : lambda *a: 'months',
95         'numbercall' : lambda *a: 1,
96         'active' : lambda *a: 1,
97         'doall' : lambda *a: 1
98     }
99
100     def _check_args(self, cr, uid, ids, context=None):
101         try:
102             for this in self.browse(cr, uid, ids, context):
103                 str2tuple(this.args)
104         except Exception:
105             return False
106         return True
107
108     _constraints = [
109         (_check_args, 'Invalid arguments', ['args']),
110     ]
111
112     def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
113         """ Method called when an exception is raised by a job.
114
115         Simply logs the exception and rollback the transaction.
116
117         :param model_name: model name on which the job method is located.
118         :param method_name: name of the method to call when this job is processed.
119         :param args: arguments of the method (without the usual self, cr, uid).
120         :param job_id: job id.
121         :param job_exception: exception raised by the job.
122
123         """
124         cr.rollback()
125         _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
126
127     def _callback(self, cr, uid, model_name, method_name, args, job_id):
128         """ Run the method associated to a given job
129
130         It takes care of logging and exception handling.
131
132         :param model_name: model name on which the job method is located.
133         :param method_name: name of the method to call when this job is processed.
134         :param args: arguments of the method (without the usual self, cr, uid).
135         :param job_id: job id.
136         """
137         args = str2tuple(args)
138         model = self.pool.get(model_name)
139         if model and hasattr(model, method_name):
140             method = getattr(model, method_name)
141             try:
142                 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
143                 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
144                 if _logger.isEnabledFor(logging.DEBUG):
145                     start_time = time.time()
146                 method(cr, uid, *args)
147                 if _logger.isEnabledFor(logging.DEBUG):
148                     end_time = time.time()
149                     _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
150             except Exception, e:
151                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
152
153     def _run_job(self, cr, job, now):
154         """ Run a given job taking care of the repetition.
155
156         The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
157         method is run in a worker thread (spawned by _run_jobs_multithread())).
158
159         :param job: job to be run (as a dictionary).
160         :param now: timestamp (result of datetime.now(), no need to call it multiple time).
161
162         """
163         try:
164             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
165             numbercall = job['numbercall']
166
167             ok = False
168             while nextcall < now and numbercall:
169                 if numbercall > 0:
170                     numbercall -= 1
171                 if not ok or job['doall']:
172                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
173                 if numbercall:
174                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
175                 ok = True
176             addsql = ''
177             if not numbercall:
178                 addsql = ', active=False'
179             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
180                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
181
182             if numbercall:
183                 # Reschedule our own main cron thread if necessary.
184                 # This is really needed if this job runs longer than its rescheduling period.
185                 nextcall = calendar.timegm(nextcall.timetuple())
186                 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
187         finally:
188             cr.commit()
189             cr.close()
190             openerp.cron.release_thread_slot()
191
192     def _run_jobs_multithread(self):
193         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
194         """ Process the cron jobs by spawning worker threads.
195
196         This selects in database all the jobs that should be processed. It then
197         tries to lock each of them and, if it succeeds, spawns a thread to run
198         the cron job (if it doesn't succeed, it means the job was already
199         locked to be taken care of by another thread).
200
201         The cursor used to lock the job in database is given to the worker
202         thread (which has to close it itself).
203
204         """
205         db = self.pool.db
206         cr = db.cursor()
207         db_name = db.dbname
208         try:
209             jobs = {} # mapping job ids to jobs for all jobs being processed.
210             now = datetime.now() 
211             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
212             cr.execute("""SELECT * FROM ir_cron
213                           WHERE numbercall != 0
214                               AND active AND nextcall <= (now() at time zone 'UTC')
215                           ORDER BY priority""")
216             for job in cr.dictfetchall():
217                 if not openerp.cron.get_thread_slots():
218                     break
219                 jobs[job['id']] = job
220
221                 task_cr = db.cursor()
222                 try:
223                     # Try to grab an exclusive lock on the job row from within the task transaction
224                     acquired_lock = False
225                     task_cr.execute("""SELECT *
226                                        FROM ir_cron
227                                        WHERE id=%s
228                                        FOR UPDATE NOWAIT""",
229                                    (job['id'],), log_exceptions=False)
230                     acquired_lock = True
231                 except psycopg2.OperationalError, e:
232                     if e.pgcode == '55P03':
233                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
234                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
235                         continue
236                     else:
237                         # Unexpected OperationalError
238                         raise
239                 finally:
240                     if not acquired_lock:
241                         # we're exiting due to an exception while acquiring the lot
242                         task_cr.close()
243                 datetime.strptime('2012-01-01', '%Y-%m-%d')
244                 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
245                 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
246                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
247                 task_thread.setDaemon(False)
248                 openerp.cron.take_thread_slot()
249                 task_thread.start()
250                 _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
251
252             # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
253             find_next_time_query = """SELECT min(nextcall) AS min_next_call
254                                       FROM ir_cron WHERE numbercall != 0 AND active""" 
255             if jobs:
256                 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
257             else:
258                 cr.execute(find_next_time_query)
259             next_call = cr.dictfetchone()['min_next_call']
260
261             if next_call:
262                 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
263             else:
264                 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
265                 # this delay will likely be modified when running jobs complete their tasks
266                 next_call = time.time() + (24*3600)
267
268             openerp.cron.schedule_wakeup(next_call, db_name)
269
270         except Exception, ex:
271             _logger.warning('Exception in cron:', exc_info=True)
272
273         finally:
274             cr.commit()
275             cr.close()
276
277     def _process_job(self, cr, job):
278         """ Run a given job taking care of the repetition.
279
280         The cursor has a lock on the job (aquired by _acquire_job()).
281
282         :param job: job to be run (as a dictionary).
283         """
284         try:
285             now = datetime.now() 
286             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
287             numbercall = job['numbercall']
288
289             ok = False
290             while nextcall < now and numbercall:
291                 if numbercall > 0:
292                     numbercall -= 1
293                 if not ok or job['doall']:
294                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
295                 if numbercall:
296                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
297                 ok = True
298             addsql = ''
299             if not numbercall:
300                 addsql = ', active=False'
301             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
302                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
303
304         finally:
305             cr.commit()
306             cr.close()
307
308     @classmethod
309     def _acquire_job(cls, db_name):
310         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
311         """ Try to process one cron job.
312
313         This selects in database all the jobs that should be processed. It then
314         tries to lock each of them and, if it succeeds, run the cron job (if it
315         doesn't succeed, it means the job was already locked to be taken care
316         of by another thread) and return.
317
318         If a job was processed, returns True, otherwise returns False.
319         """
320         db = openerp.sql_db.db_connect(db_name)
321         cr = db.cursor()
322         try:
323             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
324             cr.execute("""SELECT * FROM ir_cron
325                           WHERE numbercall != 0
326                               AND active AND nextcall <= (now() at time zone 'UTC')
327                           ORDER BY priority""")
328             for job in cr.dictfetchall():
329                 task_cr = db.cursor()
330                 try:
331                     # Try to grab an exclusive lock on the job row from within the task transaction
332                     acquired_lock = False
333                     task_cr.execute("""SELECT *
334                                        FROM ir_cron
335                                        WHERE id=%s
336                                        FOR UPDATE NOWAIT""",
337                                    (job['id'],), log_exceptions=False)
338                     acquired_lock = True
339                 except psycopg2.OperationalError, e:
340                     if e.pgcode == '55P03':
341                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
342                         _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
343                         continue
344                     else:
345                         # Unexpected OperationalError
346                         raise
347                 finally:
348                     if not acquired_lock:
349                         # we're exiting due to an exception while acquiring the lot
350                         task_cr.close()
351
352                 # Got the lock on the job row, run its code
353                 _logger.debug('Starting job `%s`.', job['name'])
354                 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
355                 registry = openerp.pooler.get_pool(db_name)
356                 registry[cls._name]._process_job(task_cr, job)
357                 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
358                 return True
359
360         except psycopg2.ProgrammingError, e:
361             if e.pgcode == '42P01':
362                 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
363                 # The table ir_cron does not exist; this is probably not an OpenERP database.
364                 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
365             else:
366                 raise
367         except Exception, ex:
368             _logger.warning('Exception in cron:', exc_info=True)
369
370         finally:
371             cr.commit()
372             cr.close()
373
374         return False
375
376     @classmethod
377     def _run(cls, db_names):
378         """
379         Class method intended to be run in a dedicated process to handle jobs.
380         This polls the database for jobs that can be run every 60 seconds.
381
382         :param db_names: list of database names to poll or callable to
383             generate such a list.
384         """
385         global quit_signal_received
386         while not quit_signal_received:
387             if callable(db_names):
388                 names = db_names()
389             else:
390                 names = db_names
391             for x in xrange(5):
392                 if quit_signal_received:
393                     return
394                 t1 = time.time()
395                 for db_name in names:
396                     while True:
397                         # Small hack to re-use the openerp-server config:
398                         # If the cpu_time_limit has not its default value, we
399                         # truly want to establish limits.
400                         if openerp.tools.config['cpu_time_limit'] != 60:
401                             openerp.wsgi.core.pre_request('dummy', 'dummy')
402                         acquired = cls._acquire_job(db_name)
403                         if openerp.tools.config['cpu_time_limit'] != 60:
404                             class W(object):
405                                 alive = True
406                             worker = W()
407                             openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
408                             if not worker.alive:
409                                 return
410                         if not acquired:
411                             break
412                         if quit_signal_received:
413                             return
414                 t2 = time.time()
415                 t = t2 - t1
416                 global job_in_progress
417                 if t > 60:
418                     _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
419                 else:
420                     job_in_progress = False
421                     time.sleep(60 - t)
422                     job_in_progress = True
423
424     def update_running_cron(self, cr):
425         """ Schedule as soon as possible a wake-up for this database. """
426         # Verify whether the server is already started and thus whether we need to commit
427         # immediately our changes and restart the cron agent in order to apply the change
428         # immediately. The commit() is needed because as soon as the cron is (re)started it
429         # will query the database with its own cursor, possibly before the end of the
430         # current transaction.
431         # This commit() is not an issue in most cases, but we must absolutely avoid it
432         # when the server is only starting or loading modules (hence the test on pool._init).
433         if not self.pool._init:
434             cr.commit()
435             openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
436
437     def _try_lock(self, cr, uid, ids, context=None):
438         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
439            to make sure a following write() or unlink() will not block due
440            to a process currently executing those cron tasks"""
441         try:
442             cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
443                        (tuple(ids),), log_exceptions=False)
444         except psycopg2.OperationalError:
445             cr.rollback() # early rollback to allow translations to work for the user feedback
446             raise osv.except_osv(_("Record cannot be modified right now"),
447                                  _("This cron task is currently being executed and may not be modified, "
448                                   "please try again in a few minutes"))
449
450     def create(self, cr, uid, vals, context=None):
451         res = super(ir_cron, self).create(cr, uid, vals, context=context)
452         self.update_running_cron(cr)
453         return res
454
455     def write(self, cr, uid, ids, vals, context=None):
456         self._try_lock(cr, uid, ids, context)
457         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
458         self.update_running_cron(cr)
459         return res
460
461     def unlink(self, cr, uid, ids, context=None):
462         self._try_lock(cr, uid, ids, context)
463         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
464         self.update_running_cron(cr)
465         return res
466 ir_cron()
467
468 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: