2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 # The refactoring about the OpenSSL support come from Tryton
8 # Copyright (C) 2007-2009 Cédric Krier.
9 # Copyright (C) 2007-2009 Bertrand Chenal.
10 # Copyright (C) 2008 B2CK SPRL.
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 ##############################################################################
29 import logging.handlers
36 from pprint import pformat
40 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
41 from loglevels import *
44 class Service(object):
45 """ Base class for *Local* services
47 Functionality here is trusted, no authentication.
50 def __init__(self, name, audience=''):
51 Service._services[name] = self
55 def joinGroup(self, name):
56 raise Exception("No group for local services")
57 #GROUPS.setdefault(name, {})[self.__name] = self
60 def exists(cls, name):
61 return name in cls._services
64 def remove(cls, name):
66 cls._services.pop(name)
68 def exportMethod(self, method):
70 self._methods[method.__name__] = method
72 def abortResponse(self, error, description, origin, details):
73 if not tools.config['debug_mode']:
74 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
78 class LocalService(object):
79 """ Proxy for local services.
81 Any instance of this class will behave like the single instance
84 __logger = logging.getLogger('service')
85 def __init__(self, name):
88 self._service = Service._services[name]
89 for method_name, method_definition in self._service._methods.items():
90 setattr(self, method_name, method_definition)
91 except KeyError, keyError:
92 self.__logger.error('This service does not exist: %s' % (str(keyError),) )
95 def __call__(self, method, *params):
96 return getattr(self, method)(*params)
98 class ExportService(object):
99 """ Proxy for exported services.
101 All methods here should take an AuthProxy as their first parameter. It
102 will be appended by the calling framework.
104 Note that this class has no direct proxy, capable of calling
105 eservice.method(). Rather, the proxy should call
106 dispatch(method,auth,params)
111 _logger = logging.getLogger('web-services')
113 def __init__(self, name, audience=''):
114 ExportService._services[name] = self
116 self._logger.debug("Registered an exported service: %s" % name)
118 def joinGroup(self, name):
119 ExportService._groups.setdefault(name, {})[self.__name] = self
122 def getService(cls,name):
123 return cls._services[name]
125 def dispatch(self, method, auth, params):
126 raise Exception("stub dispatch at %s" % self.__name)
128 def new_dispatch(self,method,auth,params):
129 raise Exception("stub dispatch at %s" % self.__name)
131 def abortResponse(self, error, description, origin, details):
132 if not tools.config['debug_mode']:
133 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
137 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
138 #The background is set with 40 plus the number of the color, and the foreground with 30
139 #These are the sequences need to get colored ouput
140 RESET_SEQ = "\033[0m"
141 COLOR_SEQ = "\033[1;%dm"
143 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
144 LEVEL_COLOR_MAPPING = {
145 logging.DEBUG_SQL: (WHITE, MAGENTA),
146 logging.DEBUG_RPC: (BLUE, WHITE),
147 logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
148 logging.DEBUG: (BLUE, DEFAULT),
149 logging.INFO: (GREEN, DEFAULT),
150 logging.TEST: (WHITE, BLUE),
151 logging.WARNING: (YELLOW, DEFAULT),
152 logging.ERROR: (RED, DEFAULT),
153 logging.CRITICAL: (WHITE, RED),
156 class DBFormatter(logging.Formatter):
157 def format(self, record):
158 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
159 return logging.Formatter.format(self, record)
161 class ColoredFormatter(DBFormatter):
162 def format(self, record):
163 fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
164 record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
165 return DBFormatter.format(self, record)
169 from tools.translate import resetlocale
172 # create a format for log messages and dates
173 format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
175 if tools.config['syslog']:
178 handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
180 handler = logging.handlers.SysLogHandler('/dev/log')
181 format = '%s %s' % (release.description, release.version) \
182 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
184 elif tools.config['logfile']:
186 logf = tools.config['logfile']
188 dirname = os.path.dirname(logf)
189 if dirname and not os.path.isdir(dirname):
191 if tools.config['logrotate'] is not False:
192 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
193 elif os.name == 'posix':
194 handler = logging.handlers.WatchedFileHandler(logf)
196 handler = logging.handlers.FileHandler(logf)
198 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
199 handler = logging.StreamHandler(sys.stdout)
201 # Normal Handler on standard output
202 handler = logging.StreamHandler(sys.stdout)
204 if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
205 formatter = ColoredFormatter(format)
207 formatter = DBFormatter(format)
208 handler.setFormatter(formatter)
210 # add the handler to the root logger
211 logger = logging.getLogger()
213 logger.addHandler(handler)
214 logger.setLevel(int(tools.config['log_level'] or '0'))
216 # A alternative logging scheme for automated runs of the
217 # server intended to test it.
218 def init_alternative_logger():
219 class H(logging.Handler):
220 def emit(self, record):
221 if record.levelno > 20:
222 print record.levelno, record.pathname, record.msg
224 logger = logging.getLogger()
226 logger.addHandler(handler)
227 logger.setLevel(logging.ERROR)
230 """Singleton that keeps track of cancellable tasks to run at a given
232 The tasks are caracterised by:
234 * the database on which the task run
235 * the function to call
236 * the arguments and keyword arguments to pass to the function
238 Implementation details:
239 Tasks are stored as list, allowing the cancellation by setting
241 A heapq is used to store tasks, so we don't need to sort
246 _logger = logging.getLogger('netsvc.agent')
249 def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
250 task = [timestamp, db_name, function, args, kwargs]
251 heapq.heappush(cls.__tasks, task)
252 cls.__tasks_by_db.setdefault(db_name, []).append(task)
255 def cancel(cls, db_name):
256 """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
257 cls._logger.debug("Cancel timers for %s db", db_name or 'all')
259 cls.__tasks, cls.__tasks_by_db = [], {}
261 if db_name in cls.__tasks_by_db:
262 for task in cls.__tasks_by_db[db_name]:
271 """Neverending function (intended to be ran in a dedicated thread) that
272 checks every 60 seconds tasks to run. TODO: make configurable
274 current_thread = threading.currentThread()
276 while cls.__tasks and cls.__tasks[0][0] < time.time():
277 task = heapq.heappop(cls.__tasks)
278 timestamp, dbname, function, args, kwargs = task
279 cls.__tasks_by_db[dbname].remove(task)
281 # null timestamp -> cancelled task
283 current_thread.dbname = dbname # hack hack
284 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
285 delattr(current_thread, 'dbname')
286 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
287 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
288 task_thread.setDaemon(False)
294 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
295 # the agent runner is a typical daemon thread, that will never quit and must be
296 # terminated when the main process exits - with no consequence (the processing
297 # threads it spawns are not marked daemon)
298 agent_runner.setDaemon(True)
304 """ Generic interface for all servers with an event loop etc.
305 Override this to impement http, net-rpc etc. servers.
307 Servers here must have threaded behaviour. start() must not block,
312 __starter_threads = []
314 # we don't want blocking server calls (think select()) to
315 # wait forever and possibly prevent exiting the process,
316 # but instead we want a form of polling/busy_wait pattern, where
317 # _server_timeout should be used as the default timeout for
318 # all I/O blocking operations
319 _busywait_timeout = 0.5
322 __logger = logging.getLogger('server')
325 Server.__servers.append(self)
326 if Server.__is_started:
327 # raise Exception('All instances of servers must be inited before the startAll()')
328 # Since the startAll() won't be called again, allow this server to
329 # init and then start it after 1sec (hopefully). Register that
330 # timer thread in a list, so that we can abort the start if quitAll
331 # is called in the meantime
332 t = threading.Timer(1.0, self._late_start)
333 t.name = 'Late start timer for %s' % str(self.__class__)
334 Server.__starter_threads.append(t)
338 self.__logger.debug("called stub Server.start")
340 def _late_start(self):
342 for thr in Server.__starter_threads:
343 if thr.finished.is_set():
344 Server.__starter_threads.remove(thr)
347 self.__logger.debug("called stub Server.stop")
350 """ This function should return statistics about the server """
351 return "%s: No statistics" % str(self.__class__)
357 cls.__logger.info("Starting %d services" % len(cls.__servers))
358 for srv in cls.__servers:
360 cls.__is_started = True
364 if not cls.__is_started:
366 cls.__logger.info("Stopping %d services" % len(cls.__servers))
367 for thr in cls.__starter_threads:
368 if not thr.finished.is_set():
370 cls.__starter_threads.remove(thr)
372 for srv in cls.__servers:
374 cls.__is_started = False
378 res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
379 res.extend(srv.stats() for srv in cls.__servers)
380 return '\n'.join(res)
382 def _close_socket(self):
385 self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
386 except socket.error, e:
387 if e.errno != errno.ENOTCONN: raise
388 # OSX, socket shutdowns both sides if any side closes it
389 # causing an error 57 'Socket is not connected' on shutdown
390 # of the other side (or something), see
391 # http://bugs.python.org/issue4397
393 '"%s" when shutting down server socket, '
394 'this is normal under OS X', e)
397 class OpenERPDispatcherException(Exception):
398 def __init__(self, exception, traceback):
399 self.exception = exception
400 self.traceback = traceback
402 def replace_request_password(args):
403 # password is always 3rd argument in a request, we replace it in RPC logs
404 # so it's easier to forward logs for diagnostics/debugging purposes...
410 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
411 logger = logging.getLogger(title)
412 if logger.isEnabledFor(channel):
414 indent_after=' '*len(fn)
415 for line in (fn+pformat(msg, depth=depth)).split('\n'):
416 logger.log(channel, indent+line)
419 class OpenERPDispatcher:
420 def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
421 log(title, msg, channel=channel, depth=depth, fn=fn)
422 def dispatch(self, service_name, method, params):
424 logger = logging.getLogger('result')
425 self.log('service', tuple(replace_request_password(params)), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='%s.%s'%(service_name,method))
426 auth = getattr(self, 'auth_provider', None)
427 result = ExportService.getService(service_name).dispatch(method, auth, params)
428 self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
431 self.log('exception', tools.exception_to_unicode(e))
432 tb = getattr(e, 'traceback', sys.exc_info())
433 tb_s = "".join(traceback.format_exception(*tb))
434 if tools.config['debug_mode']:
436 pdb.post_mortem(tb[2])
437 raise OpenERPDispatcherException(e, tb_s)
439 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: