[IMP] ir_cron: each job in its own thread, first stab.
authorVo Minh Thu <vmt@openerp.com>
Tue, 5 Jul 2011 17:00:53 +0000 (19:00 +0200)
committerVo Minh Thu <vmt@openerp.com>
Tue, 5 Jul 2011 17:00:53 +0000 (19:00 +0200)
bzr revid: vmt@openerp.com-20110705170053-q3xgeoq21oc7dh8h

openerp-server
openerp/addons/base/ir/ir_cron.py

index 3586b11..347cd9f 100755 (executable)
@@ -109,6 +109,7 @@ if config['db_name']:
             cr.rollback()
 
         pool.get('ir.cron')._poolJobs(db.dbname)
+        # pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called.
 
         cr.close()
 
index 054a152..ba25f59 100644 (file)
@@ -21,6 +21,8 @@
 
 import time
 import logging
+import threading
+import psycopg2
 from datetime import datetime
 from dateutil.relativedelta import relativedelta
 import netsvc
@@ -74,6 +76,19 @@ class ir_cron(osv.osv, netsvc.Agent):
         'doall' : lambda *a: 1
     }
 
+    def f(a, b, c):
+        print ">>> in f"
+
+    def expensive(a, b, c):
+        print ">>> in expensive"
+        time.sleep(80)
+        print ">>> out expensive"
+
+    def expensive_2(a, b, c):
+        print ">>> in expensive_2"
+        time.sleep(80)
+        print ">>> out expensive_2"
+
     def _check_args(self, cr, uid, ids, context=None):
         try:
             for this in self.browse(cr, uid, ids, context):
@@ -109,45 +124,96 @@ class ir_cron(osv.osv, netsvc.Agent):
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
 
-    def _poolJobs(self, db_name, check=False):
+    def _compute_nextcall(self, job, now):
+        """ Compute the nextcall for a job exactly as _run_job does. """
+        nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+        numbercall = job['numbercall']
+
+        while nextcall < now and numbercall:
+            if numbercall > 0:
+                numbercall -= 1
+            if numbercall:
+                nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+
+        return nextcall.strftime('%Y-%m-%d %H:%M:%S')
+
+    def _run_job(self, cr, job, now):
+        """ Run a given job. """
+        try:
+            nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+            numbercall = job['numbercall']
+
+            ok = False
+            while nextcall < now and numbercall:
+                if numbercall > 0:
+                    numbercall -= 1
+                if not ok or job['doall']:
+                    self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+                if numbercall:
+                    nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+                ok = True
+            addsql = ''
+            if not numbercall:
+                addsql = ', active=False'
+            cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
+            # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
+            # TODO NOTIFY the 'ir_cron' channel.
+        finally:
+            cr.commit()
+            cr.close()
+
+    def _poolJobs(self, db_name):
+        # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
         try:
             db, pool = pooler.get_db_and_pool(db_name)
         except:
             return False
         cr = db.cursor()
         try:
+            jobs = {} # mapping job ids to jobs for all jobs being processed.
             if not pool._init:
                 now = datetime.now()
                 cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
                 for job in cr.dictfetchall():
-                    nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
-                    numbercall = job['numbercall']
-
-                    ok = False
-                    while nextcall < now and numbercall:
-                        if numbercall > 0:
-                            numbercall -= 1
-                        if not ok or job['doall']:
-                            self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
-                        if numbercall:
-                            nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
-                        ok = True
-                    addsql = ''
-                    if not numbercall:
-                        addsql = ', active=False'
-                    cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
-                    cr.commit()
-
-
-            cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
+                    task_cr = db.cursor()
+                    task_job = None
+                    try:
+                        task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+                        task_job = task_cr.dictfetchall()[0]
+                        jobs[job['id']] = job
+                    except psycopg2.OperationalError, e:
+                        if e.pgcode == '55P03':
+                            # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+                            continue
+                        else:
+                            raise
+                    finally:
+                        if not task_job:
+                            task_cr.close()
+
+                    task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+                    # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+                    task_thread.setDaemon(False)
+                    task_thread.start()
+
+            # Wake up time, without considering the currently processed jobs.
+            if jobs.keys():
+                cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
+            else:
+                cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
             next_call = cr.dictfetchone()['min_next_call']
             if next_call:
                 next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
             else:
                 next_call = int(time.time()) + 3600   # if do not find active cron job from database, it will run again after 1 day
 
-            if not check:
-                self.setAlarm(self._poolJobs, next_call, db_name, db_name)
+            # Take the smallest nextcall value.
+            for job in jobs.values():
+                nextcall = self._compute_nextcall(job, now)
+                if nextcall < next_call:
+                    next_call = nextcall
+
+            self.setAlarm(self._poolJobs, next_call, db_name, db_name)
 
         except Exception, ex:
             self._logger.warning('Exception in cron:', exc_info=True)
@@ -156,6 +222,11 @@ class ir_cron(osv.osv, netsvc.Agent):
             cr.commit()
             cr.close()
 
+    def restart_all(self):
+        import openerp.models.registry
+        for dbname in openerp.models.registry.RegistryManager.registries:
+            self.restart(self, dbname)
+
     def restart(self, dbname):
         self.cancel(dbname)
         # Reschedule cron processing job asap, but not in the current thread