X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=openerp%2Fcron.py;h=7b67877f0fe85c1c3e149071386970f7361e6c62;hb=f05c4faf5b40910d25ceda3506d6e3bc090fac58;hp=f6e8a5d49a2a133d51613e534cf2da46b9a3b90a;hpb=ed1b2a92cad8058a30fd8ba65ef9137b9cd23554;p=odoo%2Fodoo.git diff --git a/openerp/cron.py b/openerp/cron.py index f6e8a5d..7b67877 100644 --- a/openerp/cron.py +++ b/openerp/cron.py @@ -30,10 +30,13 @@ threads to process individual cron jobs. The thread runs forever, checking every 60 seconds for new 'database wake-ups'. It maintains a heapq of database wake-ups. At each -wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs +wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread will check the jobs defined in the ir_cron table and spawn accordingly threads to process them. +This module's behavior depends on the following configuration variable: +openerp.conf.max_cron_threads. + """ import heapq @@ -42,10 +45,22 @@ import threading import time import openerp +import tools + +_logger = logging.getLogger(__name__) # Heapq of database wake-ups. Note that 'database wake-up' meaning is in -# the context of the cron management. This is not about loading a database -# or otherwise making anything about it. +# the context of the cron management. This is not originally about loading +# a database, although having the database name in the queue will +# cause it to be loaded when the schedule time is reached, even if it was +# unloaded in the mean time. Normally a database's wake-up is cancelled by +# the RegistryManager when the database is unloaded - so this should not +# cause it to be reloaded. +# +# TODO: perhaps in the future we could consider a flag on ir.cron jobs +# that would cause database wake-up even if the database has not been +# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) +# # Each element is a triple (timestamp, database-name, boolean). The boolean # specifies if the wake-up is canceled (so a wake-up can be canceled without # relying on the heapq implementation detail; no need to remove the job from @@ -54,7 +69,7 @@ _wakeups = [] # Mapping of database names to the wake-up defined in the heapq, # so that we can cancel the wake-up without messing with the heapq -# internal structure: lookup the wake-up by database-name, then set +# invariant: lookup the wake-up by database-name, then set # its third element to True. _wakeup_by_db = {} @@ -64,32 +79,39 @@ _wakeup_by_db = {} # while spawning a few threads. _wakeups_lock = threading.RLock() -# Maximum number of threads allowed to process cron jobs concurrently. -_thread_count = 2 # TODO make it configurable +# Maximum number of threads allowed to process cron jobs concurrently. This +# variable is set by start_master_thread using openerp.conf.max_cron_threads. +_thread_slots = None -# A (non re-entrant) lock to protect the above _thread_count variable. -_thread_count_lock = threading.Lock() +# A (non re-entrant) lock to protect the above _thread_slots variable. +_thread_slots_lock = threading.Lock() -_logger = logging.getLogger('cron') +# Sleep duration limits - must not loop too quickly, but can't sleep too long +# either, because a new job might be inserted in ir_cron with a much sooner +# execution date than current known ones. We won't see it until we wake! +MAX_SLEEP = 60 # 1 min +MIN_SLEEP = 1 # 1 sec +# Dummy wake-up timestamp that can be used to force a database wake-up asap +WAKE_UP_NOW = 1 -def get_thread_count(): - """ Return the number of available threads. """ - return _thread_count +def get_thread_slots(): + """ Return the number of available thread slots """ + return _thread_slots -def inc_thread_count(): - """ Increment by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count += 1 +def release_thread_slot(): + """ Increment the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots += 1 -def dec_thread_count(): - """ Decrement by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count -= 1 +def take_thread_slot(): + """ Decrement the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots -= 1 def cancel(db_name): @@ -106,6 +128,7 @@ def cancel(db_name): def cancel_all(): """ Cancel all database wake-ups. """ + _logger.debug("Cancel all database wake-ups") global _wakeups global _wakeup_by_db with _wakeups_lock: @@ -113,7 +136,7 @@ def cancel_all(): _wakeup_by_db = {} -def schedule_in_advance(timestamp, db_name): +def schedule_wakeup(timestamp, db_name): """ Schedule a new wake-up for a database. If an earlier wake-up is already defined, the new wake-up is discarded. @@ -127,43 +150,44 @@ def schedule_in_advance(timestamp, db_name): if not timestamp: return with _wakeups_lock: - # Cancel the previous wake-up if any. - add_wakeup = False if db_name in _wakeup_by_db: task = _wakeup_by_db[db_name] - if task[2] or timestamp < task[0]: - add_wakeup = True - task[2] = True - else: - add_wakeup = True - if add_wakeup: - task = [timestamp, db_name, False] - heapq.heappush(_wakeups, task) - _wakeup_by_db[db_name] = task - + if not task[2] and timestamp > task[0]: + # existing wakeup is valid and occurs earlier than new one + return + task[2] = True # cancel existing task + task = [timestamp, db_name, False] + heapq.heappush(_wakeups, task) + _wakeup_by_db[db_name] = task + _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name, + 'NOW' if timestamp == WAKE_UP_NOW else timestamp) def runner(): - """Neverending function (intended to be ran in a dedicated thread) that + """Neverending function (intended to be run in a dedicated thread) that checks every 60 seconds the next database wake-up. TODO: make configurable """ while True: - with _wakeups_lock: - while _wakeups and _wakeups[0][0] < time.time() and get_thread_count(): - task = heapq.heappop(_wakeups) - timestamp, db_name, canceled = task - if canceled: - continue - task[2] = True - registry = openerp.pooler.get_pool(db_name) - if not registry._init: - registry['ir.cron']._run_jobs() - amount = 60 - with _wakeups_lock: - # Sleep less than 60s if the next known wake-up will happen before. - if _wakeups and get_thread_count(): - amount = min(60, _wakeups[0][0] - time.time()) - time.sleep(amount) + runner_body() +def runner_body(): + with _wakeups_lock: + while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots(): + task = heapq.heappop(_wakeups) + timestamp, db_name, canceled = task + if canceled: + continue + del _wakeup_by_db[db_name] + registry = openerp.pooler.get_pool(db_name) + if not registry._init: + _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name) + registry['ir.cron']._run_jobs_multithread() + amount = MAX_SLEEP + with _wakeups_lock: + # Sleep less than MAX_SLEEP if the next known wake-up will happen before that. + if _wakeups and get_thread_slots(): + amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time())) + _logger.debug("Going to sleep for %ss", amount) + time.sleep(amount) def start_master_thread(): """ Start the above runner function in a daemon thread. @@ -173,8 +197,16 @@ def start_master_thread(): threads it spawns are not marked daemon). """ + global _thread_slots + _thread_slots = openerp.conf.max_cron_threads + db_maxconn = tools.config['db_maxconn'] + if _thread_slots >= tools.config.get('db_maxconn', 64): + _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " + "this may cause trouble if you reach that number of parallel cron tasks.", + db_maxconn, _thread_slots) t = threading.Thread(target=runner, name="openerp.cron.master_thread") t.setDaemon(True) t.start() + _logger.debug("Master cron daemon started!") # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: