Merged
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import SimpleXMLRPCServer
28 import SocketServer
29 import logging
30 import logging.handlers
31 import os
32 import signal
33 import socket
34 import sys
35 import threading
36 import time
37 import xmlrpclib
38 import release
39
40 class Service(object):
41     """ Base class for *Local* services 
42    
43         Functionality here is trusted, no authentication.
44     """
45     _services = {}
46     def __init__(self, name, audience=''):
47         Service._services[name] = self
48         self.__name = name
49         self._methods = {}
50
51     def joinGroup(self, name):
52         raise Exception("No group for local services")
53         #GROUPS.setdefault(name, {})[self.__name] = self
54
55     def service_exist(self,name):
56         return Service._services.has_key(name)
57
58     def exportMethod(self, method):
59         if callable(method):
60             self._methods[method.__name__] = method
61
62     def abortResponse(self, error, description, origin, details):
63         if not tools.config['debug_mode']:
64             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
65         else:
66             raise
67
68 class LocalService(object):
69     """ Proxy for local services. 
70     
71         Any instance of this class will behave like the single instance
72         of Service(name)
73     """
74     def __init__(self, name):
75         self.__name = name
76         try:
77             self._service = Service._services[name]
78             for method_name, method_definition in self._service._methods.items():
79                 setattr(self, method_name, method_definition)
80         except KeyError, keyError:
81             Logger().notifyChannel('module', LOG_ERROR, 'This service does not exist: %s' % (str(keyError),) )
82             raise
83     def __call__(self, method, *params):
84         return getattr(self, method)(*params)
85
86 class ExportService(object):
87     """ Proxy for exported services. 
88
89     All methods here should take an AuthProxy as their first parameter. It
90     will be appended by the calling framework.
91
92     Note that this class has no direct proxy, capable of calling 
93     eservice.method(). Rather, the proxy should call 
94     dispatch(method,auth,params)
95     """
96     
97     _services = {}
98     _groups = {}
99     
100     def __init__(self, name, audience=''):
101         ExportService._services[name] = self
102         self.__name = name
103
104     def joinGroup(self, name):
105         ExportService._groups.setdefault(name, {})[self.__name] = self
106
107     @classmethod
108     def getService(cls,name):
109         return cls._services[name]
110
111     def dispatch(self, method, auth, params):
112         raise Exception("stub dispatch at %s" % self.__name)
113         
114     def new_dispatch(self,method,auth,params):
115         raise Exception("stub dispatch at %s" % self.__name)
116
117     def abortResponse(self, error, description, origin, details):
118         if not tools.config['debug_mode']:
119             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
120         else:
121             raise
122
123 LOG_NOTSET = 'notset'
124 LOG_DEBUG_RPC = 'debug_rpc'
125 LOG_DEBUG = 'debug'
126 LOG_DEBUG2 = 'debug2'
127 LOG_INFO = 'info'
128 LOG_WARNING = 'warn'
129 LOG_ERROR = 'error'
130 LOG_CRITICAL = 'critical'
131
132 # add new log level below DEBUG
133 logging.DEBUG2 = logging.DEBUG - 1
134 logging.DEBUG_RPC = logging.DEBUG2 - 1
135
136 def init_logger():
137     import os
138     from tools.translate import resetlocale
139     resetlocale()
140
141     logger = logging.getLogger()
142     # create a format for log messages and dates
143     formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
144
145     if tools.config['syslog']:
146         # SysLog Handler
147         if os.name == 'nt':
148             handler = logging.handlers.NTEventLogHandler("%s %s" %
149                                                          (release.description,
150                                                           release.version))
151         else:
152             handler = logging.handlers.SysLogHandler('/dev/log')
153         formatter = logging.Formatter("%s %s" % (release.description, release.version) + ':%(levelname)s:%(name)s:%(message)s')
154
155     elif tools.config['logfile']:
156         # LogFile Handler
157         logf = tools.config['logfile']
158         try:
159             dirname = os.path.dirname(logf)
160             if dirname and not os.path.isdir(dirname):
161                 os.makedirs(dirname)
162             if tools.config['logrotate'] is not False:
163                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
164             elif os.name == 'posix':
165                 handler = logging.handlers.WatchedFileHandler(logf)
166             else:
167                 handler = logging.handlers.FileHandler(logf)
168         except Exception, ex:
169             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
170             handler = logging.StreamHandler(sys.stdout)
171     else:
172         # Normal Handler on standard output
173         handler = logging.StreamHandler(sys.stdout)
174
175
176     # tell the handler to use this format
177     handler.setFormatter(formatter)
178
179     # add the handler to the root logger
180     logger.addHandler(handler)
181     logger.setLevel(int(tools.config['log_level'] or '0'))
182
183     if (not isinstance(handler, logging.FileHandler)) and os.name != 'nt':
184         # change color of level names
185         # uses of ANSI color codes
186         # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
187         # maybe use http://code.activestate.com/recipes/574451/
188         colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
189         foreground = lambda f: 30 + colors.index(f)
190         background = lambda f: 40 + colors.index(f)
191
192         mapping = {
193             'DEBUG_RPC': ('blue', 'white'),
194             'DEBUG2': ('green', 'white'),
195             'DEBUG': ('blue', 'default'),
196             'INFO': ('green', 'default'),
197             'WARNING': ('yellow', 'default'),
198             'ERROR': ('red', 'default'),
199             'CRITICAL': ('white', 'red'),
200         }
201
202         for level, (fg, bg) in mapping.items():
203             msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
204             logging.addLevelName(getattr(logging, level), msg)
205
206
207 class Logger(object):
208
209     def notifyChannel(self, name, level, msg):
210         from service.web_services import common
211
212         log = logging.getLogger(tools.ustr(name))
213
214         if level == LOG_DEBUG2 and not hasattr(log, level):
215             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG2, msg, *args, **kwargs)
216             setattr(log, LOG_DEBUG2, fct)
217
218         if level == LOG_DEBUG_RPC and not hasattr(log, level):
219             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
220             setattr(log, LOG_DEBUG_RPC, fct)
221
222         level_method = getattr(log, level)
223
224         if isinstance(msg, Exception):
225             msg = tools.exception_to_unicode(msg)
226
227         try:
228             msg = tools.ustr(msg).strip()
229             if level in (LOG_ERROR,LOG_CRITICAL) and tools.config.get_misc('debug','env_info',False):
230                 msg = common().exp_get_server_environment() + "\n" + msg
231
232             result = msg.split('\n')
233         except UnicodeDecodeError:
234             result = msg.strip().split('\n')
235         try:
236             if len(result)>1:
237                 for idx, s in enumerate(result):
238                     level_method('[%02d]: %s' % (idx+1, s,))
239             elif result:
240                 level_method(result[0])
241         except IOError,e:
242             # TODO: perhaps reset the logger streams?
243             #if logrotate closes our files, we end up here..
244             pass
245         except:
246             # better ignore the exception and carry on..
247             pass
248
249     def set_loglevel(self, level):
250         log = logging.getLogger()
251         log.setLevel(logging.INFO) # make sure next msg is printed
252         log.info("Log level changed to %s" % logging.getLevelName(level))
253         log.setLevel(level)
254
255     def shutdown(self):
256         logging.shutdown()
257
258 import tools
259 init_logger()
260
261 class Agent(object):
262     _timers = {}
263     _logger = Logger()
264
265     def setAlarm(self, fn, dt, db_name, *args, **kwargs):
266         wait = dt - time.time()
267         if wait > 0:
268             self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
269             timer = threading.Timer(wait, fn, args, kwargs)
270             timer.start()
271             self._timers.setdefault(db_name, []).append(timer)
272
273         for db in self._timers:
274             for timer in self._timers[db]:
275                 if not timer.isAlive():
276                     self._timers[db].remove(timer)
277
278     @classmethod
279     def cancel(cls, db_name):
280         """Cancel all timers for a given database. If None passed, all timers are cancelled"""
281         for db in cls._timers:
282             if db_name is None or db == db_name:
283                 for timer in cls._timers[db]:
284                     timer.cancel()
285
286     @classmethod
287     def quit(cls):
288         cls.cancel(None)
289
290 import traceback
291
292 class Server:
293     """ Generic interface for all servers with an event loop etc.
294         Override this to impement http, net-rpc etc. servers.
295         
296         Servers here must have threaded behaviour. start() must not block,
297         there is no run().
298     """
299     __is_started = False
300     __servers = []
301     
302     def __init__(self):
303         if Server.__is_started:
304             raise Exception('All instances of servers must be inited before the startAll()')
305         Server.__servers.append(self)
306
307     def start(self):
308         print "called stub Server.start"
309         pass
310         
311     def stop(self):
312         print "called stub Server.stop"
313         pass
314
315     def stats(self):
316         """ This function should return statistics about the server """
317         return "%s: No statistics" % str(self.__class__)
318
319     @classmethod
320     def startAll(cls):
321         if cls.__is_started:
322             return
323         Logger().notifyChannel("services", LOG_INFO, 
324             "Starting %d services" % len(cls.__servers))
325         for srv in cls.__servers:
326             srv.start()
327         cls.__is_started = True
328     
329     @classmethod
330     def quitAll(cls):
331         if not cls.__is_started:
332             return
333         Logger().notifyChannel("services", LOG_INFO, 
334             "Stopping %d services" % len(cls.__servers))
335         for srv in cls.__servers:
336             srv.stop()
337         cls.__is_started = False
338
339     @classmethod
340     def allStats(cls):
341         res = ''
342         if cls.__is_started:
343             res += "Servers started\n"
344         else:
345             res += "Servers stopped\n"
346         for srv in cls.__servers:
347             try:
348                 res += srv.stats() + "\n"
349             except:
350                 pass
351         return res
352
353 class OpenERPDispatcherException(Exception):
354     def __init__(self, exception, traceback):
355         self.exception = exception
356         self.traceback = traceback
357
358 class OpenERPDispatcher:
359     def log(self, title, msg):
360         from pprint import pformat
361         Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg))
362
363     def dispatch(self, service_name, method, params):
364         try:
365             self.log('service', service_name)
366             self.log('method', method)
367             self.log('params', params)
368             if hasattr(self,'auth_provider'):
369                 auth = self.auth_provider
370             else:
371                 auth = None
372             result = ExportService.getService(service_name).dispatch(method, auth, params)
373             self.log('result', result)
374             # We shouldn't marshall None,
375             if result == None:
376                 result = False
377             return result
378         except Exception, e:
379             self.log('exception', tools.exception_to_unicode(e))
380             if hasattr(e, 'traceback'):
381                 tb = e.traceback
382             else:
383                 tb = sys.exc_info()
384             tb_s = "".join(traceback.format_exception(*tb))
385             if tools.config['debug_mode']:
386                 import pdb
387                 pdb.post_mortem(tb[2])
388             raise OpenERPDispatcherException(e, tb_s)
389
390 class GenericXMLRPCRequestHandler(OpenERPDispatcher):
391     def _dispatch(self, method, params):
392         try:
393             service_name = self.path.split("/")[-1]
394             return self.dispatch(service_name, method, params)
395         except OpenERPDispatcherException, e:
396             raise xmlrpclib.Fault(tools.exception_to_unicode(e.exception), e.traceback)
397
398 class SSLSocket(object):
399     def __init__(self, socket):
400         if not hasattr(socket, 'sock_shutdown'):
401             from OpenSSL import SSL
402             ctx = SSL.Context(SSL.SSLv23_METHOD)
403             ctx.use_privatekey_file(tools.config['secure_pkey_file'])
404             ctx.use_certificate_file(tools.config['secure_cert_file'])
405             self.socket = SSL.Connection(ctx, socket)
406         else:
407             self.socket = socket
408
409     def shutdown(self, how):
410         return self.socket.sock_shutdown(how)
411
412     def __getattr__(self, name):
413         return getattr(self.socket, name)
414
415 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
416     rpc_paths = map(lambda s: '/xmlrpc/%s' % s, GROUPS.get('web-services', {}).keys())
417
418 class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
419     def setup(self):
420         self.connection = SSLSocket(self.request)
421         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
422         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
423
424 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
425     def server_bind(self):
426         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
427         SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
428
429 class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer):
430     def __init__(self, server_address, HandlerClass, logRequests=1):
431         SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
432         self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type))
433         self.server_bind()
434         self.server_activate()
435
436 class HttpDaemon(threading.Thread):
437     def __init__(self, interface, port, secure=False):
438         threading.Thread.__init__(self)
439         self.__port = port
440         self.__interface = interface
441         self.secure = bool(secure)
442         handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure]
443         server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure]
444
445         if self.secure:
446             from OpenSSL.SSL import Error as SSLError
447         else:
448             class SSLError(Exception): pass
449         try:
450             self.server = server_class((interface, port), handler_class, 0)
451         except SSLError, e:
452             Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can not load the certificate and/or the private key files")
453             sys.exit(1)
454         except Exception, e:
455             Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when starting the server daemon: %s" % (e,))
456             sys.exit(1)
457
458
459     def attach(self, path, gw):
460         pass
461
462     def stop(self):
463         self.running = False
464         if os.name != 'nt':
465             try:
466                 self.server.socket.shutdown(
467                     hasattr(socket, 'SHUT_RDWR') and socket.SHUT_RDWR or 2)
468             except socket.error, e:
469                 if e.errno != 57: raise
470                 # OSX, socket shutdowns both sides if any side closes it
471                 # causing an error 57 'Socket is not connected' on shutdown
472                 # of the other side (or something), see
473                 # http://bugs.python.org/issue4397
474                 Logger().notifyChannel(
475                     'server', LOG_DEBUG,
476                     '"%s" when shutting down server socket, '
477                     'this is normal under OS X'%e)
478         self.server.socket.close()
479
480     def run(self):
481         self.server.register_introspection_functions()
482
483         self.running = True
484         while self.running:
485             self.server.handle_request()
486         return True
487
488         # If the server need to be run recursively
489         #
490         #signal.signal(signal.SIGALRM, self.my_handler)
491         #signal.alarm(6)
492         #while True:
493         #   self.server.handle_request()
494         #signal.alarm(0)          # Disable the alarm
495
496 import tiny_socket
497 class TinySocketClientThread(threading.Thread, OpenERPDispatcher):
498     def __init__(self, sock, threads):
499         threading.Thread.__init__(self)
500         self.sock = sock
501         self.threads = threads
502
503     def run(self):
504         import select
505         self.running = True
506         try:
507             ts = tiny_socket.mysocket(self.sock)
508         except:
509             self.sock.close()
510             self.threads.remove(self)
511             return False
512         while self.running:
513             try:
514                 msg = ts.myreceive()
515             except:
516                 self.sock.close()
517                 self.threads.remove(self)
518                 return False
519             try:
520                 result = self.dispatch(msg[0], msg[1], msg[2:])
521                 ts.mysend(result)
522             except OpenERPDispatcherException, e:
523                 new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling
524                 ts.mysend(new_e, exception=True, traceback=e.traceback)
525
526             self.sock.close()
527             self.threads.remove(self)
528             return True
529
530     def stop(self):
531         self.running = False
532
533
534 class TinySocketServerThread(threading.Thread):
535     def __init__(self, interface, port, secure=False):
536         threading.Thread.__init__(self)
537         self.__port = port
538         self.__interface = interface
539         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
540         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
541         self.socket.bind((self.__interface, self.__port))
542         self.socket.listen(5)
543         self.threads = []
544
545     def run(self):
546         import select
547         try:
548             self.running = True
549             while self.running:
550                 (clientsocket, address) = self.socket.accept()
551                 ct = TinySocketClientThread(clientsocket, self.threads)
552                 self.threads.append(ct)
553                 ct.start()
554             self.socket.close()
555         except Exception, e:
556             self.socket.close()
557             return False
558
559     def stop(self):
560         self.running = False
561         for t in self.threads:
562             t.stop()
563         try:
564             if hasattr(socket, 'SHUT_RDWR'):
565                 self.socket.shutdown(socket.SHUT_RDWR)
566             else:
567                 self.socket.shutdown(2)
568             self.socket.close()
569         except:
570             return False
571
572 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: