1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # OpenERP, Open Source Management Solution
5 # Copyright (C) 2004-TODAY OpenERP S.A. <http://www.openerp.com>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ##############################################################################
25 from datetime import datetime
26 from dateutil.relativedelta import relativedelta
29 from openerp import SUPERUSER_ID, netsvc, api
30 from openerp.osv import fields, osv
31 from openerp.tools import DEFAULT_SERVER_DATETIME_FORMAT
32 from openerp.tools.safe_eval import safe_eval as eval
33 from openerp.tools.translate import _
34 from openerp.modules import load_information_from_description_file
36 _logger = logging.getLogger(__name__)
38 BASE_VERSION = load_information_from_description_file('base')['version']
41 return eval('tuple(%s)' % (s or ''))
44 'work_days': lambda interval: relativedelta(days=interval),
45 'days': lambda interval: relativedelta(days=interval),
46 'hours': lambda interval: relativedelta(hours=interval),
47 'weeks': lambda interval: relativedelta(days=7*interval),
48 'months': lambda interval: relativedelta(months=interval),
49 'minutes': lambda interval: relativedelta(minutes=interval),
52 class ir_cron(osv.osv):
53 """ Model describing cron jobs (also called actions or tasks).
56 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
57 # that would cause database wake-up even if the database has not been
58 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
59 # See also openerp.cron
64 'name': fields.char('Name', required=True),
65 'user_id': fields.many2one('res.users', 'User', required=True),
66 'active': fields.boolean('Active'),
67 'interval_number': fields.integer('Interval Number',help="Repeat every x."),
68 'interval_type': fields.selection( [('minutes', 'Minutes'),
69 ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
70 'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
71 'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
72 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
73 'model': fields.char('Object', help="Model name on which the method to be called is located, e.g. 'res.partner'."),
74 'function': fields.char('Method', help="Name of the method to be called when this job is processed."),
75 'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
76 'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
80 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
82 'user_id' : lambda obj,cr,uid,context: uid,
83 'interval_number' : 1,
84 'interval_type' : 'months',
90 def _check_args(self, cr, uid, ids, context=None):
92 for this in self.browse(cr, uid, ids, context):
99 (_check_args, 'Invalid arguments', ['args']),
102 def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
103 """ Method called when an exception is raised by a job.
105 Simply logs the exception and rollback the transaction.
107 :param model_name: model name on which the job method is located.
108 :param method_name: name of the method to call when this job is processed.
109 :param args: arguments of the method (without the usual self, cr, uid).
110 :param job_id: job id.
111 :param job_exception: exception raised by the job.
115 _logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
117 def _callback(self, cr, uid, model_name, method_name, args, job_id):
118 """ Run the method associated to a given job
120 It takes care of logging and exception handling.
122 :param model_name: model name on which the job method is located.
123 :param method_name: name of the method to call when this job is processed.
124 :param args: arguments of the method (without the usual self, cr, uid).
125 :param job_id: job id.
128 args = str2tuple(args)
129 openerp.modules.registry.RegistryManager.check_registry_signaling(cr.dbname)
130 registry = openerp.registry(cr.dbname)
131 if model_name in registry:
132 model = registry[model_name]
133 if hasattr(model, method_name):
134 log_depth = (None if _logger.isEnabledFor(logging.DEBUG) else 1)
135 netsvc.log(_logger, logging.DEBUG, 'cron.object.execute', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), depth=log_depth)
136 if _logger.isEnabledFor(logging.DEBUG):
137 start_time = time.time()
138 getattr(model, method_name)(cr, uid, *args)
139 if _logger.isEnabledFor(logging.DEBUG):
140 end_time = time.time()
141 _logger.debug('%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
142 openerp.modules.registry.RegistryManager.signal_caches_change(cr.dbname)
144 msg = "Method `%s.%s` does not exist." % (model_name, method_name)
147 msg = "Model `%s` does not exist." % model_name
150 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
152 def _process_job(self, cr, job, cron_cr):
153 """ Run a given job taking care of the repetition.
155 :param cr: cursor to use to execute the job, safe to commit/rollback
156 :param job: job to be run (as a dictionary).
157 :param cron_cr: cursor holding lock on the cron job row, to use to update the next exec date,
158 must not be committed/rolled back!
161 with api.Environment.manage():
163 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
164 numbercall = job['numbercall']
167 while nextcall < now and numbercall:
170 if not ok or job['doall']:
171 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
173 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
177 addsql = ', active=False'
178 cron_cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
179 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
180 self.invalidate_cache(cr, SUPERUSER_ID)
187 def _acquire_job(cls, db_name):
188 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
189 """ Try to process one cron job.
191 This selects in database all the jobs that should be processed. It then
192 tries to lock each of them and, if it succeeds, run the cron job (if it
193 doesn't succeed, it means the job was already locked to be taken care
194 of by another thread) and return.
196 If a job was processed, returns True, otherwise returns False.
198 db = openerp.sql_db.db_connect(db_name)
199 threading.current_thread().dbname = db_name
203 # Make sure the database we poll has the same version as the code of base
204 cr.execute("SELECT 1 FROM ir_module_module WHERE name=%s AND latest_version=%s", ('base', BASE_VERSION))
206 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
207 cr.execute("""SELECT * FROM ir_cron
208 WHERE numbercall != 0
209 AND active AND nextcall <= (now() at time zone 'UTC')
210 ORDER BY priority""")
211 jobs = cr.dictfetchall()
213 _logger.warning('Skipping database %s as its base version is not %s.', db_name, BASE_VERSION)
214 except psycopg2.ProgrammingError, e:
215 if e.pgcode == '42P01':
216 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
217 # The table ir_cron does not exist; this is probably not an OpenERP database.
218 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
222 _logger.warning('Exception in cron:', exc_info=True)
227 lock_cr = db.cursor()
229 # Try to grab an exclusive lock on the job row from within the task transaction
230 # Restrict to the same conditions as for the search since the job may have already
231 # been run by an other thread when cron is running in multi thread
232 lock_cr.execute("""SELECT *
234 WHERE numbercall != 0
236 AND nextcall <= (now() at time zone 'UTC')
238 FOR UPDATE NOWAIT""",
239 (job['id'],), log_exceptions=False)
241 locked_job = lock_cr.fetchone()
243 _logger.debug("Job `%s` already executed by another process/thread. skipping it", job['name'])
245 # Got the lock on the job row, run its code
246 _logger.debug('Starting job `%s`.', job['name'])
249 registry = openerp.registry(db_name)
250 registry[cls._name]._process_job(job_cr, job, lock_cr)
252 _logger.exception('Unexpected exception while processing cron job %r', job)
256 except psycopg2.OperationalError, e:
257 if e.pgcode == '55P03':
258 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
259 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
262 # Unexpected OperationalError
265 # we're exiting due to an exception while acquiring the lock
268 if hasattr(threading.current_thread(), 'dbname'): # cron job could have removed it as side-effect
269 del threading.current_thread().dbname
271 def _try_lock(self, cr, uid, ids, context=None):
272 """Try to grab a dummy exclusive write-lock to the rows with the given ids,
273 to make sure a following write() or unlink() will not block due
274 to a process currently executing those cron tasks"""
276 cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
277 (tuple(ids),), log_exceptions=False)
278 except psycopg2.OperationalError:
279 cr.rollback() # early rollback to allow translations to work for the user feedback
280 raise osv.except_osv(_("Record cannot be modified right now"),
281 _("This cron task is currently being executed and may not be modified, "
282 "please try again in a few minutes"))
284 def create(self, cr, uid, vals, context=None):
285 res = super(ir_cron, self).create(cr, uid, vals, context=context)
288 def write(self, cr, uid, ids, vals, context=None):
289 self._try_lock(cr, uid, ids, context)
290 res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
293 def unlink(self, cr, uid, ids, context=None):
294 self._try_lock(cr, uid, ids, context)
295 res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
298 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: