[IMP] cron: removed unnecessary multi-tasks per db in Agent, some cleaning.
authorVo Minh Thu <vmt@openerp.com>
Thu, 14 Jul 2011 11:08:09 +0000 (13:08 +0200)
committerVo Minh Thu <vmt@openerp.com>
Thu, 14 Jul 2011 11:08:09 +0000 (13:08 +0200)
bzr revid: vmt@openerp.com-20110714110809-sgsoev9i24589sn8

openerp-server
openerp/addons/base/ir/ir_cron.py
openerp/netsvc.py

index 56aa278..d587611 100755 (executable)
@@ -199,7 +199,8 @@ if os.name == 'posix':
     signal.signal(signal.SIGQUIT, dumpstacks)
 
 def quit():
-    openerp.netsvc.Agent.quit()
+    # stop scheduling new jobs; we will have to wait for the jobs to complete below
+    openerp.netsvc.Agent.cancel_all()
     openerp.netsvc.Server.quitAll()
     if config['pidfile']:
         os.unlink(config['pidfile'])
@@ -215,7 +216,7 @@ def quit():
         if thread != threading.currentThread() and not thread.isDaemon():
             while thread.isAlive():
                 # need a busyloop here as thread.join() masks signals
-                # and would present the forced shutdown
+                # and would prevent the forced shutdown
                 thread.join(0.05)
                 time.sleep(0.05)
     openerp.modules.registry.RegistryManager.delete_all()
index 0998b43..61a8b6d 100644 (file)
@@ -76,10 +76,8 @@ class ir_cron(osv.osv, netsvc.Agent):
         'doall' : lambda *a: 1
     }
 
-    def __init__(self, pool, cr):
-        self.thread_count_lock = threading.Lock()
-        self.thread_count = 2 # maximum allowed number of thread.
-        super(osv.osv, self).__init__(pool, cr)
+    thread_count_lock = threading.Lock()
+    thread_count = 2 # maximum allowed number of thread.
 
     def get_thread_count(self):
         return self.thread_count
@@ -137,25 +135,6 @@ class ir_cron(osv.osv, netsvc.Agent):
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
 
-    def _compute_nextcall(self, job, now):
-        """ Compute the nextcall for a job exactly as _run_job does.
-
-        Return either the nextcall or None if it shouldn't be called.
-
-        """
-        nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
-        numbercall = job['numbercall']
-
-        while nextcall < now and numbercall:
-            if numbercall > 0:
-                numbercall -= 1
-            if numbercall:
-                nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
-
-        if not numbercall:
-            return None
-        return nextcall.strftime('%Y-%m-%d %H:%M:%S')
-
     def _run_job(self, cr, job, now):
         """ Run a given job taking care of the repetition. """
         try:
@@ -182,20 +161,17 @@ class ir_cron(osv.osv, netsvc.Agent):
                 print ">>> advance at", nextcall
                 nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
                 with self.thread_count_lock:
-                    self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
+                    self.schedule_in_advance(nextcall, cr.dbname)
         finally:
             cr.commit()
             cr.close()
             with self.thread_count_lock:
                 self.thread_count += 1
                 # reschedule the master thread in advance, using its saved next_call value.
-                self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname)
+                self.schedule_in_advance(self.next_call, cr.dbname)
                 self.next_call = None
 
-    def _poolJobs(self, db_name):
-        return self._run_jobs(db_name)
-
-    def _run_jobs(self, db_name):
+    def _run_jobs(self):
         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
         """ Process the cron jobs by spawning worker threads.
 
@@ -205,8 +181,9 @@ class ir_cron(osv.osv, netsvc.Agent):
         locked to be taken care of by another thread.
 
         """
+        db_name = self.pool.db.dbname
         try:
-            db, pool = pooler.get_db_and_pool(db_name)
+            db, pool = self.pool.db, self.pool
         except:
             return False
         print ">>> _run_jobs"
@@ -269,7 +246,7 @@ class ir_cron(osv.osv, netsvc.Agent):
                     self.next_call = next_call
                     next_call = int(time.time()) + 3600   # no available thread, it will run again after 1 day
 
-                self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name)
+                self.schedule_in_advance(next_call, db_name)
 
         except Exception, ex:
             self._logger.warning('Exception in cron:', exc_info=True)
@@ -286,7 +263,7 @@ class ir_cron(osv.osv, netsvc.Agent):
     def restart(self, dbname):
         self.cancel(dbname)
         # Reschedule cron processing job asap, but not in the current thread
-        self.setAlarm(self._poolJobs, time.time(), dbname, dbname)
+        self.schedule_in_advance(time.time(), dbname)
 
     def update_running_cron(self, cr):
         # Verify whether the server is already started and thus whether we need to commit
index 5ec114f..7ad5778 100644 (file)
@@ -37,6 +37,7 @@ from pprint import pformat
 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
 from loglevels import *
 import tools
+import openerp
 
 def close_socket(sock):
     """ Closes a socket instance cleanly
@@ -252,81 +253,65 @@ class Agent(object):
        
             * a timestamp
             * the database on which the task run
-            * the function to call
-            * the arguments and keyword arguments to pass to the function
+            * a boolean attribute specifying if the task is canceled
 
         Implementation details:
         
           - Tasks are stored as list, allowing the cancellation by setting
-            the timestamp to 0.
+            the boolean to True.
           - A heapq is used to store tasks, so we don't need to sort
             tasks ourself.
     """
-    __tasks = []
-    __tasks_by_db = {}
+    _wakeups = []
+    _wakeup_by_db = {}
     _logger = logging.getLogger('netsvc.agent')
 
     @classmethod
-    def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
-        task = [timestamp, db_name, function, args, kwargs]
-        heapq.heappush(cls.__tasks, task)
-        cls.__tasks_by_db.setdefault(db_name, []).append(task)
+    def cancel(cls, db_name):
+        """ Cancel next wakeup for a given database. """
+        cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
+        if db_name in cls._wakeup_by_db:
+            cls._wakeup_by_db[db_name][2] = True
 
     @classmethod
-    def cancel(cls, db_name):
-        """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
-        cls._logger.debug("Cancel timers for %s db", db_name or 'all')
-        if db_name is None:
-            cls.__tasks, cls.__tasks_by_db = [], {}
-        else:
-            if db_name in cls.__tasks_by_db:
-                for task in cls.__tasks_by_db[db_name]:
-                    task[0] = 0
+    def cancel_all(cls):
+        cls._wakeups = []
+        cls._wakeup_by_db = {}
 
     @classmethod
-    def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
+    def schedule_in_advance(cls, timestamp, db_name):
         if not timestamp:
             return
-        # Cancel the previous task if any.
-        old_timestamp = False
-        if db_name in cls.__tasks_by_db:
-            for task in cls.__tasks_by_db[db_name]:
-                print ">>> function:", function
-                if task[2] == function and (not task[0] or timestamp < task[0]):
-                    old_timestamp = True
-                    task[0] = 0
-        if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]:
+        # Cancel the previous wakeup if any.
+        add_wakeup = False
+        if db_name in cls._wakeup_by_db:
+            task = cls._wakeup_by_db[db_name]
+            if task[2] or timestamp < task[0]:
+                add_wakeup = True
+                task[2] = True
+        else:
+            add_wakeup = True
+        if add_wakeup:
             print ">>> rescheduled earlier", timestamp
-            cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
-
-    @classmethod
-    def quit(cls):
-        cls.cancel(None)
+            task = [timestamp, db_name, False]
+            heapq.heappush(cls._wakeups, task)
+            cls._wakeup_by_db[db_name] = task
 
     @classmethod
     def runner(cls):
         """Neverending function (intended to be ran in a dedicated thread) that
            checks every 60 seconds tasks to run. TODO: make configurable
         """
-        current_thread = threading.currentThread()
         while True:
-            print ">>>>> starting thread for"
-            while cls.__tasks and cls.__tasks[0][0] < time.time():
-                task = heapq.heappop(cls.__tasks)
-                timestamp, dbname, function, args, kwargs = task
-                cls.__tasks_by_db[dbname].remove(task)
-                if not timestamp:
-                    # null timestamp -> cancelled task
+            print ">>>>> cron for"
+            while cls._wakeups and cls._wakeups[0][0] < time.time():
+                task = heapq.heappop(cls._wakeups)
+                timestamp, db_name, canceled = task
+                del cls._wakeup_by_db[db_name]
+                if canceled:
                     continue
-                current_thread.dbname = dbname   # hack hack
-                cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
-                delattr(current_thread, 'dbname')
-                task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
-                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
-                task_thread.setDaemon(False)
-                print ">>>>> -", function.func_name
-                task_thread.start()
-                time.sleep(1)
+                ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
+                ir_cron._run_jobs()
             time.sleep(60)
 
 def start_agent():