import time
import logging
+import threading
+import psycopg2
from datetime import datetime
from dateutil.relativedelta import relativedelta
import netsvc
'doall' : lambda *a: 1
}
+ def f(a, b, c):
+ print ">>> in f"
+
+ def expensive(a, b, c):
+ print ">>> in expensive"
+ time.sleep(80)
+ print ">>> out expensive"
+
+ def expensive_2(a, b, c):
+ print ">>> in expensive_2"
+ time.sleep(80)
+ print ">>> out expensive_2"
+
def _check_args(self, cr, uid, ids, context=None):
try:
for this in self.browse(cr, uid, ids, context):
except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
- def _poolJobs(self, db_name, check=False):
+ def _compute_nextcall(self, job, now):
+ """ Compute the nextcall for a job exactly as _run_job does. """
+ nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+ numbercall = job['numbercall']
+
+ while nextcall < now and numbercall:
+ if numbercall > 0:
+ numbercall -= 1
+ if numbercall:
+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+
+ return nextcall.strftime('%Y-%m-%d %H:%M:%S')
+
+ def _run_job(self, cr, job, now):
+ """ Run a given job. """
+ try:
+ nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+ numbercall = job['numbercall']
+
+ ok = False
+ while nextcall < now and numbercall:
+ if numbercall > 0:
+ numbercall -= 1
+ if not ok or job['doall']:
+ self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+ if numbercall:
+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+ ok = True
+ addsql = ''
+ if not numbercall:
+ addsql = ', active=False'
+ cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
+ # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
+ # TODO NOTIFY the 'ir_cron' channel.
+ finally:
+ cr.commit()
+ cr.close()
+
+ def _poolJobs(self, db_name):
+ # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
try:
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
cr = db.cursor()
try:
+ jobs = {} # mapping job ids to jobs for all jobs being processed.
if not pool._init:
now = datetime.now()
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
for job in cr.dictfetchall():
- nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
- numbercall = job['numbercall']
-
- ok = False
- while nextcall < now and numbercall:
- if numbercall > 0:
- numbercall -= 1
- if not ok or job['doall']:
- self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
- if numbercall:
- nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
- ok = True
- addsql = ''
- if not numbercall:
- addsql = ', active=False'
- cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
- cr.commit()
-
-
- cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
+ task_cr = db.cursor()
+ task_job = None
+ try:
+ task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+ task_job = task_cr.dictfetchall()[0]
+ jobs[job['id']] = job
+ except psycopg2.OperationalError, e:
+ if e.pgcode == '55P03':
+ # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+ continue
+ else:
+ raise
+ finally:
+ if not task_job:
+ task_cr.close()
+
+ task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ task_thread.start()
+
+ # Wake up time, without considering the currently processed jobs.
+ if jobs.keys():
+ cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
+ else:
+ cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
- if not check:
- self.setAlarm(self._poolJobs, next_call, db_name, db_name)
+ # Take the smallest nextcall value.
+ for job in jobs.values():
+ nextcall = self._compute_nextcall(job, now)
+ if nextcall < next_call:
+ next_call = nextcall
+
+ self.setAlarm(self._poolJobs, next_call, db_name, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
cr.commit()
cr.close()
+ def restart_all(self):
+ import openerp.models.registry
+ for dbname in openerp.models.registry.RegistryManager.registries:
+ self.restart(self, dbname)
+
def restart(self, dbname):
self.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread