[FIX] event: reset tax for each event to invoice
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/env python
2 # -*- encoding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import SimpleXMLRPCServer
28 import SocketServer
29 import logging
30 import logging.handlers
31 import os
32 import signal
33 import socket
34 import select
35 import errno
36 import sys
37 import threading
38 import time
39 import xmlrpclib
40 import release
41 from pprint import pformat
42 import heapq
43
44 SERVICES = {}
45 GROUPS = {}
46
47 class Service(object):
48     def __init__(self, name, audience=''):
49         SERVICES[name] = self
50         self.__name = name
51         self._methods = {}
52
53     def joinGroup(self, name):
54         GROUPS.setdefault(name, {})[self.__name] = self
55
56     def exportMethod(self, method):
57         if callable(method):
58             self._methods[method.__name__] = method
59
60     def abortResponse(self, error, description, origin, details):
61         if not tools.config['debug_mode']:
62             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
63         else:
64             raise
65
66 class LocalService(Service):
67     def __init__(self, name):
68         self.__name = name
69         try:
70             self._service = SERVICES[name]
71             for method_name, method_definition in self._service._methods.items():
72                 setattr(self, method_name, method_definition)
73         except KeyError, keyError:
74             Logger().notifyChannel('module', LOG_ERROR, 'This service does not exists: %s' % (str(keyError),) )
75             raise
76     def __call__(self, method, *params):
77         return getattr(self, method)(*params)
78
79 def service_exist(name):
80     return SERVICES.get(name, False)
81
82 LOG_NOTSET = 'notset'
83 LOG_DEBUG_RPC = 'debug_rpc'
84 LOG_DEBUG = 'debug'
85 LOG_INFO = 'info'
86 LOG_WARNING = 'warn'
87 LOG_ERROR = 'error'
88 LOG_CRITICAL = 'critical'
89
90 # add new log level below DEBUG
91 logging.DEBUG_RPC = logging.DEBUG - 1
92
93
94 class DBLogger(logging.getLoggerClass()):
95     def makeRecord(self, *args, **kwargs):
96         record = logging.Logger.makeRecord(self, *args, **kwargs)
97         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
98         return record
99
100 def init_logger():
101     import os
102     from tools.translate import resetlocale
103     resetlocale()
104
105     logging.setLoggerClass(DBLogger)
106
107     # create a format for log messages and dates
108     formatter = logging.Formatter('[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s')
109
110     logging_to_stdout = False
111     if tools.config['syslog']:
112         # SysLog Handler
113         if os.name == 'nt':
114             handler = logging.handlers.NTEventLogHandler("%s %s" %
115                                                          (release.description,
116                                                           release.version))
117         else:
118             handler = logging.handlers.SysLogHandler('/dev/log')
119         formatter = logging.Formatter("%s %s" % (release.description, release.version) + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s')
120
121     elif tools.config['logfile']:
122         # LogFile Handler
123         logf = tools.config['logfile']
124         try:
125             dirname = os.path.dirname(logf)
126             if dirname and not os.path.isdir(dirname):
127                 os.makedirs(dirname)
128             handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
129         except Exception, ex:
130             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
131             handler = logging.StreamHandler(sys.stdout)
132             logging_to_stdout = True
133     else:
134         # Normal Handler on standard output
135         handler = logging.StreamHandler(sys.stdout)
136         logging_to_stdout = True
137
138
139     # tell the handler to use this format
140     handler.setFormatter(formatter)
141
142     # add the handler to the root logger
143     logger = logging.getLogger()
144     logger.addHandler(handler)
145     logger.setLevel(tools.config['log_level'] or '0')
146
147     if logging_to_stdout and os.name != 'nt':
148         # change color of level names
149         # uses of ANSI color codes
150         # see http://pueblo.sourceforge.net/doc/manual/ansi_color_codes.html
151         # maybe use http://code.activestate.com/recipes/574451/
152         colors = ['black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white', None, 'default']
153         foreground = lambda f: 30 + colors.index(f)
154         background = lambda f: 40 + colors.index(f)
155
156         mapping = {
157             'DEBUG_RPC': ('blue', 'white'),
158             'DEBUG': ('blue', 'default'),
159             'INFO': ('green', 'default'),
160             'WARNING': ('yellow', 'default'),
161             'ERROR': ('red', 'default'),
162             'CRITICAL': ('white', 'red'),
163         }
164
165         for level, (fg, bg) in mapping.items():
166             msg = "\x1b[%dm\x1b[%dm%s\x1b[0m" % (foreground(fg), background(bg), level)
167             logging.addLevelName(getattr(logging, level), msg)
168
169
170 class Logger(object):
171
172     def notifyChannel(self, name, level, msg):
173         from service.web_services import common
174
175         log = logging.getLogger(tools.ustr(name))
176
177         if level == LOG_DEBUG_RPC and not hasattr(log, level):
178             fct = lambda msg, *args, **kwargs: log.log(logging.DEBUG_RPC, msg, *args, **kwargs)
179             setattr(log, LOG_DEBUG_RPC, fct)
180
181         level_method = getattr(log, level)
182
183         if isinstance(msg, Exception):
184             msg = tools.exception_to_unicode(msg)
185
186         msg = tools.ustr(msg).strip()
187
188         if level in (LOG_ERROR,LOG_CRITICAL):
189             msg = common().get_server_environment() + '\n' + msg
190
191         result = msg.split('\n')
192         if len(result)>1:
193             for idx, s in enumerate(result):
194                 level_method('[%02d]: %s' % (idx+1, s,))
195         elif result:
196             level_method(result[0])
197
198     def shutdown(self):
199         logging.shutdown()
200
201 import tools
202 init_logger()
203
204 class Agent(object):
205     """Singleton that keeps track of cancellable tasks to run at a given
206        timestamp.
207        The tasks are caracterised by:
208             * a timestamp
209             * the database on which the task run
210             * the function to call
211             * the arguments and keyword arguments to pass to the function
212
213         Implementation details:
214           Tasks are stored as list, allowing the cancellation by setting
215           the timestamp to 0.
216           A heapq is used to store tasks, so we don't need to sort
217           tasks ourself.
218     """
219     __tasks = []
220     __tasks_by_db = {}
221     _logger = Logger()
222
223     @classmethod
224     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
225         task = [timestamp, db_name, function, args, kwargs]
226         heapq.heappush(cls.__tasks, task)
227         cls.__tasks_by_db.setdefault(db_name, []).append(task)
228
229     @classmethod
230     def cancel(cls, db_name):
231         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
232         if db_name is None:
233             cls.__tasks, cls.__tasks_by_db = [], {}
234         else:
235             if db_name in cls.__tasks_by_db:
236                 for task in cls.__tasks_by_db[db_name]:
237                     task[0] = 0
238
239     @classmethod
240     def quit(cls):
241         cls.cancel(None)
242
243     @classmethod
244     def runner(cls):
245         """Neverending function (intended to be ran in a dedicated thread) that
246            checks every 60 seconds tasks to run.
247         """
248         current_thread = threading.currentThread()
249         while True:
250             while cls.__tasks and cls.__tasks[0][0] < time.time():
251                 task = heapq.heappop(cls.__tasks)
252                 timestamp, dbname, function, args, kwargs = task
253                 cls.__tasks_by_db[dbname].remove(task)
254                 if not timestamp:
255                     # null timestamp -> cancelled task
256                     continue
257                 current_thread.dbname = dbname   # hack hack
258                 cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs))
259                 delattr(current_thread, 'dbname')
260                 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
261                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
262                 task_thread.daemon = False
263                 task_thread.start()
264                 time.sleep(1)
265             time.sleep(60)
266
267 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
268 # the agent runner is a typical daemon thread, that will never quit and must be
269 # terminated when the main process exits - with no consequence (the processing
270 # threads it spawns are not marked daemon)
271 agent_runner.daemon = True
272 agent_runner.start()
273
274
275 import traceback
276
277 class xmlrpc(object):
278     class RpcGateway(object):
279         def __init__(self, name):
280             self.name = name
281
282 class OpenERPDispatcherException(Exception):
283     def __init__(self, exception, traceback):
284         self.exception = exception
285         self.traceback = traceback
286
287 class OpenERPDispatcher:
288     def log(self, title, msg):
289         if tools.config['log_level'] == logging.DEBUG_RPC:
290             Logger().notifyChannel('%s' % title, LOG_DEBUG_RPC, pformat(msg))
291
292     def dispatch(self, service_name, method, params):
293         if service_name not in GROUPS['web-services']:
294             raise Exception('AccessDenied')
295         try:
296             self.log('service', service_name)
297             self.log('method', method)
298             self.log('params', params)
299             result = LocalService(service_name)(method, *params)
300             if result is None: #we cannot marshal none in XMLRPC
301                 result = False
302             self.log('result', result)
303             return result
304         except Exception, e:
305             self.log('exception', tools.exception_to_unicode(e))
306             if hasattr(e, 'traceback'):
307                 tb = e.traceback
308             else:
309                 tb = sys.exc_info()
310             tb_s = "".join(traceback.format_exception(*tb))
311             if tools.config['debug_mode']:
312                 import pdb
313                 pdb.post_mortem(tb[2])
314             raise OpenERPDispatcherException(e, tb_s)
315
316 class GenericXMLRPCRequestHandler(OpenERPDispatcher):
317     def _dispatch(self, method, params):
318         try:
319             service_name = self.path.split("/")[-1]
320             return self.dispatch(service_name, method, params)
321         except OpenERPDispatcherException, e:
322             raise xmlrpclib.Fault(tools.exception_to_unicode(e.exception), e.traceback)
323
324 class SSLSocket(object):
325     def __init__(self, socket):
326         if not hasattr(socket, 'sock_shutdown'):
327             from OpenSSL import SSL
328             ctx = SSL.Context(SSL.SSLv23_METHOD)
329             ctx.use_privatekey_file(tools.config['secure_pkey_file'])
330             ctx.use_certificate_file(tools.config['secure_cert_file'])
331             self.socket = SSL.Connection(ctx, socket)
332         else:
333             self.socket = socket
334
335     def shutdown(self, how):
336         return self.socket.sock_shutdown(how)
337
338     def __getattr__(self, name):
339         return getattr(self.socket, name)
340
341 class SimpleXMLRPCRequestHandler(GenericXMLRPCRequestHandler, SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
342     rpc_paths = map(lambda s: '/xmlrpc/%s' % s, GROUPS.get('web-services', {}).keys())
343
344 class SecureXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
345     def setup(self):
346         self.connection = SSLSocket(self.request)
347         self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
348         self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
349
350 class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
351     def server_bind(self):
352         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
353         SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
354
355 class SecureThreadedXMLRPCServer(SimpleThreadedXMLRPCServer):
356     def __init__(self, server_address, HandlerClass, logRequests=1):
357         SimpleThreadedXMLRPCServer.__init__(self, server_address, HandlerClass, logRequests)
358         self.socket = SSLSocket(socket.socket(self.address_family, self.socket_type))
359         self.server_bind()
360         self.server_activate()
361
362
363 def _close_socket(sock):
364     if os.name != 'nt':     # XXX if someone know why, please leave a comment.
365         try:
366             sock.shutdown(getattr(socket, 'SHUT_RDWR', 2))
367         except socket.error, e:
368             if e.errno != errno.ENOTCONN: raise
369             # OSX, socket shutdowns both sides if any side closes it
370             # causing an error 57 'Socket is not connected' on shutdown
371             # of the other side (or something), see
372             # http://bugs.python.org/issue4397
373             Logger().notifyChannel(
374                 'server', LOG_DEBUG,
375                 '"%s" when shutting down server socket, '
376                 'this is normal under OS X'%e)
377     sock.close()
378
379
380 class HttpDaemon(threading.Thread):
381     def __init__(self, interface, port, secure=False):
382         threading.Thread.__init__(self)
383         self.__port = port
384         self.__interface = interface
385         self.secure = bool(secure)
386         handler_class = (SimpleXMLRPCRequestHandler, SecureXMLRPCRequestHandler)[self.secure]
387         server_class = (SimpleThreadedXMLRPCServer, SecureThreadedXMLRPCServer)[self.secure]
388
389         if self.secure:
390             from OpenSSL.SSL import Error as SSLError
391         else:
392             class SSLError(Exception): pass
393         try:
394             self.server = server_class((interface, port), handler_class, 0)
395         except SSLError, e:
396             Logger().notifyChannel('xml-rpc-ssl', LOG_CRITICAL, "Can not load the certificate and/or the private key files")
397             sys.exit(1)
398         except Exception, e:
399             Logger().notifyChannel('xml-rpc', LOG_CRITICAL, "Error occur when starting the server daemon: %s" % (e,))
400             sys.exit(1)
401
402
403     def attach(self, path, gw):
404         pass
405
406     def stop(self):
407         self.running = False
408         _close_socket(self.server.socket)
409
410     def run(self):
411         self.server.register_introspection_functions()
412
413         self.running = True
414         while self.running:
415             try:
416                 self.server.handle_request()
417             except (socket.error, select.error), e:
418                 if self.running or e.args[0] != errno.EBADF:
419                     raise
420         return True
421
422         # If the server need to be run recursively
423         #
424         #signal.signal(signal.SIGALRM, self.my_handler)
425         #signal.alarm(6)
426         #while True:
427         #   self.server.handle_request()
428         #signal.alarm(0)          # Disable the alarm
429
430 import tiny_socket
431 class TinySocketClientThread(threading.Thread, OpenERPDispatcher):
432     def __init__(self, sock, threads):
433         threading.Thread.__init__(self)
434         self.sock = sock
435         self.threads = threads
436
437     def run(self):
438         self.running = True
439         try:
440             ts = tiny_socket.mysocket(self.sock)
441         except:
442             self.sock.close()
443             self.threads.remove(self)
444             return False
445         while self.running:
446             try:
447                 msg = ts.myreceive()
448             except:
449                 self.sock.close()
450                 self.threads.remove(self)
451                 return False
452             try:
453                 result = self.dispatch(msg[0], msg[1], msg[2:])
454                 ts.mysend(result)
455             except OpenERPDispatcherException, e:
456                 new_e = Exception(tools.exception_to_unicode(e.exception)) # avoid problems of pickeling
457                 try:
458                     ts.mysend(new_e, exception=True, traceback=e.traceback)
459                 except:
460                     self.sock.close()
461                     self.threads.remove(self)
462                     return False
463
464             self.sock.close()
465             self.threads.remove(self)
466             return True
467
468     def stop(self):
469         self.running = False
470
471
472 class TinySocketServerThread(threading.Thread):
473     def __init__(self, interface, port, secure=False):
474         threading.Thread.__init__(self)
475         self.__port = port
476         self.__interface = interface
477         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
479         self.socket.bind((self.__interface, self.__port))
480         self.socket.listen(5)
481         self.threads = []
482
483     def run(self):
484         try:
485             self.running = True
486             while self.running:
487                 timeout = self.socket.gettimeout()
488                 fd_sets = select.select([self.socket], [], [], timeout)
489                 if not fd_sets[0]:
490                     continue
491                 (clientsocket, address) = self.socket.accept()
492                 ct = TinySocketClientThread(clientsocket, self.threads)
493                 self.threads.append(ct)
494                 ct.start()
495             self.socket.close()
496         except Exception, e:
497             self.socket.close()
498             return False
499
500     def stop(self):
501         self.running = False
502         for t in self.threads:
503             t.stop()
504         _close_socket(self.socket)
505
506 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: