2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
26 import logging.handlers
35 from pprint import pformat
37 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
38 from loglevels import *
41 def close_socket(sock):
42 """ Closes a socket instance cleanly
44 :param sock: the network socket to close
45 :type sock: socket.socket
48 sock.shutdown(socket.SHUT_RDWR)
49 except socket.error, e:
50 # On OSX, socket shutdowns both sides if any side closes it
51 # causing an error 57 'Socket is not connected' on shutdown
52 # of the other side (or something), see
53 # http://bugs.python.org/issue4397
54 # note: stdlib fixed test, not behavior
55 if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
60 #.apidoc title: Common Services: netsvc
61 #.apidoc module-mods: member-order: bysource
63 class Service(object):
64 """ Base class for *Local* services
66 Functionality here is trusted, no authentication.
69 def __init__(self, name, audience=''):
70 Service._services[name] = self
74 def joinGroup(self, name):
75 raise Exception("No group for local services")
76 #GROUPS.setdefault(name, {})[self.__name] = self
79 def exists(cls, name):
80 return name in cls._services
83 def remove(cls, name):
85 cls._services.pop(name)
87 def exportMethod(self, method):
89 self._methods[method.__name__] = method
91 def abortResponse(self, error, description, origin, details):
92 if not tools.config['debug_mode']:
93 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
97 class LocalService(object):
98 """ Proxy for local services.
100 Any instance of this class will behave like the single instance
103 __logger = logging.getLogger('service')
104 def __init__(self, name):
107 self._service = Service._services[name]
108 for method_name, method_definition in self._service._methods.items():
109 setattr(self, method_name, method_definition)
110 except KeyError, keyError:
111 self.__logger.error('This service does not exist: %s' % (str(keyError),) )
114 def __call__(self, method, *params):
115 return getattr(self, method)(*params)
117 class ExportService(object):
118 """ Proxy for exported services.
120 All methods here should take an AuthProxy as their first parameter. It
121 will be appended by the calling framework.
123 Note that this class has no direct proxy, capable of calling
124 eservice.method(). Rather, the proxy should call
125 dispatch(method,auth,params)
130 _logger = logging.getLogger('web-services')
132 def __init__(self, name, audience=''):
133 ExportService._services[name] = self
135 self._logger.debug("Registered an exported service: %s" % name)
137 def joinGroup(self, name):
138 ExportService._groups.setdefault(name, {})[self.__name] = self
141 def getService(cls,name):
142 return cls._services[name]
144 def dispatch(self, method, auth, params):
145 raise Exception("stub dispatch at %s" % self.__name)
147 def new_dispatch(self,method,auth,params):
148 raise Exception("stub dispatch at %s" % self.__name)
150 def abortResponse(self, error, description, origin, details):
151 if not tools.config['debug_mode']:
152 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
156 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
157 #The background is set with 40 plus the number of the color, and the foreground with 30
158 #These are the sequences need to get colored ouput
159 RESET_SEQ = "\033[0m"
160 COLOR_SEQ = "\033[1;%dm"
162 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
163 LEVEL_COLOR_MAPPING = {
164 logging.DEBUG_SQL: (WHITE, MAGENTA),
165 logging.DEBUG_RPC: (BLUE, WHITE),
166 logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
167 logging.DEBUG: (BLUE, DEFAULT),
168 logging.INFO: (GREEN, DEFAULT),
169 logging.TEST: (WHITE, BLUE),
170 logging.WARNING: (YELLOW, DEFAULT),
171 logging.ERROR: (RED, DEFAULT),
172 logging.CRITICAL: (WHITE, RED),
175 class DBFormatter(logging.Formatter):
176 def format(self, record):
177 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
178 return logging.Formatter.format(self, record)
180 class ColoredFormatter(DBFormatter):
181 def format(self, record):
182 fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
183 record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
184 return DBFormatter.format(self, record)
187 from tools.translate import resetlocale
190 # create a format for log messages and dates
191 format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
193 if tools.config['syslog']:
196 handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
198 handler = logging.handlers.SysLogHandler('/dev/log')
199 format = '%s %s' % (release.description, release.version) \
200 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
202 elif tools.config['logfile']:
204 logf = tools.config['logfile']
206 dirname = os.path.dirname(logf)
207 if dirname and not os.path.isdir(dirname):
209 if tools.config['logrotate'] is not False:
210 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
211 elif os.name == 'posix':
212 handler = logging.handlers.WatchedFileHandler(logf)
214 handler = logging.handlers.FileHandler(logf)
216 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
217 handler = logging.StreamHandler(sys.stdout)
219 # Normal Handler on standard output
220 handler = logging.StreamHandler(sys.stdout)
222 if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
223 formatter = ColoredFormatter(format)
225 formatter = DBFormatter(format)
226 handler.setFormatter(formatter)
228 # add the handler to the root logger
229 logger = logging.getLogger()
231 logger.addHandler(handler)
232 logger.setLevel(int(tools.config['log_level'] or '0'))
234 # A alternative logging scheme for automated runs of the
235 # server intended to test it.
236 def init_alternative_logger():
237 class H(logging.Handler):
238 def emit(self, record):
239 if record.levelno > 20:
240 print record.levelno, record.pathname, record.msg
242 logger = logging.getLogger()
244 logger.addHandler(handler)
245 logger.setLevel(logging.ERROR)
248 """ Singleton that keeps track of cancellable tasks to run at a given
251 The tasks are characterised by:
254 * the database on which the task run
255 * the function to call
256 * the arguments and keyword arguments to pass to the function
258 Implementation details:
260 - Tasks are stored as list, allowing the cancellation by setting
262 - A heapq is used to store tasks, so we don't need to sort
267 _logger = logging.getLogger('netsvc.agent')
270 def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
271 task = [timestamp, db_name, function, args, kwargs]
272 heapq.heappush(cls.__tasks, task)
273 cls.__tasks_by_db.setdefault(db_name, []).append(task)
276 def cancel(cls, db_name):
277 """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
278 cls._logger.debug("Cancel timers for %s db", db_name or 'all')
280 cls.__tasks, cls.__tasks_by_db = [], {}
282 if db_name in cls.__tasks_by_db:
283 for task in cls.__tasks_by_db[db_name]:
292 """Neverending function (intended to be ran in a dedicated thread) that
293 checks every 60 seconds tasks to run. TODO: make configurable
295 current_thread = threading.currentThread()
297 while cls.__tasks and cls.__tasks[0][0] < time.time():
298 task = heapq.heappop(cls.__tasks)
299 timestamp, dbname, function, args, kwargs = task
300 cls.__tasks_by_db[dbname].remove(task)
302 # null timestamp -> cancelled task
304 current_thread.dbname = dbname # hack hack
305 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
306 delattr(current_thread, 'dbname')
307 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
308 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
309 task_thread.setDaemon(False)
315 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
316 # the agent runner is a typical daemon thread, that will never quit and must be
317 # terminated when the main process exits - with no consequence (the processing
318 # threads it spawns are not marked daemon)
319 agent_runner.setDaemon(True)
325 """ Generic interface for all servers with an event loop etc.
326 Override this to impement http, net-rpc etc. servers.
328 Servers here must have threaded behaviour. start() must not block,
333 __starter_threads = []
335 # we don't want blocking server calls (think select()) to
336 # wait forever and possibly prevent exiting the process,
337 # but instead we want a form of polling/busy_wait pattern, where
338 # _server_timeout should be used as the default timeout for
339 # all I/O blocking operations
340 _busywait_timeout = 0.5
343 __logger = logging.getLogger('server')
346 Server.__servers.append(self)
347 if Server.__is_started:
348 # raise Exception('All instances of servers must be inited before the startAll()')
349 # Since the startAll() won't be called again, allow this server to
350 # init and then start it after 1sec (hopefully). Register that
351 # timer thread in a list, so that we can abort the start if quitAll
352 # is called in the meantime
353 t = threading.Timer(1.0, self._late_start)
354 t.name = 'Late start timer for %s' % str(self.__class__)
355 Server.__starter_threads.append(t)
359 self.__logger.debug("called stub Server.start")
361 def _late_start(self):
363 for thr in Server.__starter_threads:
364 if thr.finished.is_set():
365 Server.__starter_threads.remove(thr)
368 self.__logger.debug("called stub Server.stop")
371 """ This function should return statistics about the server """
372 return "%s: No statistics" % str(self.__class__)
378 cls.__logger.info("Starting %d services" % len(cls.__servers))
379 for srv in cls.__servers:
381 cls.__is_started = True
385 if not cls.__is_started:
387 cls.__logger.info("Stopping %d services" % len(cls.__servers))
388 for thr in cls.__starter_threads:
389 if not thr.finished.is_set():
391 cls.__starter_threads.remove(thr)
393 for srv in cls.__servers:
395 cls.__is_started = False
399 res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
400 res.extend(srv.stats() for srv in cls.__servers)
401 return '\n'.join(res)
403 def _close_socket(self):
404 close_socket(self.socket)
406 class OpenERPDispatcherException(Exception):
407 def __init__(self, exception, traceback):
408 self.exception = exception
409 self.traceback = traceback
411 def replace_request_password(args):
412 # password is always 3rd argument in a request, we replace it in RPC logs
413 # so it's easier to forward logs for diagnostics/debugging purposes...
419 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
420 logger = logging.getLogger(title)
421 if logger.isEnabledFor(channel):
423 indent_after=' '*len(fn)
424 for line in (fn+pformat(msg, depth=depth)).split('\n'):
425 logger.log(channel, indent+line)
428 class OpenERPDispatcher:
429 def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
430 log(title, msg, channel=channel, depth=depth, fn=fn)
431 def dispatch(self, service_name, method, params):
433 auth = getattr(self, 'auth_provider', None)
434 logger = logging.getLogger('result')
435 start_time = end_time = 0
436 if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
437 self.log('service', tuple(replace_request_password(params)), depth=None, fn='%s.%s'%(service_name,method))
438 if logger.isEnabledFor(logging.DEBUG_RPC):
439 start_time = time.time()
440 result = ExportService.getService(service_name).dispatch(method, auth, params)
441 if logger.isEnabledFor(logging.DEBUG_RPC):
442 end_time = time.time()
443 if not logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
444 self.log('service (%.3fs)' % (end_time - start_time), tuple(replace_request_password(params)), depth=1, fn='%s.%s'%(service_name,method))
445 self.log('execution time', '%.3fs' % (end_time - start_time), channel=logging.DEBUG_RPC_ANSWER)
446 self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
449 self.log('exception', tools.exception_to_unicode(e))
450 tb = getattr(e, 'traceback', sys.exc_info())
451 tb_s = "".join(traceback.format_exception(*tb))
452 if tools.config['debug_mode'] and isinstance(tb[2], types.TracebackType):
454 pdb.post_mortem(tb[2])
455 raise OpenERPDispatcherException(e, tb_s)
457 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: