The thread runs forever, checking every 60 seconds for new
'database wake-ups'. It maintains a heapq of database wake-ups. At each
-wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs
+wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
will check the jobs defined in the ir_cron table and spawn accordingly threads
to process them.
-This module behavior depends on the following configuration variable:
+This module's behavior depends on the following configuration variable:
openerp.conf.max_cron_threads.
"""
import time
import openerp
+import tools
+
+_logger = logging.getLogger(__name__)
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
-# the context of the cron management. This is not about loading a database
-# or otherwise making anything about it.
+# the context of the cron management. This is not originally about loading
+# a database, although having the database name in the queue will
+# cause it to be loaded when the schedule time is reached, even if it was
+# unloaded in the mean time. Normally a database's wake-up is cancelled by
+# the RegistryManager when the database is unloaded - so this should not
+# cause it to be reloaded.
+#
+# TODO: perhaps in the future we could consider a flag on ir.cron jobs
+# that would cause database wake-up even if the database has not been
+# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
+#
# Each element is a triple (timestamp, database-name, boolean). The boolean
# specifies if the wake-up is canceled (so a wake-up can be canceled without
# relying on the heapq implementation detail; no need to remove the job from
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
-# internal structure: lookup the wake-up by database-name, then set
+# invariant: lookup the wake-up by database-name, then set
# its third element to True.
_wakeup_by_db = {}
# Maximum number of threads allowed to process cron jobs concurrently. This
# variable is set by start_master_thread using openerp.conf.max_cron_threads.
-_thread_count = None
+_thread_slots = None
-# A (non re-entrant) lock to protect the above _thread_count variable.
-_thread_count_lock = threading.Lock()
+# A (non re-entrant) lock to protect the above _thread_slots variable.
+_thread_slots_lock = threading.Lock()
-_logger = logging.getLogger('cron')
+# Sleep duration limits - must not loop too quickly, but can't sleep too long
+# either, because a new job might be inserted in ir_cron with a much sooner
+# execution date than current known ones. We won't see it until we wake!
+MAX_SLEEP = 60 # 1 min
+MIN_SLEEP = 1 # 1 sec
+# Dummy wake-up timestamp that can be used to force a database wake-up asap
+WAKE_UP_NOW = 1
-def get_thread_count():
- """ Return the number of available threads. """
- return _thread_count
+def get_thread_slots():
+ """ Return the number of available thread slots """
+ return _thread_slots
-def inc_thread_count():
- """ Increment by the number of available threads. """
- global _thread_count
- with _thread_count_lock:
- _thread_count += 1
+def release_thread_slot():
+ """ Increment the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots += 1
-def dec_thread_count():
- """ Decrement by the number of available threads. """
- global _thread_count
- with _thread_count_lock:
- _thread_count -= 1
+def take_thread_slot():
+ """ Decrement the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots -= 1
def cancel(db_name):
def cancel_all():
""" Cancel all database wake-ups. """
+ _logger.debug("Cancel all database wake-ups")
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
_wakeup_by_db = {}
-def schedule_in_advance(timestamp, db_name):
+def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
If an earlier wake-up is already defined, the new wake-up is discarded.
if not timestamp:
return
with _wakeups_lock:
- # Cancel the previous wake-up if any.
- add_wakeup = False
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
- if task[2] or timestamp < task[0]:
- add_wakeup = True
- task[2] = True
- else:
- add_wakeup = True
- if add_wakeup:
- task = [timestamp, db_name, False]
- heapq.heappush(_wakeups, task)
- _wakeup_by_db[db_name] = task
-
+ if not task[2] and timestamp > task[0]:
+ # existing wakeup is valid and occurs earlier than new one
+ return
+ task[2] = True # cancel existing task
+ task = [timestamp, db_name, False]
+ heapq.heappush(_wakeups, task)
+ _wakeup_by_db[db_name] = task
+ _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
+ 'NOW' if timestamp == WAKE_UP_NOW else timestamp)
def runner():
- """Neverending function (intended to be ran in a dedicated thread) that
+ """Neverending function (intended to be run in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
- with _wakeups_lock:
- while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
- task = heapq.heappop(_wakeups)
- timestamp, db_name, canceled = task
- if canceled:
- continue
- task[2] = True
- registry = openerp.pooler.get_pool(db_name)
- if not registry._init:
- registry['ir.cron']._run_jobs()
- amount = 60
- with _wakeups_lock:
- # Sleep less than 60s if the next known wake-up will happen before.
- if _wakeups and get_thread_count():
- amount = min(60, _wakeups[0][0] - time.time())
- time.sleep(amount)
+ runner_body()
+def runner_body():
+ with _wakeups_lock:
+ while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
+ task = heapq.heappop(_wakeups)
+ timestamp, db_name, canceled = task
+ if canceled:
+ continue
+ del _wakeup_by_db[db_name]
+ registry = openerp.pooler.get_pool(db_name)
+ if not registry._init:
+ _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
+ registry['ir.cron']._run_jobs_multithread()
+ amount = MAX_SLEEP
+ with _wakeups_lock:
+ # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
+ if _wakeups and get_thread_slots():
+ amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
+ _logger.debug("Going to sleep for %ss", amount)
+ time.sleep(amount)
def start_master_thread():
""" Start the above runner function in a daemon thread.
threads it spawns are not marked daemon).
"""
- global _thread_count
- _thread_count = openerp.conf.max_cron_threads
+ global _thread_slots
+ _thread_slots = openerp.conf.max_cron_threads
+ db_maxconn = tools.config['db_maxconn']
+ if _thread_slots >= tools.config.get('db_maxconn', 64):
+ _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
+ "this may cause trouble if you reach that number of parallel cron tasks.",
+ db_maxconn, _thread_slots)
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.setDaemon(True)
t.start()
+ _logger.debug("Master cron daemon started!")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: