return databases
if __name__ == '__main__':
+ os.environ['TZ'] = 'UTC'
openerp.tools.config.parse_config(sys.argv[1:])
config = openerp.tools.config
if config['log_handler'] == [':INFO']:
# Replace the default value, which is suitable for openerp-server.
config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
- if config['db_name']:
- db_names = config['db_name'].split(',')
- print "Monitoring %s databases." % len(db_names)
- else:
- db_names = list_databases
- print "Monitored databases are auto-discovered."
setup_signal_handlers()
openerp.modules.module.initialize_sys_path()
openerp.modules.loading.open_openerp_namespace()
openerp.netsvc.init_logger()
+ openerp.cron.enable_schedule_wakeup = False
import openerp.addons.base
print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
+ if config['db_name']:
+ db_names = config['db_name'].split(',')
+ print "Monitoring %s databases." % len(db_names)
+ else:
+ db_names = list_databases
+ print "Monitored databases are auto-discovered."
openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import addons
import conf
+import cron
import loglevels
import modules
import netsvc
import openerp
import pooler
import tools
+from openerp.cron import WAKE_UP_NOW
from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval
except Exception, e:
self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
- def _run_job(self, cr, job):
+ def _run_job(self, cr, job, now):
+ """ Run a given job taking care of the repetition.
+
+ The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
+ method is run in a worker thread (spawned by _run_jobs_multithread())).
+
+ :param job: job to be run (as a dictionary).
+ :param now: timestamp (result of datetime.now(), no need to call it multiple time).
+
+ """
+ try:
+ nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
+ numbercall = job['numbercall']
+
+ ok = False
+ while nextcall < now and numbercall:
+ if numbercall > 0:
+ numbercall -= 1
+ if not ok or job['doall']:
+ self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+ if numbercall:
+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+ ok = True
+ addsql = ''
+ if not numbercall:
+ addsql = ', active=False'
+ cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
+ (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
+
+ if numbercall:
+ # Reschedule our own main cron thread if necessary.
+ # This is really needed if this job runs longer than its rescheduling period.
+ nextcall = calendar.timegm(nextcall.timetuple())
+ openerp.cron.schedule_wakeup(nextcall, cr.dbname)
+ finally:
+ cr.commit()
+ cr.close()
+ openerp.cron.release_thread_slot()
+
+ def _run_jobs_multithread(self):
+ # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+ """ Process the cron jobs by spawning worker threads.
+
+ This selects in database all the jobs that should be processed. It then
+ tries to lock each of them and, if it succeeds, spawns a thread to run
+ the cron job (if it doesn't succeed, it means the job was already
+ locked to be taken care of by another thread).
+
+ The cursor used to lock the job in database is given to the worker
+ thread (which has to close it itself).
+
+ """
+ db = self.pool.db
+ cr = db.cursor()
+ db_name = db.dbname
+ try:
+ jobs = {} # mapping job ids to jobs for all jobs being processed.
+ now = datetime.now()
+ # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
+ cr.execute("""SELECT * FROM ir_cron
+ WHERE numbercall != 0
+ AND active AND nextcall <= (now() at time zone 'UTC')
+ ORDER BY priority""")
+ for job in cr.dictfetchall():
+ if not openerp.cron.get_thread_slots():
+ break
+ jobs[job['id']] = job
+
+ task_cr = db.cursor()
+ try:
+ # Try to grab an exclusive lock on the job row from within the task transaction
+ acquired_lock = False
+ task_cr.execute("""SELECT *
+ FROM ir_cron
+ WHERE id=%s
+ FOR UPDATE NOWAIT""",
+ (job['id'],), log_exceptions=False)
+ acquired_lock = True
+ except psycopg2.OperationalError, e:
+ if e.pgcode == '55P03':
+ # Class 55: Object not in prerequisite state; 55P03: lock_not_available
+ _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
+ continue
+ else:
+ # Unexpected OperationalError
+ raise
+ finally:
+ if not acquired_lock:
+ # we're exiting due to an exception while acquiring the lot
+ task_cr.close()
+
+ # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
+ task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ openerp.cron.take_thread_slot()
+ task_thread.start()
+ _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
+
+ # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
+ find_next_time_query = """SELECT min(nextcall) AS min_next_call
+ FROM ir_cron WHERE numbercall != 0 AND active"""
+ if jobs:
+ cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
+ else:
+ cr.execute(find_next_time_query)
+ next_call = cr.dictfetchone()['min_next_call']
+
+ if next_call:
+ next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
+ else:
+ # no matching cron job found in database, re-schedule arbitrarily in 1 day,
+ # this delay will likely be modified when running jobs complete their tasks
+ next_call = time.time() + (24*3600)
+
+ openerp.cron.schedule_wakeup(next_call, db_name)
+
+ except Exception, ex:
+ _logger.warning('Exception in cron:', exc_info=True)
+
+ finally:
+ cr.commit()
+ cr.close()
+
+ def _process_job(self, cr, job):
""" Run a given job taking care of the repetition.
The cursor has a lock on the job (aquired by _acquire_job()).
:param job: job to be run (as a dictionary).
"""
try:
- now = datetime.utcnow()
+ now = datetime.now()
nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
numbercall = job['numbercall']
# Got the lock on the job row, run its code
_logger.debug('Starting job `%s`.', job['name'])
registry = openerp.pooler.get_pool(db_name)
- registry[cls._name]._run_job(task_cr, job)
+ registry[cls._name]._process_job(task_cr, job)
return True
except psycopg2.ProgrammingError, e:
time.sleep(60 - t)
job_in_progress = True
+ def update_running_cron(self, cr):
+ """ Schedule as soon as possible a wake-up for this database. """
+ # Verify whether the server is already started and thus whether we need to commit
+ # immediately our changes and restart the cron agent in order to apply the change
+ # immediately. The commit() is needed because as soon as the cron is (re)started it
+ # will query the database with its own cursor, possibly before the end of the
+ # current transaction.
+ # This commit() is not an issue in most cases, but we must absolutely avoid it
+ # when the server is only starting or loading modules (hence the test on pool._init).
+ if not self.pool._init:
+ cr.commit()
+ openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
+
def _try_lock(self, cr, uid, ids, context=None):
"""Try to grab a dummy exclusive write-lock to the rows with the given ids,
to make sure a following write() or unlink() will not block due
def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context)
+ self.update_running_cron(cr)
return res
def write(self, cr, uid, ids, vals, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
+ self.update_running_cron(cr)
return res
def unlink(self, cr, uid, ids, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
+ self.update_running_cron(cr)
return res
+ir_cron()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
_logger = logging.getLogger(__name__)
+# Scheduling wake-ups (see below) can be disabled when the polling process
+# workers are used instead of the managed thread workers. (I.e. wake-ups are
+# not used since polling is used. And polling is used when the cron are
+# handled by running special processes, e.g. openerp-cron-worker, instead
+# of the general openerp-server script.)
+enable_schedule_wakeup = True
+
+# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
+# the context of the cron management. This is not originally about loading
+# a database, although having the database name in the queue will
+# cause it to be loaded when the schedule time is reached, even if it was
+# unloaded in the mean time. Normally a database's wake-up is cancelled by
+# the RegistryManager when the database is unloaded - so this should not
+# cause it to be reloaded.
+#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
+#
+# Each element is a triple (timestamp, database-name, boolean). The boolean
+# specifies if the wake-up is canceled (so a wake-up can be canceled without
+# relying on the heapq implementation detail; no need to remove the job from
+# the heapq).
+_wakeups = []
+
+# Mapping of database names to the wake-up defined in the heapq,
+# so that we can cancel the wake-up without messing with the heapq
+# invariant: lookup the wake-up by database-name, then set
+# its third element to True.
+_wakeup_by_db = {}
+
+# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
+# We could use a simple (non-reentrant) lock if the runner function below
+# was more fine-grained, but we are fine with the loop owning the lock
+# while spawning a few threads.
+_wakeups_lock = threading.RLock()
+
+# Maximum number of threads allowed to process cron jobs concurrently. This
+# variable is set by start_master_thread using openerp.conf.max_cron_threads.
+_thread_slots = None
+
+# A (non re-entrant) lock to protect the above _thread_slots variable.
+_thread_slots_lock = threading.Lock()
+
+# Sleep duration limits - must not loop too quickly, but can't sleep too long
+# either, because a new job might be inserted in ir_cron with a much sooner
+# execution date than current known ones. We won't see it until we wake!
+MAX_SLEEP = 60 # 1 min
+MIN_SLEEP = 1 # 1 sec
+
+# Dummy wake-up timestamp that can be used to force a database wake-up asap
+WAKE_UP_NOW = 1
+
+def get_thread_slots():
+ """ Return the number of available thread slots """
+ return _thread_slots
+
+
+def release_thread_slot():
+ """ Increment the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots += 1
+
+
+def take_thread_slot():
+ """ Decrement the number of available thread slots """
+ global _thread_slots
+ with _thread_slots_lock:
+ _thread_slots -= 1
+
+
+def cancel(db_name):
+ """ Cancel the next wake-up of a given database, if any.
+
+ :param db_name: database name for which the wake-up is canceled.
+
+ """
+ _logger.debug("Cancel next wake-up for database '%s'.", db_name)
+ with _wakeups_lock:
+ if db_name in _wakeup_by_db:
+ _wakeup_by_db[db_name][2] = True
+
+
+def cancel_all():
+ """ Cancel all database wake-ups. """
+ _logger.debug("Cancel all database wake-ups")
+ global _wakeups
+ global _wakeup_by_db
+ with _wakeups_lock:
+ _wakeups = []
+ _wakeup_by_db = {}
+
+def schedule_wakeup(timestamp, db_name):
+ """ Schedule a new wake-up for a database.
+
+ If an earlier wake-up is already defined, the new wake-up is discarded.
+ If another wake-up is defined, that wake-up is discarded and the new one
+ is scheduled.
+
+ :param db_name: database name for which a new wake-up is scheduled.
+ :param timestamp: when the wake-up is scheduled.
+
+ """
+ global enable_schedule_wakeup
+ if not enable_schedule_wakeup:
+ return
+ if not timestamp:
+ return
+ with _wakeups_lock:
+ if db_name in _wakeup_by_db:
+ task = _wakeup_by_db[db_name]
+ if not task[2] and timestamp > task[0]:
+ # existing wakeup is valid and occurs earlier than new one
+ return
+ task[2] = True # cancel existing task
+ task = [timestamp, db_name, False]
+ heapq.heappush(_wakeups, task)
+ _wakeup_by_db[db_name] = task
+ _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
+ 'NOW' if timestamp == WAKE_UP_NOW else timestamp)
+
+def runner():
+ """Neverending function (intended to be run in a dedicated thread) that
+ checks every 60 seconds the next database wake-up. TODO: make configurable
+ """
+ while True:
+ runner_body()
+
+def runner_body():
+ with _wakeups_lock:
+ while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
+ task = heapq.heappop(_wakeups)
+ timestamp, db_name, canceled = task
+ if canceled:
+ continue
+ del _wakeup_by_db[db_name]
+ registry = openerp.pooler.get_pool(db_name)
+ if not registry._init:
+ _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
+ registry['ir.cron']._run_jobs_multithread()
+ amount = MAX_SLEEP
+ with _wakeups_lock:
+ # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
+ if _wakeups and get_thread_slots():
+ amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
+ _logger.debug("Going to sleep for %ss", amount)
+ time.sleep(amount)
+
+def start_master_thread():
+ """ Start the above runner function in a daemon thread.
+
+ The thread is a typical daemon thread: it will never quit and must be
+ terminated when the main process exits - with no consequence (the processing
+ threads it spawns are not marked daemon).
+ """
+ global _thread_slots
+ _thread_slots = openerp.conf.max_cron_threads
+ db_maxconn = tools.config['db_maxconn']
+ if _thread_slots >= tools.config.get('db_maxconn', 64):
+ _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
+ "this may cause trouble if you reach that number of parallel cron tasks.",
+ db_maxconn, _thread_slots)
+ if _thread_slots:
+ t = threading.Thread(target=runner, name="openerp.cron.master_thread")
+ t.setDaemon(True)
+ t.start()
+ _logger.debug("Master cron daemon started!")
+ else:
+ _logger.info("No master cron daemon (0 workers needed).")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import openerp.sql_db
import openerp.osv.orm
+import openerp.cron
import openerp.tools
import openerp.modules.db
import openerp.tools.config
return res
+ def schedule_cron_jobs(self):
+ """ Make the cron thread care about this registry/database jobs.
+ This will initiate the cron thread to check for any pending jobs for
+ this registry/database as soon as possible. Then it will continuously
+ monitor the ir.cron model for future jobs. See openerp.cron for
+ details.
+ """
+ openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
+
def clear_caches(self):
""" Clear the caches
This clears the caches associated to methods decorated with
finally:
cr.close()
+ if pooljobs:
+ registry.schedule_cron_jobs()
+
return registry
@classmethod
if db_name in cls.registries:
cls.registries[db_name].clear_caches()
del cls.registries[db_name]
+ openerp.cron.cancel(db_name)
+
@classmethod
def delete_all(cls):
import web_services
import websrv_lib
+import openerp.cron
import openerp.modules
import openerp.netsvc
import openerp.osv
def start_services():
""" Start all services.
- Services include the different servers.
+ Services include the different servers and cron threads.
"""
# Instantiate local services (this is a legacy design).
#http_server.init_static_http()
netrpc_server.init_servers()
+ # Start the main cron thread.
+ openerp.cron.start_master_thread()
+
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
def stop_services():
""" Stop all services. """
+ # stop scheduling new jobs; we will have to wait for the jobs to complete below
+ openerp.cron.cancel_all()
+
openerp.netsvc.Server.quitAll()
openerp.wsgi.core.stop_server()
config = openerp.tools.config