2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
26 import logging.handlers
35 from pprint import pformat
37 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
38 from loglevels import *
41 def close_socket(sock):
42 """ Closes a socket instance cleanly
44 :param sock: the network socket to close
45 :type sock: socket.socket
48 sock.shutdown(socket.SHUT_RDWR)
49 except socket.error, e:
50 # On OSX, socket shutdowns both sides if any side closes it
51 # causing an error 57 'Socket is not connected' on shutdown
52 # of the other side (or something), see
53 # http://bugs.python.org/issue4397
54 # note: stdlib fixed test, not behavior
55 if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
60 #.apidoc title: Common Services: netsvc
61 #.apidoc module-mods: member-order: bysource
63 class Service(object):
64 """ Base class for *Local* services
66 Functionality here is trusted, no authentication.
69 def __init__(self, name, audience=''):
70 Service._services[name] = self
74 def joinGroup(self, name):
75 raise Exception("No group for local services")
76 #GROUPS.setdefault(name, {})[self.__name] = self
79 def exists(cls, name):
80 return name in cls._services
83 def remove(cls, name):
85 cls._services.pop(name)
87 def exportMethod(self, method):
89 self._methods[method.__name__] = method
91 def abortResponse(self, error, description, origin, details):
92 if not tools.config['debug_mode']:
93 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
97 def LocalService(name):
98 return Service._services[name]
100 class ExportService(object):
101 """ Proxy for exported services.
103 All methods here should take an AuthProxy as their first parameter. It
104 will be appended by the calling framework.
106 Note that this class has no direct proxy, capable of calling
107 eservice.method(). Rather, the proxy should call
108 dispatch(method,auth,params)
113 _logger = logging.getLogger('web-services')
115 def __init__(self, name, audience=''):
116 ExportService._services[name] = self
118 self._logger.debug("Registered an exported service: %s" % name)
120 def joinGroup(self, name):
121 ExportService._groups.setdefault(name, {})[self.__name] = self
124 def getService(cls,name):
125 return cls._services[name]
127 def dispatch(self, method, auth, params):
128 raise Exception("stub dispatch at %s" % self.__name)
130 def abortResponse(self, error, description, origin, details):
131 if not tools.config['debug_mode']:
132 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
136 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
137 #The background is set with 40 plus the number of the color, and the foreground with 30
138 #These are the sequences need to get colored ouput
139 RESET_SEQ = "\033[0m"
140 COLOR_SEQ = "\033[1;%dm"
142 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
143 LEVEL_COLOR_MAPPING = {
144 logging.DEBUG_SQL: (WHITE, MAGENTA),
145 logging.DEBUG_RPC: (BLUE, WHITE),
146 logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
147 logging.DEBUG: (BLUE, DEFAULT),
148 logging.INFO: (GREEN, DEFAULT),
149 logging.TEST: (WHITE, BLUE),
150 logging.WARNING: (YELLOW, DEFAULT),
151 logging.ERROR: (RED, DEFAULT),
152 logging.CRITICAL: (WHITE, RED),
155 class DBFormatter(logging.Formatter):
156 def format(self, record):
157 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
158 return logging.Formatter.format(self, record)
160 class ColoredFormatter(DBFormatter):
161 def format(self, record):
162 fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
163 record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
164 return DBFormatter.format(self, record)
167 from tools.translate import resetlocale
170 # create a format for log messages and dates
171 format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
173 if tools.config['syslog']:
176 handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
178 handler = logging.handlers.SysLogHandler('/dev/log')
179 format = '%s %s' % (release.description, release.version) \
180 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
182 elif tools.config['logfile']:
184 logf = tools.config['logfile']
186 dirname = os.path.dirname(logf)
187 if dirname and not os.path.isdir(dirname):
189 if tools.config['logrotate'] is not False:
190 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
191 elif os.name == 'posix':
192 handler = logging.handlers.WatchedFileHandler(logf)
194 handler = logging.handlers.FileHandler(logf)
196 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
197 handler = logging.StreamHandler(sys.stdout)
199 # Normal Handler on standard output
200 handler = logging.StreamHandler(sys.stdout)
202 if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
203 formatter = ColoredFormatter(format)
205 formatter = DBFormatter(format)
206 handler.setFormatter(formatter)
208 # add the handler to the root logger
209 logger = logging.getLogger()
211 logger.addHandler(handler)
212 logger.setLevel(int(tools.config['log_level'] or '0'))
214 # A alternative logging scheme for automated runs of the
215 # server intended to test it.
216 def init_alternative_logger():
217 class H(logging.Handler):
218 def emit(self, record):
219 if record.levelno > 20:
220 print record.levelno, record.pathname, record.msg
222 logger = logging.getLogger()
224 logger.addHandler(handler)
225 logger.setLevel(logging.ERROR)
228 """ Singleton that keeps track of cancellable tasks to run at a given
231 The tasks are characterised by:
234 * the database on which the task run
235 * the function to call
236 * the arguments and keyword arguments to pass to the function
238 Implementation details:
240 - Tasks are stored as list, allowing the cancellation by setting
242 - A heapq is used to store tasks, so we don't need to sort
247 _logger = logging.getLogger('netsvc.agent')
250 def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
251 task = [timestamp, db_name, function, args, kwargs]
252 heapq.heappush(cls.__tasks, task)
253 cls.__tasks_by_db.setdefault(db_name, []).append(task)
256 def cancel(cls, db_name):
257 """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
258 cls._logger.debug("Cancel timers for %s db", db_name or 'all')
260 cls.__tasks, cls.__tasks_by_db = [], {}
262 if db_name in cls.__tasks_by_db:
263 for task in cls.__tasks_by_db[db_name]:
272 """Neverending function (intended to be ran in a dedicated thread) that
273 checks every 60 seconds tasks to run. TODO: make configurable
275 current_thread = threading.currentThread()
277 while cls.__tasks and cls.__tasks[0][0] < time.time():
278 task = heapq.heappop(cls.__tasks)
279 timestamp, dbname, function, args, kwargs = task
280 cls.__tasks_by_db[dbname].remove(task)
282 # null timestamp -> cancelled task
284 current_thread.dbname = dbname # hack hack
285 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
286 delattr(current_thread, 'dbname')
287 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
288 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
289 task_thread.setDaemon(False)
295 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
296 # the agent runner is a typical daemon thread, that will never quit and must be
297 # terminated when the main process exits - with no consequence (the processing
298 # threads it spawns are not marked daemon)
299 agent_runner.setDaemon(True)
305 """ Generic interface for all servers with an event loop etc.
306 Override this to impement http, net-rpc etc. servers.
308 Servers here must have threaded behaviour. start() must not block,
313 __starter_threads = []
315 # we don't want blocking server calls (think select()) to
316 # wait forever and possibly prevent exiting the process,
317 # but instead we want a form of polling/busy_wait pattern, where
318 # _server_timeout should be used as the default timeout for
319 # all I/O blocking operations
320 _busywait_timeout = 0.5
323 __logger = logging.getLogger('server')
326 Server.__servers.append(self)
327 if Server.__is_started:
328 # raise Exception('All instances of servers must be inited before the startAll()')
329 # Since the startAll() won't be called again, allow this server to
330 # init and then start it after 1sec (hopefully). Register that
331 # timer thread in a list, so that we can abort the start if quitAll
332 # is called in the meantime
333 t = threading.Timer(1.0, self._late_start)
334 t.name = 'Late start timer for %s' % str(self.__class__)
335 Server.__starter_threads.append(t)
339 self.__logger.debug("called stub Server.start")
341 def _late_start(self):
343 for thr in Server.__starter_threads:
344 if thr.finished.is_set():
345 Server.__starter_threads.remove(thr)
348 self.__logger.debug("called stub Server.stop")
351 """ This function should return statistics about the server """
352 return "%s: No statistics" % str(self.__class__)
358 cls.__logger.info("Starting %d services" % len(cls.__servers))
359 for srv in cls.__servers:
361 cls.__is_started = True
365 if not cls.__is_started:
367 cls.__logger.info("Stopping %d services" % len(cls.__servers))
368 for thr in cls.__starter_threads:
369 if not thr.finished.is_set():
371 cls.__starter_threads.remove(thr)
373 for srv in cls.__servers:
375 cls.__is_started = False
379 res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
380 res.extend(srv.stats() for srv in cls.__servers)
381 return '\n'.join(res)
383 def _close_socket(self):
384 close_socket(self.socket)
386 class OpenERPDispatcherException(Exception):
387 def __init__(self, exception, traceback):
388 self.exception = exception
389 self.traceback = traceback
391 def replace_request_password(args):
392 # password is always 3rd argument in a request, we replace it in RPC logs
393 # so it's easier to forward logs for diagnostics/debugging purposes...
399 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
400 logger = logging.getLogger(title)
401 if logger.isEnabledFor(channel):
403 indent_after=' '*len(fn)
404 for line in (fn+pformat(msg, depth=depth)).split('\n'):
405 logger.log(channel, indent+line)
408 class OpenERPDispatcher:
409 def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
410 log(title, msg, channel=channel, depth=depth, fn=fn)
411 def dispatch(self, service_name, method, params):
413 auth = getattr(self, 'auth_provider', None)
414 logger = logging.getLogger('result')
415 start_time = end_time = 0
416 if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
417 self.log('service', tuple(replace_request_password(params)), depth=None, fn='%s.%s'%(service_name,method))
418 if logger.isEnabledFor(logging.DEBUG_RPC):
419 start_time = time.time()
420 result = ExportService.getService(service_name).dispatch(method, auth, params)
421 if logger.isEnabledFor(logging.DEBUG_RPC):
422 end_time = time.time()
423 if not logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
424 self.log('service (%.3fs)' % (end_time - start_time), tuple(replace_request_password(params)), depth=1, fn='%s.%s'%(service_name,method))
425 self.log('execution time', '%.3fs' % (end_time - start_time), channel=logging.DEBUG_RPC_ANSWER)
426 self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
429 self.log('exception', tools.exception_to_unicode(e))
430 tb = getattr(e, 'traceback', sys.exc_info())
431 tb_s = "".join(traceback.format_exception(*tb))
432 if tools.config['debug_mode'] and isinstance(tb, types.TracebackType):
434 pdb.post_mortem(tb[2])
435 raise OpenERPDispatcherException(e, tb_s)
437 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: