'doall' : lambda *a: 1
}
- def __init__(self, pool, cr):
- self.thread_count_lock = threading.Lock()
- self.thread_count = 2 # maximum allowed number of thread.
- super(osv.osv, self).__init__(pool, cr)
+ thread_count_lock = threading.Lock()
+ thread_count = 2 # maximum allowed number of thread.
def get_thread_count(self):
return self.thread_count
except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
- def _compute_nextcall(self, job, now):
- """ Compute the nextcall for a job exactly as _run_job does.
-
- Return either the nextcall or None if it shouldn't be called.
-
- """
- nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
- numbercall = job['numbercall']
-
- while nextcall < now and numbercall:
- if numbercall > 0:
- numbercall -= 1
- if numbercall:
- nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
-
- if not numbercall:
- return None
- return nextcall.strftime('%Y-%m-%d %H:%M:%S')
-
def _run_job(self, cr, job, now):
""" Run a given job taking care of the repetition. """
try:
print ">>> advance at", nextcall
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
with self.thread_count_lock:
- self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
+ self.schedule_in_advance(nextcall, cr.dbname)
finally:
cr.commit()
cr.close()
with self.thread_count_lock:
self.thread_count += 1
# reschedule the master thread in advance, using its saved next_call value.
- self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname)
+ self.schedule_in_advance(self.next_call, cr.dbname)
self.next_call = None
- def _poolJobs(self, db_name):
- return self._run_jobs(db_name)
-
- def _run_jobs(self, db_name):
+ def _run_jobs(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads.
locked to be taken care of by another thread.
"""
+ db_name = self.pool.db.dbname
try:
- db, pool = pooler.get_db_and_pool(db_name)
+ db, pool = self.pool.db, self.pool
except:
return False
print ">>> _run_jobs"
self.next_call = next_call
next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day
- self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name)
+ self.schedule_in_advance(next_call, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
def restart(self, dbname):
self.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread
- self.setAlarm(self._poolJobs, time.time(), dbname, dbname)
+ self.schedule_in_advance(time.time(), dbname)
def update_running_cron(self, cr):
# Verify whether the server is already started and thus whether we need to commit
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import *
import tools
+import openerp
def close_socket(sock):
""" Closes a socket instance cleanly
* a timestamp
* the database on which the task run
- * the function to call
- * the arguments and keyword arguments to pass to the function
+ * a boolean attribute specifying if the task is canceled
Implementation details:
- Tasks are stored as list, allowing the cancellation by setting
- the timestamp to 0.
+ the boolean to True.
- A heapq is used to store tasks, so we don't need to sort
tasks ourself.
"""
- __tasks = []
- __tasks_by_db = {}
+ _wakeups = []
+ _wakeup_by_db = {}
_logger = logging.getLogger('netsvc.agent')
@classmethod
- def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
- task = [timestamp, db_name, function, args, kwargs]
- heapq.heappush(cls.__tasks, task)
- cls.__tasks_by_db.setdefault(db_name, []).append(task)
+ def cancel(cls, db_name):
+ """ Cancel next wakeup for a given database. """
+ cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
+ if db_name in cls._wakeup_by_db:
+ cls._wakeup_by_db[db_name][2] = True
@classmethod
- def cancel(cls, db_name):
- """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
- cls._logger.debug("Cancel timers for %s db", db_name or 'all')
- if db_name is None:
- cls.__tasks, cls.__tasks_by_db = [], {}
- else:
- if db_name in cls.__tasks_by_db:
- for task in cls.__tasks_by_db[db_name]:
- task[0] = 0
+ def cancel_all(cls):
+ cls._wakeups = []
+ cls._wakeup_by_db = {}
@classmethod
- def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
+ def schedule_in_advance(cls, timestamp, db_name):
if not timestamp:
return
- # Cancel the previous task if any.
- old_timestamp = False
- if db_name in cls.__tasks_by_db:
- for task in cls.__tasks_by_db[db_name]:
- print ">>> function:", function
- if task[2] == function and (not task[0] or timestamp < task[0]):
- old_timestamp = True
- task[0] = 0
- if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]:
+ # Cancel the previous wakeup if any.
+ add_wakeup = False
+ if db_name in cls._wakeup_by_db:
+ task = cls._wakeup_by_db[db_name]
+ if task[2] or timestamp < task[0]:
+ add_wakeup = True
+ task[2] = True
+ else:
+ add_wakeup = True
+ if add_wakeup:
print ">>> rescheduled earlier", timestamp
- cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
-
- @classmethod
- def quit(cls):
- cls.cancel(None)
+ task = [timestamp, db_name, False]
+ heapq.heappush(cls._wakeups, task)
+ cls._wakeup_by_db[db_name] = task
@classmethod
def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run. TODO: make configurable
"""
- current_thread = threading.currentThread()
while True:
- print ">>>>> starting thread for"
- while cls.__tasks and cls.__tasks[0][0] < time.time():
- task = heapq.heappop(cls.__tasks)
- timestamp, dbname, function, args, kwargs = task
- cls.__tasks_by_db[dbname].remove(task)
- if not timestamp:
- # null timestamp -> cancelled task
+ print ">>>>> cron for"
+ while cls._wakeups and cls._wakeups[0][0] < time.time():
+ task = heapq.heappop(cls._wakeups)
+ timestamp, db_name, canceled = task
+ del cls._wakeup_by_db[db_name]
+ if canceled:
continue
- current_thread.dbname = dbname # hack hack
- cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
- delattr(current_thread, 'dbname')
- task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
- # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
- task_thread.setDaemon(False)
- print ">>>>> -", function.func_name
- task_thread.start()
- time.sleep(1)
+ ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
+ ir_cron._run_jobs()
time.sleep(60)
def start_agent():