[REF] explicit service objects instanciation.
[odoo/odoo.git] / openerp / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import errno
28 import logging
29 import logging.handlers
30 import os
31 import socket
32 import sys
33 import threading
34 import time
35 import release
36 from pprint import pformat
37 import warnings
38 import heapq
39
40 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
41 from loglevels import *
42 import tools
43
44 class Service(object):
45     """ Base class for *Local* services
46
47         Functionality here is trusted, no authentication.
48     """
49     _services = {}
50     def __init__(self, name, audience=''):
51         Service._services[name] = self
52         self.__name = name
53         self._methods = {}
54
55     def joinGroup(self, name):
56         raise Exception("No group for local services")
57         #GROUPS.setdefault(name, {})[self.__name] = self
58
59     @classmethod
60     def exists(cls, name):
61         return name in cls._services
62
63     @classmethod
64     def remove(cls, name):
65         if cls.exists(name):
66             cls._services.pop(name)
67
68     def exportMethod(self, method):
69         if callable(method):
70             self._methods[method.__name__] = method
71
72     def abortResponse(self, error, description, origin, details):
73         if not tools.config['debug_mode']:
74             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
75         else:
76             raise
77
78 class LocalService(object):
79     """ Proxy for local services. 
80     
81         Any instance of this class will behave like the single instance
82         of Service(name)
83     """
84     __logger = logging.getLogger('service')
85     def __init__(self, name):
86         self.__name = name
87         try:
88             self._service = Service._services[name]
89             for method_name, method_definition in self._service._methods.items():
90                 setattr(self, method_name, method_definition)
91         except KeyError, keyError:
92             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
93             raise
94
95     def __call__(self, method, *params):
96         return getattr(self, method)(*params)
97
98 class ExportService(object):
99     """ Proxy for exported services. 
100
101     All methods here should take an AuthProxy as their first parameter. It
102     will be appended by the calling framework.
103
104     Note that this class has no direct proxy, capable of calling 
105     eservice.method(). Rather, the proxy should call 
106     dispatch(method,auth,params)
107     """
108     
109     _services = {}
110     _groups = {}
111     _logger = logging.getLogger('web-services')
112     
113     def __init__(self, name, audience=''):
114         ExportService._services[name] = self
115         self.__name = name
116         self._logger.debug("Registered an exported service: %s" % name)
117
118     def joinGroup(self, name):
119         ExportService._groups.setdefault(name, {})[self.__name] = self
120
121     @classmethod
122     def getService(cls,name):
123         return cls._services[name]
124
125     def dispatch(self, method, auth, params):
126         raise Exception("stub dispatch at %s" % self.__name)
127         
128     def new_dispatch(self,method,auth,params):
129         raise Exception("stub dispatch at %s" % self.__name)
130
131     def abortResponse(self, error, description, origin, details):
132         if not tools.config['debug_mode']:
133             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
134         else:
135             raise
136
137 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
138 #The background is set with 40 plus the number of the color, and the foreground with 30
139 #These are the sequences need to get colored ouput
140 RESET_SEQ = "\033[0m"
141 COLOR_SEQ = "\033[1;%dm"
142 BOLD_SEQ = "\033[1m"
143 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
144 LEVEL_COLOR_MAPPING = {
145     logging.DEBUG_SQL: (WHITE, MAGENTA),
146     logging.DEBUG_RPC: (BLUE, WHITE),
147     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
148     logging.DEBUG: (BLUE, DEFAULT),
149     logging.INFO: (GREEN, DEFAULT),
150     logging.TEST: (WHITE, BLUE),
151     logging.WARNING: (YELLOW, DEFAULT),
152     logging.ERROR: (RED, DEFAULT),
153     logging.CRITICAL: (WHITE, RED),
154 }
155
156 class DBFormatter(logging.Formatter):
157     def format(self, record):
158         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
159         return logging.Formatter.format(self, record)
160
161 class ColoredFormatter(DBFormatter):
162     def format(self, record):
163         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
164         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
165         return DBFormatter.format(self, record)
166
167 def init_logger():
168     import os
169     from tools.translate import resetlocale
170     resetlocale()
171
172     # create a format for log messages and dates
173     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
174
175     if tools.config['syslog']:
176         # SysLog Handler
177         if os.name == 'nt':
178             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
179         else:
180             handler = logging.handlers.SysLogHandler('/dev/log')
181         format = '%s %s' % (release.description, release.version) \
182                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
183
184     elif tools.config['logfile']:
185         # LogFile Handler
186         logf = tools.config['logfile']
187         try:
188             dirname = os.path.dirname(logf)
189             if dirname and not os.path.isdir(dirname):
190                 os.makedirs(dirname)
191             if tools.config['logrotate'] is not False:
192                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
193             elif os.name == 'posix':
194                 handler = logging.handlers.WatchedFileHandler(logf)
195             else:
196                 handler = logging.handlers.FileHandler(logf)
197         except Exception:
198             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
199             handler = logging.StreamHandler(sys.stdout)
200     else:
201         # Normal Handler on standard output
202         handler = logging.StreamHandler(sys.stdout)
203
204     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
205         formatter = ColoredFormatter(format)
206     else:
207         formatter = DBFormatter(format)
208     handler.setFormatter(formatter)
209
210     # add the handler to the root logger
211     logger = logging.getLogger()
212     logger.handlers = []
213     logger.addHandler(handler)
214     logger.setLevel(int(tools.config['log_level'] or '0'))
215
216 # A alternative logging scheme for automated runs of the
217 # server intended to test it.
218 def init_alternative_logger():
219     class H(logging.Handler):
220       def emit(self, record):
221         if record.levelno > 20:
222           print record.levelno, record.pathname, record.msg
223     handler = H()
224     logger = logging.getLogger()
225     logger.handlers = []
226     logger.addHandler(handler)
227     logger.setLevel(logging.ERROR)
228
229 class Agent(object):
230     """Singleton that keeps track of cancellable tasks to run at a given
231        timestamp.
232        The tasks are caracterised by:
233             * a timestamp
234             * the database on which the task run
235             * the function to call
236             * the arguments and keyword arguments to pass to the function
237
238         Implementation details:
239           Tasks are stored as list, allowing the cancellation by setting
240           the timestamp to 0.
241           A heapq is used to store tasks, so we don't need to sort
242           tasks ourself.
243     """
244     __tasks = []
245     __tasks_by_db = {}
246     _logger = logging.getLogger('netsvc.agent')
247
248     @classmethod
249     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
250         task = [timestamp, db_name, function, args, kwargs]
251         heapq.heappush(cls.__tasks, task)
252         cls.__tasks_by_db.setdefault(db_name, []).append(task)
253
254     @classmethod
255     def cancel(cls, db_name):
256         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
257         cls._logger.debug("Cancel timers for %s db", db_name or 'all')
258         if db_name is None:
259             cls.__tasks, cls.__tasks_by_db = [], {}
260         else:
261             if db_name in cls.__tasks_by_db:
262                 for task in cls.__tasks_by_db[db_name]:
263                     task[0] = 0
264
265     @classmethod
266     def quit(cls):
267         cls.cancel(None)
268
269     @classmethod
270     def runner(cls):
271         """Neverending function (intended to be ran in a dedicated thread) that
272            checks every 60 seconds tasks to run. TODO: make configurable
273         """
274         current_thread = threading.currentThread()
275         while True:
276             while cls.__tasks and cls.__tasks[0][0] < time.time():
277                 task = heapq.heappop(cls.__tasks)
278                 timestamp, dbname, function, args, kwargs = task
279                 cls.__tasks_by_db[dbname].remove(task)
280                 if not timestamp:
281                     # null timestamp -> cancelled task
282                     continue
283                 current_thread.dbname = dbname   # hack hack
284                 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
285                 delattr(current_thread, 'dbname')
286                 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
287                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
288                 task_thread.setDaemon(False)
289                 task_thread.start()
290                 time.sleep(1)
291             time.sleep(60)
292
293 def start_agent():
294     agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
295     # the agent runner is a typical daemon thread, that will never quit and must be
296     # terminated when the main process exits - with no consequence (the processing
297     # threads it spawns are not marked daemon)
298     agent_runner.setDaemon(True)
299     agent_runner.start()
300
301 import traceback
302
303 class Server:
304     """ Generic interface for all servers with an event loop etc.
305         Override this to impement http, net-rpc etc. servers.
306
307         Servers here must have threaded behaviour. start() must not block,
308         there is no run().
309     """
310     __is_started = False
311     __servers = []
312     __starter_threads = []
313
314     # we don't want blocking server calls (think select()) to
315     # wait forever and possibly prevent exiting the process,
316     # but instead we want a form of polling/busy_wait pattern, where
317     # _server_timeout should be used as the default timeout for
318     # all I/O blocking operations
319     _busywait_timeout = 0.5
320
321
322     __logger = logging.getLogger('server')
323
324     def __init__(self):
325         Server.__servers.append(self)
326         if Server.__is_started:
327             # raise Exception('All instances of servers must be inited before the startAll()')
328             # Since the startAll() won't be called again, allow this server to
329             # init and then start it after 1sec (hopefully). Register that
330             # timer thread in a list, so that we can abort the start if quitAll
331             # is called in the meantime
332             t = threading.Timer(1.0, self._late_start)
333             t.name = 'Late start timer for %s' % str(self.__class__)
334             Server.__starter_threads.append(t)
335             t.start()
336
337     def start(self):
338         self.__logger.debug("called stub Server.start")
339         
340     def _late_start(self):
341         self.start()
342         for thr in Server.__starter_threads:
343             if thr.finished.is_set():
344                 Server.__starter_threads.remove(thr)
345
346     def stop(self):
347         self.__logger.debug("called stub Server.stop")
348
349     def stats(self):
350         """ This function should return statistics about the server """
351         return "%s: No statistics" % str(self.__class__)
352
353     @classmethod
354     def startAll(cls):
355         if cls.__is_started:
356             return
357         cls.__logger.info("Starting %d services" % len(cls.__servers))
358         for srv in cls.__servers:
359             srv.start()
360         cls.__is_started = True
361
362     @classmethod
363     def quitAll(cls):
364         if not cls.__is_started:
365             return
366         cls.__logger.info("Stopping %d services" % len(cls.__servers))
367         for thr in cls.__starter_threads:
368             if not thr.finished.is_set():
369                 thr.cancel()
370             cls.__starter_threads.remove(thr)
371
372         for srv in cls.__servers:
373             srv.stop()
374         cls.__is_started = False
375
376     @classmethod
377     def allStats(cls):
378         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
379         res.extend(srv.stats() for srv in cls.__servers)
380         return '\n'.join(res)
381
382     def _close_socket(self):
383         if os.name != 'nt':
384             try:
385                 self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
386             except socket.error, e:
387                 if e.errno != errno.ENOTCONN: raise
388                 # OSX, socket shutdowns both sides if any side closes it
389                 # causing an error 57 'Socket is not connected' on shutdown
390                 # of the other side (or something), see
391                 # http://bugs.python.org/issue4397
392                 self.__logger.debug(
393                     '"%s" when shutting down server socket, '
394                     'this is normal under OS X', e)
395         self.socket.close()
396
397 class OpenERPDispatcherException(Exception):
398     def __init__(self, exception, traceback):
399         self.exception = exception
400         self.traceback = traceback
401
402 def replace_request_password(args):
403     # password is always 3rd argument in a request, we replace it in RPC logs
404     # so it's easier to forward logs for diagnostics/debugging purposes...
405     args = list(args)
406     if len(args) > 2:
407         args[2] = '*'
408     return args
409
410 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
411     logger = logging.getLogger(title)
412     if logger.isEnabledFor(channel):
413         indent=''
414         indent_after=' '*len(fn)
415         for line in (fn+pformat(msg, depth=depth)).split('\n'):
416             logger.log(channel, indent+line)
417             indent=indent_after
418
419 class OpenERPDispatcher:
420     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
421         log(title, msg, channel=channel, depth=depth, fn=fn)
422     def dispatch(self, service_name, method, params):
423         try:
424             logger = logging.getLogger('result')
425             self.log('service', tuple(replace_request_password(params)), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='%s.%s'%(service_name,method))
426             auth = getattr(self, 'auth_provider', None)
427             result = ExportService.getService(service_name).dispatch(method, auth, params)
428             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
429             return result
430         except Exception, e:
431             self.log('exception', tools.exception_to_unicode(e))
432             tb = getattr(e, 'traceback', sys.exc_info())
433             tb_s = "".join(traceback.format_exception(*tb))
434             if tools.config['debug_mode']:
435                 import pdb
436                 pdb.post_mortem(tb[2])
437             raise OpenERPDispatcherException(e, tb_s)
438
439 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: