Launchpad automatic translations update.
[odoo/odoo.git] / openerp / addons / base / ir / ir_cron.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    OpenERP, Open Source Management Solution
5 #    Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import calendar
23 import time
24 import logging
25 import threading
26 import psycopg2
27 from datetime import datetime
28 from dateutil.relativedelta import relativedelta
29
30 import netsvc
31 import openerp
32 import pooler
33 import tools
34 from openerp.cron import WAKE_UP_NOW
35 from osv import fields, osv
36 from tools import DEFAULT_SERVER_DATETIME_FORMAT
37 from tools.safe_eval import safe_eval as eval
38 from tools.translate import _
39
40 def str2tuple(s):
41     return eval('tuple(%s)' % (s or ''))
42
43 _intervalTypes = {
44     'work_days': lambda interval: relativedelta(days=interval),
45     'days': lambda interval: relativedelta(days=interval),
46     'hours': lambda interval: relativedelta(hours=interval),
47     'weeks': lambda interval: relativedelta(days=7*interval),
48     'months': lambda interval: relativedelta(months=interval),
49     'minutes': lambda interval: relativedelta(minutes=interval),
50 }
51
52 class ir_cron(osv.osv):
53     """ Model describing cron jobs (also called actions or tasks).
54     """
55
56     # TODO: perhaps in the future we could consider a flag on ir.cron jobs
57     # that would cause database wake-up even if the database has not been
58     # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
59     # See also openerp.cron
60
61     _name = "ir.cron"
62     _order = 'name'
63     _columns = {
64         'name': fields.char('Name', size=60, required=True),
65         'user_id': fields.many2one('res.users', 'User', required=True),
66         'active': fields.boolean('Active'),
67         'interval_number': fields.integer('Interval Number',help="Repeat every x."),
68         'interval_type': fields.selection( [('minutes', 'Minutes'),
69             ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
70         'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
71         'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
72         'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
73         'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
74         'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
75         'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
76         'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
77     }
78
79     _defaults = {
80         'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
81         'priority' : lambda *a: 5,
82         'user_id' : lambda obj,cr,uid,context: uid,
83         'interval_number' : lambda *a: 1,
84         'interval_type' : lambda *a: 'months',
85         'numbercall' : lambda *a: 1,
86         'active' : lambda *a: 1,
87         'doall' : lambda *a: 1
88     }
89
90     _logger = logging.getLogger('cron')
91
92     def _check_args(self, cr, uid, ids, context=None):
93         try:
94             for this in self.browse(cr, uid, ids, context):
95                 str2tuple(this.args)
96         except Exception:
97             return False
98         return True
99
100     _constraints = [
101         (_check_args, 'Invalid arguments', ['args']),
102     ]
103
104     def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
105         """ Method called when an exception is raised by a job.
106
107         Simply logs the exception and rollback the transaction.
108
109         :param model_name: model name on which the job method is located.
110         :param method_name: name of the method to call when this job is processed.
111         :param args: arguments of the method (without the usual self, cr, uid).
112         :param job_id: job id.
113         :param job_exception: exception raised by the job.
114
115         """
116         cr.rollback()
117         self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
118
119     def _callback(self, cr, uid, model_name, method_name, args, job_id):
120         """ Run the method associated to a given job
121
122         It takes care of logging and exception handling.
123
124         :param model_name: model name on which the job method is located.
125         :param method_name: name of the method to call when this job is processed.
126         :param args: arguments of the method (without the usual self, cr, uid).
127         :param job_id: job id.
128         """
129         args = str2tuple(args)
130         model = self.pool.get(model_name)
131         if model and hasattr(model, method_name):
132             method = getattr(model, method_name)
133             try:
134                 netsvc.log('cron', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), channel=logging.DEBUG,
135                             depth=(None if self._logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='object.execute')
136                 logger = logging.getLogger('execution time')
137                 if logger.isEnabledFor(logging.DEBUG):
138                     start_time = time.time()
139                 method(cr, uid, *args)
140                 if logger.isEnabledFor(logging.DEBUG):
141                     end_time = time.time()
142                     logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
143             except Exception, e:
144                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
145
146     def _run_job(self, cr, job, now):
147         """ Run a given job taking care of the repetition.
148
149         The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
150         method is run in a worker thread (spawned by _run_jobs_multithread())).
151
152         :param job: job to be run (as a dictionary).
153         :param now: timestamp (result of datetime.now(), no need to call it multiple time).
154
155         """
156         try:
157             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
158             numbercall = job['numbercall']
159
160             ok = False
161             while nextcall < now and numbercall:
162                 if numbercall > 0:
163                     numbercall -= 1
164                 if not ok or job['doall']:
165                     self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
166                 if numbercall:
167                     nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
168                 ok = True
169             addsql = ''
170             if not numbercall:
171                 addsql = ', active=False'
172             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
173                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
174
175             if numbercall:
176                 # Reschedule our own main cron thread if necessary.
177                 # This is really needed if this job runs longer than its rescheduling period.
178                 nextcall = calendar.timegm(nextcall.timetuple())
179                 openerp.cron.schedule_wakeup(nextcall, cr.dbname)
180         finally:
181             cr.commit()
182             cr.close()
183             openerp.cron.release_thread_slot()
184
185     def _run_jobs_multithread(self):
186         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
187         """ Process the cron jobs by spawning worker threads.
188
189         This selects in database all the jobs that should be processed. It then
190         tries to lock each of them and, if it succeeds, spawns a thread to run
191         the cron job (if it doesn't succeed, it means the job was already
192         locked to be taken care of by another thread).
193
194         The cursor used to lock the job in database is given to the worker
195         thread (which has to close it itself).
196
197         """
198         db = self.pool.db
199         cr = db.cursor()
200         db_name = db.dbname
201         try:
202             jobs = {} # mapping job ids to jobs for all jobs being processed.
203             now = datetime.now() 
204             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
205             cr.execute("""SELECT * FROM ir_cron
206                           WHERE numbercall != 0
207                               AND active AND nextcall <= (now() at time zone 'UTC')
208                           ORDER BY priority""")
209             for job in cr.dictfetchall():
210                 if not openerp.cron.get_thread_slots():
211                     break
212                 jobs[job['id']] = job
213
214                 task_cr = db.cursor()
215                 try:
216                     # Try to grab an exclusive lock on the job row from within the task transaction
217                     acquired_lock = False
218                     task_cr.execute("""SELECT *
219                                        FROM ir_cron
220                                        WHERE id=%s
221                                        FOR UPDATE NOWAIT""",
222                                    (job['id'],), log_exceptions=False)
223                     acquired_lock = True
224                 except psycopg2.OperationalError, e:
225                     if e.pgcode == '55P03':
226                         # Class 55: Object not in prerequisite state; 55P03: lock_not_available
227                         self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
228                         continue
229                     else:
230                         # Unexpected OperationalError
231                         raise
232                 finally:
233                     if not acquired_lock:
234                         # we're exiting due to an exception while acquiring the lot
235                         task_cr.close()
236
237                 # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
238                 task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
239                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
240                 task_thread.setDaemon(False)
241                 openerp.cron.take_thread_slot()
242                 task_thread.start()
243                 self._logger.debug('Cron execution thread for job `%s` spawned', job['name'])
244
245             # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
246             find_next_time_query = """SELECT min(nextcall) AS min_next_call
247                                       FROM ir_cron WHERE numbercall != 0 AND active""" 
248             if jobs:
249                 cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
250             else:
251                 cr.execute(find_next_time_query)
252             next_call = cr.dictfetchone()['min_next_call']
253
254             if next_call:
255                 next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
256             else:
257                 # no matching cron job found in database, re-schedule arbitrarily in 1 day,
258                 # this delay will likely be modified when running jobs complete their tasks
259                 next_call = time.time() + (24*3600)
260
261             openerp.cron.schedule_wakeup(next_call, db_name)
262
263         except Exception, ex:
264             self._logger.warning('Exception in cron:', exc_info=True)
265
266         finally:
267             cr.commit()
268             cr.close()
269
270     def update_running_cron(self, cr):
271         """ Schedule as soon as possible a wake-up for this database. """
272         # Verify whether the server is already started and thus whether we need to commit
273         # immediately our changes and restart the cron agent in order to apply the change
274         # immediately. The commit() is needed because as soon as the cron is (re)started it
275         # will query the database with its own cursor, possibly before the end of the
276         # current transaction.
277         # This commit() is not an issue in most cases, but we must absolutely avoid it
278         # when the server is only starting or loading modules (hence the test on pool._init).
279         if not self.pool._init:
280             cr.commit()
281             openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
282
283     def _try_lock(self, cr, uid, ids, context=None):
284         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
285            to make sure a following write() or unlink() will not block due
286            to a process currently executing those cron tasks"""
287         try:
288             cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
289                        (tuple(ids),), log_exceptions=False)
290         except psycopg2.OperationalError:
291             cr.rollback() # early rollback to allow translations to work for the user feedback
292             raise osv.except_osv(_("Record cannot be modified right now"),
293                                  _("This cron task is currently being executed and may not be modified, "
294                                   "please try again in a few minutes"))
295
296     def create(self, cr, uid, vals, context=None):
297         res = super(ir_cron, self).create(cr, uid, vals, context=context)
298         self.update_running_cron(cr)
299         return res
300
301     def write(self, cr, uid, ids, vals, context=None):
302         self._try_lock(cr, uid, ids, context)
303         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
304         self.update_running_cron(cr)
305         return res
306
307     def unlink(self, cr, uid, ids, context=None):
308         self._try_lock(cr, uid, ids, context)
309         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
310         self.update_running_cron(cr)
311         return res
312 ir_cron()
313
314 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: