import release
from pprint import pformat
import warnings
+import heapq
class Service(object):
""" Base class for *Local* services
_services = {}
_groups = {}
+ _logger = logging.getLogger('web-services')
def __init__(self, name, audience=''):
ExportService._services[name] = self
self.__name = name
+ self._logger.debug("Registered an exported service: %s" % name)
def joinGroup(self, name):
ExportService._groups.setdefault(name, {})[self.__name] = self
raise
LOG_NOTSET = 'notset'
+LOG_DEBUG_SQL = 'debug_sql'
+LOG_DEBUG_RPC_ANSWER = 'debug_rpc_answer'
LOG_DEBUG_RPC = 'debug_rpc'
LOG_DEBUG = 'debug'
LOG_TEST = 'test'
LOG_ERROR = 'error'
LOG_CRITICAL = 'critical'
+logging.DEBUG_RPC_ANSWER = logging.DEBUG - 4
+logging.addLevelName(logging.DEBUG_RPC_ANSWER, 'DEBUG_RPC_ANSWER')
logging.DEBUG_RPC = logging.DEBUG - 2
logging.addLevelName(logging.DEBUG_RPC, 'DEBUG_RPC')
+logging.DEBUG_SQL = logging.DEBUG_RPC - 3
+logging.addLevelName(logging.DEBUG_SQL, 'DEBUG_SQL')
logging.TEST = logging.INFO - 5
logging.addLevelName(logging.TEST, 'TEST')
BOLD_SEQ = "\033[1m"
COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
LEVEL_COLOR_MAPPING = {
+ logging.DEBUG_SQL: (WHITE, MAGENTA),
logging.DEBUG_RPC: (BLUE, WHITE),
+ logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
logging.DEBUG: (BLUE, DEFAULT),
logging.INFO: (GREEN, DEFAULT),
logging.TEST: (WHITE, BLUE),
logging.CRITICAL: (WHITE, RED),
}
-class ColoredFormatter(logging.Formatter):
+class DBFormatter(logging.Formatter):
def format(self, record):
- fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
- record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
+ record.dbname = getattr(threading.currentThread(), 'dbname', '?')
return logging.Formatter.format(self, record)
+class ColoredFormatter(DBFormatter):
+ def format(self, record):
+ fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
+ record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
+ return DBFormatter.format(self, record)
def init_logger():
import os
from tools.translate import resetlocale
resetlocale()
- logger = logging.getLogger()
# create a format for log messages and dates
- format = '[%(asctime)s] %(levelname)s:%(name)s:%(message)s'
+ format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
if tools.config['syslog']:
# SysLog Handler
else:
handler = logging.handlers.SysLogHandler('/dev/log')
format = '%s %s' % (release.description, release.version) \
- + ':%(levelname)s:%(name)s:%(message)s'
+ + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
elif tools.config['logfile']:
# LogFile Handler
handler = logging.handlers.WatchedFileHandler(logf)
else:
handler = logging.handlers.FileHandler(logf)
- except Exception, ex:
+ except Exception:
sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
handler = logging.StreamHandler(sys.stdout)
else:
if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
formatter = ColoredFormatter(format)
else:
- formatter = logging.Formatter(format)
+ formatter = DBFormatter(format)
handler.setFormatter(formatter)
# add the handler to the root logger
+ logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(int(tools.config['log_level'] or '0'))
-
class Logger(object):
def __init__(self):
warnings.warn("The netsvc.Logger API shouldn't be used anymore, please "
level_method('[%02d]: %s' % (idx+1, s,))
elif result:
level_method(result[0])
- except IOError,e:
+ except IOError:
# TODO: perhaps reset the logger streams?
#if logrotate closes our files, we end up here..
pass
- except:
+ except Exception:
# better ignore the exception and carry on..
pass
- def set_loglevel(self, level):
- log = logging.getLogger()
+ def set_loglevel(self, level, logger=None):
+ if logger is not None:
+ log = logging.getLogger(str(logger))
+ else:
+ log = logging.getLogger()
log.setLevel(logging.INFO) # make sure next msg is printed
log.info("Log level changed to %s" % logging.getLevelName(level))
log.setLevel(level)
init_logger()
class Agent(object):
- _timers = {}
- _logger = Logger()
-
- __logger = logging.getLogger('timer')
-
- def setAlarm(self, fn, dt, db_name, *args, **kwargs):
- wait = dt - time.time()
- if wait > 0:
- self.__logger.debug("Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
- timer = threading.Timer(wait, fn, args, kwargs)
- timer.start()
- self._timers.setdefault(db_name, []).append(timer)
+ """Singleton that keeps track of cancellable tasks to run at a given
+ timestamp.
+ The tasks are caracterised by:
+ * a timestamp
+ * the database on which the task run
+ * the function to call
+ * the arguments and keyword arguments to pass to the function
+
+ Implementation details:
+ Tasks are stored as list, allowing the cancellation by setting
+ the timestamp to 0.
+ A heapq is used to store tasks, so we don't need to sort
+ tasks ourself.
+ """
+ __tasks = []
+ __tasks_by_db = {}
+ _logger = logging.getLogger('netsvc.agent')
- for db in self._timers:
- for timer in self._timers[db]:
- if not timer.isAlive():
- self._timers[db].remove(timer)
+ @classmethod
+ def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
+ task = [timestamp, db_name, function, args, kwargs]
+ heapq.heappush(cls.__tasks, task)
+ cls.__tasks_by_db.setdefault(db_name, []).append(task)
@classmethod
def cancel(cls, db_name):
- """Cancel all timers for a given database. If None passed, all timers are cancelled"""
- for db in cls._timers:
- if db_name is None or db == db_name:
- for timer in cls._timers[db]:
- timer.cancel()
+ """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
+ cls._logger.debug("Cancel timers for %s db", db_name or 'all')
+ if db_name is None:
+ cls.__tasks, cls.__tasks_by_db = [], {}
+ else:
+ if db_name in cls.__tasks_by_db:
+ for task in cls.__tasks_by_db[db_name]:
+ task[0] = 0
@classmethod
def quit(cls):
cls.cancel(None)
+ @classmethod
+ def runner(cls):
+ """Neverending function (intended to be ran in a dedicated thread) that
+ checks every 60 seconds tasks to run. TODO: make configurable
+ """
+ current_thread = threading.currentThread()
+ while True:
+ while cls.__tasks and cls.__tasks[0][0] < time.time():
+ task = heapq.heappop(cls.__tasks)
+ timestamp, dbname, function, args, kwargs = task
+ cls.__tasks_by_db[dbname].remove(task)
+ if not timestamp:
+ # null timestamp -> cancelled task
+ continue
+ current_thread.dbname = dbname # hack hack
+ cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
+ delattr(current_thread, 'dbname')
+ task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ task_thread.start()
+ time.sleep(1)
+ time.sleep(60)
+
+agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
+# the agent runner is a typical daemon thread, that will never quit and must be
+# terminated when the main process exits - with no consequence (the processing
+# threads it spawns are not marked daemon)
+agent_runner.setDaemon(True)
+agent_runner.start()
+
import traceback
class Server:
self.exception = exception
self.traceback = traceback
+def replace_request_password(args):
+ # password is always 3rd argument in a request, we replace it in RPC logs
+ # so it's easier to forward logs for diagnostics/debugging purposes...
+ args = list(args)
+ if len(args) > 2:
+ args[2] = '*'
+ return args
+
class OpenERPDispatcher:
- def log(self, title, msg):
- if tools.config['log_level'] == logging.DEBUG_RPC:
- Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg))
+ def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None):
+ logger = logging.getLogger(title)
+ if logger.isEnabledFor(channel):
+ for line in pformat(msg, depth=depth).split('\n'):
+ logger.log(channel, line)
def dispatch(self, service_name, method, params):
try:
+ logger = logging.getLogger('result')
self.log('service', service_name)
self.log('method', method)
- self.log('params', params)
+ self.log('params', replace_request_password(params), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1))
auth = getattr(self, 'auth_provider', None)
result = ExportService.getService(service_name).dispatch(method, auth, params)
- self.log('result', result)
- # We shouldn't marshall None,
- if result == None:
- result = False
+ self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
return result
except Exception, e:
self.log('exception', tools.exception_to_unicode(e))