[IMP] cron: moved netsvc.Agent to openerp.cron.
authorVo Minh Thu <vmt@openerp.com>
Thu, 14 Jul 2011 14:32:09 +0000 (16:32 +0200)
committerVo Minh Thu <vmt@openerp.com>
Thu, 14 Jul 2011 14:32:09 +0000 (16:32 +0200)
bzr revid: vmt@openerp.com-20110714143209-bebn6xg91fcrxro9

openerp-server
openerp/addons/base/ir/ir_cron.py
openerp/modules/registry.py
openerp/netsvc.py

index d587611..f06da60 100755 (executable)
@@ -153,7 +153,7 @@ if config["translate_in"]:
 if config["stop_after_init"]:
     sys.exit(0)
 
-openerp.netsvc.start_agent()
+openerp.cron.start_master_thread()
 
 #----------------------------------------------------------
 # Launch Servers
@@ -200,7 +200,7 @@ if os.name == 'posix':
 
 def quit():
     # stop scheduling new jobs; we will have to wait for the jobs to complete below
-    openerp.netsvc.Agent.cancel_all()
+    openerp.cron.cancel_all()
     openerp.netsvc.Server.quitAll()
     if config['pidfile']:
         os.unlink(config['pidfile'])
index 61a8b6d..f1f4547 100644 (file)
@@ -30,6 +30,7 @@ import tools
 from tools.safe_eval import safe_eval as eval
 import pooler
 from osv import fields, osv
+import openerp
 
 def str2tuple(s):
     return eval('tuple(%s)' % (s or ''))
@@ -43,9 +44,8 @@ _intervalTypes = {
     'minutes': lambda interval: relativedelta(minutes=interval),
 }
 
-class ir_cron(osv.osv, netsvc.Agent):
+class ir_cron(osv.osv):
     """ This is the ORM object that periodically executes actions.
-        Note that we use the netsvc.Agent()._logger member.
     """
     _name = "ir.cron"
     _order = 'name'
@@ -76,16 +76,7 @@ class ir_cron(osv.osv, netsvc.Agent):
         'doall' : lambda *a: 1
     }
 
-    thread_count_lock = threading.Lock()
-    thread_count = 2 # maximum allowed number of thread.
-
-    def get_thread_count(self):
-        return self.thread_count
-
-    def dec_thread_count(self):
-        self.thread_count_lock.acquire()
-        self.thread_count -= 1
-        self.thread_count_lock.release()
+    _logger = logging.getLogger('cron')
 
     def f(a, b, c):
         print ">>> in f"
@@ -160,16 +151,11 @@ class ir_cron(osv.osv, netsvc.Agent):
                 # This is really needed if this job run longer that its rescheduling period.
                 print ">>> advance at", nextcall
                 nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
-                with self.thread_count_lock:
-                    self.schedule_in_advance(nextcall, cr.dbname)
+                openerp.cron.schedule_in_advance(nextcall, cr.dbname)
         finally:
             cr.commit()
             cr.close()
-            with self.thread_count_lock:
-                self.thread_count += 1
-                # reschedule the master thread in advance, using its saved next_call value.
-                self.schedule_in_advance(self.next_call, cr.dbname)
-                self.next_call = None
+            openerp.cron.inc_thread_count()
 
     def _run_jobs(self):
         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
@@ -181,50 +167,45 @@ class ir_cron(osv.osv, netsvc.Agent):
         locked to be taken care of by another thread.
 
         """
-        db_name = self.pool.db.dbname
-        try:
-            db, pool = self.pool.db, self.pool
-        except:
-            return False
         print ">>> _run_jobs"
-        self.next_call = None
+        db = self.pool.db
         cr = db.cursor()
+        db_name = db.dbname
         try:
             jobs = {} # mapping job ids to jobs for all jobs being processed.
-            if not pool._init:
-                now = datetime.now()
-                cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
-                for job in cr.dictfetchall():
-                    print ">>>", self.get_thread_count(), "threads"
-                    if not self.get_thread_count():
-                        break
-                    task_cr = db.cursor()
-                    task_job = None
-                    jobs[job['id']] = job
-
-                    try:
-                        # Try to lock the job...
-                        task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
-                        task_job = task_cr.dictfetchall()[0]
-                    except psycopg2.OperationalError, e:
-                        if e.pgcode == '55P03':
-                            # Class 55: Object not in prerequisite state, 55P03: lock_not_available
-                            # ... and fail.
-                            print ">>>", job['name'], " is already being processed"
-                            continue
-                        else:
-                            raise
-                    finally:
-                        if not task_job:
-                            task_cr.close()
-
-                    # ... and succeed.
-                    print ">>> taking care of", job['name']
-                    task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
-                    # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
-                    task_thread.setDaemon(False)
-                    self.dec_thread_count()
-                    task_thread.start()
+            now = datetime.now()
+            cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
+            for job in cr.dictfetchall():
+                print ">>>", openerp.cron.get_thread_count(), "threads"
+                if not openerp.cron.get_thread_count():
+                    break
+                task_cr = db.cursor()
+                task_job = None
+                jobs[job['id']] = job
+
+                try:
+                    # Try to lock the job...
+                    task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+                    task_job = task_cr.dictfetchall()[0]
+                except psycopg2.OperationalError, e:
+                    if e.pgcode == '55P03':
+                        # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+                        # ... and fail.
+                        print ">>>", job['name'], " is already being processed"
+                        continue
+                    else:
+                        raise
+                finally:
+                    if not task_job:
+                        task_cr.close()
+
+                # ... and succeed.
+                print ">>> taking care of", job['name']
+                task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+                task_thread.setDaemon(False)
+                openerp.cron.dec_thread_count()
+                task_thread.start()
 
             # Wake up time, without considering the currently processed jobs.
             if jobs.keys():
@@ -239,14 +220,7 @@ class ir_cron(osv.osv, netsvc.Agent):
             else:
                 next_call = int(time.time()) + 3600   # if do not find active cron job from database, it will run again after 1 day
 
-            # avoid race condition: the thread rescheduled the main thread, then the main thread puts +3600.
-            with self.thread_count_lock:
-                if not self.thread_count:
-                    print ">>> no more threads"
-                    self.next_call = next_call
-                    next_call = int(time.time()) + 3600   # no available thread, it will run again after 1 day
-
-                self.schedule_in_advance(next_call, db_name)
+            openerp.cron.schedule_in_advance(next_call, db_name)
 
         except Exception, ex:
             self._logger.warning('Exception in cron:', exc_info=True)
@@ -261,9 +235,9 @@ class ir_cron(osv.osv, netsvc.Agent):
             self.restart(self, dbname)
 
     def restart(self, dbname):
-        self.cancel(dbname)
+        openerp.cron.cancel(dbname)
         # Reschedule cron processing job asap, but not in the current thread
-        self.schedule_in_advance(time.time(), dbname)
+        openerp.cron.schedule_in_advance(time.time(), dbname)
 
     def update_running_cron(self, cr):
         # Verify whether the server is already started and thus whether we need to commit
index cb81284..965ba00 100644 (file)
@@ -25,7 +25,7 @@
 
 import openerp.sql_db
 import openerp.osv.orm
-import openerp.netsvc
+import openerp.cron
 import openerp.tools
 
 
@@ -161,14 +161,14 @@ class RegistryManager(object):
         cancels the associated cron job. But please note that the cron job can
         be running and take some time before ending, and that you should not
         remove a registry if it can still be used by some thread. So it might
-        be necessary to call yourself openerp.netsvc.Agent.cancel(db_name) and
+        be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
         and join (i.e. wait for) the thread.
 
         """
         if db_name in cls.registries:
             del cls.registries[db_name]
         openerp.tools.cache.clean_caches_for_db(db_name)
-        openerp.netsvc.Agent.cancel(db_name)
+        openerp.cron.cancel(db_name)
 
 
     @classmethod
index 7ad5778..dc5de9a 100644 (file)
@@ -21,7 +21,6 @@
 ##############################################################################
 
 import errno
-import heapq
 import logging
 import logging.handlers
 import os
@@ -245,83 +244,6 @@ def init_alternative_logger():
     logger.addHandler(handler)
     logger.setLevel(logging.ERROR)
 
-class Agent(object):
-    """ Singleton that keeps track of cancellable tasks to run at a given
-        timestamp.
-       
-        The tasks are characterised by:
-       
-            * a timestamp
-            * the database on which the task run
-            * a boolean attribute specifying if the task is canceled
-
-        Implementation details:
-        
-          - Tasks are stored as list, allowing the cancellation by setting
-            the boolean to True.
-          - A heapq is used to store tasks, so we don't need to sort
-            tasks ourself.
-    """
-    _wakeups = []
-    _wakeup_by_db = {}
-    _logger = logging.getLogger('netsvc.agent')
-
-    @classmethod
-    def cancel(cls, db_name):
-        """ Cancel next wakeup for a given database. """
-        cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
-        if db_name in cls._wakeup_by_db:
-            cls._wakeup_by_db[db_name][2] = True
-
-    @classmethod
-    def cancel_all(cls):
-        cls._wakeups = []
-        cls._wakeup_by_db = {}
-
-    @classmethod
-    def schedule_in_advance(cls, timestamp, db_name):
-        if not timestamp:
-            return
-        # Cancel the previous wakeup if any.
-        add_wakeup = False
-        if db_name in cls._wakeup_by_db:
-            task = cls._wakeup_by_db[db_name]
-            if task[2] or timestamp < task[0]:
-                add_wakeup = True
-                task[2] = True
-        else:
-            add_wakeup = True
-        if add_wakeup:
-            print ">>> rescheduled earlier", timestamp
-            task = [timestamp, db_name, False]
-            heapq.heappush(cls._wakeups, task)
-            cls._wakeup_by_db[db_name] = task
-
-    @classmethod
-    def runner(cls):
-        """Neverending function (intended to be ran in a dedicated thread) that
-           checks every 60 seconds tasks to run. TODO: make configurable
-        """
-        while True:
-            print ">>>>> cron for"
-            while cls._wakeups and cls._wakeups[0][0] < time.time():
-                task = heapq.heappop(cls._wakeups)
-                timestamp, db_name, canceled = task
-                del cls._wakeup_by_db[db_name]
-                if canceled:
-                    continue
-                ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
-                ir_cron._run_jobs()
-            time.sleep(60)
-
-def start_agent():
-    agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
-    # the agent runner is a typical daemon thread, that will never quit and must be
-    # terminated when the main process exits - with no consequence (the processing
-    # threads it spawns are not marked daemon)
-    agent_runner.setDaemon(True)
-    agent_runner.start()
-
 import traceback
 
 class Server: