[IMP] logging of rpc calls and cron jobs
[odoo/odoo.git] / openerp / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import errno
28 import logging
29 import logging.handlers
30 import os
31 import socket
32 import sys
33 import threading
34 import time
35 import release
36 from pprint import pformat
37 import warnings
38 import heapq
39
40 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
41 from loglevels import *
42 import tools
43
44 class Service(object):
45     """ Base class for *Local* services
46
47         Functionality here is trusted, no authentication.
48     """
49     _services = {}
50     def __init__(self, name, audience=''):
51         Service._services[name] = self
52         self.__name = name
53         self._methods = {}
54
55     def joinGroup(self, name):
56         raise Exception("No group for local services")
57         #GROUPS.setdefault(name, {})[self.__name] = self
58
59     @classmethod
60     def exists(cls, name):
61         return name in cls._services
62
63     @classmethod
64     def remove(cls, name):
65         if cls.exists(name):
66             cls._services.pop(name)
67
68     def exportMethod(self, method):
69         if callable(method):
70             self._methods[method.__name__] = method
71
72     def abortResponse(self, error, description, origin, details):
73         if not tools.config['debug_mode']:
74             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
75         else:
76             raise
77
78 class LocalService(object):
79     """ Proxy for local services. 
80     
81         Any instance of this class will behave like the single instance
82         of Service(name)
83     """
84     __logger = logging.getLogger('service')
85     def __init__(self, name):
86         self.__name = name
87         try:
88             self._service = Service._services[name]
89             for method_name, method_definition in self._service._methods.items():
90                 setattr(self, method_name, method_definition)
91         except KeyError, keyError:
92             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
93             raise
94
95     def __call__(self, method, *params):
96         return getattr(self, method)(*params)
97
98 class ExportService(object):
99     """ Proxy for exported services. 
100
101     All methods here should take an AuthProxy as their first parameter. It
102     will be appended by the calling framework.
103
104     Note that this class has no direct proxy, capable of calling 
105     eservice.method(). Rather, the proxy should call 
106     dispatch(method,auth,params)
107     """
108     
109     _services = {}
110     _groups = {}
111     _logger = logging.getLogger('web-services')
112     
113     def __init__(self, name, audience=''):
114         ExportService._services[name] = self
115         self.__name = name
116         self._logger.debug("Registered an exported service: %s" % name)
117
118     def joinGroup(self, name):
119         ExportService._groups.setdefault(name, {})[self.__name] = self
120
121     @classmethod
122     def getService(cls,name):
123         return cls._services[name]
124
125     def dispatch(self, method, auth, params):
126         raise Exception("stub dispatch at %s" % self.__name)
127         
128     def new_dispatch(self,method,auth,params):
129         raise Exception("stub dispatch at %s" % self.__name)
130
131     def abortResponse(self, error, description, origin, details):
132         if not tools.config['debug_mode']:
133             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
134         else:
135             raise
136
137 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
138 #The background is set with 40 plus the number of the color, and the foreground with 30
139 #These are the sequences need to get colored ouput
140 RESET_SEQ = "\033[0m"
141 COLOR_SEQ = "\033[1;%dm"
142 BOLD_SEQ = "\033[1m"
143 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
144 LEVEL_COLOR_MAPPING = {
145     logging.DEBUG_SQL: (WHITE, MAGENTA),
146     logging.DEBUG_RPC: (BLUE, WHITE),
147     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
148     logging.DEBUG: (BLUE, DEFAULT),
149     logging.INFO: (GREEN, DEFAULT),
150     logging.TEST: (WHITE, BLUE),
151     logging.WARNING: (YELLOW, DEFAULT),
152     logging.ERROR: (RED, DEFAULT),
153     logging.CRITICAL: (WHITE, RED),
154 }
155
156 class DBFormatter(logging.Formatter):
157     def format(self, record):
158         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
159         return logging.Formatter.format(self, record)
160
161 class ColoredFormatter(DBFormatter):
162     def format(self, record):
163         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
164         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
165         return DBFormatter.format(self, record)
166
167 def init_logger():
168     import os
169     from tools.translate import resetlocale
170     resetlocale()
171
172     # create a format for log messages and dates
173     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
174
175     if tools.config['syslog']:
176         # SysLog Handler
177         if os.name == 'nt':
178             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
179         else:
180             handler = logging.handlers.SysLogHandler('/dev/log')
181         format = '%s %s' % (release.description, release.version) \
182                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
183
184     elif tools.config['logfile']:
185         # LogFile Handler
186         logf = tools.config['logfile']
187         try:
188             dirname = os.path.dirname(logf)
189             if dirname and not os.path.isdir(dirname):
190                 os.makedirs(dirname)
191             if tools.config['logrotate'] is not False:
192                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
193             elif os.name == 'posix':
194                 handler = logging.handlers.WatchedFileHandler(logf)
195             else:
196                 handler = logging.handlers.FileHandler(logf)
197         except Exception:
198             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
199             handler = logging.StreamHandler(sys.stdout)
200     else:
201         # Normal Handler on standard output
202         handler = logging.StreamHandler(sys.stdout)
203
204     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
205         formatter = ColoredFormatter(format)
206     else:
207         formatter = DBFormatter(format)
208     handler.setFormatter(formatter)
209
210     # add the handler to the root logger
211     logger = logging.getLogger()
212     logger.handlers = []
213     logger.addHandler(handler)
214     logger.setLevel(int(tools.config['log_level'] or '0'))
215
216 class Agent(object):
217     """Singleton that keeps track of cancellable tasks to run at a given
218        timestamp.
219        The tasks are caracterised by:
220             * a timestamp
221             * the database on which the task run
222             * the function to call
223             * the arguments and keyword arguments to pass to the function
224
225         Implementation details:
226           Tasks are stored as list, allowing the cancellation by setting
227           the timestamp to 0.
228           A heapq is used to store tasks, so we don't need to sort
229           tasks ourself.
230     """
231     __tasks = []
232     __tasks_by_db = {}
233     _logger = logging.getLogger('netsvc.agent')
234
235     @classmethod
236     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
237         task = [timestamp, db_name, function, args, kwargs]
238         heapq.heappush(cls.__tasks, task)
239         cls.__tasks_by_db.setdefault(db_name, []).append(task)
240
241     @classmethod
242     def cancel(cls, db_name):
243         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
244         cls._logger.debug("Cancel timers for %s db", db_name or 'all')
245         if db_name is None:
246             cls.__tasks, cls.__tasks_by_db = [], {}
247         else:
248             if db_name in cls.__tasks_by_db:
249                 for task in cls.__tasks_by_db[db_name]:
250                     task[0] = 0
251
252     @classmethod
253     def quit(cls):
254         cls.cancel(None)
255
256     @classmethod
257     def runner(cls):
258         """Neverending function (intended to be ran in a dedicated thread) that
259            checks every 60 seconds tasks to run. TODO: make configurable
260         """
261         current_thread = threading.currentThread()
262         while True:
263             while cls.__tasks and cls.__tasks[0][0] < time.time():
264                 task = heapq.heappop(cls.__tasks)
265                 timestamp, dbname, function, args, kwargs = task
266                 cls.__tasks_by_db[dbname].remove(task)
267                 if not timestamp:
268                     # null timestamp -> cancelled task
269                     continue
270                 current_thread.dbname = dbname   # hack hack
271                 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
272                 delattr(current_thread, 'dbname')
273                 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
274                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
275                 task_thread.setDaemon(False)
276                 task_thread.start()
277                 time.sleep(1)
278             time.sleep(60)
279
280 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
281 # the agent runner is a typical daemon thread, that will never quit and must be
282 # terminated when the main process exits - with no consequence (the processing
283 # threads it spawns are not marked daemon)
284 agent_runner.setDaemon(True)
285 agent_runner.start()
286
287 import traceback
288
289 class Server:
290     """ Generic interface for all servers with an event loop etc.
291         Override this to impement http, net-rpc etc. servers.
292
293         Servers here must have threaded behaviour. start() must not block,
294         there is no run().
295     """
296     __is_started = False
297     __servers = []
298     __starter_threads = []
299
300     # we don't want blocking server calls (think select()) to
301     # wait forever and possibly prevent exiting the process,
302     # but instead we want a form of polling/busy_wait pattern, where
303     # _server_timeout should be used as the default timeout for
304     # all I/O blocking operations
305     _busywait_timeout = 0.5
306
307
308     __logger = logging.getLogger('server')
309
310     def __init__(self):
311         Server.__servers.append(self)
312         if Server.__is_started:
313             # raise Exception('All instances of servers must be inited before the startAll()')
314             # Since the startAll() won't be called again, allow this server to
315             # init and then start it after 1sec (hopefully). Register that
316             # timer thread in a list, so that we can abort the start if quitAll
317             # is called in the meantime
318             t = threading.Timer(1.0, self._late_start)
319             t.name = 'Late start timer for %s' % str(self.__class__)
320             Server.__starter_threads.append(t)
321             t.start()
322
323     def start(self):
324         self.__logger.debug("called stub Server.start")
325         
326     def _late_start(self):
327         self.start()
328         for thr in Server.__starter_threads:
329             if thr.finished.is_set():
330                 Server.__starter_threads.remove(thr)
331
332     def stop(self):
333         self.__logger.debug("called stub Server.stop")
334
335     def stats(self):
336         """ This function should return statistics about the server """
337         return "%s: No statistics" % str(self.__class__)
338
339     @classmethod
340     def startAll(cls):
341         if cls.__is_started:
342             return
343         cls.__logger.info("Starting %d services" % len(cls.__servers))
344         for srv in cls.__servers:
345             srv.start()
346         cls.__is_started = True
347
348     @classmethod
349     def quitAll(cls):
350         if not cls.__is_started:
351             return
352         cls.__logger.info("Stopping %d services" % len(cls.__servers))
353         for thr in cls.__starter_threads:
354             if not thr.finished.is_set():
355                 thr.cancel()
356             cls.__starter_threads.remove(thr)
357
358         for srv in cls.__servers:
359             srv.stop()
360         cls.__is_started = False
361
362     @classmethod
363     def allStats(cls):
364         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
365         res.extend(srv.stats() for srv in cls.__servers)
366         return '\n'.join(res)
367
368     def _close_socket(self):
369         if os.name != 'nt':
370             try:
371                 self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
372             except socket.error, e:
373                 if e.errno != errno.ENOTCONN: raise
374                 # OSX, socket shutdowns both sides if any side closes it
375                 # causing an error 57 'Socket is not connected' on shutdown
376                 # of the other side (or something), see
377                 # http://bugs.python.org/issue4397
378                 self.__logger.debug(
379                     '"%s" when shutting down server socket, '
380                     'this is normal under OS X', e)
381         self.socket.close()
382
383 class OpenERPDispatcherException(Exception):
384     def __init__(self, exception, traceback):
385         self.exception = exception
386         self.traceback = traceback
387
388 def replace_request_password(args):
389     # password is always 3rd argument in a request, we replace it in RPC logs
390     # so it's easier to forward logs for diagnostics/debugging purposes...
391     args = list(args)
392     if len(args) > 2:
393         args[2] = '*'
394     return args
395
396 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
397     logger = logging.getLogger(title)
398     if logger.isEnabledFor(channel):
399         indent=0
400         for line in (fn+pformat(msg, depth=depth)).split('\n'):
401             logger.log(channel, ' '*indent+line)
402             indent=len(fn)
403
404 class OpenERPDispatcher:
405     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
406         log(title, msg, channel=channel, depth=depth, fn=fn)
407     def dispatch(self, service_name, method, params):
408         try:
409             logger = logging.getLogger('result')
410             self.log('service', tuple(replace_request_password(params)), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='%s.%s'%(service_name,method))
411             auth = getattr(self, 'auth_provider', None)
412             result = ExportService.getService(service_name).dispatch(method, auth, params)
413             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
414             return result
415         except Exception, e:
416             self.log('exception', tools.exception_to_unicode(e))
417             tb = getattr(e, 'traceback', sys.exc_info())
418             tb_s = "".join(traceback.format_exception(*tb))
419             if tools.config['debug_mode']:
420                 import pdb
421                 pdb.post_mortem(tb[2])
422             raise OpenERPDispatcherException(e, tb_s)
423
424 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: