[IMP] cron: forgot to add the new openerp.cron module.
authorVo Minh Thu <vmt@openerp.com>
Fri, 15 Jul 2011 10:01:27 +0000 (12:01 +0200)
committerVo Minh Thu <vmt@openerp.com>
Fri, 15 Jul 2011 10:01:27 +0000 (12:01 +0200)
bzr revid: vmt@openerp.com-20110715100127-8btlo3bluaju3em6

openerp/cron.py [new file with mode: 0644]

diff --git a/openerp/cron.py b/openerp/cron.py
new file mode 100644 (file)
index 0000000..3271e13
--- /dev/null
@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+#    OpenERP, Open Source Management Solution
+#    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU Affero General Public License as
+#    published by the Free Software Foundation, either version 3 of the
+#    License, or (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU Affero General Public License for more details.
+#
+#    You should have received a copy of the GNU Affero General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+##############################################################################
+
+""" Cron jobs scheduling
+
+Cron jobs are defined in the ir_cron table/model. This module deals with all
+cron jobs, for all databases of a single OpenERP server instance.
+
+It defines a single master thread that will spawn (a bounded number of)
+threads to process individual cron jobs.
+
+"""
+
+import heapq
+import logging
+import threading
+import time
+
+import openerp
+
+""" Singleton that keeps track of cancellable tasks to run at a given
+    timestamp.
+   
+    The tasks are characterised by:
+   
+        * a timestamp
+        * the database on which the task run
+        * a boolean attribute specifying if the task is canceled
+
+    Implementation details:
+    
+      - Tasks are stored as list, allowing the cancellation by setting
+        the boolean to True.
+      - A heapq is used to store tasks, so we don't need to sort
+        tasks ourself.
+"""
+
+# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
+# the context of the cron management. This is not about loading a database
+# or otherwise making anything about it.
+_wakeups = [] # TODO protect this variable with a lock?
+
+# Mapping of database names to the wake-up defined in the heapq,
+# so that we can cancel the wake-up without messing with the heapq
+# internal structure.
+_wakeup_by_db = {}
+
+_logger = logging.getLogger('cron')
+
+_thread_count_lock = threading.Lock()
+
+# Maximum number of threads allowed to process cron jobs concurrently.
+_thread_count = 2
+
+
+def get_thread_count():
+    return _thread_count
+
+
+def inc_thread_count():
+    global _thread_count
+    with _thread_count_lock:
+        _thread_count += 1
+
+
+def dec_thread_count():
+    global _thread_count
+    with _thread_count_lock:
+        _thread_count -= 1
+
+
+def cancel(db_name):
+    """ Cancel the next wake-up of a given database, if any. """
+    _logger.debug("Cancel next wake-up for database '%s'.", db_name)
+    if db_name in _wakeup_by_db:
+        _wakeup_by_db[db_name][2] = True
+
+
+def cancel_all():
+    """ Cancel all database wake-ups. """
+    global _wakeups
+    global _wakeup_by_db
+    _wakeups = []
+    _wakeup_by_db = {}
+
+
+def schedule_in_advance(timestamp, db_name):
+    """ Schedule a wake-up for a new database.
+
+    If an earlier wake-up is already defined, the new wake-up is discarded.
+    If another wake-up is defined, it is discarded.
+
+    """
+    if not timestamp:
+        return
+    # Cancel the previous wakeup if any.
+    add_wakeup = False
+    if db_name in _wakeup_by_db:
+        task = _wakeup_by_db[db_name]
+        if task[2] or timestamp < task[0]:
+            add_wakeup = True
+            task[2] = True
+    else:
+        add_wakeup = True
+    if add_wakeup:
+        task = [timestamp, db_name, False]
+        heapq.heappush(_wakeups, task)
+        _wakeup_by_db[db_name] = task
+
+
+def runner():
+    """Neverending function (intended to be ran in a dedicated thread) that
+       checks every 60 seconds the next database wake-up. TODO: make configurable
+    """
+    while True:
+        while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
+            task = heapq.heappop(_wakeups)
+            timestamp, db_name, canceled = task
+            if canceled:
+                continue
+            task[2] = True
+            registry = openerp.pooler.get_pool(db_name)
+            if not registry._init:
+                registry['ir.cron']._run_jobs()
+        if _wakeups and get_thread_count():
+            time.sleep(min(60, _wakeups[0][0] - time.time()))
+        else:
+            time.sleep(60)
+
+
+def start_master_thread():
+    """ Start the above runner function in a daemon thread.
+
+    The thread is a typical daemon thread: it will never quit and must be
+    terminated when the main process exits - with no consequence (the processing
+    threads it spawns are not marked daemon).
+
+    """
+    t = threading.Thread(target=runner, name="openerp.cron.master_thread")
+    t.setDaemon(True)
+    t.start()
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: