[IMP] cron: the maximum number of cron threads is configurable.
[odoo/odoo.git] / openerp / cron.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 """ Cron jobs scheduling
24
25 Cron jobs are defined in the ir_cron table/model. This module deals with all
26 cron jobs, for all databases of a single OpenERP server instance.
27
28 It defines a single master thread that will spawn (a bounded number of)
29 threads to process individual cron jobs.
30
31 The thread runs forever, checking every 60 seconds for new
32 'database wake-ups'. It maintains a heapq of database wake-ups. At each
33 wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs
34 will check the jobs defined in the ir_cron table and spawn accordingly threads
35 to process them.
36
37 """
38
39 import heapq
40 import logging
41 import threading
42 import time
43
44 import openerp
45
46 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
47 # the context of the cron management. This is not about loading a database
48 # or otherwise making anything about it.
49 # Each element is a triple (timestamp, database-name, boolean). The boolean
50 # specifies if the wake-up is canceled (so a wake-up can be canceled without
51 # relying on the heapq implementation detail; no need to remove the job from
52 # the heapq).
53 _wakeups = []
54
55 # Mapping of database names to the wake-up defined in the heapq,
56 # so that we can cancel the wake-up without messing with the heapq
57 # internal structure: lookup the wake-up by database-name, then set
58 # its third element to True.
59 _wakeup_by_db = {}
60
61 # Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
62 # We could use a simple (non-reentrant) lock if the runner function below
63 # was more fine-grained, but we are fine with the loop owning the lock
64 # while spawning a few threads.
65 _wakeups_lock = threading.RLock()
66
67 # A (non re-entrant) lock to protect the openerp.conf.max_cron_threads
68 # variable.
69 _thread_count_lock = threading.Lock()
70
71 _logger = logging.getLogger('cron')
72
73
74 def get_thread_count():
75     """ Return the number of available threads. """
76     return openerp.conf.max_cron_threads
77
78
79 def inc_thread_count():
80     """ Increment by the number of available threads. """
81     with _thread_count_lock:
82         openerp.conf.max_cron_threads += 1
83
84
85 def dec_thread_count():
86     """ Decrement by the number of available threads. """
87     with _thread_count_lock:
88         openerp.conf.max_cron_threads -= 1
89
90
91 def cancel(db_name):
92     """ Cancel the next wake-up of a given database, if any.
93
94     :param db_name: database name for which the wake-up is canceled.
95
96     """
97     _logger.debug("Cancel next wake-up for database '%s'.", db_name)
98     with _wakeups_lock:
99         if db_name in _wakeup_by_db:
100             _wakeup_by_db[db_name][2] = True
101
102
103 def cancel_all():
104     """ Cancel all database wake-ups. """
105     global _wakeups
106     global _wakeup_by_db
107     with _wakeups_lock:
108         _wakeups = []
109         _wakeup_by_db = {}
110
111
112 def schedule_in_advance(timestamp, db_name):
113     """ Schedule a new wake-up for a database.
114
115     If an earlier wake-up is already defined, the new wake-up is discarded.
116     If another wake-up is defined, that wake-up is discarded and the new one
117     is scheduled.
118
119     :param db_name: database name for which a new wake-up is scheduled.
120     :param timestamp: when the wake-up is scheduled.
121
122     """
123     if not timestamp:
124         return
125     with _wakeups_lock:
126         # Cancel the previous wake-up if any.
127         add_wakeup = False
128         if db_name in _wakeup_by_db:
129             task = _wakeup_by_db[db_name]
130             if task[2] or timestamp < task[0]:
131                 add_wakeup = True
132                 task[2] = True
133         else:
134             add_wakeup = True
135         if add_wakeup:
136             task = [timestamp, db_name, False]
137             heapq.heappush(_wakeups, task)
138             _wakeup_by_db[db_name] = task
139
140
141 def runner():
142     """Neverending function (intended to be ran in a dedicated thread) that
143        checks every 60 seconds the next database wake-up. TODO: make configurable
144     """
145     while True:
146         with _wakeups_lock:
147             while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
148                 task = heapq.heappop(_wakeups)
149                 timestamp, db_name, canceled = task
150                 if canceled:
151                     continue
152                 task[2] = True
153                 registry = openerp.pooler.get_pool(db_name)
154                 if not registry._init:
155                     registry['ir.cron']._run_jobs()
156         amount = 60
157         with _wakeups_lock:
158             # Sleep less than 60s if the next known wake-up will happen before.
159             if _wakeups and get_thread_count():
160                 amount = min(60, _wakeups[0][0] - time.time())
161         time.sleep(amount)
162
163
164 def start_master_thread():
165     """ Start the above runner function in a daemon thread.
166
167     The thread is a typical daemon thread: it will never quit and must be
168     terminated when the main process exits - with no consequence (the processing
169     threads it spawns are not marked daemon).
170
171     """
172     t = threading.Thread(target=runner, name="openerp.cron.master_thread")
173     t.setDaemon(True)
174     t.start()
175
176 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: