X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;f=bin%2Fnetsvc.py;h=eae53826a74038e5514288b714622478f2bf509c;hb=a21dcd7e1c8623f4aaf3a3659e81247273f652e9;hp=b03dfad7e319302e4abbc178e83bdf05e0b7ef89;hpb=b69c19fbb2a56de017f466c76d06addd968c96ce;p=odoo%2Fodoo.git diff --git a/bin/netsvc.py b/bin/netsvc.py index b03dfad..eae5382 100644 --- a/bin/netsvc.py +++ b/bin/netsvc.py @@ -1,5 +1,5 @@ -#!/usr/bin/python -# -*- encoding: utf-8 -*- +#!/usr/bin/env python +# -*- coding: utf-8 -*- ############################################################################## # # OpenERP, Open Source Management Solution @@ -24,31 +24,38 @@ # ############################################################################## - -import SimpleXMLRPCServer -import SocketServer import logging import logging.handlers -import os -import signal -import socket import sys import threading import time -import xmlrpclib import release - -SERVICES = {} -GROUPS = {} +from pprint import pformat +import warnings class Service(object): + """ Base class for *Local* services + + Functionality here is trusted, no authentication. + """ + _services = {} def __init__(self, name, audience=''): - SERVICES[name] = self + Service._services[name] = self self.__name = name self._methods = {} def joinGroup(self, name): - GROUPS.setdefault(name, {})[self.__name] = self + raise Exception("No group for local services") + #GROUPS.setdefault(name, {})[self.__name] = self + + @classmethod + def exists(cls, name): + return name in cls._services + + @classmethod + def remove(cls, name): + if cls.exists(name): + cls._services.pop(name) def exportMethod(self, method): if callable(method): @@ -60,39 +67,86 @@ class Service(object): else: raise -class LocalService(Service): +class LocalService(object): + """ Proxy for local services. + + Any instance of this class will behave like the single instance + of Service(name) + """ + __logger = logging.getLogger('service') def __init__(self, name): self.__name = name try: - self._service = SERVICES[name] + self._service = Service._services[name] for method_name, method_definition in self._service._methods.items(): setattr(self, method_name, method_definition) except KeyError, keyError: - Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) ) + self.__logger.error('This service does not exist: %s' % (str(keyError),) ) raise + def __call__(self, method, *params): return getattr(self, method)(*params) -def service_exist(name): - return SERVICES.get(name, False) +class ExportService(object): + """ Proxy for exported services. + + All methods here should take an AuthProxy as their first parameter. It + will be appended by the calling framework. + + Note that this class has no direct proxy, capable of calling + eservice.method(). Rather, the proxy should call + dispatch(method,auth,params) + """ + + _services = {} + _groups = {} + + def __init__(self, name, audience=''): + ExportService._services[name] = self + self.__name = name + + def joinGroup(self, name): + ExportService._groups.setdefault(name, {})[self.__name] = self + + @classmethod + def getService(cls,name): + return cls._services[name] + + def dispatch(self, method, auth, params): + raise Exception("stub dispatch at %s" % self.__name) + + def new_dispatch(self,method,auth,params): + raise Exception("stub dispatch at %s" % self.__name) + + def abortResponse(self, error, description, origin, details): + if not tools.config['debug_mode']: + raise Exception("%s -- %s\n\n%s"%(origin, description, details)) + else: + raise LOG_NOTSET = 'notset' LOG_DEBUG_RPC = 'debug_rpc' LOG_DEBUG = 'debug' +LOG_TEST = 'test' LOG_INFO = 'info' LOG_WARNING = 'warn' LOG_ERROR = 'error' LOG_CRITICAL = 'critical' -# add new log level below DEBUG -logging.DEBUG_RPC = logging.DEBUG - 1 +logging.DEBUG_RPC = logging.DEBUG - 2 +logging.addLevelName(logging.DEBUG_RPC, 'DEBUG_RPC') + +logging.TEST = logging.INFO - 5 +logging.addLevelName(logging.TEST, 'TEST') def init_logger(): import os + from tools.translate import resetlocale + resetlocale() logger = logging.getLogger() # create a format for log messages and dates - formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s', '%Y-%m-%d %H:%M:%S') + formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s') if tools.config['syslog']: # SysLog Handler @@ -111,10 +165,15 @@ def init_logger(): dirname = os.path.dirname(logf) if dirname and not os.path.isdir(dirname): os.makedirs(dirname) - handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30) + if tools.config['logrotate'] is not False: + handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30) + elif os.name == 'posix': + handler = logging.handlers.WatchedFileHandler(logf) + else: + handler = logging.handlers.FileHandler(logf) except Exception, ex: sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n") - handler = logging.StreamHandler(sys.stdout) + handler = logging.StreamHandler(sys.stdout) else: # Normal Handler on standard output handler = logging.StreamHandler(sys.stdout) @@ -140,6 +199,7 @@ def init_logger(): 'DEBUG_RPC': ('blue', 'white'), 'DEBUG': ('blue', 'default'), 'INFO': ('green', 'default'), + 'TEST': ('white', 'blue'), 'WARNING': ('yellow', 'default'), 'ERROR': ('red', 'default'), 'CRITICAL': ('white', 'red'), @@ -151,27 +211,57 @@ def init_logger(): class Logger(object): + def __init__(self): + warnings.warn("The netsvc.Logger API shouldn't be used anymore, please " + "use the standard `logging.getLogger` API instead", + PendingDeprecationWarning, stacklevel=2) + super(Logger, self).__init__() + def notifyChannel(self, name, level, msg): - log = logging.getLogger(name) + warnings.warn("notifyChannel API shouldn't be used anymore, please use " + "the standard `logging` module instead", + PendingDeprecationWarning, stacklevel=2) + from service.web_services import common + + log = logging.getLogger(tools.ustr(name)) + + if level in [LOG_DEBUG_RPC, LOG_TEST] and not hasattr(log, level): + fct = lambda msg, *args, **kwargs: log.log(getattr(logging, level.upper()), msg, *args, **kwargs) + setattr(log, level, fct) - if level == LOG_DEBUG_RPC and not hasattr(log, level): - fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs) - setattr(log, LOG_DEBUG_RPC, fct) level_method = getattr(log, level) if isinstance(msg, Exception): msg = tools.exception_to_unicode(msg) - try: - result = tools.ustr(msg).strip().split('\n') - except UnicodeDecodeError: - result = msg.strip().split('\n') - if len(result)>1: - for idx, s in enumerate(result): - level_method('[%02d]: %s' % (idx+1, s,)) - elif result: - level_method(result[0]) + try: + msg = tools.ustr(msg).strip() + if level in (LOG_ERROR, LOG_CRITICAL) and tools.config.get_misc('debug','env_info',False): + msg = common().exp_get_server_environment() + "\n" + msg + + result = msg.split('\n') + except UnicodeDecodeError: + result = msg.strip().split('\n') + try: + if len(result)>1: + for idx, s in enumerate(result): + level_method('[%02d]: %s' % (idx+1, s,)) + elif result: + level_method(result[0]) + except IOError,e: + # TODO: perhaps reset the logger streams? + #if logrotate closes our files, we end up here.. + pass + except: + # better ignore the exception and carry on.. + pass + + def set_loglevel(self, level): + log = logging.getLogger() + log.setLevel(logging.INFO) # make sure next msg is printed + log.info("Log level changed to %s" % logging.getLevelName(level)) + log.setLevel(level) def shutdown(self): logging.shutdown() @@ -180,36 +270,89 @@ import tools init_logger() class Agent(object): - _timers = [] + _timers = {} _logger = Logger() - def setAlarm(self, fn, dt, args=None, kwargs=None): - if not args: - args = [] - if not kwargs: - kwargs = {} + __logger = logging.getLogger('timer') + + def setAlarm(self, fn, dt, db_name, *args, **kwargs): wait = dt - time.time() if wait > 0: - self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name)) + self.__logger.debug("Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name)) timer = threading.Timer(wait, fn, args, kwargs) timer.start() - self._timers.append(timer) - for timer in self._timers[:]: - if not timer.isAlive(): - self._timers.remove(timer) - + self._timers.setdefault(db_name, []).append(timer) + + for db in self._timers: + for timer in self._timers[db]: + if not timer.isAlive(): + self._timers[db].remove(timer) + + @classmethod + def cancel(cls, db_name): + """Cancel all timers for a given database. If None passed, all timers are cancelled""" + for db in cls._timers: + if db_name is None or db == db_name: + for timer in cls._timers[db]: + timer.cancel() + + @classmethod def quit(cls): - for timer in cls._timers: - timer.cancel() - quit = classmethod(quit) - + cls.cancel(None) import traceback -class xmlrpc(object): - class RpcGateway(object): - def __init__(self, name): - self.name = name +class Server: + """ Generic interface for all servers with an event loop etc. + Override this to impement http, net-rpc etc. servers. + + Servers here must have threaded behaviour. start() must not block, + there is no run(). + """ + __is_started = False + __servers = [] + + + __logger = logging.getLogger('server') + + def __init__(self): + if Server.__is_started: + raise Exception('All instances of servers must be inited before the startAll()') + Server.__servers.append(self) + + def start(self): + self.__logger.debug("called stub Server.start") + + def stop(self): + self.__logger.debug("called stub Server.stop") + + def stats(self): + """ This function should return statistics about the server """ + return "%s: No statistics" % str(self.__class__) + + @classmethod + def startAll(cls): + if cls.__is_started: + return + cls.__logger.info("Starting %d services" % len(cls.__servers)) + for srv in cls.__servers: + srv.start() + cls.__is_started = True + + @classmethod + def quitAll(cls): + if not cls.__is_started: + return + cls.__logger.info("Stopping %d services" % len(cls.__servers)) + for srv in cls.__servers: + srv.stop() + cls.__is_started = False + + @classmethod + def allStats(cls): + res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]] + res.extend(srv.stats() for srv in cls.__servers) + return '\n'.join(res) class OpenERPDispatcherException(Exception): def __init__(self, exception, traceback): @@ -218,7 +361,6 @@ class OpenERPDispatcherException(Exception): class OpenERPDispatcher: def log(self, title, msg): - from pprint import pformat Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg)) def dispatch(self, service_name, method, params): @@ -226,223 +368,20 @@ class OpenERPDispatcher: self.log('service', service_name) self.log('method', method) self.log('params', params) - result = LocalService(service_name)(method, *params) + auth = getattr(self, 'auth_provider', None) + result = ExportService.getService(service_name).dispatch(method, auth, params) self.log('result', result) + # We shouldn't marshall None, + if result == None: + result = False return result except Exception, e: self.log('exception', tools.exception_to_unicode(e)) - if hasattr(e, 'traceback'): - tb = e.traceback - else: - tb = sys.exc_info() + tb = getattr(e, 'traceback', sys.exc_info()) tb_s = "".join(traceback.format_exception(*tb)) if tools.config['debug_mode']: import pdb pdb.post_mortem(tb[2]) raise OpenERPDispatcherException(e, tb_s) -class GenericXMLRPCRequestHandler(OpenERPDispatcher): - def _dispatch(self, method, params): - try: - service_name = self.path.split("/")[-1] - return self.dispatch(service_name, method, params) - except OpenERPDispatcherException, e: - raise xmlrpclib.Fault(tools.exception_to_unicode(e.exception), e.traceback) - -class SSLSocket(object): - def __init__(self, socket): - if not hasattr(socket, 'sock_shutdown'): - from OpenSSL import SSL - ctx = SSL.Context(SSL.SSLv23_METHOD) - ctx.use_privatekey_file(tools.config['secure_pkey_file']) - ctx.use_certificate_file(tools.config['secure_cert_file']) - - self.socket = SSL.Connection(ctx, socket) - else: - self.socket = socket - - def shutdown(self, how): - return self.socket.sock_shutdown(how) - - def __getattr__(self, name): - return getattr(self.socket, name) - -class doesitgohere(): - def recv(self, bufsize): - """ Another bugfix: SSL's recv() may raise - recoverable exceptions, which simply need us to retry - the call - """ - while True: - try: - return self.socket.recv(bufsize) - except SSL.WantReadError: - pass - except SSL.WantWriteError: - pass - - -class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - rpc_paths = map(lambda s: '/xmlrpc/%s' % s, SERVICES.keys()) - -class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): - def setup(self): - self.connection = SSLSocket(self.request) - self.rfile = socket._fileobject(self.request, "rb", self.rbufsize) - self.wfile = socket._fileobject(self.request, "wb", self.wbufsize) - -class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer): - encoding = None - allow_none = False - - def server_bind(self): - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self) - - def handle_error(self, request, client_address): - """ Override the error handler - """ - import traceback - Logger().notifyChannel("init", LOG_ERROR,"Server error in request from %s:\n%s" % - (client_address,traceback.format_exc())) - -class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer): - def __init__(self, server_address, HandlerClass, logRequests=1): - SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests) - self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type)) - self.server_bind() - self.server_activate() - - def handle_error(self, request, client_address): - """ Override the error handler - """ - import traceback - e_type, e_value, e_traceback = sys.exc_info() - Logger().notifyChannel("init", LOG_ERROR,"SSL Request handler error in request from %s: %s\n%s" % - (client_address,str(e_type),traceback.format_exc())) - -class HttpDaemon(threading.Thread): - def __init__(self, interface, port, secure=False): - threading.Thread.__init__(self) - self.__port = port - self.__interface = interface - self.secure = bool(secure) - handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure] - server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure] - - if self.secure: - from OpenSSL.SSL import Error as SSLError - else: - class SSLError(Exception): pass - try: - self.server = server_class((interface, port), handler_class, 0) - except SSLError, e: - Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can not load the certificate and/or the private key files") - sys.exit(1) - except Exception, e: - Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when starting the server daemon: %s" % (e,)) - sys.exit(1) - - - def attach(self, path, gw): - pass - - def stop(self): - self.running = False - if os.name != 'nt': - self.server.socket.shutdown( hasattr(socket, 'SHUT_RDWR') and socket.SHUT_RDWR or 2 ) - self.server.socket.close() - - def run(self): - self.server.register_introspection_functions() - - self.running = True - while self.running: - self.server.handle_request() - return True - - # If the server need to be run recursively - # - #signal.signal(signal.SIGALRM, self.my_handler) - #signal.alarm(6) - #while True: - # self.server.handle_request() - #signal.alarm(0) # Disable the alarm - -import tiny_socket -class TinySocketClientThread(threading.Thread, OpenERPDispatcher): - def __init__(self, sock, threads): - threading.Thread.__init__(self) - self.sock = sock - self.threads = threads - - def run(self): - import select - self.running = True - try: - ts = tiny_socket.mysocket(self.sock) - except: - self.sock.close() - self.threads.remove(self) - return False - while self.running: - try: - msg = ts.myreceive() - except: - self.sock.close() - self.threads.remove(self) - return False - try: - result = self.dispatch(msg[0], msg[1], msg[2:]) - ts.mysend(result) - except OpenERPDispatcherException, e: - new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling - ts.mysend(new_e, exception=True, traceback=e.traceback) - - self.sock.close() - self.threads.remove(self) - return True - - def stop(self): - self.running = False - - -class TinySocketServerThread(threading.Thread): - def __init__(self, interface, port, secure=False): - threading.Thread.__init__(self) - self.__port = port - self.__interface = interface - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((self.__interface, self.__port)) - self.socket.listen(5) - self.threads = [] - - def run(self): - import select - try: - self.running = True - while self.running: - (clientsocket, address) = self.socket.accept() - ct = TinySocketClientThread(clientsocket, self.threads) - self.threads.append(ct) - ct.start() - self.socket.close() - except Exception, e: - self.socket.close() - return False - - def stop(self): - self.running = False - for t in self.threads: - t.stop() - try: - if hasattr(socket, 'SHUT_RDWR'): - self.socket.shutdown(socket.SHUT_RDWR) - else: - self.socket.shutdown(2) - self.socket.close() - except: - return False - # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: