cron cleanup, back to the Kernighan KISS roots 1min poll time, rely only on database...
authorAntony Lesuisse <al@openerp.com>
Sun, 9 Dec 2012 17:04:47 +0000 (18:04 +0100)
committerAntony Lesuisse <al@openerp.com>
Sun, 9 Dec 2012 17:04:47 +0000 (18:04 +0100)
Nota: If we replace sequence signaling for cache invalidation with pg
listen/notify in the future, we will use the same mechanism for more accurate
cron timing.

bzr revid: al@openerp.com-20121209170447-zs0k3jazokylwvar

openerp/addons/base/ir/ir_cron.py
openerp/conf/__init__.py
openerp/cron.py [deleted file]
openerp/modules/registry.py
openerp/service/__init__.py
openerp/service/cron.py [new file with mode: 0644]
openerp/tools/config.py

index 243f4c7..e2d7912 100644 (file)
@@ -31,7 +31,6 @@ import netsvc
 import openerp
 import pooler
 import tools
-from openerp.cron import WAKE_UP_NOW
 from osv import fields, osv
 from tools import DEFAULT_SERVER_DATETIME_FORMAT
 from tools.safe_eval import safe_eval as eval
@@ -142,130 +141,6 @@ class ir_cron(osv.osv):
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
 
-    def _run_job(self, cr, job, now):
-        """ Run a given job taking care of the repetition.
-
-        The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
-        method is run in a worker thread (spawned by _run_jobs_multithread())).
-
-        :param job: job to be run (as a dictionary).
-        :param now: timestamp (result of datetime.now(), no need to call it multiple time).
-
-        """
-        try:
-            nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
-            numbercall = job['numbercall']
-
-            ok = False
-            while nextcall < now and numbercall:
-                if numbercall > 0:
-                    numbercall -= 1
-                if not ok or job['doall']:
-                    self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
-                if numbercall:
-                    nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
-                ok = True
-            addsql = ''
-            if not numbercall:
-                addsql = ', active=False'
-            cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
-                       (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
-
-            if numbercall:
-                # Reschedule our own main cron thread if necessary.
-                # This is really needed if this job runs longer than its rescheduling period.
-                nextcall = calendar.timegm(nextcall.timetuple())
-                openerp.cron.schedule_wakeup(nextcall, cr.dbname)
-        finally:
-            cr.commit()
-            cr.close()
-            openerp.cron.release_thread_slot()
-
-    def _run_jobs_multithread(self):
-        # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
-        """ Process the cron jobs by spawning worker threads.
-
-        This selects in database all the jobs that should be processed. It then
-        tries to lock each of them and, if it succeeds, spawns a thread to run
-        the cron job (if it doesn't succeed, it means the job was already
-        locked to be taken care of by another thread).
-
-        The cursor used to lock the job in database is given to the worker
-        thread (which has to close it itself).
-
-        """
-        db = self.pool.db
-        cr = db.cursor()
-        db_name = db.dbname
-        try:
-            jobs = {} # mapping job ids to jobs for all jobs being processed.
-            now = datetime.now() 
-            # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
-            cr.execute("""SELECT * FROM ir_cron
-                          WHERE numbercall != 0
-                              AND active AND nextcall <= (now() at time zone 'UTC')
-                          ORDER BY priority""")
-            for job in cr.dictfetchall():
-                if not openerp.cron.get_thread_slots():
-                    break
-                jobs[job['id']] = job
-
-                task_cr = db.cursor()
-                try:
-                    # Try to grab an exclusive lock on the job row from within the task transaction
-                    acquired_lock = False
-                    task_cr.execute("""SELECT *
-                                       FROM ir_cron
-                                       WHERE id=%s
-                                       FOR UPDATE NOWAIT""",
-                                   (job['id'],), log_exceptions=False)
-                    acquired_lock = True
-                except psycopg2.OperationalError, e:
-                    if e.pgcode == '55P03':
-                        # Class 55: Object not in prerequisite state; 55P03: lock_not_available
-                        _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
-                        continue
-                    else:
-                        # Unexpected OperationalError
-                        raise
-                finally:
-                    if not acquired_lock:
-                        # we're exiting due to an exception while acquiring the lot
-                        task_cr.close()
-
-                # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
-                task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
-                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
-                task_thread.setDaemon(False)
-                openerp.cron.take_thread_slot()
-                task_thread.start()
-                _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
-
-            # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
-            find_next_time_query = """SELECT min(nextcall) AS min_next_call
-                                      FROM ir_cron WHERE numbercall != 0 AND active""" 
-            if jobs:
-                cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
-            else:
-                cr.execute(find_next_time_query)
-            next_call = cr.dictfetchone()['min_next_call']
-
-            if next_call:
-                next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
-            else:
-                # no matching cron job found in database, re-schedule arbitrarily in 1 day,
-                # this delay will likely be modified when running jobs complete their tasks
-                next_call = time.time() + (24*3600)
-
-            openerp.cron.schedule_wakeup(next_call, db_name)
-
-        except Exception, ex:
-            _logger.warning('Exception in cron:', exc_info=True)
-
-        finally:
-            cr.commit()
-            cr.close()
-
     def _process_job(self, cr, job):
         """ Run a given job taking care of the repetition.
 
@@ -365,19 +240,6 @@ class ir_cron(osv.osv):
 
         return False
 
-    def update_running_cron(self, cr):
-        """ Schedule as soon as possible a wake-up for this database. """
-        # Verify whether the server is already started and thus whether we need to commit
-        # immediately our changes and restart the cron agent in order to apply the change
-        # immediately. The commit() is needed because as soon as the cron is (re)started it
-        # will query the database with its own cursor, possibly before the end of the
-        # current transaction.
-        # This commit() is not an issue in most cases, but we must absolutely avoid it
-        # when the server is only starting or loading modules (hence the test on pool._init).
-        if not self.pool._init:
-            cr.commit()
-            openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
-
     def _try_lock(self, cr, uid, ids, context=None):
         """Try to grab a dummy exclusive write-lock to the rows with the given ids,
            to make sure a following write() or unlink() will not block due
@@ -393,20 +255,16 @@ class ir_cron(osv.osv):
 
     def create(self, cr, uid, vals, context=None):
         res = super(ir_cron, self).create(cr, uid, vals, context=context)
-        self.update_running_cron(cr)
         return res
 
     def write(self, cr, uid, ids, vals, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
-        self.update_running_cron(cr)
         return res
 
     def unlink(self, cr, uid, ids, context=None):
         self._try_lock(cr, uid, ids, context)
         res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
-        self.update_running_cron(cr)
         return res
-ir_cron()
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index c89ddf4..0a975c5 100644 (file)
@@ -35,10 +35,6 @@ must be used.
 
 import deprecation
 
-# Maximum number of threads processing concurrently cron jobs.
-max_cron_threads = 4 # Actually the default value here is meaningless,
-                     # look at tools.config for the default value.
-
 # Paths to search for OpenERP addons.
 addons_paths = []
 
diff --git a/openerp/cron.py b/openerp/cron.py
deleted file mode 100644 (file)
index 8551ed7..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-##############################################################################
-#
-#    OpenERP, Open Source Management Solution
-#    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU Affero General Public License as
-#    published by the Free Software Foundation, either version 3 of the
-#    License, or (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU Affero General Public License for more details.
-#
-#    You should have received a copy of the GNU Affero General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-##############################################################################
-
-""" Cron jobs scheduling
-
-Cron jobs are defined in the ir_cron table/model. This module deals with all
-cron jobs, for all databases of a single OpenERP server instance.
-
-It defines a single master thread that will spawn (a bounded number of)
-threads to process individual cron jobs.
-
-The thread runs forever, checking every 60 seconds for new
-'database wake-ups'. It maintains a heapq of database wake-ups. At each
-wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
-will check the jobs defined in the ir_cron table and spawn accordingly threads
-to process them.
-
-This module's behavior depends on the following configuration variable:
-openerp.conf.max_cron_threads.
-
-"""
-
-import heapq
-import logging
-import threading
-import time
-
-import openerp
-import tools
-
-_logger = logging.getLogger(__name__)
-
-# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
-# the context of the cron management. This is not originally about loading
-# a database, although having the database name in the queue will
-# cause it to be loaded when the schedule time is reached, even if it was
-# unloaded in the mean time. Normally a database's wake-up is cancelled by
-# the RegistryManager when the database is unloaded - so this should not
-# cause it to be reloaded.
-#
-# TODO: perhaps in the future we could consider a flag on ir.cron jobs
-# that would cause database wake-up even if the database has not been
-# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
-#
-# Each element is a triple (timestamp, database-name, boolean). The boolean
-# specifies if the wake-up is canceled (so a wake-up can be canceled without
-# relying on the heapq implementation detail; no need to remove the job from
-# the heapq).
-_wakeups = []
-
-# Mapping of database names to the wake-up defined in the heapq,
-# so that we can cancel the wake-up without messing with the heapq
-# invariant: lookup the wake-up by database-name, then set
-# its third element to True.
-_wakeup_by_db = {}
-
-# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
-# We could use a simple (non-reentrant) lock if the runner function below
-# was more fine-grained, but we are fine with the loop owning the lock
-# while spawning a few threads.
-_wakeups_lock = threading.RLock()
-
-# Maximum number of threads allowed to process cron jobs concurrently. This
-# variable is set by start_master_thread using openerp.conf.max_cron_threads.
-_thread_slots = None
-
-# A (non re-entrant) lock to protect the above _thread_slots variable.
-_thread_slots_lock = threading.Lock()
-
-# Sleep duration limits - must not loop too quickly, but can't sleep too long
-# either, because a new job might be inserted in ir_cron with a much sooner
-# execution date than current known ones. We won't see it until we wake!
-MAX_SLEEP = 60 # 1 min
-MIN_SLEEP = 1  # 1 sec
-
-# Dummy wake-up timestamp that can be used to force a database wake-up asap
-WAKE_UP_NOW = 1
-
-def get_thread_slots():
-    """ Return the number of available thread slots """
-    return _thread_slots
-
-
-def release_thread_slot():
-    """ Increment the number of available thread slots """
-    global _thread_slots
-    with _thread_slots_lock:
-        _thread_slots += 1
-
-
-def take_thread_slot():
-    """ Decrement the number of available thread slots """
-    global _thread_slots
-    with _thread_slots_lock:
-        _thread_slots -= 1
-
-
-def cancel(db_name):
-    """ Cancel the next wake-up of a given database, if any.
-
-    :param db_name: database name for which the wake-up is canceled.
-
-    """
-    _logger.debug("Cancel next wake-up for database '%s'.", db_name)
-    with _wakeups_lock:
-        if db_name in _wakeup_by_db:
-            _wakeup_by_db[db_name][2] = True
-
-
-def cancel_all():
-    """ Cancel all database wake-ups. """
-    _logger.debug("Cancel all database wake-ups")
-    global _wakeups
-    global _wakeup_by_db
-    with _wakeups_lock:
-        _wakeups = []
-        _wakeup_by_db = {}
-
-
-def schedule_wakeup(timestamp, db_name):
-    """ Schedule a new wake-up for a database.
-
-    If an earlier wake-up is already defined, the new wake-up is discarded.
-    If another wake-up is defined, that wake-up is discarded and the new one
-    is scheduled.
-
-    :param db_name: database name for which a new wake-up is scheduled.
-    :param timestamp: when the wake-up is scheduled.
-
-    """
-    if not timestamp:
-        return
-    with _wakeups_lock:
-        if db_name in _wakeup_by_db:
-            task = _wakeup_by_db[db_name]
-            if not task[2] and timestamp > task[0]:
-                # existing wakeup is valid and occurs earlier than new one
-                return
-            task[2] = True # cancel existing task
-        task = [timestamp, db_name, False]
-        heapq.heappush(_wakeups, task)
-        _wakeup_by_db[db_name] = task
-        _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
-                      'NOW' if timestamp == WAKE_UP_NOW else timestamp)
-
-def runner():
-    """Neverending function (intended to be run in a dedicated thread) that
-       checks every 60 seconds the next database wake-up. TODO: make configurable
-    """
-    while True:
-        runner_body()
-
-def runner_body():
-    with _wakeups_lock:
-        while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
-            task = heapq.heappop(_wakeups)
-            timestamp, db_name, canceled = task
-            if canceled:
-                continue
-            del _wakeup_by_db[db_name]
-            registry = openerp.pooler.get_pool(db_name)
-            if not registry._init:
-                _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
-                registry['ir.cron']._run_jobs_multithread()
-    amount = MAX_SLEEP
-    with _wakeups_lock:
-        # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
-        if _wakeups and get_thread_slots():
-            amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
-    _logger.debug("Going to sleep for %ss", amount)
-    time.sleep(amount)
-
-def start_master_thread():
-    """ Start the above runner function in a daemon thread.
-
-    The thread is a typical daemon thread: it will never quit and must be
-    terminated when the main process exits - with no consequence (the processing
-    threads it spawns are not marked daemon).
-
-    """
-    global _thread_slots
-    _thread_slots = openerp.conf.max_cron_threads
-    db_maxconn = tools.config['db_maxconn']
-    if _thread_slots >= tools.config.get('db_maxconn', 64):
-        _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
-                        "this may cause trouble if you reach that number of parallel cron tasks.",
-                        db_maxconn, _thread_slots)
-    if _thread_slots:
-        t = threading.Thread(target=runner, name="openerp.cron.master_thread")
-        t.setDaemon(True)
-        t.start()
-        _logger.debug("Master cron daemon started!")
-    else:
-        _logger.info("No master cron daemon (0 workers needed).")
-
-# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index cd59a0e..397ab98 100644 (file)
@@ -28,7 +28,6 @@ import threading
 
 import openerp.sql_db
 import openerp.osv.orm
-import openerp.cron
 import openerp.tools
 import openerp.modules.db
 import openerp.tools.config
@@ -58,6 +57,9 @@ class Registry(object):
         self.db_name = db_name
         self.db = openerp.sql_db.db_connect(db_name)
 
+        # In monoprocess cron jobs flag (pooljobs)
+        self.cron = False
+
         # Inter-process signaling (used only when openerp.multi_process is True):
         # The `base_registry_signaling` sequence indicates the whole registry
         # must be reloaded.
@@ -124,7 +126,7 @@ class Registry(object):
         monitor the ir.cron model for future jobs. See openerp.cron for
         details.
         """
-        openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
+        self.cron = True
 
     def clear_caches(self):
         """ Clear the caches
index 26740ec..1c8f27b 100644 (file)
@@ -24,10 +24,11 @@ import logging
 import threading
 import time
 
+import cron
 import netrpc_server
 import web_services
+import web_services
 
-import openerp.cron
 import openerp.modules
 import openerp.netsvc
 import openerp.osv
@@ -83,8 +84,7 @@ def start_services():
     netrpc_server.init_servers()
 
     # Start the main cron thread.
-    if openerp.conf.max_cron_threads:
-        openerp.cron.start_master_thread()
+    cron.start_master_thread()
 
     # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
     openerp.netsvc.Server.startAll()
@@ -95,8 +95,6 @@ def start_services():
 def stop_services():
     """ Stop all services. """
     # stop scheduling new jobs; we will have to wait for the jobs to complete below
-    openerp.cron.cancel_all()
-
     openerp.netsvc.Server.quitAll()
     openerp.service.wsgi_server.stop_server()
     _logger.info("Initiating shutdown")
diff --git a/openerp/service/cron.py b/openerp/service/cron.py
new file mode 100644 (file)
index 0000000..08ce679
--- /dev/null
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+#    OpenERP, Open Source Management Solution
+#    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU Affero General Public License as
+#    published by the Free Software Foundation, either version 3 of the
+#    License, or (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU Affero General Public License for more details.
+#
+#    You should have received a copy of the GNU Affero General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+##############################################################################
+
+""" Cron jobs scheduling
+
+Cron jobs are defined in the ir_cron table/model. This module deals with all
+cron jobs, for all databases of a single OpenERP server instance.
+
+"""
+
+import logging
+import threading
+import time
+
+import openerp
+
+_logger = logging.getLogger(__name__)
+
+SLEEP_INTERVAL = 60 # 1 min
+
+def cron_runner(number):
+    while True:
+        time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
+        registries = openerp.modules.registry.RegistryManager.registries
+        _logger.debug('cron%d polling for jobs', number)
+        for db_name, registry in registries.items():
+            while True and registry.cron:
+                # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
+                # TODO why isnt openerp.addons.base defined ?
+                import sys
+                base = sys.modules['addons.base']
+                acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
+                if not acquired:
+                    break
+
+def start_master_thread():
+    """ Start the above runner function in a daemon thread.
+
+    The thread is a typical daemon thread: it will never quit and must be
+    terminated when the main process exits - with no consequence (the processing
+    threads it spawns are not marked daemon).
+
+    """
+    for i in range(openerp.tools.config['max_cron_threads']):
+        def target():
+            cron_runner(i)
+        t = threading.Thread(target=target, name="openerp.service.cron.cron_runner%d" % i)
+        t.setDaemon(True)
+        t.start()
+        _logger.debug("cron%d started!" % i)
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 0b5a95d..5b481c3 100644 (file)
@@ -269,7 +269,7 @@ class configmanager(object):
                               "osv_memory tables. This is a decimal value expressed in hours, "
                               "and the default is 1 hour.",
                          type="float")
-        group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4,
+        group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=2,
                          help="Maximum number of threads processing concurrently cron jobs.",
                          type="int")
         group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true",
@@ -479,8 +479,6 @@ class configmanager(object):
         if opt.save:
             self.save()
 
-        openerp.conf.max_cron_threads = self.options['max_cron_threads']
-
         openerp.conf.addons_paths = self.options['addons_path'].split(',')
         if opt.server_wide_modules:
             openerp.conf.server_wide_modules = map(lambda m: m.strip(), opt.server_wide_modules.split(','))