Minor improvement in poolJobs debug string.
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/python
2 # -*- encoding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import SimpleXMLRPCServer
28 import SocketServer
29 import logging
30 import logging.handlers
31 import os
32 import signal
33 import socket
34 import sys
35 import threading
36 import time
37 import xmlrpclib
38 import release
39
40 SERVICES = {}
41 GROUPS = {}
42
43 class Service(object):
44     def __init__(self, name, audience=''):
45         SERVICES[name] = self
46         self.__name = name
47         self._methods = {}
48
49     def joinGroup(self, name):
50         GROUPS.setdefault(name, {})[self.__name] = self
51
52     def exportMethod(self, method):
53         if callable(method):
54             self._methods[method.__name__] = method
55
56     def abortResponse(self, error, description, origin, details):
57         if not tools.config['debug_mode']:
58             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
59         else:
60             raise
61
62 class LocalService(Service):
63     def __init__(self, name):
64         self.__name = name
65         try:
66             self._service = SERVICES[name]
67             for method_name, method_definition in self._service._methods.items():
68                 setattr(self, method_name, method_definition)
69         except KeyError, keyError:
70             Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
71             raise
72     def __call__(self, method, *params):
73         return getattr(self, method)(*params)
74
75 def service_exist(name):
76     return SERVICES.get(name, False)
77
78 LOG_NOTSET = 'notset'
79 LOG_DEBUG_RPC = 'debug_rpc'
80 LOG_DEBUG = 'debug'
81 LOG_DEBUG2 = 'debug2'
82 LOG_INFO = 'info'
83 LOG_WARNING = 'warn'
84 LOG_ERROR = 'error'
85 LOG_CRITICAL = 'critical'
86
87 # add new log level below DEBUG
88 logging.DEBUG2 = logging.DEBUG - 1
89 logging.DEBUG_RPC = logging.DEBUG2 - 1
90
91 def init_logger():
92     import os
93     from tools.translate import resetlocale
94     resetlocale()
95
96     logger = logging.getLogger()
97     # create a format for log messages and dates
98     formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
99
100     if tools.config['syslog']:
101         # SysLog Handler
102         if os.name == 'nt':
103             handler = logging.handlers.NTEventLogHandler("%s %s" %
104                                                          (release.description,
105                                                           release.version))
106         else:
107             handler = logging.handlers.SysLogHandler('/dev/log')
108         formatter = logging.Formatter("%s %s" % (release.description, release.version) + ':%(levelname)s:%(name)s:%(message)s')
109
110     elif tools.config['logfile']:
111         # LogFile Handler
112         logf = tools.config['logfile']
113         try:
114             dirname = os.path.dirname(logf)
115             if dirname and not os.path.isdir(dirname):
116                 os.makedirs(dirname)
117             if tools.config['logrotate'] is not False:
118                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
119             elif os.name == 'posix':
120                 handler = logging.handlers.WatchedFileHandler(logf)
121             else:
122                 handler = logging.handlers.FileHandler(logf)
123         except Exception, ex:
124             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
125             handler = logging.StreamHandler(sys.stdout)
126     else:
127         # Normal Handler on standard output
128         handler = logging.StreamHandler(sys.stdout)
129
130
131     # tell the handler to use this format
132     handler.setFormatter(formatter)
133
134     # add the handler to the root logger
135     logger.addHandler(handler)
136     logger.setLevel(int(tools.config['log_level'] or '0'))
137
138     if (not isinstance(handler, logging.FileHandler)) and os.name != 'nt':
139         # change color of level names
140         # uses of ANSI color codes
141         # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
142         # maybe use http://code.activestate.com/recipes/574451/
143         colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
144         foreground = lambda f: 30 + colors.index(f)
145         background = lambda f: 40 + colors.index(f)
146
147         mapping = {
148             'DEBUG_RPC': ('blue', 'white'),
149             'DEBUG2': ('green', 'white'),
150             'DEBUG': ('blue', 'default'),
151             'INFO': ('green', 'default'),
152             'WARNING': ('yellow', 'default'),
153             'ERROR': ('red', 'default'),
154             'CRITICAL': ('white', 'red'),
155         }
156
157         for level, (fg, bg) in mapping.items():
158             msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
159             logging.addLevelName(getattr(logging, level), msg)
160
161
162 class Logger(object):
163
164     def notifyChannel(self, name, level, msg):
165         from service.web_services import common
166
167         log = logging.getLogger(tools.ustr(name))
168
169         if level == LOG_DEBUG2 and not hasattr(log, level):
170             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG2, msg, *args, **kwargs)
171             setattr(log, LOG_DEBUG2, fct)
172
173         if level == LOG_DEBUG_RPC and not hasattr(log, level):
174             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
175             setattr(log, LOG_DEBUG_RPC, fct)
176
177         level_method = getattr(log, level)
178
179         if isinstance(msg, Exception):
180             msg = tools.exception_to_unicode(msg)
181
182         try:
183             msg = tools.ustr(msg).strip()
184             if level in (LOG_ERROR,LOG_CRITICAL) and tools.config.get_misc('debug','env_info',True):
185                 msg = common().get_server_environment() + msg
186
187             result = msg.split('\n')
188         except UnicodeDecodeError:
189                 result = msg.strip().split('\n')
190         try:
191             if len(result)>1:
192                 for idx, s in enumerate(result):
193                     level_method('[%02d]: %s' % (idx+1, s,))
194             elif result:
195                 level_method(result[0])
196         except IOError,e:
197                 # TODO: perhaps reset the logger streams?
198                 #if logrotate closes our files, we end up here..
199                 pass
200         except:
201                 # better ignore the exception and carry on..
202                 pass
203
204     def set_loglevel(self, level):
205         log = logging.getLogger()
206         log.setLevel(logging.INFO) # make sure next msg is printed
207         log.info("Log level changed to %s" % logging.getLevelName(level))
208         log.setLevel(level)
209
210     def shutdown(self):
211         logging.shutdown()
212
213 import tools
214 init_logger()
215
216 class Agent(object):
217     _timers = {}
218     _logger = Logger()
219
220     def setAlarm(self, fn, dt, db_name, *args, **kwargs):
221         wait = dt - time.time()
222         if wait > 0:
223             self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
224             timer = threading.Timer(wait, fn, args, kwargs)
225             timer.start()
226             self._timers.setdefault(db_name, []).append(timer)
227
228         for db in self._timers:
229             for timer in self._timers[db]:
230                 if not timer.isAlive():
231                     self._timers[db].remove(timer)
232
233     @classmethod
234     def cancel(cls, db_name):
235         """Cancel all timers for a given database. If None passed, all timers are cancelled"""
236         for db in cls._timers:
237             if db_name is None or db == db_name:
238                 for timer in cls._timers[db]:
239                     timer.cancel()
240
241     @classmethod
242     def quit(cls):
243         cls.cancel(None)
244
245 import traceback
246
247 class xmlrpc(object):
248     class RpcGateway(object):
249         def __init__(self, name):
250             self.name = name
251
252 class OpenERPDispatcherException(Exception):
253     def __init__(self, exception, traceback):
254         self.exception = exception
255         self.traceback = traceback
256
257 class OpenERPDispatcher:
258     def log(self, title, msg):
259         from pprint import pformat
260         Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg))
261
262     def dispatch(self, service_name, method, params):
263         try:
264             self.log('service', service_name)
265             self.log('method', method)
266             self.log('params', params)
267             result = LocalService(service_name)(method, *params)
268             self.log('result', result)
269             return result
270         except Exception, e:
271             self.log('exception', tools.exception_to_unicode(e))
272             if hasattr(e, 'traceback'):
273                 tb = e.traceback
274             else:
275                 tb = sys.exc_info()
276             tb_s = "".join(traceback.format_exception(*tb))
277             if tools.config['debug_mode']:
278                 import pdb
279                 pdb.post_mortem(tb[2])
280             raise OpenERPDispatcherException(e, tb_s)
281
282 class GenericXMLRPCRequestHandler(OpenERPDispatcher):
283     def _dispatch(self, method, params):
284         try:
285             service_name = self.path.split("/")[-1]
286             return self.dispatch(service_name, method, params)
287         except OpenERPDispatcherException, e:
288             raise xmlrpclib.Fault(tools.exception_to_unicode(e.exception), e.traceback)
289
290 class SSLSocket(object):
291     def __init__(self, socket):
292         if not hasattr(socket, 'sock_shutdown'):
293             from OpenSSL import SSL
294             ctx = SSL.Context(SSL.SSLv23_METHOD)
295             ctx.use_privatekey_file(tools.config['secure_pkey_file'])
296             ctx.use_certificate_file(tools.config['secure_cert_file'])
297
298             self.socket = SSL.Connection(ctx, socket)
299         else:
300             self.socket = socket
301
302     def shutdown(self, how):
303         return self.socket.sock_shutdown(how)
304
305     def __getattr__(self, name):
306         return getattr(self.socket, name)
307
308 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
309     rpc_paths = map(lambda s: '/xmlrpc/%s' % s, SERVICES.keys())
310
311 class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
312     def setup(self):
313         self.connection = SSLSocket(self.request)
314         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
315         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
316
317 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
318     encoding = None
319     allow_none = False
320
321     def server_bind(self):
322         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
323         SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
324
325     def handle_error(self, request, client_address):
326         """ Override the error handler
327         """
328         import traceback
329         Logger().notifyChannel("init", LOG_ERROR,"Server error in request from %s:\n%s" %
330                 (client_address,traceback.format_exc()))
331
332 class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer):
333     def __init__(self, server_address, HandlerClass, logRequests=1):
334         SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
335         self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type))
336         self.server_bind()
337         self.server_activate()
338
339     def handle_error(self, request, client_address):
340         """ Override the error handler
341         """
342         import traceback
343         e_type, e_value, e_traceback = sys.exc_info()
344         Logger().notifyChannel("init", LOG_ERROR,"SSL Request handler error in request from %s: %s\n%s" % 
345                         (client_address,str(e_type),traceback.format_exc()))
346
347 class HttpDaemon(threading.Thread):
348     def __init__(self, interface, port, secure=False):
349         threading.Thread.__init__(self)
350         self.__port = port
351         self.__interface = interface
352         self.secure = bool(secure)
353         handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure]
354         server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure]
355
356         if self.secure:
357             from OpenSSL.SSL import Error as SSLError
358         else:
359             class SSLError(Exception): pass
360         try:
361             self.server = server_class((interface, port), handler_class, 0)
362         except SSLError, e:
363             Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can not load the certificate and/or the private key files")
364             sys.exit(1)
365         except Exception, e:
366             Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when starting the server daemon: %s" % (e,))
367             sys.exit(1)
368
369
370     def attach(self, path, gw):
371         pass
372
373     def stop(self):
374         self.running = False
375         if os.name != 'nt':
376             self.server.socket.shutdown( hasattr(socket, 'SHUT_RDWR') and socket.SHUT_RDWR or 2 )
377         self.server.socket.close()
378
379     def run(self):
380         self.server.register_introspection_functions()
381
382         self.running = True
383         while self.running:
384             self.server.handle_request()
385         return True
386
387         # If the server need to be run recursively
388         #
389         #signal.signal(signal.SIGALRM, self.my_handler)
390         #signal.alarm(6)
391         #while True:
392         #   self.server.handle_request()
393         #signal.alarm(0)          # Disable the alarm
394
395 import tiny_socket
396 class TinySocketClientThread(threading.Thread, OpenERPDispatcher):
397     def __init__(self, sock, threads):
398         threading.Thread.__init__(self)
399         self.sock = sock
400         self.threads = threads
401
402     def run(self):
403         import select
404         self.running = True
405         try:
406             ts = tiny_socket.mysocket(self.sock)
407         except:
408             self.sock.close()
409             self.threads.remove(self)
410             return False
411         while self.running:
412             try:
413                 msg = ts.myreceive()
414             except:
415                 self.sock.close()
416                 self.threads.remove(self)
417                 return False
418             try:
419                 result = self.dispatch(msg[0], msg[1], msg[2:])
420                 ts.mysend(result)
421             except OpenERPDispatcherException, e:
422                 new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling
423                 ts.mysend(new_e, exception=True, traceback=e.traceback)
424
425             self.sock.close()
426             self.threads.remove(self)
427             return True
428
429     def stop(self):
430         self.running = False
431
432
433 class TinySocketServerThread(threading.Thread):
434     def __init__(self, interface, port, secure=False):
435         threading.Thread.__init__(self)
436         self.__port = port
437         self.__interface = interface
438         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
439         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
440         self.socket.bind((self.__interface, self.__port))
441         self.socket.listen(5)
442         self.threads = []
443
444     def run(self):
445         import select
446         try:
447             self.running = True
448             while self.running:
449                 (clientsocket, address) = self.socket.accept()
450                 ct = TinySocketClientThread(clientsocket, self.threads)
451                 self.threads.append(ct)
452                 ct.start()
453             self.socket.close()
454         except Exception, e:
455             self.socket.close()
456             return False
457
458     def stop(self):
459         self.running = False
460         for t in self.threads:
461             t.stop()
462         try:
463             if hasattr(socket, 'SHUT_RDWR'):
464                 self.socket.shutdown(socket.SHUT_RDWR)
465             else:
466                 self.socket.shutdown(2)
467             self.socket.close()
468         except:
469             return False
470
471 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: