3271e131f016e675dbf252c170a5ebd56afbf561
[odoo/odoo.git] / openerp / cron.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 """ Cron jobs scheduling
24
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
27
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
30
31 """
32
33 import heapq
34 import logging
35 import threading
36 import time
37
38 import openerp
39
40 """ Singleton that keeps track of cancellable tasks to run at a given
41     timestamp.
42    
43     The tasks are characterised by:
44    
45         * a timestamp
46         * the database on which the task run
47         * a boolean attribute specifying if the task is canceled
48
49     Implementation details:
50     
51       - Tasks are stored as list, allowing the cancellation by setting
52         the boolean to True.
53       - A heapq is used to store tasks, so we don't need to sort
54         tasks ourself.
55 """
56
57 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
58 # the context of the cron management. This is not about loading a database
59 # or otherwise making anything about it.
60 _wakeups = [] # TODO protect this variable with a lock?
61
62 # Mapping of database names to the wake-up defined in the heapq,
63 # so that we can cancel the wake-up without messing with the heapq
64 # internal structure.
65 _wakeup_by_db = {}
66
67 _logger = logging.getLogger('cron')
68
69 _thread_count_lock = threading.Lock()
70
71 # Maximum number of threads allowed to process cron jobs concurrently.
72 _thread_count = 2
73
74
75 def get_thread_count():
76     return _thread_count
77
78
79 def inc_thread_count():
80     global _thread_count
81     with _thread_count_lock:
82         _thread_count += 1
83
84
85 def dec_thread_count():
86     global _thread_count
87     with _thread_count_lock:
88         _thread_count -= 1
89
90
91 def cancel(db_name):
92     """ Cancel the next wake-up of a given database, if any. """
93     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
94     if db_name in _wakeup_by_db:
95         _wakeup_by_db[db_name][2] = True
96
97
98 def cancel_all():
99     """ Cancel all database wake-ups. """
100     global _wakeups
101     global _wakeup_by_db
102     _wakeups = []
103     _wakeup_by_db = {}
104
105
106 def schedule_in_advance(timestamp, db_name):
107     """ Schedule a wake-up for a new database.
108
109     If an earlier wake-up is already defined, the new wake-up is discarded.
110     If another wake-up is defined, it is discarded.
111
112     """
113     if not timestamp:
114         return
115     # Cancel the previous wakeup if any.
116     add_wakeup = False
117     if db_name in _wakeup_by_db:
118         task = _wakeup_by_db[db_name]
119         if task[2] or timestamp < task[0]:
120             add_wakeup = True
121             task[2] = True
122     else:
123         add_wakeup = True
124     if add_wakeup:
125         task = [timestamp, db_name, False]
126         heapq.heappush(_wakeups, task)
127         _wakeup_by_db[db_name] = task
128
129
130 def runner():
131     """Neverending function (intended to be ran in a dedicated thread) that
132        checks every 60 seconds the next database wake-up. TODO: make configurable
133     """
134     while True:
135         while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
136             task = heapq.heappop(_wakeups)
137             timestamp, db_name, canceled = task
138             if canceled:
139                 continue
140             task[2] = True
141             registry = openerp.pooler.get_pool(db_name)
142             if not registry._init:
143                 registry['ir.cron']._run_jobs()
144         if _wakeups and get_thread_count():
145             time.sleep(min(60, _wakeups[0][0] - time.time()))
146         else:
147             time.sleep(60)
148
149
150 def start_master_thread():
151     """ Start the above runner function in a daemon thread.
152
153     The thread is a typical daemon thread: it will never quit and must be
154     terminated when the main process exits - with no consequence (the processing
155     threads it spawns are not marked daemon).
156
157     """
158     t = threading.Thread(target=runner, name="openerp.cron.master_thread")
159     t.setDaemon(True)
160     t.start()
161
162 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: