2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
23 """ Cron jobs scheduling
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
31 The thread runs forever, checking every 60 seconds for new
32 'database wake-ups'. It maintains a heapq of database wake-ups. At each
33 wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
34 will check the jobs defined in the ir_cron table and spawn accordingly threads
37 This module's behavior depends on the following configuration variable:
38 openerp.conf.max_cron_threads.
50 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
51 # the context of the cron management. This is not originally about loading
52 # a database, although having the database name in the queue will
53 # cause it to be loaded when the schedule time is reached, even if it was
54 # unloaded in the mean time. Normally a database's wake-up is cancelled by
55 # the RegistryManager when the database is unloaded - so this should not
56 # cause it to be reloaded.
58 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
59 # that would cause database wake-up even if the database has not been
60 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
62 # Each element is a triple (timestamp, database-name, boolean). The boolean
63 # specifies if the wake-up is canceled (so a wake-up can be canceled without
64 # relying on the heapq implementation detail; no need to remove the job from
68 # Mapping of database names to the wake-up defined in the heapq,
69 # so that we can cancel the wake-up without messing with the heapq
70 # invariant: lookup the wake-up by database-name, then set
71 # its third element to True.
74 # Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
75 # We could use a simple (non-reentrant) lock if the runner function below
76 # was more fine-grained, but we are fine with the loop owning the lock
77 # while spawning a few threads.
78 _wakeups_lock = threading.RLock()
80 # Maximum number of threads allowed to process cron jobs concurrently. This
81 # variable is set by start_master_thread using openerp.conf.max_cron_threads.
84 # A (non re-entrant) lock to protect the above _thread_slots variable.
85 _thread_slots_lock = threading.Lock()
87 _logger = logging.getLogger('cron')
89 # Sleep duration limits - must not loop too quickly, but can't sleep too long
90 # either, because a new job might be inserted in ir_cron with a much sooner
91 # execution date than current known ones. We won't see it until we wake!
92 MAX_SLEEP = 60 # 1 min
95 # Dummy wake-up timestamp that can be used to force a database wake-up asap
98 def get_thread_slots():
99 """ Return the number of available thread slots """
103 def release_thread_slot():
104 """ Increment the number of available thread slots """
106 with _thread_slots_lock:
110 def take_thread_slot():
111 """ Decrement the number of available thread slots """
113 with _thread_slots_lock:
118 """ Cancel the next wake-up of a given database, if any.
120 :param db_name: database name for which the wake-up is canceled.
123 _logger.debug("Cancel next wake-up for database '%s'.", db_name)
125 if db_name in _wakeup_by_db:
126 _wakeup_by_db[db_name][2] = True
130 """ Cancel all database wake-ups. """
131 _logger.debug("Cancel all database wake-ups")
139 def schedule_wakeup(timestamp, db_name):
140 """ Schedule a new wake-up for a database.
142 If an earlier wake-up is already defined, the new wake-up is discarded.
143 If another wake-up is defined, that wake-up is discarded and the new one
146 :param db_name: database name for which a new wake-up is scheduled.
147 :param timestamp: when the wake-up is scheduled.
153 if db_name in _wakeup_by_db:
154 task = _wakeup_by_db[db_name]
155 if not task[2] and timestamp > task[0]:
156 # existing wakeup is valid and occurs earlier than new one
158 task[2] = True # cancel existing task
159 task = [timestamp, db_name, False]
160 heapq.heappush(_wakeups, task)
161 _wakeup_by_db[db_name] = task
162 _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
163 'NOW' if timestamp == WAKE_UP_NOW else timestamp)
166 """Neverending function (intended to be run in a dedicated thread) that
167 checks every 60 seconds the next database wake-up. TODO: make configurable
174 while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
175 task = heapq.heappop(_wakeups)
176 timestamp, db_name, canceled = task
179 del _wakeup_by_db[db_name]
180 registry = openerp.pooler.get_pool(db_name)
181 if not registry._init:
182 _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
183 registry['ir.cron']._run_jobs_multithread()
186 # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
187 if _wakeups and get_thread_slots():
188 amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
189 _logger.debug("Going to sleep for %ss", amount)
192 def start_master_thread():
193 """ Start the above runner function in a daemon thread.
195 The thread is a typical daemon thread: it will never quit and must be
196 terminated when the main process exits - with no consequence (the processing
197 threads it spawns are not marked daemon).
201 _thread_slots = openerp.conf.max_cron_threads
202 db_maxconn = tools.config['db_maxconn']
203 if _thread_slots >= tools.config.get('db_maxconn', 64):
204 _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
205 "this may cause trouble if you reach that number of parallel cron tasks.",
206 db_maxconn, _thread_slots)
207 t = threading.Thread(target=runner, name="openerp.cron.master_thread")
210 _logger.debug("Master cron daemon started!")
212 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: