##############################################################################
import errno
+import heapq
import logging
import logging.handlers
import os
import sys
import threading
import time
-import traceback
import types
from pprint import pformat
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import *
import tools
-import openerp
def close_socket(sock):
""" Closes a socket instance cleanly
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
+class Agent(object):
+ """ Singleton that keeps track of cancellable tasks to run at a given
+ timestamp.
+
+ The tasks are characterised by:
+
+ * a timestamp
+ * the database on which the task run
+ * the function to call
+ * the arguments and keyword arguments to pass to the function
+
+ Implementation details:
+
+ - Tasks are stored as list, allowing the cancellation by setting
+ the timestamp to 0.
+ - A heapq is used to store tasks, so we don't need to sort
+ tasks ourself.
+ """
+ __tasks = []
+ __tasks_by_db = {}
+ _logger = logging.getLogger('netsvc.agent')
+
+ @classmethod
+ def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
+ task = [timestamp, db_name, function, args, kwargs]
+ heapq.heappush(cls.__tasks, task)
+ cls.__tasks_by_db.setdefault(db_name, []).append(task)
+
+ @classmethod
+ def cancel(cls, db_name):
+ """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
+ cls._logger.debug("Cancel timers for %s db", db_name or 'all')
+ if db_name is None:
+ cls.__tasks, cls.__tasks_by_db = [], {}
+ else:
+ if db_name in cls.__tasks_by_db:
+ for task in cls.__tasks_by_db[db_name]:
+ task[0] = 0
+
+ @classmethod
+ def quit(cls):
+ cls.cancel(None)
+
+ @classmethod
+ def runner(cls):
+ """Neverending function (intended to be ran in a dedicated thread) that
+ checks every 60 seconds tasks to run. TODO: make configurable
+ """
+ current_thread = threading.currentThread()
+ while True:
+ while cls.__tasks and cls.__tasks[0][0] < time.time():
+ task = heapq.heappop(cls.__tasks)
+ timestamp, dbname, function, args, kwargs = task
+ cls.__tasks_by_db[dbname].remove(task)
+ if not timestamp:
+ # null timestamp -> cancelled task
+ continue
+ current_thread.dbname = dbname # hack hack
+ cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
+ delattr(current_thread, 'dbname')
+ task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ task_thread.start()
+ time.sleep(1)
+ time.sleep(60)
+
+def start_agent():
+ agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
+ # the agent runner is a typical daemon thread, that will never quit and must be
+ # terminated when the main process exits - with no consequence (the processing
+ # threads it spawns are not marked daemon)
+ agent_runner.setDaemon(True)
+ agent_runner.start()
+
+import traceback
+
class Server:
""" Generic interface for all servers with an event loop etc.
Override this to impement http, net-rpc etc. servers.