_logger = logging.getLogger(__name__)
+# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
+# the context of the cron management. This is not originally about loading
+# a database, although having the database name in the queue will
+# cause it to be loaded when the schedule time is reached, even if it was
+# unloaded in the mean time. Normally a database's wake-up is cancelled by
+# the RegistryManager when the database is unloaded - so this should not
+# cause it to be reloaded.
+#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
+#
+# Each element is a triple (timestamp, database-name, boolean). The boolean
+# specifies if the wake-up is canceled (so a wake-up can be canceled without
+# relying on the heapq implementation detail; no need to remove the job from
+# the heapq).
+_wakeups = []
+
+# Mapping of database names to the wake-up defined in the heapq,
+# so that we can cancel the wake-up without messing with the heapq
+# invariant: lookup the wake-up by database-name, then set
+# its third element to True.
+_wakeup_by_db = {}
+
+# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
+# We could use a simple (non-reentrant) lock if the runner function below
+# was more fine-grained, but we are fine with the loop owning the lock
+# while spawning a few threads.
+_wakeups_lock = threading.RLock()
+
+# Maximum number of threads allowed to process cron jobs concurrently. This
+# variable is set by start_master_thread using openerp.conf.max_cron_threads.
+_thread_slots = None
+
+# A (non re-entrant) lock to protect the above _thread_slots variable.
+_thread_slots_lock = threading.Lock()
+
+# Sleep duration limits - must not loop too quickly, but can't sleep too long
+# either, because a new job might be inserted in ir_cron with a much sooner
+# execution date than current known ones. We won't see it until we wake!
+MAX_SLEEP = 60 # 1 min
+MIN_SLEEP = 1 # 1 sec
+
+# Dummy wake-up timestamp that can be used to force a database wake-up asap
+WAKE_UP_NOW = 1
+
+def get_thread_slots():
+ """ Return the number of available thread slots """
+ return _thread_slots
+
+
+def release_thread_slot():
+ """ Increment the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots += 1
+
+
+def take_thread_slot():
+ """ Decrement the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots -= 1
+
+
+def cancel(db_name):
+ """ Cancel the next wake-up of a given database, if any.
+
+ :param db_name: database name for which the wake-up is canceled.
+
+ """
+ _logger.debug("Cancel next wake-up for database '%s'.", db_name)
+ with _wakeups_lock:
+ if db_name in _wakeup_by_db:
+ _wakeup_by_db[db_name][2] = True
+
+
+def cancel_all():
+ """ Cancel all database wake-ups. """
+ _logger.debug("Cancel all database wake-ups")
+ global _wakeups
+ global _wakeup_by_db
+ with _wakeups_lock:
+ _wakeups = []
+ _wakeup_by_db = {}
+
+
+def schedule_wakeup(timestamp, db_name):
+ """ Schedule a new wake-up for a database.
+
+ If an earlier wake-up is already defined, the new wake-up is discarded.
+ If another wake-up is defined, that wake-up is discarded and the new one
+ is scheduled.
+
+ :param db_name: database name for which a new wake-up is scheduled.
+ :param timestamp: when the wake-up is scheduled.
+
+ """
+ if not timestamp:
+ return
+ with _wakeups_lock:
+ if db_name in _wakeup_by_db:
+ task = _wakeup_by_db[db_name]
+ if not task[2] and timestamp > task[0]:
+ # existing wakeup is valid and occurs earlier than new one
+ return
+ task[2] = True # cancel existing task
+ task = [timestamp, db_name, False]
+ heapq.heappush(_wakeups, task)
+ _wakeup_by_db[db_name] = task
+ _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
+ 'NOW' if timestamp == WAKE_UP_NOW else timestamp)
+
+def runner():
+ """Neverending function (intended to be run in a dedicated thread) that
+ checks every 60 seconds the next database wake-up. TODO: make configurable
+ """
+ while True:
+ runner_body()
+
+def runner_body():
+ with _wakeups_lock:
+ while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
+ task = heapq.heappop(_wakeups)
+ timestamp, db_name, canceled = task
+ if canceled:
+ continue
+ del _wakeup_by_db[db_name]
+ registry = openerp.pooler.get_pool(db_name)
+ if not registry._init:
+ _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
+ registry['ir.cron']._run_jobs_multithread()
+ amount = MAX_SLEEP
+ with _wakeups_lock:
+ # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
+ if _wakeups and get_thread_slots():
+ amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
+ _logger.debug("Going to sleep for %ss", amount)
+ time.sleep(amount)
+
+def start_master_thread():
+ """ Start the above runner function in a daemon thread.
+
+ The thread is a typical daemon thread: it will never quit and must be
+ terminated when the main process exits - with no consequence (the processing
+ threads it spawns are not marked daemon).
+ """
+ global _thread_slots
+ _thread_slots = openerp.conf.max_cron_threads
+ db_maxconn = tools.config['db_maxconn']
+ if _thread_slots >= tools.config.get('db_maxconn', 64):
+ _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
+ "this may cause trouble if you reach that number of parallel cron tasks.",
+ db_maxconn, _thread_slots)
+ t = threading.Thread(target=runner, name="openerp.cron.master_thread")
+ t.setDaemon(True)
+ t.start()
+ _logger.debug("Master cron daemon started!")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: