2 # -*- encoding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2008 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 # The refactoring about the OpenSSL support come from Tryton
8 # Copyright (C) 2007-2008 Cédric Krier.
9 # Copyright (C) 2007-2008 Bertrand Chenal.
10 # Copyright (C) 2008 B2CK SPRL.
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 ##############################################################################
28 import SimpleXMLRPCServer
31 import logging.handlers
45 class ServiceEndPointCall(object):
46 def __init__(self, id, method):
50 def __call__(self, *args):
51 _res[self._id] = self._meth(*args)
55 class ServiceEndPoint(object):
56 def __init__(self, name, id):
61 self._meth[m] = s._method[m]
63 def __getattr__(self, name):
64 return ServiceEndPointCall(self._id, self._meth[name])
67 class Service(object):
68 _serviceEndPointID = 0
70 def __init__(self, name, audience=''):
74 self.exportedMethods = None
75 self._response_process = None
76 self._response_process_id = None
79 def joinGroup(self, name):
80 if not name in _group:
82 _group[name][self.__name] = self
84 def exportMethod(self, m):
86 self._method[m.__name__] = m
88 def serviceEndPoint(self, s):
89 if Service._serviceEndPointID >= 2**16:
90 Service._serviceEndPointID = 0
91 Service._serviceEndPointID += 1
92 return ServiceEndPoint(s, self._serviceEndPointID)
94 def conversationId(self):
97 def processResponse(self, s, id):
98 self._response_process, self._response_process_id = s, id
100 def processFailure(self, s, id):
103 def resumeResponse(self, s):
106 def cancelResponse(self, s):
109 def suspendResponse(self, s):
110 if self._response_process:
111 self._response_process(self._response_process_id,
112 _res[self._response_process_id])
113 self._response_process = None
114 self._response = s(self._response_process_id)
116 def abortResponse(self, error, description, origin, details):
118 if not tools.config['debug_mode']:
119 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
123 def currentFailure(self, s):
127 class LocalService(Service):
128 def __init__(self, name):
134 setattr(self, m, s._method[m])
135 except KeyError, keyError:
136 Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
139 def service_exist(name):
140 return (name in _service) and bool(_service[name])
142 LOG_DEBUG_RPC = 'debug_rpc'
147 LOG_CRITICAL = 'critical'
149 # add new log level below DEBUG
150 logging.DEBUG_RPC = logging.DEBUG - 1
153 from tools import config
156 if config['logfile']:
157 logf = config['logfile']
159 dirname = os.path.dirname(logf)
160 if dirname and not os.path.isdir(dirname):
162 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
163 except Exception, ex:
164 sys.stderr.write("ERROR: couldn't create the logfile directory\n")
165 handler = logging.StreamHandler(sys.stdout)
167 handler = logging.StreamHandler(sys.stdout)
169 # create a format for log messages and dates
170 formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s', '%a %b %d %H:%M:%S %Y')
172 # tell the handler to use this format
173 handler.setFormatter(formatter)
175 # add the handler to the root logger
176 logging.getLogger().addHandler(handler)
177 logging.getLogger().setLevel(config['log_level'])
180 if isinstance(handler, logging.StreamHandler) and os.name != 'nt':
181 # change color of level names
182 # uses of ANSI color codes
183 # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
184 # maybe use http://code.activestate.com/recipes/574451/
185 colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
186 foreground = lambda f: 30 + colors.index(f)
187 background = lambda f: 40 + colors.index(f)
190 'DEBUG_RPC': ('blue', 'white'),
191 'DEBUG': ('blue', 'default'),
192 'INFO': ('green', 'default'),
193 'WARNING': ('yellow', 'default'),
194 'ERROR': ('red', 'default'),
195 'CRITICAL': ('white', 'red'),
198 for level, (fg, bg) in mapping.items():
199 msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
200 logging.addLevelName(getattr(logging, level), msg)
203 class Logger(object):
204 def notifyChannel(self, name, level, msg):
205 log = logging.getLogger(name)
207 if level == LOG_DEBUG_RPC and not hasattr(log, level):
208 fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
209 setattr(log, LOG_DEBUG_RPC, fct)
211 level_method = getattr(log, level)
213 result = str(msg).strip().split('\n')
215 for idx, s in enumerate(result):
216 level_method('[%02d]: %s' % (idx+1, s,))
218 level_method(result[0])
226 def setAlarm(self, fn, dt, args=None, kwargs=None):
231 wait = dt - time.time()
233 self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
234 timer = threading.Timer(wait, fn, args, kwargs)
236 self._timers.append(timer)
237 for timer in self._timers[:]:
238 if not timer.isAlive():
239 self._timers.remove(timer)
242 for timer in cls._timers:
244 quit = classmethod(quit)
246 class xmlrpc(object):
247 class RpcGateway(object):
248 def __init__(self, name):
251 class GenericXMLRPCRequestHandler:
252 def log(self, title, msg):
253 from pprint import pformat
254 Logger().notifyChannel('XMLRPC-%s' % title, LOG_DEBUG_RPC, pformat(msg))
256 def _dispatch(self, method, params):
259 self.log('method', method)
260 self.log('params', params)
261 n = self.path.split("/")[-1]
263 m = getattr(s, method)
264 s._service._response = None
266 self.log('result', r)
267 res = s._service._response
273 self.log('exception', e)
274 tb_s = reduce(lambda x, y: x+y, traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
277 if tools.config['debug_mode']:
279 tb = sys.exc_info()[2]
281 raise xmlrpclib.Fault(s, tb_s)
283 class SSLSocket(object):
284 def __init__(self, socket):
285 if not hasattr(socket, 'sock_shutdown'):
286 from OpenSSL import SSL
287 ctx = SSL.Context(SSL.SSLv23_METHOD)
288 ctx.use_privatekey_file('server.pkey')
289 ctx.use_certificate_file('server.cert')
290 self.socket = SSL.Connection(ctx, socket)
294 def shutdown(self, how):
295 return self.socket.sock_shutdown(how)
297 def __getattr__(self, name):
298 return getattr(self.socket, name)
300 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
301 rpc_paths = map(lambda s: '/xmlrpc/%s' % s, _service)
303 class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
305 self.connection = SSLSocket(self.request)
306 self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
307 self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
309 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
310 def server_bind(self):
311 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
312 SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
314 class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer):
315 def __init__(self, server_address, HandlerClass, logRequests=1):
316 SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
317 self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type))
319 self.server_activate()
321 class HttpDaemon(threading.Thread):
322 def __init__(self, interface, port, secure=False):
323 threading.Thread.__init__(self)
325 self.__interface = interface
326 self.secure = bool(secure)
327 handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure]
328 server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure]
331 from OpenSSL.SSL import Error as SSLError
333 class SSLError(Exception): pass
336 self.server = server_class((interface, port), handler_class, 0)
338 Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can't load the certificate and/or the private key files")
341 Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when strarting the server daemon: %s" % (e,))
345 def attach(self, path, gw):
351 self.server.socket.shutdown( hasattr(socket, 'SHUT_RDWR') and socket.SHUT_RDWR or 2 )
352 self.server.socket.close()
355 self.server.register_introspection_functions()
359 self.server.handle_request()
362 # If the server need to be run recursively
364 #signal.signal(signal.SIGALRM, self.my_handler)
367 # self.server.handle_request()
368 #signal.alarm(0) # Disable the alarm
371 class TinySocketClientThread(threading.Thread):
372 def __init__(self, sock, threads):
373 threading.Thread.__init__(self)
375 self.threads = threads
376 self._logger = Logger()
379 self._logger.notifyChannel('NETRPC', LOG_DEBUG_RPC, msg)
387 ts = tiny_socket.mysocket(self.sock)
390 self.threads.remove(self)
397 self.threads.remove(self)
401 service = LocalService(msg[0])
402 method = getattr(service, msg[1])
403 service._service._response = None
404 result_from_method = method(*msg[2:])
405 res = service._service._response
407 result_from_method = res
408 self.log(result_from_method)
409 ts.mysend(result_from_method)
412 tb_s = reduce(lambda x, y: x+y, traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
414 if tools.config['debug_mode']:
416 tb = sys.exc_info()[2]
418 e = Exception(str(e))
420 ts.mysend(e, exception=True, traceback=tb_s)
424 self.threads.remove(self)
431 class TinySocketServerThread(threading.Thread):
432 def __init__(self, interface, port, secure=False):
433 threading.Thread.__init__(self)
435 self.__interface = interface
436 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
437 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
438 self.socket.bind((self.__interface, self.__port))
439 self.socket.listen(5)
447 (clientsocket, address) = self.socket.accept()
448 ct = TinySocketClientThread(clientsocket, self.threads)
449 self.threads.append(ct)
458 for t in self.threads:
461 if hasattr(socket, 'SHUT_RDWR'):
462 self.socket.shutdown(socket.SHUT_RDWR)
464 self.socket.shutdown(2)
472 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: