[IMP] cron: re-added the threaded cron:
authorVo Minh Thu <vmt@openerp.com>
Thu, 29 Mar 2012 11:55:31 +0000 (13:55 +0200)
committerVo Minh Thu <vmt@openerp.com>
Thu, 29 Mar 2012 11:55:31 +0000 (13:55 +0200)
- the new process workers implementation is present along the old one
- new code impacts minimally the old one: just one flag is added to
schedule_wakeup().

bzr revid: vmt@openerp.com-20120329115531-9mbh4mtvktyyi9bv

openerp-cron-worker
openerp/__init__.py
openerp/addons/base/ir/ir_cron.py
openerp/cron.py
openerp/modules/registry.py
openerp/service/__init__.py

index 5db5c31..a3b2ab0 100755 (executable)
@@ -66,23 +66,25 @@ def list_databases():
     return databases
 
 if __name__ == '__main__':
+    os.environ['TZ'] = 'UTC'
     openerp.tools.config.parse_config(sys.argv[1:])
     config = openerp.tools.config
     if config['log_handler'] == [':INFO']:
         # Replace the default value, which is suitable for openerp-server.
         config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
-    if config['db_name']:
-        db_names = config['db_name'].split(',')
-        print "Monitoring %s databases." % len(db_names)
-    else:
-        db_names = list_databases
-        print "Monitored databases are auto-discovered."
     setup_signal_handlers()
     openerp.modules.module.initialize_sys_path()
     openerp.modules.loading.open_openerp_namespace()
     openerp.netsvc.init_logger()
+    openerp.cron.enable_schedule_wakeup = False
     import openerp.addons.base
     print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
+    if config['db_name']:
+        db_names = config['db_name'].split(',')
+        print "Monitoring %s databases." % len(db_names)
+    else:
+        db_names = list_databases
+        print "Monitored databases are auto-discovered."
     openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 521a9ef..cc35566 100644 (file)
@@ -27,6 +27,7 @@ SUPERUSER_ID = 1
 
 import addons
 import conf
+import cron
 import loglevels
 import modules
 import netsvc
index a2cee29..cf54a90 100644 (file)
@@ -31,6 +31,7 @@ import netsvc
 import openerp
 import pooler
 import tools
+from openerp.cron import WAKE_UP_NOW
 from osv import fields, osv
 from tools import DEFAULT_SERVER_DATETIME_FORMAT
 from tools.safe_eval import safe_eval as eval
@@ -149,7 +150,131 @@ class ir_cron(osv.osv):
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
 
-    def _run_job(self, cr, job):
+    def _run_job(self, cr, job, now):
+        """ Run a given job taking care of the repetition.
+
+        The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
+        method is run in a worker thread (spawned by _run_jobs_multithread())).
+
+        :param job: job to be run (as a dictionary).
+        :param now: timestamp (result of datetime.now(), no need to call it multiple time).
+
+        """
+        try:
+            nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
+            numbercall = job['numbercall']
+
+            ok = False
+            while nextcall < now and numbercall:
+                if numbercall > 0:
+                    numbercall -= 1
+                if not ok or job['doall']:
+                    self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+                if numbercall:
+                    nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+                ok = True
+            addsql = ''
+            if not numbercall:
+                addsql = ', active=False'
+            cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
+                       (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
+
+            if numbercall:
+                # Reschedule our own main cron thread if necessary.
+                # This is really needed if this job runs longer than its rescheduling period.
+                nextcall = calendar.timegm(nextcall.timetuple())
+                openerp.cron.schedule_wakeup(nextcall, cr.dbname)
+        finally:
+            cr.commit()
+            cr.close()
+            openerp.cron.release_thread_slot()
+
+    def _run_jobs_multithread(self):
+        # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+        """ Process the cron jobs by spawning worker threads.
+
+        This selects in database all the jobs that should be processed. It then
+        tries to lock each of them and, if it succeeds, spawns a thread to run
+        the cron job (if it doesn't succeed, it means the job was already
+        locked to be taken care of by another thread).
+
+        The cursor used to lock the job in database is given to the worker
+        thread (which has to close it itself).
+
+        """
+        db = self.pool.db
+        cr = db.cursor()
+        db_name = db.dbname
+        try:
+            jobs = {} # mapping job ids to jobs for all jobs being processed.
+            now = datetime.now() 
+            # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
+            cr.execute("""SELECT * FROM ir_cron
+                          WHERE numbercall != 0
+                              AND active AND nextcall <= (now() at time zone 'UTC')
+                          ORDER BY priority""")
+            for job in cr.dictfetchall():
+                if not openerp.cron.get_thread_slots():
+                    break
+                jobs[job['id']] = job
+
+                task_cr = db.cursor()
+                try:
+                    # Try to grab an exclusive lock on the job row from within the task transaction
+                    acquired_lock = False
+                    task_cr.execute("""SELECT *
+                                       FROM ir_cron
+                                       WHERE id=%s
+                                       FOR UPDATE NOWAIT""",
+                                   (job['id'],), log_exceptions=False)
+                    acquired_lock = True
+                except psycopg2.OperationalError, e:
+                    if e.pgcode == '55P03':
+                        # Class 55: Object not in prerequisite state; 55P03: lock_not_available
+                        _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
+                        continue
+                    else:
+                        # Unexpected OperationalError
+                        raise
+                finally:
+                    if not acquired_lock:
+                        # we're exiting due to an exception while acquiring the lot
+                        task_cr.close()
+
+                # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
+                task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
+                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+                task_thread.setDaemon(False)
+                openerp.cron.take_thread_slot()
+                task_thread.start()
+                _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
+
+            # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
+            find_next_time_query = """SELECT min(nextcall) AS min_next_call
+                                      FROM ir_cron WHERE numbercall != 0 AND active""" 
+            if jobs:
+                cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
+            else:
+                cr.execute(find_next_time_query)
+            next_call = cr.dictfetchone()['min_next_call']
+
+            if next_call:
+                next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
+            else:
+                # no matching cron job found in database, re-schedule arbitrarily in 1 day,
+                # this delay will likely be modified when running jobs complete their tasks
+                next_call = time.time() + (24*3600)
+
+            openerp.cron.schedule_wakeup(next_call, db_name)
+
+        except Exception, ex:
+            _logger.warning('Exception in cron:', exc_info=True)
+
+        finally:
+            cr.commit()
+            cr.close()
+
+    def _process_job(self, cr, job):
         """ Run a given job taking care of the repetition.
 
         The cursor has a lock on the job (aquired by _acquire_job()).
@@ -157,7 +282,7 @@ class ir_cron(osv.osv):
         :param job: job to be run (as a dictionary).
         """
         try:
-            now = datetime.utcnow() 
+            now = datetime.now() 
             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
             numbercall = job['numbercall']
 
@@ -227,7 +352,7 @@ class ir_cron(osv.osv):
                 # Got the lock on the job row, run its code
                 _logger.debug('Starting job `%s`.', job['name'])
                 registry = openerp.pooler.get_pool(db_name)
-                registry[cls._name]._run_job(task_cr, job)
+                registry[cls._name]._process_job(task_cr, job)
                 return True
 
         except psycopg2.ProgrammingError, e:
@@ -279,6 +404,19 @@ class ir_cron(osv.osv):
                     time.sleep(60 - t)
                     job_in_progress = True
 
+    def update_running_cron(self, cr):
+        """ Schedule as soon as possible a wake-up for this database. """
+        # Verify whether the server is already started and thus whether we need to commit
+        # immediately our changes and restart the cron agent in order to apply the change
+        # immediately. The commit() is needed because as soon as the cron is (re)started it
+        # will query the database with its own cursor, possibly before the end of the
+        # current transaction.
+        # This commit() is not an issue in most cases, but we must absolutely avoid it
+        # when the server is only starting or loading modules (hence the test on pool._init).
+        if not self.pool._init:
+            cr.commit()
+            openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
+
     def _try_lock(self, cr, uid, ids, context=None):
         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
            to make sure a following write() or unlink() will not block due
@@ -294,16 +432,20 @@ class ir_cron(osv.osv):
 
     def create(self, cr, uid, vals, context=None):
         res = super(ir_cron, self).create(cr, uid, vals, context=context)
+        self.update_running_cron(cr)
         return res
 
     def write(self, cr, uid, ids, vals, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
+        self.update_running_cron(cr)
         return res
 
     def unlink(self, cr, uid, ids, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
+        self.update_running_cron(cr)
         return res
+ir_cron()
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 5c6b860..317d695 100644 (file)
@@ -49,9 +49,176 @@ import tools
 
 _logger = logging.getLogger(__name__)
 
+# Scheduling wake-ups (see below) can be disabled when the polling process
+# workers are used instead of the managed thread workers. (I.e. wake-ups are
+# not used since polling is used. And polling is used when the cron are
+# handled by running special processes, e.g. openerp-cron-worker, instead
+# of the general openerp-server script.)
+enable_schedule_wakeup = True
+
+# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
+# the context of the cron management. This is not originally about loading
+# a database, although having the database name in the queue will
+# cause it to be loaded when the schedule time is reached, even if it was
+# unloaded in the mean time. Normally a database's wake-up is cancelled by
+# the RegistryManager when the database is unloaded - so this should not
+# cause it to be reloaded.
+#
 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
 # that would cause database wake-up even if the database has not been
 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
+#
+# Each element is a triple (timestamp, database-name, boolean). The boolean
+# specifies if the wake-up is canceled (so a wake-up can be canceled without
+# relying on the heapq implementation detail; no need to remove the job from
+# the heapq).
+_wakeups = []
+
+# Mapping of database names to the wake-up defined in the heapq,
+# so that we can cancel the wake-up without messing with the heapq
+# invariant: lookup the wake-up by database-name, then set
+# its third element to True.
+_wakeup_by_db = {}
+
+# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
+# We could use a simple (non-reentrant) lock if the runner function below
+# was more fine-grained, but we are fine with the loop owning the lock
+# while spawning a few threads.
+_wakeups_lock = threading.RLock()
+
+# Maximum number of threads allowed to process cron jobs concurrently. This
+# variable is set by start_master_thread using openerp.conf.max_cron_threads.
+_thread_slots = None
+
+# A (non re-entrant) lock to protect the above _thread_slots variable.
+_thread_slots_lock = threading.Lock()
+
+# Sleep duration limits - must not loop too quickly, but can't sleep too long
+# either, because a new job might be inserted in ir_cron with a much sooner
+# execution date than current known ones. We won't see it until we wake!
+MAX_SLEEP = 60 # 1 min
+MIN_SLEEP = 1  # 1 sec
+
+# Dummy wake-up timestamp that can be used to force a database wake-up asap
+WAKE_UP_NOW = 1
+
+def get_thread_slots():
+    """ Return the number of available thread slots """
+    return _thread_slots
+
+
+def release_thread_slot():
+    """ Increment the number of available thread slots """
+    global _thread_slots
+    with _thread_slots_lock:
+        _thread_slots += 1
+
+
+def take_thread_slot():
+    """ Decrement the number of available thread slots """
+    global _thread_slots
+    with _thread_slots_lock:
+        _thread_slots -= 1
+
+
+def cancel(db_name):
+    """ Cancel the next wake-up of a given database, if any.
+
+    :param db_name: database name for which the wake-up is canceled.
+
+    """
+    _logger.debug("Cancel next wake-up for database '%s'.", db_name)
+    with _wakeups_lock:
+        if db_name in _wakeup_by_db:
+            _wakeup_by_db[db_name][2] = True
+
+
+def cancel_all():
+    """ Cancel all database wake-ups. """
+    _logger.debug("Cancel all database wake-ups")
+    global _wakeups
+    global _wakeup_by_db
+    with _wakeups_lock:
+        _wakeups = []
+        _wakeup_by_db = {}
+
+def schedule_wakeup(timestamp, db_name):
+    """ Schedule a new wake-up for a database.
+
+    If an earlier wake-up is already defined, the new wake-up is discarded.
+    If another wake-up is defined, that wake-up is discarded and the new one
+    is scheduled.
+
+    :param db_name: database name for which a new wake-up is scheduled.
+    :param timestamp: when the wake-up is scheduled.
+
+    """
+    global enable_schedule_wakeup
+    if not enable_schedule_wakeup:
+        return
+    if not timestamp:
+        return
+    with _wakeups_lock:
+        if db_name in _wakeup_by_db:
+            task = _wakeup_by_db[db_name]
+            if not task[2] and timestamp > task[0]:
+                # existing wakeup is valid and occurs earlier than new one
+                return
+            task[2] = True # cancel existing task
+        task = [timestamp, db_name, False]
+        heapq.heappush(_wakeups, task)
+        _wakeup_by_db[db_name] = task
+        _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
+                      'NOW' if timestamp == WAKE_UP_NOW else timestamp)
+
+def runner():
+    """Neverending function (intended to be run in a dedicated thread) that
+       checks every 60 seconds the next database wake-up. TODO: make configurable
+    """
+    while True:
+        runner_body()
+
+def runner_body():
+    with _wakeups_lock:
+        while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
+            task = heapq.heappop(_wakeups)
+            timestamp, db_name, canceled = task
+            if canceled:
+                continue
+            del _wakeup_by_db[db_name]
+            registry = openerp.pooler.get_pool(db_name)
+            if not registry._init:
+                _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
+                registry['ir.cron']._run_jobs_multithread()
+    amount = MAX_SLEEP
+    with _wakeups_lock:
+        # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
+        if _wakeups and get_thread_slots():
+            amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
+    _logger.debug("Going to sleep for %ss", amount)
+    time.sleep(amount)
+
+def start_master_thread():
+    """ Start the above runner function in a daemon thread.
+
+    The thread is a typical daemon thread: it will never quit and must be
+    terminated when the main process exits - with no consequence (the processing
+    threads it spawns are not marked daemon).
 
+    """
+    global _thread_slots
+    _thread_slots = openerp.conf.max_cron_threads
+    db_maxconn = tools.config['db_maxconn']
+    if _thread_slots >= tools.config.get('db_maxconn', 64):
+        _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
+                        "this may cause trouble if you reach that number of parallel cron tasks.",
+                        db_maxconn, _thread_slots)
+    if _thread_slots:
+        t = threading.Thread(target=runner, name="openerp.cron.master_thread")
+        t.setDaemon(True)
+        t.start()
+        _logger.debug("Master cron daemon started!")
+    else:
+        _logger.info("No master cron daemon (0 workers needed).")
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 5ed8ae3..9ec3e6f 100644 (file)
@@ -27,6 +27,7 @@ import threading
 
 import openerp.sql_db
 import openerp.osv.orm
+import openerp.cron
 import openerp.tools
 import openerp.modules.db
 import openerp.tools.config
@@ -109,6 +110,15 @@ class Registry(object):
 
         return res
 
+    def schedule_cron_jobs(self):
+        """ Make the cron thread care about this registry/database jobs.
+        This will initiate the cron thread to check for any pending jobs for
+        this registry/database as soon as possible. Then it will continuously
+        monitor the ir.cron model for future jobs. See openerp.cron for
+        details.
+        """
+        openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
+
     def clear_caches(self):
         """ Clear the caches
         This clears the caches associated to methods decorated with
@@ -203,6 +213,9 @@ class RegistryManager(object):
             finally:
                 cr.close()
 
+        if pooljobs:
+            registry.schedule_cron_jobs()
+
         return registry
 
     @classmethod
@@ -220,6 +233,8 @@ class RegistryManager(object):
             if db_name in cls.registries:
                 cls.registries[db_name].clear_caches()
                 del cls.registries[db_name]
+                openerp.cron.cancel(db_name)
+
 
     @classmethod
     def delete_all(cls):
index 41e7200..c387d27 100644 (file)
@@ -28,6 +28,7 @@ import netrpc_server
 import web_services
 import websrv_lib
 
+import openerp.cron
 import openerp.modules
 import openerp.netsvc
 import openerp.osv
@@ -50,7 +51,7 @@ _logger = logging.getLogger(__name__)
 def start_services():
     """ Start all services.
 
-    Services include the different servers.
+    Services include the different servers and cron threads.
 
     """
     # Instantiate local services (this is a legacy design).
@@ -63,6 +64,9 @@ def start_services():
     #http_server.init_static_http()
     netrpc_server.init_servers()
 
+    # Start the main cron thread.
+    openerp.cron.start_master_thread()
+
     # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
     openerp.netsvc.Server.startAll()
 
@@ -72,6 +76,9 @@ def start_services():
 
 def stop_services():
     """ Stop all services. """
+    # stop scheduling new jobs; we will have to wait for the jobs to complete below
+    openerp.cron.cancel_all()
+
     openerp.netsvc.Server.quitAll()
     openerp.wsgi.core.stop_server()
     config = openerp.tools.config