2 # -*- encoding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2008 Tiny SPRL (<http://tiny.be>). All Rights Reserved
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 ##############################################################################
27 import SimpleXMLRPCServer, signal, sys, xmlrpclib
31 import logging.handlers
40 class ServiceEndPointCall(object):
41 def __init__(self, id, method):
45 def __call__(self, *args):
46 _res[self._id] = self._meth(*args)
50 class ServiceEndPoint(object):
51 def __init__(self, name, id):
56 self._meth[m] = s._method[m]
58 def __getattr__(self, name):
59 return ServiceEndPointCall(self._id, self._meth[name])
62 class Service(object):
63 _serviceEndPointID = 0
65 def __init__(self, name, audience=''):
69 self.exportedMethods = None
70 self._response_process = None
71 self._response_process_id = None
74 def joinGroup(self, name):
75 if not name in _group:
77 _group[name][self.__name] = self
79 def exportMethod(self, m):
81 self._method[m.__name__] = m
83 def serviceEndPoint(self, s):
84 if Service._serviceEndPointID >= 2**16:
85 Service._serviceEndPointID = 0
86 Service._serviceEndPointID += 1
87 return ServiceEndPoint(s, self._serviceEndPointID)
89 def conversationId(self):
92 def processResponse(self, s, id):
93 self._response_process, self._response_process_id = s, id
95 def processFailure(self, s, id):
98 def resumeResponse(self, s):
101 def cancelResponse(self, s):
104 def suspendResponse(self, s):
105 if self._response_process:
106 self._response_process(self._response_process_id,
107 _res[self._response_process_id])
108 self._response_process = None
109 self._response = s(self._response_process_id)
111 def abortResponse(self, error, description, origin, details):
113 if not tools.config['debug_mode']:
114 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
118 def currentFailure(self, s):
122 class LocalService(Service):
123 def __init__(self, name):
129 setattr(self, m, s._method[m])
130 except KeyError, keyError:
131 Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
136 class ServiceUnavailable(Exception):
140 def service_exist(name):
141 return (name in _service) and bool(_service[name])
145 return map(lambda s: '/xmlrpc/%s' % s, _service)
148 LOG_DEBUG_RPC = 'debug_rpc'
153 LOG_CRITICAL = 'critical'
155 # add new log level below DEBUG
156 logging.DEBUG_RPC = logging.DEBUG - 1
159 from tools import config
162 if config['logfile']:
163 logf = config['logfile']
165 dirname = os.path.dirname(logf)
166 if dirname and not os.path.isdir(dirname):
168 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
169 except Exception, ex:
170 sys.stderr.write("ERROR: couldn't create the logfile directory\n")
171 handler = logging.StreamHandler(sys.stdout)
173 handler = logging.StreamHandler(sys.stdout)
175 # create a format for log messages and dates
176 formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s', '%a %b %d %H:%M:%S %Y')
178 # tell the handler to use this format
179 handler.setFormatter(formatter)
181 # add the handler to the root logger
182 logging.getLogger().addHandler(handler)
183 logging.getLogger().setLevel(config['log_level'])
185 if (not isinstance(handler, logging.FileHandler)) and os.name != 'nt':
186 # change color of level names
187 # uses of ANSI color codes
188 # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
189 # maybe use http://code.activestate.com/recipes/574451/
190 colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
191 foreground = lambda f: 30 + colors.index(f)
192 background = lambda f: 40 + colors.index(f)
195 'DEBUG_RPC': ('blue', 'white'),
196 'DEBUG': ('blue', 'default'),
197 'INFO': ('green', 'default'),
198 'WARNING': ('yellow', 'default'),
199 'ERROR': ('red', 'default'),
200 'CRITICAL': ('white', 'red'),
203 for level, (fg, bg) in mapping.items():
204 msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
205 logging.addLevelName(getattr(logging, level), msg)
208 class Logger(object):
209 def notifyChannel(self, name, level, msg):
210 log = logging.getLogger(name)
212 if level == LOG_DEBUG_RPC and not hasattr(log, level):
213 fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
214 setattr(log, LOG_DEBUG_RPC, fct)
216 level_method = getattr(log, level)
218 result = str(msg).strip().split('\n')
220 for idx, s in enumerate(result):
221 level_method('[%02d]: %s' % (idx+1, s,))
223 level_method(result[0])
231 def setAlarm(self, fn, dt, args=None, kwargs=None):
236 wait = dt - time.time()
238 self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
239 timer = threading.Timer(wait, fn, args, kwargs)
241 self._timers.append(timer)
242 for timer in self._timers[:]:
243 if not timer.isAlive():
244 self._timers.remove(timer)
247 for timer in cls._timers:
249 quit = classmethod(quit)
252 class RpcGateway(object):
253 def __init__(self, name):
257 class Dispatcher(object):
261 def monitor(self, signal):
268 class xmlrpc(object):
269 class RpcGateway(object):
270 def __init__(self, name):
274 class GenericXMLRPCRequestHandler:
275 def _dispatch(self, method, params):
276 # print 'TERP-CALL : ',method, params
279 n = self.path.split("/")[-1]
281 m = getattr(s, method)
282 s._service._response = None
284 res = s._service._response
289 tb_s = reduce(lambda x, y: x+y, traceback.format_exception(
290 sys.exc_type, sys.exc_value, sys.exc_traceback))
293 if tools.config['debug_mode']:
295 tb = sys.exc_info()[2]
297 raise xmlrpclib.Fault(s, tb_s)
300 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
301 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.rpc_paths = get_rpc_paths()
303 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
305 def server_bind(self):
307 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
308 SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
310 Logger().notifyChannel('init', LOG_CRITICAL, 'Address already in use')
313 class HttpDaemon(threading.Thread):
315 def __init__(self, interface, port, secure=False):
316 threading.Thread.__init__(self)
318 self.__interface = interface
321 from ssl import SecureXMLRPCServer
323 class SecureXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SecureXMLRPCServer.SecureXMLRPCRequestHandler):
324 SecureXMLRPCServer.SecureXMLRPCRequestHandler.rpc_paths = get_rpc_paths()
326 class SecureThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SecureXMLRPCServer.SecureXMLRPCServer):
327 def server_bind(self):
329 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
330 SecureXMLRPCServer.SecureXMLRPCServer.server_bind(self)
332 sys.stderr.write("ERROR: address already in use\n")
335 self.server = SecureThreadedXMLRPCServer((interface, port),
336 SecureXMLRPCRequestHandler, 0)
338 self.server = SimpleThreadedXMLRPCServer((interface, port),
339 SimpleXMLRPCRequestHandler, 0)
341 def attach(self, path, gw):
347 if hasattr(socket, 'SHUT_RDWR'):
349 self.server.socket.sock_shutdown(socket.SHUT_RDWR)
351 self.server.socket.shutdown(socket.SHUT_RDWR)
354 self.server.socket.sock_shutdown(2)
356 self.server.socket.shutdown(2)
357 self.server.socket.close()
360 self.server.register_introspection_functions()
364 self.server.handle_request()
367 # If the server need to be run recursively
369 #signal.signal(signal.SIGALRM, self.my_handler)
372 # self.server.handle_request()
373 #signal.alarm(0) # Disable the alarm
376 class TinySocketClientThread(threading.Thread):
377 def __init__(self, sock, threads):
378 threading.Thread.__init__(self)
380 self.threads = threads
381 self._logger = Logger()
384 self._logger.notifyChannel('NETRPC', LOG_DEBUG_RPC, msg)
392 ts = tiny_socket.mysocket(self.sock)
395 self.threads.remove(self)
402 self.threads.remove(self)
406 s = LocalService(msg[0])
407 m = getattr(s, msg[1])
408 s._service._response = None
410 res = s._service._response
416 tb_s = reduce(lambda x, y: x+y, traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
418 if tools.config['debug_mode']:
420 tb = sys.exc_info()[2]
422 e = Exception(str(e))
424 ts.mysend(e, exception=True, traceback=tb_s)
428 self.threads.remove(self)
435 class TinySocketServerThread(threading.Thread):
436 def __init__(self, interface, port, secure=False):
437 threading.Thread.__init__(self)
439 self.__interface = interface
440 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
441 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
442 self.socket.bind((self.__interface, self.__port))
443 self.socket.listen(5)
451 (clientsocket, address) = self.socket.accept()
452 ct = TinySocketClientThread(clientsocket, self.threads)
453 self.threads.append(ct)
462 for t in self.threads:
465 if hasattr(socket, 'SHUT_RDWR'):
466 self.socket.shutdown(socket.SHUT_RDWR)
468 self.socket.shutdown(2)
476 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: