Merge commit 'origin/master' into mdv-gpl3-fwd
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/python
2 # -*- encoding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2008 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    $Id$
8 #
9 #    This program is free software: you can redistribute it and/or modify
10 #    it under the terms of the GNU General Public License as published by
11 #    the Free Software Foundation, either version 3 of the License, or
12 #    (at your option) any later version.
13 #
14 #    This program is distributed in the hope that it will be useful,
15 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
16 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 #    GNU General Public License for more details.
18 #
19 #    You should have received a copy of the GNU General Public License
20 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22 ##############################################################################
23
24 import time
25 import threading
26
27 import SimpleXMLRPCServer, signal, sys, xmlrpclib
28 import SocketServer
29 import socket
30 import logging
31 import logging.handlers
32 import os
33
34 _service = {}
35 _group = {}
36 _res_id = 1
37 _res = {}
38
39
40 class ServiceEndPointCall(object):
41     def __init__(self, id, method):
42         self._id = id
43         self._meth = method
44
45     def __call__(self, *args):
46         _res[self._id] = self._meth(*args)
47         return self._id
48
49
50 class ServiceEndPoint(object):
51     def __init__(self, name, id):
52         self._id = id
53         self._meth = {}
54         s = _service[name]
55         for m in s._method:
56             self._meth[m] = s._method[m]
57
58     def __getattr__(self, name):
59         return ServiceEndPointCall(self._id, self._meth[name])
60
61
62 class Service(object):
63     _serviceEndPointID = 0
64
65     def __init__(self, name, audience=''):
66         _service[name] = self
67         self.__name = name
68         self._method = {}
69         self.exportedMethods = None
70         self._response_process = None
71         self._response_process_id = None
72         self._response = None
73
74     def joinGroup(self, name):
75         if not name in _group:
76             _group[name] = {}
77         _group[name][self.__name] = self
78
79     def exportMethod(self, m):
80         if callable(m):
81             self._method[m.__name__] = m
82
83     def serviceEndPoint(self, s):
84         if Service._serviceEndPointID >= 2**16:
85             Service._serviceEndPointID = 0
86         Service._serviceEndPointID += 1
87         return ServiceEndPoint(s, self._serviceEndPointID)
88
89     def conversationId(self):
90         return 1
91
92     def processResponse(self, s, id):
93         self._response_process, self._response_process_id = s, id
94
95     def processFailure(self, s, id):
96         pass
97
98     def resumeResponse(self, s):
99         pass
100
101     def cancelResponse(self, s):
102         pass
103
104     def suspendResponse(self, s):
105         if self._response_process:
106             self._response_process(self._response_process_id,
107                                    _res[self._response_process_id])
108         self._response_process = None
109         self._response = s(self._response_process_id)
110
111     def abortResponse(self, error, description, origin, details):
112         import tools
113         if not tools.config['debug_mode']:
114             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
115         else:
116             raise
117
118     def currentFailure(self, s):
119         pass
120
121
122 class LocalService(Service):
123     def __init__(self, name):
124         self.__name = name
125         try:
126             s = _service[name]
127             self._service = s
128             for m in s._method:
129                 setattr(self, m, s._method[m])
130         except KeyError, keyError:
131             Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
132             raise
133
134
135
136 class ServiceUnavailable(Exception):
137     pass
138
139
140 def service_exist(name):
141     return (name in _service) and bool(_service[name])
142
143
144 def get_rpc_paths():
145     return map(lambda s: '/xmlrpc/%s' % s, _service)
146
147
148 LOG_DEBUG_RPC = 'debug_rpc'
149 LOG_DEBUG = 'debug'
150 LOG_INFO = 'info'
151 LOG_WARNING = 'warn'
152 LOG_ERROR = 'error'
153 LOG_CRITICAL = 'critical'
154
155 # add new log level below DEBUG
156 logging.DEBUG_RPC = logging.DEBUG - 1
157
158 def init_logger():
159     from tools import config
160     import os
161
162     if config['logfile']:
163         logf = config['logfile']
164         try:
165             dirname = os.path.dirname(logf)
166             if dirname and not os.path.isdir(dirname):
167                 os.makedirs(dirname)
168             handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
169         except Exception, ex:
170             sys.stderr.write("ERROR: couldn't create the logfile directory\n")
171             handler = logging.StreamHandler(sys.stdout)
172     else:
173         handler = logging.StreamHandler(sys.stdout)
174
175     # create a format for log messages and dates
176     formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s', '%a %b %d %H:%M:%S %Y')
177
178     # tell the handler to use this format
179     handler.setFormatter(formatter)
180
181     # add the handler to the root logger
182     logging.getLogger().addHandler(handler)
183     logging.getLogger().setLevel(config['log_level'])
184
185     if (not isinstance(handler, logging.FileHandler)) and os.name != 'nt':
186         # change color of level names
187         # uses of ANSI color codes
188         # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
189         # maybe use http://code.activestate.com/recipes/574451/
190         colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
191         foreground = lambda f: 30 + colors.index(f)
192         background = lambda f: 40 + colors.index(f)
193
194         mapping = {
195             'DEBUG_RPC': ('blue', 'white'),
196             'DEBUG': ('blue', 'default'),
197             'INFO': ('green', 'default'),
198             'WARNING': ('yellow', 'default'),
199             'ERROR': ('red', 'default'),
200             'CRITICAL': ('white', 'red'),
201         }
202
203         for level, (fg, bg) in mapping.items():
204             msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
205             logging.addLevelName(getattr(logging, level), msg)
206
207
208 class Logger(object):
209     def notifyChannel(self, name, level, msg):
210         log = logging.getLogger(name)
211
212         if level == LOG_DEBUG_RPC and not hasattr(log, level):
213             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
214             setattr(log, LOG_DEBUG_RPC, fct)
215
216         level_method = getattr(log, level)
217
218         result = str(msg).strip().split('\n')
219         if len(result)>1:
220             for idx, s in enumerate(result):
221                 level_method('[%02d]: %s' % (idx+1, s,))
222         elif result:
223             level_method(result[0])
224
225 init_logger()
226
227 class Agent(object):
228     _timers = []
229     _logger = Logger()
230
231     def setAlarm(self, fn, dt, args=None, kwargs=None):
232         if not args:
233             args = []
234         if not kwargs:
235             kwargs = {}
236         wait = dt - time.time()
237         if wait > 0:
238             self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
239             timer = threading.Timer(wait, fn, args, kwargs)
240             timer.start()
241             self._timers.append(timer)
242         for timer in self._timers[:]:
243             if not timer.isAlive():
244                 self._timers.remove(timer)
245
246     def quit(cls):
247         for timer in cls._timers:
248             timer.cancel()
249     quit = classmethod(quit)
250
251
252 class RpcGateway(object):
253     def __init__(self, name):
254         self.name = name
255
256
257 class Dispatcher(object):
258     def __init__(self):
259         pass
260
261     def monitor(self, signal):
262         pass
263
264     def run(self):
265         pass
266
267
268 class xmlrpc(object):
269     class RpcGateway(object):
270         def __init__(self, name):
271             self.name = name
272
273
274 class GenericXMLRPCRequestHandler:
275     def _dispatch(self, method, params):
276 #        print 'TERP-CALL : ',method, params
277         import traceback
278         try:
279             n = self.path.split("/")[-1]
280             s = LocalService(n)
281             m = getattr(s, method)
282             s._service._response = None
283             r = m(*params)
284             res = s._service._response
285             if res != None:
286                 r = res
287             return r
288         except Exception, e:
289             tb_s = reduce(lambda x, y: x+y, traceback.format_exception(
290                 sys.exc_type, sys.exc_value, sys.exc_traceback))
291             s = str(e)
292             import tools
293             if tools.config['debug_mode']:
294                 import pdb
295                 tb = sys.exc_info()[2]
296                 pdb.post_mortem(tb)
297             raise xmlrpclib.Fault(s, tb_s)
298
299
300 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
301     SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.rpc_paths = get_rpc_paths()
302
303 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
304
305     def server_bind(self):
306         try:
307             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
308             SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
309         except:
310             Logger().notifyChannel('init', LOG_CRITICAL, 'Address already in use')
311             sys.exit(1)
312
313 class HttpDaemon(threading.Thread):
314
315     def __init__(self, interface, port, secure=False):
316         threading.Thread.__init__(self)
317         self.__port = port
318         self.__interface = interface
319         self.secure = secure
320         if secure:
321             from ssl import SecureXMLRPCServer
322
323             class SecureXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SecureXMLRPCServer.SecureXMLRPCRequestHandler):
324                 SecureXMLRPCServer.SecureXMLRPCRequestHandler.rpc_paths = get_rpc_paths()
325
326             class SecureThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SecureXMLRPCServer.SecureXMLRPCServer):
327                 def server_bind(self):
328                     try:
329                         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
330                         SecureXMLRPCServer.SecureXMLRPCServer.server_bind(self)
331                     except:
332                         sys.stderr.write("ERROR: address already in use\n")
333                         sys.exit(1)
334
335             self.server = SecureThreadedXMLRPCServer((interface, port),
336                     SecureXMLRPCRequestHandler, 0)
337         else:
338             self.server = SimpleThreadedXMLRPCServer((interface, port),
339                     SimpleXMLRPCRequestHandler, 0)
340
341     def attach(self, path, gw):
342         pass
343
344     def stop(self):
345         self.running = False
346         if os.name != 'nt':
347             if hasattr(socket, 'SHUT_RDWR'):
348                 if self.secure:
349                     self.server.socket.sock_shutdown(socket.SHUT_RDWR)
350                 else:
351                     self.server.socket.shutdown(socket.SHUT_RDWR)
352             else:
353                 if self.secure:
354                     self.server.socket.sock_shutdown(2)
355                 else:
356                     self.server.socket.shutdown(2)
357         self.server.socket.close()
358
359     def run(self):
360         self.server.register_introspection_functions()
361
362         self.running = True
363         while self.running:
364             self.server.handle_request()
365         return True
366
367         # If the server need to be run recursively
368         #
369         #signal.signal(signal.SIGALRM, self.my_handler)
370         #signal.alarm(6)
371         #while True:
372         #   self.server.handle_request()
373         #signal.alarm(0)          # Disable the alarm
374
375 import tiny_socket
376 class TinySocketClientThread(threading.Thread):
377     def __init__(self, sock, threads):
378         threading.Thread.__init__(self)
379         self.sock = sock
380         self.threads = threads
381         self._logger = Logger()
382
383     def log(self, msg):
384         self._logger.notifyChannel('NETRPC', LOG_DEBUG_RPC, msg)
385
386     def run(self):
387         import traceback
388         import time
389         import select
390         self.running = True
391         try:
392             ts = tiny_socket.mysocket(self.sock)
393         except:
394             self.sock.close()
395             self.threads.remove(self)
396             return False
397         while self.running:
398             try:
399                 msg = ts.myreceive()
400             except:
401                 self.sock.close()
402                 self.threads.remove(self)
403                 return False
404             try:
405                 self.log(msg)
406                 s = LocalService(msg[0])
407                 m = getattr(s, msg[1])
408                 s._service._response = None
409                 r = m(*msg[2:])
410                 res = s._service._response
411                 if res != None:
412                     r = res
413                 self.log(r)
414                 ts.mysend(r)
415             except Exception, e:
416                 tb_s = reduce(lambda x, y: x+y, traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
417                 import tools
418                 if tools.config['debug_mode']:
419                     import pdb
420                     tb = sys.exc_info()[2]
421                     pdb.post_mortem(tb)
422                 e = Exception(str(e))
423                 self.log(str(e))
424                 ts.mysend(e, exception=True, traceback=tb_s)
425             except:
426                 pass
427             self.sock.close()
428             self.threads.remove(self)
429             return True
430
431     def stop(self):
432         self.running = False
433
434
435 class TinySocketServerThread(threading.Thread):
436     def __init__(self, interface, port, secure=False):
437         threading.Thread.__init__(self)
438         self.__port = port
439         self.__interface = interface
440         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
441         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
442         self.socket.bind((self.__interface, self.__port))
443         self.socket.listen(5)
444         self.threads = []
445
446     def run(self):
447         import select
448         try:
449             self.running = True
450             while self.running:
451                 (clientsocket, address) = self.socket.accept()
452                 ct = TinySocketClientThread(clientsocket, self.threads)
453                 self.threads.append(ct)
454                 ct.start()
455             self.socket.close()
456         except Exception, e:
457             self.socket.close()
458             return False
459
460     def stop(self):
461         self.running = False
462         for t in self.threads:
463             t.stop()
464         try:
465             if hasattr(socket, 'SHUT_RDWR'):
466                 self.socket.shutdown(socket.SHUT_RDWR)
467             else:
468                 self.socket.shutdown(2)
469             self.socket.close()
470         except:
471             return False
472
473
474
475
476 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
477