[IMP] cron: bracketed the jobs heap/dict with a lock.
authorVo Minh Thu <vmt@openerp.com>
Fri, 15 Jul 2011 11:38:45 +0000 (13:38 +0200)
committerVo Minh Thu <vmt@openerp.com>
Fri, 15 Jul 2011 11:38:45 +0000 (13:38 +0200)
bzr revid: vmt@openerp.com-20110715113845-zokj6cf6z0adj6h4

openerp/addons/base/ir/ir_cron.py
openerp/cron.py

index 6073489..2cdb15b 100644 (file)
@@ -164,7 +164,7 @@ class ir_cron(osv.osv):
                 # Reschedule our own main cron thread if necessary.
                 # This is really needed if this job run longer that its rescheduling period.
                 print ">>> advance at", nextcall
-                nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
+                nextcall = time.mktime(nextcall.timetuple())
                 openerp.cron.schedule_in_advance(nextcall, cr.dbname)
         finally:
             cr.commit()
index 3271e13..79347fb 100644 (file)
@@ -66,6 +66,11 @@ _wakeup_by_db = {}
 
 _logger = logging.getLogger('cron')
 
+# We could use a simple (non-reentrant) lock if the runner function below
+# was more fine-grained, but we are fine with the loop owning the lock
+# while spawning a few threads.
+_wakeups_lock = threading.RLock()
+
 _thread_count_lock = threading.Lock()
 
 # Maximum number of threads allowed to process cron jobs concurrently.
@@ -91,16 +96,18 @@ def dec_thread_count():
 def cancel(db_name):
     """ Cancel the next wake-up of a given database, if any. """
     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
-    if db_name in _wakeup_by_db:
-        _wakeup_by_db[db_name][2] = True
+    with _wakeups_lock:
+        if db_name in _wakeup_by_db:
+            _wakeup_by_db[db_name][2] = True
 
 
 def cancel_all():
     """ Cancel all database wake-ups. """
     global _wakeups
     global _wakeup_by_db
-    _wakeups = []
-    _wakeup_by_db = {}
+    with _wakeups_lock:
+        _wakeups = []
+        _wakeup_by_db = {}
 
 
 def schedule_in_advance(timestamp, db_name):
@@ -112,19 +119,20 @@ def schedule_in_advance(timestamp, db_name):
     """
     if not timestamp:
         return
-    # Cancel the previous wakeup if any.
-    add_wakeup = False
-    if db_name in _wakeup_by_db:
-        task = _wakeup_by_db[db_name]
-        if task[2] or timestamp < task[0]:
+    with _wakeups_lock:
+        # Cancel the previous wakeup if any.
+        add_wakeup = False
+        if db_name in _wakeup_by_db:
+            task = _wakeup_by_db[db_name]
+            if task[2] or timestamp < task[0]:
+                add_wakeup = True
+                task[2] = True
+        else:
             add_wakeup = True
-            task[2] = True
-    else:
-        add_wakeup = True
-    if add_wakeup:
-        task = [timestamp, db_name, False]
-        heapq.heappush(_wakeups, task)
-        _wakeup_by_db[db_name] = task
+        if add_wakeup:
+            task = [timestamp, db_name, False]
+            heapq.heappush(_wakeups, task)
+            _wakeup_by_db[db_name] = task
 
 
 def runner():
@@ -132,19 +140,21 @@ def runner():
        checks every 60 seconds the next database wake-up. TODO: make configurable
     """
     while True:
-        while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
-            task = heapq.heappop(_wakeups)
-            timestamp, db_name, canceled = task
-            if canceled:
-                continue
-            task[2] = True
-            registry = openerp.pooler.get_pool(db_name)
-            if not registry._init:
-                registry['ir.cron']._run_jobs()
-        if _wakeups and get_thread_count():
-            time.sleep(min(60, _wakeups[0][0] - time.time()))
-        else:
-            time.sleep(60)
+        with _wakeups_lock:
+            while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
+                task = heapq.heappop(_wakeups)
+                timestamp, db_name, canceled = task
+                if canceled:
+                    continue
+                task[2] = True
+                registry = openerp.pooler.get_pool(db_name)
+                if not registry._init:
+                    registry['ir.cron']._run_jobs()
+        amount = 60
+        with _wakeups_lock:
+            if _wakeups and get_thread_count():
+                amount = min(60, _wakeups[0][0] - time.time())
+        time.sleep(amount)
 
 
 def start_master_thread():