'doall' : lambda *a: 1
}
+ thread_count_lock = threading.Lock()
+ thread_count = 1 # maximum allowed number of thread.
+
+ @classmethod
+ def get_thread_count(cls):
+ return cls.thread_count
+
+ @classmethod
+ def inc_thread_count(cls):
+ cls.thread_count_lock.acquire()
+ cls.thread_count += 1
+ cls.thread_count_lock.release()
+
+ @classmethod
+ def dec_thread_count(cls):
+ cls.thread_count_lock.acquire()
+ cls.thread_count -= 1
+ cls.thread_count_lock.release()
+
def f(a, b, c):
print ">>> in f"
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
def _compute_nextcall(self, job, now):
- """ Compute the nextcall for a job exactly as _run_job does. """
+ """ Compute the nextcall for a job exactly as _run_job does.
+
+ Return either the nextcall or None if it shouldn't be called.
+
+ """
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+ if not numbercall:
+ return None
return nextcall.strftime('%Y-%m-%d %H:%M:%S')
def _run_job(self, cr, job, now):
- """ Run a given job. """
+ """ Run a given job taking care of the repetition. """
try:
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
- # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
- # TODO NOTIFY the 'ir_cron' channel.
+
+ if numbercall:
+ # Reschedule our own main cron thread if necessary.
+ # This is really needed if this job run longer that its rescheduling period.
+ print ">>> advance at", nextcall
+ nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
+ self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
finally:
cr.commit()
cr.close()
def _poolJobs(self, db_name):
+ return self._run_jobs(db_name)
+
+ def _run_jobs(self, db_name):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+ """ Process the cron jobs by spawning worker threads.
+
+ This selects in database all the jobs that should be processed. It then
+ try to lock each of them and, if it succeeds, spawn a thread to run the
+ cron job (if doesn't succeed, it means another the job was already
+ locked to be taken care of by another thread.
+
+ """
try:
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
+ print ">>> _run_jobs"
cr = db.cursor()
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
for job in cr.dictfetchall():
task_cr = db.cursor()
task_job = None
+ jobs[job['id']] = job
try:
+ # Try to lock the job...
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
task_job = task_cr.dictfetchall()[0]
- jobs[job['id']] = job
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state, 55P03: lock_not_available
+ # ... and fail.
+ print ">>>", job['name'], " is already being processed"
continue
else:
raise
if not task_job:
task_cr.close()
+ # ... and succeed.
+ print ">>> taking care of", job['name']
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
else:
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
- if next_call:
- next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
- else:
- next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
+ print ">>> possibility at ", next_call
- # Take the smallest nextcall value.
+ # Wake up time, taking the smallest processed job nextcall value.
for job in jobs.values():
nextcall = self._compute_nextcall(job, now)
- if nextcall < next_call:
+ print ">>> or at ", nextcall
+ if not nextcall:
+ continue
+ if not next_call or nextcall < next_call:
next_call = nextcall
+ print ">>> rescheduling at", next_call
+
+ if next_call:
+ next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
+ else:
+ next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
self.setAlarm(self._poolJobs, next_call, db_name, db_name)