[FIX] export_data: avoid access error while auto-generating external IDs during export
[odoo/odoo.git] / openerp / cron.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 """ Cron jobs scheduling
24
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
27
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
30
31 The thread runs forever, checking every 60 seconds for new
32 'database wake-ups'. It maintains a heapq of database wake-ups. At each
33 wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
34 will check the jobs defined in the ir_cron table and spawn accordingly threads
35 to process them.
36
37 This module's behavior depends on the following configuration variable:
38 openerp.conf.max_cron_threads.
39
40 """
41
42 import heapq
43 import logging
44 import threading
45 import time
46
47 import openerp
48 import tools
49
50 _logger = logging.getLogger(__name__)
51
52 # Scheduling wake-ups (see below) can be disabled when the polling process
53 # workers are used instead of the managed thread workers. (I.e. wake-ups are
54 # not used since polling is used. And polling is used when the cron are
55 # handled by running special processes, e.g. openerp-cron-worker, instead
56 # of the general openerp-server script.)
57 enable_schedule_wakeup = True
58
59 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
60 # the context of the cron management. This is not originally about loading
61 # a database, although having the database name in the queue will
62 # cause it to be loaded when the schedule time is reached, even if it was
63 # unloaded in the mean time. Normally a database's wake-up is cancelled by
64 # the RegistryManager when the database is unloaded - so this should not
65 # cause it to be reloaded.
66 #
67 # TODO: perhaps in the future we could consider a flag on ir.cron jobs
68 # that would cause database wake-up even if the database has not been
69 # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
70 #
71 # Each element is a triple (timestamp, database-name, boolean). The boolean
72 # specifies if the wake-up is canceled (so a wake-up can be canceled without
73 # relying on the heapq implementation detail; no need to remove the job from
74 # the heapq).
75 _wakeups = []
76
77 # Mapping of database names to the wake-up defined in the heapq,
78 # so that we can cancel the wake-up without messing with the heapq
79 # invariant: lookup the wake-up by database-name, then set
80 # its third element to True.
81 _wakeup_by_db = {}
82
83 # Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
84 # We could use a simple (non-reentrant) lock if the runner function below
85 # was more fine-grained, but we are fine with the loop owning the lock
86 # while spawning a few threads.
87 _wakeups_lock = threading.RLock()
88
89 # Maximum number of threads allowed to process cron jobs concurrently. This
90 # variable is set by start_master_thread using openerp.conf.max_cron_threads.
91 _thread_slots = None
92
93 # A (non re-entrant) lock to protect the above _thread_slots variable.
94 _thread_slots_lock = threading.Lock()
95
96 # Sleep duration limits - must not loop too quickly, but can't sleep too long
97 # either, because a new job might be inserted in ir_cron with a much sooner
98 # execution date than current known ones. We won't see it until we wake!
99 MAX_SLEEP = 60 # 1 min
100 MIN_SLEEP = 1  # 1 sec
101
102 # Dummy wake-up timestamp that can be used to force a database wake-up asap
103 WAKE_UP_NOW = 1
104
105 def get_thread_slots():
106     """ Return the number of available thread slots """
107     return _thread_slots
108
109
110 def release_thread_slot():
111     """ Increment the number of available thread slots """
112     global _thread_slots
113     with _thread_slots_lock:
114         _thread_slots += 1
115
116
117 def take_thread_slot():
118     """ Decrement the number of available thread slots """
119     global _thread_slots
120     with _thread_slots_lock:
121         _thread_slots -= 1
122
123
124 def cancel(db_name):
125     """ Cancel the next wake-up of a given database, if any.
126
127     :param db_name: database name for which the wake-up is canceled.
128
129     """
130     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
131     with _wakeups_lock:
132         if db_name in _wakeup_by_db:
133             _wakeup_by_db[db_name][2] = True
134
135
136 def cancel_all():
137     """ Cancel all database wake-ups. """
138     _logger.debug("Cancel all database wake-ups")
139     global _wakeups
140     global _wakeup_by_db
141     with _wakeups_lock:
142         _wakeups = []
143         _wakeup_by_db = {}
144
145 def schedule_wakeup(timestamp, db_name):
146     """ Schedule a new wake-up for a database.
147
148     If an earlier wake-up is already defined, the new wake-up is discarded.
149     If another wake-up is defined, that wake-up is discarded and the new one
150     is scheduled.
151
152     :param db_name: database name for which a new wake-up is scheduled.
153     :param timestamp: when the wake-up is scheduled.
154
155     """
156     global enable_schedule_wakeup
157     if not enable_schedule_wakeup:
158         return
159     if not timestamp:
160         return
161     with _wakeups_lock:
162         if db_name in _wakeup_by_db:
163             task = _wakeup_by_db[db_name]
164             if not task[2] and timestamp > task[0]:
165                 # existing wakeup is valid and occurs earlier than new one
166                 return
167             task[2] = True # cancel existing task
168         task = [timestamp, db_name, False]
169         heapq.heappush(_wakeups, task)
170         _wakeup_by_db[db_name] = task
171         _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
172                       'NOW' if timestamp == WAKE_UP_NOW else timestamp)
173
174 def runner():
175     """Neverending function (intended to be run in a dedicated thread) that
176        checks every 60 seconds the next database wake-up. TODO: make configurable
177     """
178     while True:
179         runner_body()
180
181 def runner_body():
182     with _wakeups_lock:
183         while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
184             task = heapq.heappop(_wakeups)
185             timestamp, db_name, canceled = task
186             if canceled:
187                 continue
188             del _wakeup_by_db[db_name]
189             registry = openerp.pooler.get_pool(db_name)
190             if not registry._init:
191                 _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
192                 registry['ir.cron']._run_jobs_multithread()
193     amount = MAX_SLEEP
194     with _wakeups_lock:
195         # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
196         if _wakeups and get_thread_slots():
197             amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
198     _logger.debug("Going to sleep for %ss", amount)
199     time.sleep(amount)
200
201 def start_master_thread():
202     """ Start the above runner function in a daemon thread.
203
204     The thread is a typical daemon thread: it will never quit and must be
205     terminated when the main process exits - with no consequence (the processing
206     threads it spawns are not marked daemon).
207
208     """
209     global _thread_slots
210     _thread_slots = openerp.conf.max_cron_threads
211     db_maxconn = tools.config['db_maxconn']
212     if _thread_slots >= tools.config.get('db_maxconn', 64):
213         _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
214                         "this may cause trouble if you reach that number of parallel cron tasks.",
215                         db_maxconn, _thread_slots)
216     if _thread_slots:
217         t = threading.Thread(target=runner, name="openerp.cron.master_thread")
218         t.setDaemon(True)
219         t.start()
220         _logger.debug("Master cron daemon started!")
221     else:
222         _logger.info("No master cron daemon (0 workers needed).")
223
224 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: