X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=openerp%2Fcron.py;h=317d69522a235739f780113dca772344504e7afe;hb=2c2ce5805e55787757fdcbe65001eca076e7a2b1;hp=64edf04208ddfe3048903fedff80ce0826684447;hpb=e93d018a394ed09c1e8420052b24c5c6c15dd568;p=odoo%2Fodoo.git diff --git a/openerp/cron.py b/openerp/cron.py index 64edf04..317d695 100644 --- a/openerp/cron.py +++ b/openerp/cron.py @@ -30,11 +30,11 @@ threads to process individual cron jobs. The thread runs forever, checking every 60 seconds for new 'database wake-ups'. It maintains a heapq of database wake-ups. At each -wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs +wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread will check the jobs defined in the ir_cron table and spawn accordingly threads to process them. -This module behavior depends on the following configuration variable: +This module's behavior depends on the following configuration variable: openerp.conf.max_cron_threads. """ @@ -45,10 +45,29 @@ import threading import time import openerp +import tools + +_logger = logging.getLogger(__name__) + +# Scheduling wake-ups (see below) can be disabled when the polling process +# workers are used instead of the managed thread workers. (I.e. wake-ups are +# not used since polling is used. And polling is used when the cron are +# handled by running special processes, e.g. openerp-cron-worker, instead +# of the general openerp-server script.) +enable_schedule_wakeup = True # Heapq of database wake-ups. Note that 'database wake-up' meaning is in -# the context of the cron management. This is not about loading a database -# or otherwise making anything about it. +# the context of the cron management. This is not originally about loading +# a database, although having the database name in the queue will +# cause it to be loaded when the schedule time is reached, even if it was +# unloaded in the mean time. Normally a database's wake-up is cancelled by +# the RegistryManager when the database is unloaded - so this should not +# cause it to be reloaded. +# +# TODO: perhaps in the future we could consider a flag on ir.cron jobs +# that would cause database wake-up even if the database has not been +# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) +# # Each element is a triple (timestamp, database-name, boolean). The boolean # specifies if the wake-up is canceled (so a wake-up can be canceled without # relying on the heapq implementation detail; no need to remove the job from @@ -57,7 +76,7 @@ _wakeups = [] # Mapping of database names to the wake-up defined in the heapq, # so that we can cancel the wake-up without messing with the heapq -# internal structure: lookup the wake-up by database-name, then set +# invariant: lookup the wake-up by database-name, then set # its third element to True. _wakeup_by_db = {} @@ -69,31 +88,37 @@ _wakeups_lock = threading.RLock() # Maximum number of threads allowed to process cron jobs concurrently. This # variable is set by start_master_thread using openerp.conf.max_cron_threads. -_thread_count = None +_thread_slots = None -# A (non re-entrant) lock to protect the above _thread_count variable. -_thread_count_lock = threading.Lock() +# A (non re-entrant) lock to protect the above _thread_slots variable. +_thread_slots_lock = threading.Lock() -_logger = logging.getLogger('cron') +# Sleep duration limits - must not loop too quickly, but can't sleep too long +# either, because a new job might be inserted in ir_cron with a much sooner +# execution date than current known ones. We won't see it until we wake! +MAX_SLEEP = 60 # 1 min +MIN_SLEEP = 1 # 1 sec +# Dummy wake-up timestamp that can be used to force a database wake-up asap +WAKE_UP_NOW = 1 -def get_thread_count(): - """ Return the number of available threads. """ - return _thread_count +def get_thread_slots(): + """ Return the number of available thread slots """ + return _thread_slots -def inc_thread_count(): - """ Increment by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count += 1 +def release_thread_slot(): + """ Increment the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots += 1 -def dec_thread_count(): - """ Decrement by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count -= 1 +def take_thread_slot(): + """ Decrement the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots -= 1 def cancel(db_name): @@ -110,14 +135,14 @@ def cancel(db_name): def cancel_all(): """ Cancel all database wake-ups. """ + _logger.debug("Cancel all database wake-ups") global _wakeups global _wakeup_by_db with _wakeups_lock: _wakeups = [] _wakeup_by_db = {} - -def schedule_in_advance(timestamp, db_name): +def schedule_wakeup(timestamp, db_name): """ Schedule a new wake-up for a database. If an earlier wake-up is already defined, the new wake-up is discarded. @@ -128,26 +153,26 @@ def schedule_in_advance(timestamp, db_name): :param timestamp: when the wake-up is scheduled. """ + global enable_schedule_wakeup + if not enable_schedule_wakeup: + return if not timestamp: return with _wakeups_lock: - # Cancel the previous wake-up if any. - add_wakeup = False if db_name in _wakeup_by_db: task = _wakeup_by_db[db_name] - if task[2] or timestamp < task[0]: - add_wakeup = True - task[2] = True - else: - add_wakeup = True - if add_wakeup: - task = [timestamp, db_name, False] - heapq.heappush(_wakeups, task) - _wakeup_by_db[db_name] = task - + if not task[2] and timestamp > task[0]: + # existing wakeup is valid and occurs earlier than new one + return + task[2] = True # cancel existing task + task = [timestamp, db_name, False] + heapq.heappush(_wakeups, task) + _wakeup_by_db[db_name] = task + _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name, + 'NOW' if timestamp == WAKE_UP_NOW else timestamp) def runner(): - """Neverending function (intended to be ran in a dedicated thread) that + """Neverending function (intended to be run in a dedicated thread) that checks every 60 seconds the next database wake-up. TODO: make configurable """ while True: @@ -155,23 +180,24 @@ def runner(): def runner_body(): with _wakeups_lock: - while _wakeups and _wakeups[0][0] < time.time() and get_thread_count(): + while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots(): task = heapq.heappop(_wakeups) timestamp, db_name, canceled = task if canceled: continue - task[2] = True + del _wakeup_by_db[db_name] registry = openerp.pooler.get_pool(db_name) if not registry._init: - registry['ir.cron']._run_jobs() - amount = 60 + _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name) + registry['ir.cron']._run_jobs_multithread() + amount = MAX_SLEEP with _wakeups_lock: - # Sleep less than 60s if the next known wake-up will happen before. - if _wakeups and get_thread_count(): - amount = min(60, _wakeups[0][0] - time.time()) + # Sleep less than MAX_SLEEP if the next known wake-up will happen before that. + if _wakeups and get_thread_slots(): + amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time())) + _logger.debug("Going to sleep for %ss", amount) time.sleep(amount) - def start_master_thread(): """ Start the above runner function in a daemon thread. @@ -180,10 +206,19 @@ def start_master_thread(): threads it spawns are not marked daemon). """ - global _thread_count - _thread_count = openerp.conf.max_cron_threads - t = threading.Thread(target=runner, name="openerp.cron.master_thread") - t.setDaemon(True) - t.start() + global _thread_slots + _thread_slots = openerp.conf.max_cron_threads + db_maxconn = tools.config['db_maxconn'] + if _thread_slots >= tools.config.get('db_maxconn', 64): + _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " + "this may cause trouble if you reach that number of parallel cron tasks.", + db_maxconn, _thread_slots) + if _thread_slots: + t = threading.Thread(target=runner, name="openerp.cron.master_thread") + t.setDaemon(True) + t.start() + _logger.debug("Master cron daemon started!") + else: + _logger.info("No master cron daemon (0 workers needed).") # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: