from tools.safe_eval import safe_eval as eval
import pooler
from osv import fields, osv
+import openerp
def str2tuple(s):
return eval('tuple(%s)' % (s or ''))
'minutes': lambda interval: relativedelta(minutes=interval),
}
-class ir_cron(osv.osv, netsvc.Agent):
+class ir_cron(osv.osv):
""" This is the ORM object that periodically executes actions.
- Note that we use the netsvc.Agent()._logger member.
"""
_name = "ir.cron"
_order = 'name'
'doall' : lambda *a: 1
}
- thread_count_lock = threading.Lock()
- thread_count = 2 # maximum allowed number of thread.
-
- def get_thread_count(self):
- return self.thread_count
-
- def dec_thread_count(self):
- self.thread_count_lock.acquire()
- self.thread_count -= 1
- self.thread_count_lock.release()
+ _logger = logging.getLogger('cron')
def f(a, b, c):
print ">>> in f"
# This is really needed if this job run longer that its rescheduling period.
print ">>> advance at", nextcall
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
- with self.thread_count_lock:
- self.schedule_in_advance(nextcall, cr.dbname)
+ openerp.cron.schedule_in_advance(nextcall, cr.dbname)
finally:
cr.commit()
cr.close()
- with self.thread_count_lock:
- self.thread_count += 1
- # reschedule the master thread in advance, using its saved next_call value.
- self.schedule_in_advance(self.next_call, cr.dbname)
- self.next_call = None
+ openerp.cron.inc_thread_count()
def _run_jobs(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
locked to be taken care of by another thread.
"""
- db_name = self.pool.db.dbname
- try:
- db, pool = self.pool.db, self.pool
- except:
- return False
print ">>> _run_jobs"
- self.next_call = None
+ db = self.pool.db
cr = db.cursor()
+ db_name = db.dbname
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
- if not pool._init:
- now = datetime.now()
- cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
- for job in cr.dictfetchall():
- print ">>>", self.get_thread_count(), "threads"
- if not self.get_thread_count():
- break
- task_cr = db.cursor()
- task_job = None
- jobs[job['id']] = job
-
- try:
- # Try to lock the job...
- task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
- task_job = task_cr.dictfetchall()[0]
- except psycopg2.OperationalError, e:
- if e.pgcode == '55P03':
- # Class 55: Object not in prerequisite state, 55P03: lock_not_available
- # ... and fail.
- print ">>>", job['name'], " is already being processed"
- continue
- else:
- raise
- finally:
- if not task_job:
- task_cr.close()
-
- # ... and succeed.
- print ">>> taking care of", job['name']
- task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
- # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
- task_thread.setDaemon(False)
- self.dec_thread_count()
- task_thread.start()
+ now = datetime.now()
+ cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
+ for job in cr.dictfetchall():
+ print ">>>", openerp.cron.get_thread_count(), "threads"
+ if not openerp.cron.get_thread_count():
+ break
+ task_cr = db.cursor()
+ task_job = None
+ jobs[job['id']] = job
+
+ try:
+ # Try to lock the job...
+ task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+ task_job = task_cr.dictfetchall()[0]
+ except psycopg2.OperationalError, e:
+ if e.pgcode == '55P03':
+ # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+ # ... and fail.
+ print ">>>", job['name'], " is already being processed"
+ continue
+ else:
+ raise
+ finally:
+ if not task_job:
+ task_cr.close()
+
+ # ... and succeed.
+ print ">>> taking care of", job['name']
+ task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ openerp.cron.dec_thread_count()
+ task_thread.start()
# Wake up time, without considering the currently processed jobs.
if jobs.keys():
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
- # avoid race condition: the thread rescheduled the main thread, then the main thread puts +3600.
- with self.thread_count_lock:
- if not self.thread_count:
- print ">>> no more threads"
- self.next_call = next_call
- next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day
-
- self.schedule_in_advance(next_call, db_name)
+ openerp.cron.schedule_in_advance(next_call, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
self.restart(self, dbname)
def restart(self, dbname):
- self.cancel(dbname)
+ openerp.cron.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread
- self.schedule_in_advance(time.time(), dbname)
+ openerp.cron.schedule_in_advance(time.time(), dbname)
def update_running_cron(self, cr):
# Verify whether the server is already started and thus whether we need to commit
##############################################################################
import errno
-import heapq
import logging
import logging.handlers
import os
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
-class Agent(object):
- """ Singleton that keeps track of cancellable tasks to run at a given
- timestamp.
-
- The tasks are characterised by:
-
- * a timestamp
- * the database on which the task run
- * a boolean attribute specifying if the task is canceled
-
- Implementation details:
-
- - Tasks are stored as list, allowing the cancellation by setting
- the boolean to True.
- - A heapq is used to store tasks, so we don't need to sort
- tasks ourself.
- """
- _wakeups = []
- _wakeup_by_db = {}
- _logger = logging.getLogger('netsvc.agent')
-
- @classmethod
- def cancel(cls, db_name):
- """ Cancel next wakeup for a given database. """
- cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
- if db_name in cls._wakeup_by_db:
- cls._wakeup_by_db[db_name][2] = True
-
- @classmethod
- def cancel_all(cls):
- cls._wakeups = []
- cls._wakeup_by_db = {}
-
- @classmethod
- def schedule_in_advance(cls, timestamp, db_name):
- if not timestamp:
- return
- # Cancel the previous wakeup if any.
- add_wakeup = False
- if db_name in cls._wakeup_by_db:
- task = cls._wakeup_by_db[db_name]
- if task[2] or timestamp < task[0]:
- add_wakeup = True
- task[2] = True
- else:
- add_wakeup = True
- if add_wakeup:
- print ">>> rescheduled earlier", timestamp
- task = [timestamp, db_name, False]
- heapq.heappush(cls._wakeups, task)
- cls._wakeup_by_db[db_name] = task
-
- @classmethod
- def runner(cls):
- """Neverending function (intended to be ran in a dedicated thread) that
- checks every 60 seconds tasks to run. TODO: make configurable
- """
- while True:
- print ">>>>> cron for"
- while cls._wakeups and cls._wakeups[0][0] < time.time():
- task = heapq.heappop(cls._wakeups)
- timestamp, db_name, canceled = task
- del cls._wakeup_by_db[db_name]
- if canceled:
- continue
- ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
- ir_cron._run_jobs()
- time.sleep(60)
-
-def start_agent():
- agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
- # the agent runner is a typical daemon thread, that will never quit and must be
- # terminated when the main process exits - with no consequence (the processing
- # threads it spawns are not marked daemon)
- agent_runner.setDaemon(True)
- agent_runner.start()
-
import traceback
class Server: