[IMP] ir_cron: reschedule the main cron thread if a worker takes too long.
authorVo Minh Thu <vmt@openerp.com>
Thu, 7 Jul 2011 13:58:43 +0000 (15:58 +0200)
committerVo Minh Thu <vmt@openerp.com>
Thu, 7 Jul 2011 13:58:43 +0000 (15:58 +0200)
bzr revid: vmt@openerp.com-20110707135843-z38f4r8s373ctnd2

openerp-server
openerp/addons/base/ir/ir_cron.py
openerp/netsvc.py

index 347cd9f..a2ec11b 100755 (executable)
@@ -108,8 +108,8 @@ if config['db_name']:
             openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True)
             cr.rollback()
 
-        pool.get('ir.cron')._poolJobs(db.dbname)
-        # pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called.
+        # jobs will start to be processed later, when start_agent below is called.
+        pool.get('ir.cron').restart(db.dbname)
 
         cr.close()
 
index ba25f59..9360442 100644 (file)
@@ -76,6 +76,25 @@ class ir_cron(osv.osv, netsvc.Agent):
         'doall' : lambda *a: 1
     }
 
+    thread_count_lock = threading.Lock()
+    thread_count = 1 # maximum allowed number of thread.
+
+    @classmethod
+    def get_thread_count(cls):
+        return cls.thread_count
+
+    @classmethod
+    def inc_thread_count(cls):
+        cls.thread_count_lock.acquire()
+        cls.thread_count += 1
+        cls.thread_count_lock.release()
+
+    @classmethod
+    def dec_thread_count(cls):
+        cls.thread_count_lock.acquire()
+        cls.thread_count -= 1
+        cls.thread_count_lock.release()
+
     def f(a, b, c):
         print ">>> in f"
 
@@ -125,7 +144,11 @@ class ir_cron(osv.osv, netsvc.Agent):
                 self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
 
     def _compute_nextcall(self, job, now):
-        """ Compute the nextcall for a job exactly as _run_job does. """
+        """ Compute the nextcall for a job exactly as _run_job does.
+
+        Return either the nextcall or None if it shouldn't be called.
+
+        """
         nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
         numbercall = job['numbercall']
 
@@ -135,10 +158,12 @@ class ir_cron(osv.osv, netsvc.Agent):
             if numbercall:
                 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
 
+        if not numbercall:
+            return None
         return nextcall.strftime('%Y-%m-%d %H:%M:%S')
 
     def _run_job(self, cr, job, now):
-        """ Run a given job. """
+        """ Run a given job taking care of the repetition. """
         try:
             nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
             numbercall = job['numbercall']
@@ -156,18 +181,35 @@ class ir_cron(osv.osv, netsvc.Agent):
             if not numbercall:
                 addsql = ', active=False'
             cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
-            # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
-            # TODO NOTIFY the 'ir_cron' channel.
+
+            if numbercall:
+                # Reschedule our own main cron thread if necessary.
+                # This is really needed if this job run longer that its rescheduling period.
+                print ">>> advance at", nextcall
+                nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
+                self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
         finally:
             cr.commit()
             cr.close()
 
     def _poolJobs(self, db_name):
+        return self._run_jobs(db_name)
+
+    def _run_jobs(self, db_name):
         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+        """ Process the cron jobs by spawning worker threads.
+
+        This selects in database all the jobs that should be processed. It then
+        try to lock each of them and, if it succeeds, spawn a thread to run the
+        cron job (if doesn't succeed, it means another the job was already
+        locked to be taken care of by another thread.
+
+        """
         try:
             db, pool = pooler.get_db_and_pool(db_name)
         except:
             return False
+        print ">>> _run_jobs"
         cr = db.cursor()
         try:
             jobs = {} # mapping job ids to jobs for all jobs being processed.
@@ -177,13 +219,16 @@ class ir_cron(osv.osv, netsvc.Agent):
                 for job in cr.dictfetchall():
                     task_cr = db.cursor()
                     task_job = None
+                    jobs[job['id']] = job
                     try:
+                        # Try to lock the job...
                         task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
                         task_job = task_cr.dictfetchall()[0]
-                        jobs[job['id']] = job
                     except psycopg2.OperationalError, e:
                         if e.pgcode == '55P03':
                             # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+                            # ... and fail.
+                            print ">>>", job['name'], " is already being processed"
                             continue
                         else:
                             raise
@@ -191,6 +236,8 @@ class ir_cron(osv.osv, netsvc.Agent):
                         if not task_job:
                             task_cr.close()
 
+                    # ... and succeed.
+                    print ">>> taking care of", job['name']
                     task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
                     # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
                     task_thread.setDaemon(False)
@@ -202,16 +249,22 @@ class ir_cron(osv.osv, netsvc.Agent):
             else:
                 cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
             next_call = cr.dictfetchone()['min_next_call']
-            if next_call:
-                next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
-            else:
-                next_call = int(time.time()) + 3600   # if do not find active cron job from database, it will run again after 1 day
+            print ">>> possibility at ", next_call
 
-            # Take the smallest nextcall value.
+            # Wake up time, taking the smallest processed job nextcall value.
             for job in jobs.values():
                 nextcall = self._compute_nextcall(job, now)
-                if nextcall < next_call:
+                print ">>> or at ", nextcall
+                if not nextcall:
+                    continue
+                if not next_call or nextcall < next_call:
                     next_call = nextcall
+            print ">>> rescheduling at", next_call
+
+            if next_call:
+                next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
+            else:
+                next_call = int(time.time()) + 3600   # if do not find active cron job from database, it will run again after 1 day
 
             self.setAlarm(self._poolJobs, next_call, db_name, db_name)
 
index 3a2add2..26f7cbe 100644 (file)
@@ -284,6 +284,18 @@ class Agent(object):
                     task[0] = 0
 
     @classmethod
+    def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
+        # Cancel the previous task if any.
+        old_timestamp = None
+        if db_name in cls.__tasks_by_db:
+            for task in cls.__tasks_by_db[db_name]:
+                if task[2] == function and timestamp < task[0]:
+                    old_timestamp = task[0]
+                    task[0] = 0
+        if not old_timestamp or timestamp < old_timestamp:
+            cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
+
+    @classmethod
     def quit(cls):
         cls.cancel(None)