import openerp
import pooler
import tools
-from openerp.cron import WAKE_UP_NOW
from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval
except Exception, e:
self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
- def _run_job(self, cr, job, now):
- """ Run a given job taking care of the repetition.
-
- The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
- method is run in a worker thread (spawned by _run_jobs_multithread())).
-
- :param job: job to be run (as a dictionary).
- :param now: timestamp (result of datetime.now(), no need to call it multiple time).
-
- """
- try:
- nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
- numbercall = job['numbercall']
-
- ok = False
- while nextcall < now and numbercall:
- if numbercall > 0:
- numbercall -= 1
- if not ok or job['doall']:
- self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
- if numbercall:
- nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
- ok = True
- addsql = ''
- if not numbercall:
- addsql = ', active=False'
- cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
- (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
-
- if numbercall:
- # Reschedule our own main cron thread if necessary.
- # This is really needed if this job runs longer than its rescheduling period.
- nextcall = calendar.timegm(nextcall.timetuple())
- openerp.cron.schedule_wakeup(nextcall, cr.dbname)
- finally:
- cr.commit()
- cr.close()
- openerp.cron.release_thread_slot()
-
- def _run_jobs_multithread(self):
- # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
- """ Process the cron jobs by spawning worker threads.
-
- This selects in database all the jobs that should be processed. It then
- tries to lock each of them and, if it succeeds, spawns a thread to run
- the cron job (if it doesn't succeed, it means the job was already
- locked to be taken care of by another thread).
-
- The cursor used to lock the job in database is given to the worker
- thread (which has to close it itself).
-
- """
- db = self.pool.db
- cr = db.cursor()
- db_name = db.dbname
- try:
- jobs = {} # mapping job ids to jobs for all jobs being processed.
- now = datetime.now()
- # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
- cr.execute("""SELECT * FROM ir_cron
- WHERE numbercall != 0
- AND active AND nextcall <= (now() at time zone 'UTC')
- ORDER BY priority""")
- for job in cr.dictfetchall():
- if not openerp.cron.get_thread_slots():
- break
- jobs[job['id']] = job
-
- task_cr = db.cursor()
- try:
- # Try to grab an exclusive lock on the job row from within the task transaction
- acquired_lock = False
- task_cr.execute("""SELECT *
- FROM ir_cron
- WHERE id=%s
- FOR UPDATE NOWAIT""",
- (job['id'],), log_exceptions=False)
- acquired_lock = True
- except psycopg2.OperationalError, e:
- if e.pgcode == '55P03':
- # Class 55: Object not in prerequisite state; 55P03: lock_not_available
- _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
- continue
- else:
- # Unexpected OperationalError
- raise
- finally:
- if not acquired_lock:
- # we're exiting due to an exception while acquiring the lot
- task_cr.close()
-
- # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
- task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
- # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
- task_thread.setDaemon(False)
- openerp.cron.take_thread_slot()
- task_thread.start()
- _logger.debug('Cron execution thread for job `%s` spawned', job['name'])
-
- # Find next earliest job ignoring currently processed jobs (by this and other cron threads)
- find_next_time_query = """SELECT min(nextcall) AS min_next_call
- FROM ir_cron WHERE numbercall != 0 AND active"""
- if jobs:
- cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
- else:
- cr.execute(find_next_time_query)
- next_call = cr.dictfetchone()['min_next_call']
-
- if next_call:
- next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
- else:
- # no matching cron job found in database, re-schedule arbitrarily in 1 day,
- # this delay will likely be modified when running jobs complete their tasks
- next_call = time.time() + (24*3600)
-
- openerp.cron.schedule_wakeup(next_call, db_name)
-
- except Exception, ex:
- _logger.warning('Exception in cron:', exc_info=True)
-
- finally:
- cr.commit()
- cr.close()
-
def _process_job(self, cr, job):
""" Run a given job taking care of the repetition.
return False
- def update_running_cron(self, cr):
- """ Schedule as soon as possible a wake-up for this database. """
- # Verify whether the server is already started and thus whether we need to commit
- # immediately our changes and restart the cron agent in order to apply the change
- # immediately. The commit() is needed because as soon as the cron is (re)started it
- # will query the database with its own cursor, possibly before the end of the
- # current transaction.
- # This commit() is not an issue in most cases, but we must absolutely avoid it
- # when the server is only starting or loading modules (hence the test on pool._init).
- if not self.pool._init:
- cr.commit()
- openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
-
def _try_lock(self, cr, uid, ids, context=None):
"""Try to grab a dummy exclusive write-lock to the rows with the given ids,
to make sure a following write() or unlink() will not block due
def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context)
- self.update_running_cron(cr)
return res
def write(self, cr, uid, ids, vals, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
- self.update_running_cron(cr)
return res
def unlink(self, cr, uid, ids, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
- self.update_running_cron(cr)
return res
-ir_cron()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import deprecation
-# Maximum number of threads processing concurrently cron jobs.
-max_cron_threads = 4 # Actually the default value here is meaningless,
- # look at tools.config for the default value.
-
# Paths to search for OpenERP addons.
addons_paths = []
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-##############################################################################
-#
-# OpenERP, Open Source Management Solution
-# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-##############################################################################
-
-""" Cron jobs scheduling
-
-Cron jobs are defined in the ir_cron table/model. This module deals with all
-cron jobs, for all databases of a single OpenERP server instance.
-
-It defines a single master thread that will spawn (a bounded number of)
-threads to process individual cron jobs.
-
-The thread runs forever, checking every 60 seconds for new
-'database wake-ups'. It maintains a heapq of database wake-ups. At each
-wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
-will check the jobs defined in the ir_cron table and spawn accordingly threads
-to process them.
-
-This module's behavior depends on the following configuration variable:
-openerp.conf.max_cron_threads.
-
-"""
-
-import heapq
-import logging
-import threading
-import time
-
-import openerp
-import tools
-
-_logger = logging.getLogger(__name__)
-
-# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
-# the context of the cron management. This is not originally about loading
-# a database, although having the database name in the queue will
-# cause it to be loaded when the schedule time is reached, even if it was
-# unloaded in the mean time. Normally a database's wake-up is cancelled by
-# the RegistryManager when the database is unloaded - so this should not
-# cause it to be reloaded.
-#
-# TODO: perhaps in the future we could consider a flag on ir.cron jobs
-# that would cause database wake-up even if the database has not been
-# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
-#
-# Each element is a triple (timestamp, database-name, boolean). The boolean
-# specifies if the wake-up is canceled (so a wake-up can be canceled without
-# relying on the heapq implementation detail; no need to remove the job from
-# the heapq).
-_wakeups = []
-
-# Mapping of database names to the wake-up defined in the heapq,
-# so that we can cancel the wake-up without messing with the heapq
-# invariant: lookup the wake-up by database-name, then set
-# its third element to True.
-_wakeup_by_db = {}
-
-# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
-# We could use a simple (non-reentrant) lock if the runner function below
-# was more fine-grained, but we are fine with the loop owning the lock
-# while spawning a few threads.
-_wakeups_lock = threading.RLock()
-
-# Maximum number of threads allowed to process cron jobs concurrently. This
-# variable is set by start_master_thread using openerp.conf.max_cron_threads.
-_thread_slots = None
-
-# A (non re-entrant) lock to protect the above _thread_slots variable.
-_thread_slots_lock = threading.Lock()
-
-# Sleep duration limits - must not loop too quickly, but can't sleep too long
-# either, because a new job might be inserted in ir_cron with a much sooner
-# execution date than current known ones. We won't see it until we wake!
-MAX_SLEEP = 60 # 1 min
-MIN_SLEEP = 1 # 1 sec
-
-# Dummy wake-up timestamp that can be used to force a database wake-up asap
-WAKE_UP_NOW = 1
-
-def get_thread_slots():
- """ Return the number of available thread slots """
- return _thread_slots
-
-
-def release_thread_slot():
- """ Increment the number of available thread slots """
- global _thread_slots
- with _thread_slots_lock:
- _thread_slots += 1
-
-
-def take_thread_slot():
- """ Decrement the number of available thread slots """
- global _thread_slots
- with _thread_slots_lock:
- _thread_slots -= 1
-
-
-def cancel(db_name):
- """ Cancel the next wake-up of a given database, if any.
-
- :param db_name: database name for which the wake-up is canceled.
-
- """
- _logger.debug("Cancel next wake-up for database '%s'.", db_name)
- with _wakeups_lock:
- if db_name in _wakeup_by_db:
- _wakeup_by_db[db_name][2] = True
-
-
-def cancel_all():
- """ Cancel all database wake-ups. """
- _logger.debug("Cancel all database wake-ups")
- global _wakeups
- global _wakeup_by_db
- with _wakeups_lock:
- _wakeups = []
- _wakeup_by_db = {}
-
-
-def schedule_wakeup(timestamp, db_name):
- """ Schedule a new wake-up for a database.
-
- If an earlier wake-up is already defined, the new wake-up is discarded.
- If another wake-up is defined, that wake-up is discarded and the new one
- is scheduled.
-
- :param db_name: database name for which a new wake-up is scheduled.
- :param timestamp: when the wake-up is scheduled.
-
- """
- if not timestamp:
- return
- with _wakeups_lock:
- if db_name in _wakeup_by_db:
- task = _wakeup_by_db[db_name]
- if not task[2] and timestamp > task[0]:
- # existing wakeup is valid and occurs earlier than new one
- return
- task[2] = True # cancel existing task
- task = [timestamp, db_name, False]
- heapq.heappush(_wakeups, task)
- _wakeup_by_db[db_name] = task
- _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
- 'NOW' if timestamp == WAKE_UP_NOW else timestamp)
-
-def runner():
- """Neverending function (intended to be run in a dedicated thread) that
- checks every 60 seconds the next database wake-up. TODO: make configurable
- """
- while True:
- runner_body()
-
-def runner_body():
- with _wakeups_lock:
- while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
- task = heapq.heappop(_wakeups)
- timestamp, db_name, canceled = task
- if canceled:
- continue
- del _wakeup_by_db[db_name]
- registry = openerp.pooler.get_pool(db_name)
- if not registry._init:
- _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
- registry['ir.cron']._run_jobs_multithread()
- amount = MAX_SLEEP
- with _wakeups_lock:
- # Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
- if _wakeups and get_thread_slots():
- amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
- _logger.debug("Going to sleep for %ss", amount)
- time.sleep(amount)
-
-def start_master_thread():
- """ Start the above runner function in a daemon thread.
-
- The thread is a typical daemon thread: it will never quit and must be
- terminated when the main process exits - with no consequence (the processing
- threads it spawns are not marked daemon).
-
- """
- global _thread_slots
- _thread_slots = openerp.conf.max_cron_threads
- db_maxconn = tools.config['db_maxconn']
- if _thread_slots >= tools.config.get('db_maxconn', 64):
- _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
- "this may cause trouble if you reach that number of parallel cron tasks.",
- db_maxconn, _thread_slots)
- if _thread_slots:
- t = threading.Thread(target=runner, name="openerp.cron.master_thread")
- t.setDaemon(True)
- t.start()
- _logger.debug("Master cron daemon started!")
- else:
- _logger.info("No master cron daemon (0 workers needed).")
-
-# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import openerp.sql_db
import openerp.osv.orm
-import openerp.cron
import openerp.tools
import openerp.modules.db
import openerp.tools.config
self.db_name = db_name
self.db = openerp.sql_db.db_connect(db_name)
+ # In monoprocess cron jobs flag (pooljobs)
+ self.cron = False
+
# Inter-process signaling (used only when openerp.multi_process is True):
# The `base_registry_signaling` sequence indicates the whole registry
# must be reloaded.
monitor the ir.cron model for future jobs. See openerp.cron for
details.
"""
- openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
+ self.cron = True
def clear_caches(self):
""" Clear the caches
import threading
import time
+import cron
import netrpc_server
import web_services
+import web_services
-import openerp.cron
import openerp.modules
import openerp.netsvc
import openerp.osv
netrpc_server.init_servers()
# Start the main cron thread.
- if openerp.conf.max_cron_threads:
- openerp.cron.start_master_thread()
+ cron.start_master_thread()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
def stop_services():
""" Stop all services. """
# stop scheduling new jobs; we will have to wait for the jobs to complete below
- openerp.cron.cancel_all()
-
openerp.netsvc.Server.quitAll()
openerp.service.wsgi_server.stop_server()
_logger.info("Initiating shutdown")
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+##############################################################################
+#
+# OpenERP, Open Source Management Solution
+# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+##############################################################################
+
+""" Cron jobs scheduling
+
+Cron jobs are defined in the ir_cron table/model. This module deals with all
+cron jobs, for all databases of a single OpenERP server instance.
+
+"""
+
+import logging
+import threading
+import time
+
+import openerp
+
+_logger = logging.getLogger(__name__)
+
+SLEEP_INTERVAL = 60 # 1 min
+
+def cron_runner(number):
+ while True:
+ time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
+ registries = openerp.modules.registry.RegistryManager.registries
+ _logger.debug('cron%d polling for jobs', number)
+ for db_name, registry in registries.items():
+ while True and registry.cron:
+ # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
+ # TODO why isnt openerp.addons.base defined ?
+ import sys
+ base = sys.modules['addons.base']
+ acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
+ if not acquired:
+ break
+
+def start_master_thread():
+ """ Start the above runner function in a daemon thread.
+
+ The thread is a typical daemon thread: it will never quit and must be
+ terminated when the main process exits - with no consequence (the processing
+ threads it spawns are not marked daemon).
+
+ """
+ for i in range(openerp.tools.config['max_cron_threads']):
+ def target():
+ cron_runner(i)
+ t = threading.Thread(target=target, name="openerp.service.cron.cron_runner%d" % i)
+ t.setDaemon(True)
+ t.start()
+ _logger.debug("cron%d started!" % i)
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
"osv_memory tables. This is a decimal value expressed in hours, "
"and the default is 1 hour.",
type="float")
- group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4,
+ group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=2,
help="Maximum number of threads processing concurrently cron jobs.",
type="int")
group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true",
if opt.save:
self.save()
- openerp.conf.max_cron_threads = self.options['max_cron_threads']
-
openerp.conf.addons_paths = self.options['addons_path'].split(',')
if opt.server_wide_modules:
openerp.conf.server_wide_modules = map(lambda m: m.strip(), opt.server_wide_modules.split(','))