[IMP]module: improve statusbar_visible
[odoo/odoo.git] / openerp / cron.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 """ Cron jobs scheduling
24
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
27
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
30
31 The thread runs forever, checking every 60 seconds for new
32 'database wake-ups'. It maintains a heapq of database wake-ups. At each
33 wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
34 will check the jobs defined in the ir_cron table and spawn accordingly threads
35 to process them.
36
37 This module's behavior depends on the following configuration variable:
38 openerp.conf.max_cron_threads.
39
40 """
41
42 import heapq
43 import logging
44 import threading
45 import time
46
47 import openerp
48 import tools
49
50 _logger = logging.getLogger(__name__)
51
52 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
53 # the context of the cron management. This is not originally about loading
54 # a database, although having the database name in the queue will
55 # cause it to be loaded when the schedule time is reached, even if it was
56 # unloaded in the mean time. Normally a database's wake-up is cancelled by
57 # the RegistryManager when the database is unloaded - so this should not
58 # cause it to be reloaded.
59 #
60 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
61 # that would cause database wake-up even if the database has not been
62 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
63 #
64 # Each element is a triple (timestamp, database-name, boolean). The boolean
65 # specifies if the wake-up is canceled (so a wake-up can be canceled without
66 # relying on the heapq implementation detail; no need to remove the job from
67 # the heapq).
68 _wakeups = []
69
70 # Mapping of database names to the wake-up defined in the heapq,
71 # so that we can cancel the wake-up without messing with the heapq
72 # invariant: lookup the wake-up by database-name, then set
73 # its third element to True.
74 _wakeup_by_db = {}
75
76 # Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
77 # We could use a simple (non-reentrant) lock if the runner function below
78 # was more fine-grained, but we are fine with the loop owning the lock
79 # while spawning a few threads.
80 _wakeups_lock = threading.RLock()
81
82 # Maximum number of threads allowed to process cron jobs concurrently. This
83 # variable is set by start_master_thread using openerp.conf.max_cron_threads.
84 _thread_slots = None
85
86 # A (non re-entrant) lock to protect the above _thread_slots variable.
87 _thread_slots_lock = threading.Lock()
88
89 # Sleep duration limits - must not loop too quickly, but can't sleep too long
90 # either, because a new job might be inserted in ir_cron with a much sooner
91 # execution date than current known ones. We won't see it until we wake!
92 MAX_SLEEP = 60 # 1 min
93 MIN_SLEEP = 1  # 1 sec
94
95 # Dummy wake-up timestamp that can be used to force a database wake-up asap
96 WAKE_UP_NOW = 1
97
98 def get_thread_slots():
99     """ Return the number of available thread slots """
100     return _thread_slots
101
102
103 def release_thread_slot():
104     """ Increment the number of available thread slots """
105     global _thread_slots
106     with _thread_slots_lock:
107         _thread_slots += 1
108
109
110 def take_thread_slot():
111     """ Decrement the number of available thread slots """
112     global _thread_slots
113     with _thread_slots_lock:
114         _thread_slots -= 1
115
116
117 def cancel(db_name):
118     """ Cancel the next wake-up of a given database, if any.
119
120     :param db_name: database name for which the wake-up is canceled.
121
122     """
123     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
124     with _wakeups_lock:
125         if db_name in _wakeup_by_db:
126             _wakeup_by_db[db_name][2] = True
127
128
129 def cancel_all():
130     """ Cancel all database wake-ups. """
131     _logger.debug("Cancel all database wake-ups")
132     global _wakeups
133     global _wakeup_by_db
134     with _wakeups_lock:
135         _wakeups = []
136         _wakeup_by_db = {}
137
138
139 def schedule_wakeup(timestamp, db_name):
140     """ Schedule a new wake-up for a database.
141
142     If an earlier wake-up is already defined, the new wake-up is discarded.
143     If another wake-up is defined, that wake-up is discarded and the new one
144     is scheduled.
145
146     :param db_name: database name for which a new wake-up is scheduled.
147     :param timestamp: when the wake-up is scheduled.
148
149     """
150     if not timestamp:
151         return
152     with _wakeups_lock:
153         if db_name in _wakeup_by_db:
154             task = _wakeup_by_db[db_name]
155             if not task[2] and timestamp > task[0]:
156                 # existing wakeup is valid and occurs earlier than new one
157                 return
158             task[2] = True # cancel existing task
159         task = [timestamp, db_name, False]
160         heapq.heappush(_wakeups, task)
161         _wakeup_by_db[db_name] = task
162         _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
163                       'NOW' if timestamp == WAKE_UP_NOW else timestamp)
164
165 def runner():
166     """Neverending function (intended to be run in a dedicated thread) that
167        checks every 60 seconds the next database wake-up. TODO: make configurable
168     """
169     while True:
170         runner_body()
171
172 def runner_body():
173     with _wakeups_lock:
174         while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
175             task = heapq.heappop(_wakeups)
176             timestamp, db_name, canceled = task
177             if canceled:
178                 continue
179             del _wakeup_by_db[db_name]
180             registry = openerp.pooler.get_pool(db_name)
181             if not registry._init:
182                 _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
183                 registry['ir.cron']._run_jobs_multithread()
184     amount = MAX_SLEEP
185     with _wakeups_lock:
186         # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
187         if _wakeups and get_thread_slots():
188             amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
189     _logger.debug("Going to sleep for %ss", amount)
190     time.sleep(amount)
191
192 def start_master_thread():
193     """ Start the above runner function in a daemon thread.
194
195     The thread is a typical daemon thread: it will never quit and must be
196     terminated when the main process exits - with no consequence (the processing
197     threads it spawns are not marked daemon).
198
199     """
200     global _thread_slots
201     _thread_slots = openerp.conf.max_cron_threads
202     db_maxconn = tools.config['db_maxconn']
203     if _thread_slots >= tools.config.get('db_maxconn', 64):
204         _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
205                         "this may cause trouble if you reach that number of parallel cron tasks.",
206                         db_maxconn, _thread_slots)
207     t = threading.Thread(target=runner, name="openerp.cron.master_thread")
208     t.setDaemon(True)
209     t.start()
210     _logger.debug("Master cron daemon started!")
211
212 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: