[IMP] cron: alternative implementation:
authorVo Minh Thu <vmt@openerp.com>
Wed, 28 Mar 2012 09:03:20 +0000 (11:03 +0200)
committerVo Minh Thu <vmt@openerp.com>
Wed, 28 Mar 2012 09:03:20 +0000 (11:03 +0200)
- The previous implementation was optimized to make few queries to the database
  but needed to keep some internal state.
- That state was updated whenever the ir_cron table was modified by the ORM
  (this works only when the cron and web processes/threads are inside a single
  OpenERP server instance).
- The new implementation is instead polling the database.
- This is deemed acceptable in `normal` situation (i.e. not a SaaS with
  thousand of databases).
- This makes it possible to avoid sharing state or the use of IPC.
- This makes it possible to add/remove additional worker processes,
  possibly on different machines.
- The code of the older implementation is removed in this commit but
  will be added back in a later commit: this is the 6.1 stable branch
  and we don't want to change the existing installation, but simply
  provide a solution for those running OpenERP with Gunicorn (which
  uses processes for which no cron state were shared).

bzr revid: vmt@openerp.com-20120328090320-vshsfv3gt1ck34s1

openerp-cron-worker [new file with mode: 0755]
openerp-server
openerp/addons/base/ir/ir_cron.py
openerp/cron.py
openerp/modules/registry.py
openerp/service/__init__.py

diff --git a/openerp-cron-worker b/openerp-cron-worker
new file mode 100755 (executable)
index 0000000..4f86da9
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+OpenERP cron jobs worker
+
+This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
+OpenERP server but depending on deployment needs, independent worker processes
+can be used.
+"""
+
+import openerp
+
+if __name__ == '__main__':
+    openerp.modules.module.initialize_sys_path()
+    openerp.modules.loading.open_openerp_namespace()
+    openerp.tools.config['log_handler'] = ['openerp.addons.base.ir.ir_cron:DEBUG']
+    openerp.netsvc.init_logger()
+    import openerp.addons.base
+    openerp.addons.base.ir.ir_cron.ir_cron._run(['xx'])
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index f574ebb..19c6039 100755 (executable)
@@ -93,9 +93,6 @@ def preload_registry(dbname):
     """ Preload a registry, and start the cron."""
     try:
         db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
-
-        # jobs will start to be processed later, when openerp.cron.start_master_thread() is called by openerp.service.start_services()
-        registry.schedule_cron_jobs()
     except Exception:
         _logger.exception('Failed to initialize database `%s`.', dbname)
 
index 3c437c9..2514116 100644 (file)
@@ -31,7 +31,6 @@ import netsvc
 import openerp
 import pooler
 import tools
-from openerp.cron import WAKE_UP_NOW
 from osv import fields, osv
 from tools import DEFAULT_SERVER_DATETIME_FORMAT
 from tools.safe_eval import safe_eval as eval
@@ -142,17 +141,15 @@ class ir_cron(osv.osv):
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
 
-    def _run_job(self, cr, job, now):
+    def _run_job(self, cr, job):
         """ Run a given job taking care of the repetition.
 
-        The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
-        method is run in a worker thread (spawned by _run_jobs_multithread())).
+        The cursor has a lock on the job (aquired by _acquire_job()).
 
         :param job: job to be run (as a dictionary).
-        :param now: timestamp (result of datetime.now(), no need to call it multiple time).
-
         """
         try:
+            now = datetime.utcnow() 
             nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
             numbercall = job['numbercall']
 
@@ -171,45 +168,29 @@ class ir_cron(osv.osv):
             cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
                        (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
 
-            if numbercall:
-                # Reschedule our own main cron thread if necessary.
-                # This is really needed if this job runs longer than its rescheduling period.
-                nextcall = calendar.timegm(nextcall.timetuple())
-                openerp.cron.schedule_wakeup(nextcall, cr.dbname)
         finally:
             cr.commit()
             cr.close()
-            openerp.cron.release_thread_slot()
 
-    def _run_jobs_multithread(self):
+    @classmethod
+    def _acquire_job(cls, db_name):
         # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
-        """ Process the cron jobs by spawning worker threads.
+        """ Try to process one cron job.
 
         This selects in database all the jobs that should be processed. It then
-        tries to lock each of them and, if it succeeds, spawns a thread to run
+        tries to lock each of them and, if it succeeds, run
         the cron job (if it doesn't succeed, it means the job was already
-        locked to be taken care of by another thread).
-
-        The cursor used to lock the job in database is given to the worker
-        thread (which has to close it itself).
-
+        locked to be taken care of by another thread) and stop.
         """
-        db = self.pool.db
+        db = openerp.sql_db.db_connect(db_name)
         cr = db.cursor()
-        db_name = db.dbname
         try:
-            jobs = {} # mapping job ids to jobs for all jobs being processed.
-            now = datetime.now() 
             # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
             cr.execute("""SELECT * FROM ir_cron
                           WHERE numbercall != 0
                               AND active AND nextcall <= (now() at time zone 'UTC')
                           ORDER BY priority""")
             for job in cr.dictfetchall():
-                if not openerp.cron.get_thread_slots():
-                    break
-                jobs[job['id']] = job
-
                 task_cr = db.cursor()
                 try:
                     # Try to grab an exclusive lock on the job row from within the task transaction
@@ -233,31 +214,11 @@ class ir_cron(osv.osv):
                         # we're exiting due to an exception while acquiring the lot
                         task_cr.close()
 
-                # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
-                task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
-                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
-                task_thread.setDaemon(False)
-                openerp.cron.take_thread_slot()
-                task_thread.start()
-                _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
-
-            # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
-            find_next_time_query = """SELECT min(nextcall) AS min_next_call
-                                      FROM ir_cron WHERE numbercall != 0 AND active""" 
-            if jobs:
-                cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
-            else:
-                cr.execute(find_next_time_query)
-            next_call = cr.dictfetchone()['min_next_call']
-
-            if next_call:
-                next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
-            else:
-                # no matching cron job found in database, re-schedule arbitrarily in 1 day,
-                # this delay will likely be modified when running jobs complete their tasks
-                next_call = time.time() + (24*3600)
-
-            openerp.cron.schedule_wakeup(next_call, db_name)
+                # Got the lock on the job row, run its code
+                _logger.debug('Starting job `%s`.', job['name'])
+                registry = openerp.pooler.get_pool(db_name)
+                registry[cls._name]._run_job(task_cr, job)
+                return True
 
         except Exception, ex:
             _logger.warning('Exception in cron:', exc_info=True)
@@ -266,18 +227,21 @@ class ir_cron(osv.osv):
             cr.commit()
             cr.close()
 
-    def update_running_cron(self, cr):
-        """ Schedule as soon as possible a wake-up for this database. """
-        # Verify whether the server is already started and thus whether we need to commit
-        # immediately our changes and restart the cron agent in order to apply the change
-        # immediately. The commit() is needed because as soon as the cron is (re)started it
-        # will query the database with its own cursor, possibly before the end of the
-        # current transaction.
-        # This commit() is not an issue in most cases, but we must absolutely avoid it
-        # when the server is only starting or loading modules (hence the test on pool._init).
-        if not self.pool._init:
-            cr.commit()
-            openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
+        return False
+
+    @classmethod
+    def _run(cls, db_names):
+        while True:
+            t1 = time.time()
+            for db_name in db_names:
+                while(cls._acquire_job(db_name)):
+                    pass
+            t2 = time.time()
+            t = t2 - t1
+            if t > 60:
+                _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
+            else:
+                time.sleep(60 - t)
 
     def _try_lock(self, cr, uid, ids, context=None):
         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
@@ -294,20 +258,16 @@ class ir_cron(osv.osv):
 
     def create(self, cr, uid, vals, context=None):
         res = super(ir_cron, self).create(cr, uid, vals, context=context)
-        self.update_running_cron(cr)
         return res
 
     def write(self, cr, uid, ids, vals, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
-        self.update_running_cron(cr)
         return res
 
     def unlink(self, cr, uid, ids, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
-        self.update_running_cron(cr)
         return res
-ir_cron()
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 8551ed7..5c6b860 100644 (file)
@@ -49,167 +49,9 @@ import tools
 
 _logger = logging.getLogger(__name__)
 
-# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
-# the context of the cron management. This is not originally about loading
-# a database, although having the database name in the queue will
-# cause it to be loaded when the schedule time is reached, even if it was
-# unloaded in the mean time. Normally a database's wake-up is cancelled by
-# the RegistryManager when the database is unloaded - so this should not
-# cause it to be reloaded.
-#
 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
 # that would cause database wake-up even if the database has not been
 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
-#
-# Each element is a triple (timestamp, database-name, boolean). The boolean
-# specifies if the wake-up is canceled (so a wake-up can be canceled without
-# relying on the heapq implementation detail; no need to remove the job from
-# the heapq).
-_wakeups = []
-
-# Mapping of database names to the wake-up defined in the heapq,
-# so that we can cancel the wake-up without messing with the heapq
-# invariant: lookup the wake-up by database-name, then set
-# its third element to True.
-_wakeup_by_db = {}
-
-# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
-# We could use a simple (non-reentrant) lock if the runner function below
-# was more fine-grained, but we are fine with the loop owning the lock
-# while spawning a few threads.
-_wakeups_lock = threading.RLock()
-
-# Maximum number of threads allowed to process cron jobs concurrently. This
-# variable is set by start_master_thread using openerp.conf.max_cron_threads.
-_thread_slots = None
-
-# A (non re-entrant) lock to protect the above _thread_slots variable.
-_thread_slots_lock = threading.Lock()
-
-# Sleep duration limits - must not loop too quickly, but can't sleep too long
-# either, because a new job might be inserted in ir_cron with a much sooner
-# execution date than current known ones. We won't see it until we wake!
-MAX_SLEEP = 60 # 1 min
-MIN_SLEEP = 1  # 1 sec
-
-# Dummy wake-up timestamp that can be used to force a database wake-up asap
-WAKE_UP_NOW = 1
-
-def get_thread_slots():
-    """ Return the number of available thread slots """
-    return _thread_slots
-
-
-def release_thread_slot():
-    """ Increment the number of available thread slots """
-    global _thread_slots
-    with _thread_slots_lock:
-        _thread_slots += 1
-
-
-def take_thread_slot():
-    """ Decrement the number of available thread slots """
-    global _thread_slots
-    with _thread_slots_lock:
-        _thread_slots -= 1
-
-
-def cancel(db_name):
-    """ Cancel the next wake-up of a given database, if any.
-
-    :param db_name: database name for which the wake-up is canceled.
-
-    """
-    _logger.debug("Cancel next wake-up for database '%s'.", db_name)
-    with _wakeups_lock:
-        if db_name in _wakeup_by_db:
-            _wakeup_by_db[db_name][2] = True
-
-
-def cancel_all():
-    """ Cancel all database wake-ups. """
-    _logger.debug("Cancel all database wake-ups")
-    global _wakeups
-    global _wakeup_by_db
-    with _wakeups_lock:
-        _wakeups = []
-        _wakeup_by_db = {}
-
-
-def schedule_wakeup(timestamp, db_name):
-    """ Schedule a new wake-up for a database.
-
-    If an earlier wake-up is already defined, the new wake-up is discarded.
-    If another wake-up is defined, that wake-up is discarded and the new one
-    is scheduled.
-
-    :param db_name: database name for which a new wake-up is scheduled.
-    :param timestamp: when the wake-up is scheduled.
-
-    """
-    if not timestamp:
-        return
-    with _wakeups_lock:
-        if db_name in _wakeup_by_db:
-            task = _wakeup_by_db[db_name]
-            if not task[2] and timestamp > task[0]:
-                # existing wakeup is valid and occurs earlier than new one
-                return
-            task[2] = True # cancel existing task
-        task = [timestamp, db_name, False]
-        heapq.heappush(_wakeups, task)
-        _wakeup_by_db[db_name] = task
-        _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
-                      'NOW' if timestamp == WAKE_UP_NOW else timestamp)
-
-def runner():
-    """Neverending function (intended to be run in a dedicated thread) that
-       checks every 60 seconds the next database wake-up. TODO: make configurable
-    """
-    while True:
-        runner_body()
-
-def runner_body():
-    with _wakeups_lock:
-        while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
-            task = heapq.heappop(_wakeups)
-            timestamp, db_name, canceled = task
-            if canceled:
-                continue
-            del _wakeup_by_db[db_name]
-            registry = openerp.pooler.get_pool(db_name)
-            if not registry._init:
-                _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
-                registry['ir.cron']._run_jobs_multithread()
-    amount = MAX_SLEEP
-    with _wakeups_lock:
-        # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
-        if _wakeups and get_thread_slots():
-            amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
-    _logger.debug("Going to sleep for %ss", amount)
-    time.sleep(amount)
-
-def start_master_thread():
-    """ Start the above runner function in a daemon thread.
-
-    The thread is a typical daemon thread: it will never quit and must be
-    terminated when the main process exits - with no consequence (the processing
-    threads it spawns are not marked daemon).
 
-    """
-    global _thread_slots
-    _thread_slots = openerp.conf.max_cron_threads
-    db_maxconn = tools.config['db_maxconn']
-    if _thread_slots >= tools.config.get('db_maxconn', 64):
-        _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
-                        "this may cause trouble if you reach that number of parallel cron tasks.",
-                        db_maxconn, _thread_slots)
-    if _thread_slots:
-        t = threading.Thread(target=runner, name="openerp.cron.master_thread")
-        t.setDaemon(True)
-        t.start()
-        _logger.debug("Master cron daemon started!")
-    else:
-        _logger.info("No master cron daemon (0 workers needed).")
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 9ec3e6f..5ed8ae3 100644 (file)
@@ -27,7 +27,6 @@ import threading
 
 import openerp.sql_db
 import openerp.osv.orm
-import openerp.cron
 import openerp.tools
 import openerp.modules.db
 import openerp.tools.config
@@ -110,15 +109,6 @@ class Registry(object):
 
         return res
 
-    def schedule_cron_jobs(self):
-        """ Make the cron thread care about this registry/database jobs.
-        This will initiate the cron thread to check for any pending jobs for
-        this registry/database as soon as possible. Then it will continuously
-        monitor the ir.cron model for future jobs. See openerp.cron for
-        details.
-        """
-        openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
-
     def clear_caches(self):
         """ Clear the caches
         This clears the caches associated to methods decorated with
@@ -213,9 +203,6 @@ class RegistryManager(object):
             finally:
                 cr.close()
 
-        if pooljobs:
-            registry.schedule_cron_jobs()
-
         return registry
 
     @classmethod
@@ -233,8 +220,6 @@ class RegistryManager(object):
             if db_name in cls.registries:
                 cls.registries[db_name].clear_caches()
                 del cls.registries[db_name]
-                openerp.cron.cancel(db_name)
-
 
     @classmethod
     def delete_all(cls):
index c387d27..41e7200 100644 (file)
@@ -28,7 +28,6 @@ import netrpc_server
 import web_services
 import websrv_lib
 
-import openerp.cron
 import openerp.modules
 import openerp.netsvc
 import openerp.osv
@@ -51,7 +50,7 @@ _logger = logging.getLogger(__name__)
 def start_services():
     """ Start all services.
 
-    Services include the different servers and cron threads.
+    Services include the different servers.
 
     """
     # Instantiate local services (this is a legacy design).
@@ -64,9 +63,6 @@ def start_services():
     #http_server.init_static_http()
     netrpc_server.init_servers()
 
-    # Start the main cron thread.
-    openerp.cron.start_master_thread()
-
     # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
     openerp.netsvc.Server.startAll()
 
@@ -76,9 +72,6 @@ def start_services():
 
 def stop_services():
     """ Stop all services. """
-    # stop scheduling new jobs; we will have to wait for the jobs to complete below
-    openerp.cron.cancel_all()
-
     openerp.netsvc.Server.quitAll()
     openerp.wsgi.core.stop_server()
     config = openerp.tools.config