_logger = logging.getLogger('cron')
+# We could use a simple (non-reentrant) lock if the runner function below
+# was more fine-grained, but we are fine with the loop owning the lock
+# while spawning a few threads.
+_wakeups_lock = threading.RLock()
+
_thread_count_lock = threading.Lock()
# Maximum number of threads allowed to process cron jobs concurrently.
def cancel(db_name):
""" Cancel the next wake-up of a given database, if any. """
_logger.debug("Cancel next wake-up for database '%s'.", db_name)
- if db_name in _wakeup_by_db:
- _wakeup_by_db[db_name][2] = True
+ with _wakeups_lock:
+ if db_name in _wakeup_by_db:
+ _wakeup_by_db[db_name][2] = True
def cancel_all():
""" Cancel all database wake-ups. """
global _wakeups
global _wakeup_by_db
- _wakeups = []
- _wakeup_by_db = {}
+ with _wakeups_lock:
+ _wakeups = []
+ _wakeup_by_db = {}
def schedule_in_advance(timestamp, db_name):
"""
if not timestamp:
return
- # Cancel the previous wakeup if any.
- add_wakeup = False
- if db_name in _wakeup_by_db:
- task = _wakeup_by_db[db_name]
- if task[2] or timestamp < task[0]:
+ with _wakeups_lock:
+ # Cancel the previous wakeup if any.
+ add_wakeup = False
+ if db_name in _wakeup_by_db:
+ task = _wakeup_by_db[db_name]
+ if task[2] or timestamp < task[0]:
+ add_wakeup = True
+ task[2] = True
+ else:
add_wakeup = True
- task[2] = True
- else:
- add_wakeup = True
- if add_wakeup:
- task = [timestamp, db_name, False]
- heapq.heappush(_wakeups, task)
- _wakeup_by_db[db_name] = task
+ if add_wakeup:
+ task = [timestamp, db_name, False]
+ heapq.heappush(_wakeups, task)
+ _wakeup_by_db[db_name] = task
def runner():
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
- while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
- task = heapq.heappop(_wakeups)
- timestamp, db_name, canceled = task
- if canceled:
- continue
- task[2] = True
- registry = openerp.pooler.get_pool(db_name)
- if not registry._init:
- registry['ir.cron']._run_jobs()
- if _wakeups and get_thread_count():
- time.sleep(min(60, _wakeups[0][0] - time.time()))
- else:
- time.sleep(60)
+ with _wakeups_lock:
+ while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
+ task = heapq.heappop(_wakeups)
+ timestamp, db_name, canceled = task
+ if canceled:
+ continue
+ task[2] = True
+ registry = openerp.pooler.get_pool(db_name)
+ if not registry._init:
+ registry['ir.cron']._run_jobs()
+ amount = 60
+ with _wakeups_lock:
+ if _wakeups and get_thread_count():
+ amount = min(60, _wakeups[0][0] - time.time())
+ time.sleep(amount)
def start_master_thread():