2 # -*- encoding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 # The refactoring about the OpenSSL support come from Tryton
8 # Copyright (C) 2007-2009 Cédric Krier.
9 # Copyright (C) 2007-2009 Bertrand Chenal.
10 # Copyright (C) 2008 B2CK SPRL.
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 ##############################################################################
27 import SimpleXMLRPCServer
30 import logging.handlers
41 from pprint import pformat
47 class Service(object):
48 def __init__(self, name, audience=''):
53 def joinGroup(self, name):
54 GROUPS.setdefault(name, {})[self.__name] = self
56 def exportMethod(self, method):
58 self._methods[method.__name__] = method
60 def abortResponse(self, error, description, origin, details):
61 if not tools.config['debug_mode']:
62 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
66 class LocalService(Service):
67 def __init__(self, name):
70 self._service = SERVICES[name]
71 for method_name, method_definition in self._service._methods.items():
72 setattr(self, method_name, method_definition)
73 except KeyError, keyError:
74 Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
76 def __call__(self, method, *params):
77 return getattr(self, method)(*params)
79 def service_exist(name):
80 return SERVICES.get(name, False)
83 LOG_DEBUG_RPC = 'debug_rpc'
88 LOG_CRITICAL = 'critical'
90 # add new log level below DEBUG
91 logging.DEBUG_RPC = logging.DEBUG - 1
94 class DBLogger(logging.getLoggerClass()):
95 def makeRecord(self, *args, **kwargs):
96 record = logging.Logger.makeRecord(self, *args, **kwargs)
97 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
102 from tools.translate import resetlocale
105 logging.setLoggerClass(DBLogger)
107 # create a format for log messages and dates
108 formatter = logging.Formatter('[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s')
110 logging_to_stdout = False
111 if tools.config['syslog']:
114 handler = logging.handlers.NTEventLogHandler("%s %s" %
115 (release.description,
118 handler = logging.handlers.SysLogHandler('/dev/log')
119 formatter = logging.Formatter("%s %s" % (release.description, release.version) + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s')
121 elif tools.config['logfile']:
123 logf = tools.config['logfile']
125 dirname = os.path.dirname(logf)
126 if dirname and not os.path.isdir(dirname):
128 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
129 except Exception, ex:
130 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
131 handler = logging.StreamHandler(sys.stdout)
132 logging_to_stdout = True
134 # Normal Handler on standard output
135 handler = logging.StreamHandler(sys.stdout)
136 logging_to_stdout = True
139 # tell the handler to use this format
140 handler.setFormatter(formatter)
142 # add the handler to the root logger
143 logger = logging.getLogger()
144 logger.addHandler(handler)
145 logger.setLevel(tools.config['log_level'] or '0')
147 if logging_to_stdout and os.name != 'nt':
148 # change color of level names
149 # uses of ANSI color codes
150 # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
151 # maybe use http://code.activestate.com/recipes/574451/
152 colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
153 foreground = lambda f: 30 + colors.index(f)
154 background = lambda f: 40 + colors.index(f)
157 'DEBUG_RPC': ('blue', 'white'),
158 'DEBUG': ('blue', 'default'),
159 'INFO': ('green', 'default'),
160 'WARNING': ('yellow', 'default'),
161 'ERROR': ('red', 'default'),
162 'CRITICAL': ('white', 'red'),
165 for level, (fg, bg) in mapping.items():
166 msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
167 logging.addLevelName(getattr(logging, level), msg)
170 class Logger(object):
172 def notifyChannel(self, name, level, msg):
173 from service.web_services import common
175 log = logging.getLogger(tools.ustr(name))
177 if level == LOG_DEBUG_RPC and not hasattr(log, level):
178 fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
179 setattr(log, LOG_DEBUG_RPC, fct)
181 level_method = getattr(log, level)
183 if isinstance(msg, Exception):
184 msg = tools.exception_to_unicode(msg)
186 msg = tools.ustr(msg).strip()
188 if level in (LOG_ERROR,LOG_CRITICAL):
189 msg = common().get_server_environment() + '\n' + msg
191 result = msg.split('\n')
193 for idx, s in enumerate(result):
194 level_method('[%02d]: %s' % (idx+1, s,))
196 level_method(result[0])
205 """Singleton that keeps track of cancellable tasks to run at a given
207 The tasks are caracterised by:
209 * the database on which the task run
210 * the function to call
211 * the arguments and keyword arguments to pass to the function
213 Implementation details:
214 Tasks are stored as list, allowing the cancellation by setting
216 A heapq is used to store tasks, so we don't need to sort
224 def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
225 task = [timestamp, db_name, function, args, kwargs]
226 heapq.heappush(cls.__tasks, task)
227 cls.__tasks_by_db.setdefault(db_name, []).append(task)
230 def cancel(cls, db_name):
231 """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
233 cls.__tasks, cls.__tasks_by_db = [], {}
235 if db_name in cls.__tasks_by_db:
236 for task in cls.__tasks_by_db[db_name]:
245 """Neverending function (intended to be ran in a dedicated thread) that
246 checks every 60 seconds tasks to run.
248 current_thread = threading.currentThread()
250 while cls.__tasks and cls.__tasks[0][0] < time.time():
251 task = heapq.heappop(cls.__tasks)
252 timestamp, dbname, function, args, kwargs = task
253 cls.__tasks_by_db[dbname].remove(task)
255 # null timestamp -> cancelled task
257 current_thread.dbname = dbname # hack hack
258 cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs))
259 delattr(current_thread, 'dbname')
260 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
261 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
262 task_thread.daemon = False
267 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
268 # the agent runner is a typical daemon thread, that will never quit and must be
269 # terminated when the main process exits - with no consequence (the processing
270 # threads it spawns are not marked daemon)
271 agent_runner.daemon = True
277 class xmlrpc(object):
278 class RpcGateway(object):
279 def __init__(self, name):
282 class OpenERPDispatcherException(Exception):
283 def __init__(self, exception, traceback):
284 self.exception = exception
285 self.traceback = traceback
287 class OpenERPDispatcher:
288 def log(self, title, msg):
289 if tools.config['log_level'] == logging.DEBUG_RPC:
290 Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg))
292 def dispatch(self, service_name, method, params):
293 if service_name not in GROUPS['web-services']:
294 raise Exception('AccessDenied')
296 self.log('service', service_name)
297 self.log('method', method)
298 self.log('params', params)
299 result = LocalService(service_name)(method, *params)
300 if result is None: #we cannot marshal none in XMLRPC
302 self.log('result', result)
305 self.log('exception', tools.exception_to_unicode(e))
306 if hasattr(e, 'traceback'):
310 tb_s = "".join(traceback.format_exception(*tb))
311 if tools.config['debug_mode']:
313 pdb.post_mortem(tb[2])
314 raise OpenERPDispatcherException(e, tb_s)
316 class GenericXMLRPCRequestHandler(OpenERPDispatcher):
317 def _dispatch(self, method, params):
319 service_name = self.path.split("/")[-1]
320 return self.dispatch(service_name, method, params)
321 except OpenERPDispatcherException, e:
322 raise xmlrpclib.Fault(tools.exception_to_unicode(e.exception), e.traceback)
324 class SSLSocket(object):
325 def __init__(self, socket):
326 if not hasattr(socket, 'sock_shutdown'):
327 from OpenSSL import SSL
328 ctx = SSL.Context(SSL.SSLv23_METHOD)
329 ctx.use_privatekey_file(tools.config['secure_pkey_file'])
330 ctx.use_certificate_file(tools.config['secure_cert_file'])
331 self.socket = SSL.Connection(ctx, socket)
335 def shutdown(self, how):
336 return self.socket.sock_shutdown(how)
338 def __getattr__(self, name):
339 return getattr(self.socket, name)
341 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
342 rpc_paths = map(lambda s: '/xmlrpc/%s' % s, GROUPS.get('web-services', {}).keys())
344 class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
346 self.connection = SSLSocket(self.request)
347 self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
348 self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
350 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
351 def server_bind(self):
352 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
353 SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
355 class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer):
356 def __init__(self, server_address, HandlerClass, logRequests=1):
357 SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
358 self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type))
360 self.server_activate()
363 def _close_socket(sock):
364 if os.name != 'nt': # XXX if someone know why, please leave a comment.
366 sock.shutdown(getattr(socket, 'SHUT_RDWR', 2))
367 except socket.error, e:
368 if e.errno != errno.ENOTCONN: raise
369 # OSX, socket shutdowns both sides if any side closes it
370 # causing an error 57 'Socket is not connected' on shutdown
371 # of the other side (or something), see
372 # http://bugs.python.org/issue4397
373 Logger().notifyChannel(
375 '"%s" when shutting down server socket, '
376 'this is normal under OS X'%e)
380 class HttpDaemon(threading.Thread):
381 def __init__(self, interface, port, secure=False):
382 threading.Thread.__init__(self)
384 self.__interface = interface
385 self.secure = bool(secure)
386 handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure]
387 server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure]
390 from OpenSSL.SSL import Error as SSLError
392 class SSLError(Exception): pass
394 self.server = server_class((interface, port), handler_class, 0)
396 Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can not load the certificate and/or the private key files")
399 Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when starting the server daemon: %s" % (e,))
403 def attach(self, path, gw):
408 _close_socket(self.server.socket)
411 self.server.register_introspection_functions()
416 self.server.handle_request()
417 except (socket.error, select.error), e:
418 if self.running or e.args[0] != errno.EBADF:
422 # If the server need to be run recursively
424 #signal.signal(signal.SIGALRM, self.my_handler)
427 # self.server.handle_request()
428 #signal.alarm(0) # Disable the alarm
431 class TinySocketClientThread(threading.Thread, OpenERPDispatcher):
432 def __init__(self, sock, threads):
433 threading.Thread.__init__(self)
435 self.threads = threads
440 ts = tiny_socket.mysocket(self.sock)
443 self.threads.remove(self)
450 self.threads.remove(self)
453 result = self.dispatch(msg[0], msg[1], msg[2:])
455 except OpenERPDispatcherException, e:
456 new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling
458 ts.mysend(new_e, exception=True, traceback=e.traceback)
461 self.threads.remove(self)
465 self.threads.remove(self)
472 class TinySocketServerThread(threading.Thread):
473 def __init__(self, interface, port, secure=False):
474 threading.Thread.__init__(self)
476 self.__interface = interface
477 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
479 self.socket.bind((self.__interface, self.__port))
480 self.socket.listen(5)
487 timeout = self.socket.gettimeout()
488 fd_sets = select.select([self.socket], [], [], timeout)
491 (clientsocket, address) = self.socket.accept()
492 ct = TinySocketClientThread(clientsocket, self.threads)
493 self.threads.append(ct)
502 for t in self.threads:
504 _close_socket(self.socket)
506 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: