[IMP] cron: added yaml test.
[odoo/odoo.git] / openerp / cron.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 """ Cron jobs scheduling
24
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
27
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
30
31 The thread runs forever, checking every 60 seconds for new
32 'database wake-ups'. It maintains a heapq of database wake-ups. At each
33 wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs
34 will check the jobs defined in the ir_cron table and spawn accordingly threads
35 to process them.
36
37 This module behavior depends on the following configuration variable:
38 openerp.conf.max_cron_threads.
39
40 """
41
42 import heapq
43 import logging
44 import threading
45 import time
46
47 import openerp
48
49 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
50 # the context of the cron management. This is not about loading a database
51 # or otherwise making anything about it.
52 # Each element is a triple (timestamp, database-name, boolean). The boolean
53 # specifies if the wake-up is canceled (so a wake-up can be canceled without
54 # relying on the heapq implementation detail; no need to remove the job from
55 # the heapq).
56 _wakeups = []
57
58 # Mapping of database names to the wake-up defined in the heapq,
59 # so that we can cancel the wake-up without messing with the heapq
60 # internal structure: lookup the wake-up by database-name, then set
61 # its third element to True.
62 _wakeup_by_db = {}
63
64 # Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
65 # We could use a simple (non-reentrant) lock if the runner function below
66 # was more fine-grained, but we are fine with the loop owning the lock
67 # while spawning a few threads.
68 _wakeups_lock = threading.RLock()
69
70 # Maximum number of threads allowed to process cron jobs concurrently. This
71 # variable is set by start_master_thread using openerp.conf.max_cron_threads.
72 _thread_count = None
73
74 # A (non re-entrant) lock to protect the above _thread_count variable.
75 _thread_count_lock = threading.Lock()
76
77 _logger = logging.getLogger('cron')
78
79
80 def get_thread_count():
81     """ Return the number of available threads. """
82     return _thread_count
83
84
85 def inc_thread_count():
86     """ Increment by the number of available threads. """
87     global _thread_count
88     with _thread_count_lock:
89         _thread_count += 1
90
91
92 def dec_thread_count():
93     """ Decrement by the number of available threads. """
94     global _thread_count
95     with _thread_count_lock:
96         _thread_count -= 1
97
98
99 def cancel(db_name):
100     """ Cancel the next wake-up of a given database, if any.
101
102     :param db_name: database name for which the wake-up is canceled.
103
104     """
105     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
106     with _wakeups_lock:
107         if db_name in _wakeup_by_db:
108             _wakeup_by_db[db_name][2] = True
109
110
111 def cancel_all():
112     """ Cancel all database wake-ups. """
113     global _wakeups
114     global _wakeup_by_db
115     with _wakeups_lock:
116         _wakeups = []
117         _wakeup_by_db = {}
118
119
120 def schedule_in_advance(timestamp, db_name):
121     """ Schedule a new wake-up for a database.
122
123     If an earlier wake-up is already defined, the new wake-up is discarded.
124     If another wake-up is defined, that wake-up is discarded and the new one
125     is scheduled.
126
127     :param db_name: database name for which a new wake-up is scheduled.
128     :param timestamp: when the wake-up is scheduled.
129
130     """
131     if not timestamp:
132         return
133     with _wakeups_lock:
134         # Cancel the previous wake-up if any.
135         add_wakeup = False
136         if db_name in _wakeup_by_db:
137             task = _wakeup_by_db[db_name]
138             if task[2] or timestamp < task[0]:
139                 add_wakeup = True
140                 task[2] = True
141         else:
142             add_wakeup = True
143         if add_wakeup:
144             task = [timestamp, db_name, False]
145             heapq.heappush(_wakeups, task)
146             _wakeup_by_db[db_name] = task
147
148
149 def runner():
150     """Neverending function (intended to be ran in a dedicated thread) that
151        checks every 60 seconds the next database wake-up. TODO: make configurable
152     """
153     while True:
154         runner_body()
155
156 def runner_body():
157     with _wakeups_lock:
158         while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
159             task = heapq.heappop(_wakeups)
160             timestamp, db_name, canceled = task
161             if canceled:
162                 continue
163             task[2] = True
164             registry = openerp.pooler.get_pool(db_name)
165             if not registry._init:
166                 registry['ir.cron']._run_jobs()
167     amount = 60
168     with _wakeups_lock:
169         # Sleep less than 60s if the next known wake-up will happen before.
170         if _wakeups and get_thread_count():
171             amount = min(60, _wakeups[0][0] - time.time())
172     time.sleep(amount)
173
174
175 def start_master_thread():
176     """ Start the above runner function in a daemon thread.
177
178     The thread is a typical daemon thread: it will never quit and must be
179     terminated when the main process exits - with no consequence (the processing
180     threads it spawns are not marked daemon).
181
182     """
183     global _thread_count
184     _thread_count = openerp.conf.max_cron_threads
185     t = threading.Thread(target=runner, name="openerp.cron.master_thread")
186     t.setDaemon(True)
187     t.start()
188
189 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: