2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 # The refactoring about the OpenSSL support come from Tryton
8 # Copyright (C) 2007-2009 Cédric Krier.
9 # Copyright (C) 2007-2009 Bertrand Chenal.
10 # Copyright (C) 2008 B2CK SPRL.
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 ##############################################################################
31 import logging.handlers
40 from pprint import pformat
42 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
43 from loglevels import *
46 def close_socket(sock):
47 """ Closes a socket instance cleanly
49 :param sock: the network socket to close
50 :type sock: socket.socket
53 sock.shutdown(socket.SHUT_RDWR)
54 except socket.error, e:
55 # On OSX, socket shutdowns both sides if any side closes it
56 # causing an error 57 'Socket is not connected' on shutdown
57 # of the other side (or something), see
58 # http://bugs.python.org/issue4397
59 # note: stdlib fixed test, not behavior
60 if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
65 class Service(object):
66 """ Base class for *Local* services
68 Functionality here is trusted, no authentication.
71 def __init__(self, name, audience=''):
72 Service._services[name] = self
76 def joinGroup(self, name):
77 raise Exception("No group for local services")
78 #GROUPS.setdefault(name, {})[self.__name] = self
81 def exists(cls, name):
82 return name in cls._services
85 def remove(cls, name):
87 cls._services.pop(name)
89 def exportMethod(self, method):
91 self._methods[method.__name__] = method
93 def abortResponse(self, error, description, origin, details):
94 if not tools.config['debug_mode']:
95 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
99 class LocalService(object):
100 """ Proxy for local services.
102 Any instance of this class will behave like the single instance
105 __logger = logging.getLogger('service')
106 def __init__(self, name):
109 self._service = Service._services[name]
110 for method_name, method_definition in self._service._methods.items():
111 setattr(self, method_name, method_definition)
112 except KeyError, keyError:
113 self.__logger.error('This service does not exist: %s' % (str(keyError),) )
116 def __call__(self, method, *params):
117 return getattr(self, method)(*params)
119 class ExportService(object):
120 """ Proxy for exported services.
122 All methods here should take an AuthProxy as their first parameter. It
123 will be appended by the calling framework.
125 Note that this class has no direct proxy, capable of calling
126 eservice.method(). Rather, the proxy should call
127 dispatch(method,auth,params)
132 _logger = logging.getLogger('web-services')
134 def __init__(self, name, audience=''):
135 ExportService._services[name] = self
137 self._logger.debug("Registered an exported service: %s" % name)
139 def joinGroup(self, name):
140 ExportService._groups.setdefault(name, {})[self.__name] = self
143 def getService(cls,name):
144 return cls._services[name]
146 def dispatch(self, method, auth, params):
147 raise Exception("stub dispatch at %s" % self.__name)
149 def new_dispatch(self,method,auth,params):
150 raise Exception("stub dispatch at %s" % self.__name)
152 def abortResponse(self, error, description, origin, details):
153 if not tools.config['debug_mode']:
154 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
158 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
159 #The background is set with 40 plus the number of the color, and the foreground with 30
160 #These are the sequences need to get colored ouput
161 RESET_SEQ = "\033[0m"
162 COLOR_SEQ = "\033[1;%dm"
164 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
165 LEVEL_COLOR_MAPPING = {
166 logging.DEBUG_SQL: (WHITE, MAGENTA),
167 logging.DEBUG_RPC: (BLUE, WHITE),
168 logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
169 logging.DEBUG: (BLUE, DEFAULT),
170 logging.INFO: (GREEN, DEFAULT),
171 logging.TEST: (WHITE, BLUE),
172 logging.WARNING: (YELLOW, DEFAULT),
173 logging.ERROR: (RED, DEFAULT),
174 logging.CRITICAL: (WHITE, RED),
177 class DBFormatter(logging.Formatter):
178 def format(self, record):
179 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
180 return logging.Formatter.format(self, record)
182 class ColoredFormatter(DBFormatter):
183 def format(self, record):
184 fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
185 record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
186 return DBFormatter.format(self, record)
190 from tools.translate import resetlocale
193 # create a format for log messages and dates
194 format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
196 if tools.config['syslog']:
199 handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
201 handler = logging.handlers.SysLogHandler('/dev/log')
202 format = '%s %s' % (release.description, release.version) \
203 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
205 elif tools.config['logfile']:
207 logf = tools.config['logfile']
209 dirname = os.path.dirname(logf)
210 if dirname and not os.path.isdir(dirname):
212 if tools.config['logrotate'] is not False:
213 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
214 elif os.name == 'posix':
215 handler = logging.handlers.WatchedFileHandler(logf)
217 handler = logging.handlers.FileHandler(logf)
219 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
220 handler = logging.StreamHandler(sys.stdout)
222 # Normal Handler on standard output
223 handler = logging.StreamHandler(sys.stdout)
225 if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
226 formatter = ColoredFormatter(format)
228 formatter = DBFormatter(format)
229 handler.setFormatter(formatter)
231 # add the handler to the root logger
232 logger = logging.getLogger()
234 logger.addHandler(handler)
235 logger.setLevel(int(tools.config['log_level'] or '0'))
237 # A alternative logging scheme for automated runs of the
238 # server intended to test it.
239 def init_alternative_logger():
240 class H(logging.Handler):
241 def emit(self, record):
242 if record.levelno > 20:
243 print record.levelno, record.pathname, record.msg
245 logger = logging.getLogger()
247 logger.addHandler(handler)
248 logger.setLevel(logging.ERROR)
251 """Singleton that keeps track of cancellable tasks to run at a given
253 The tasks are caracterised by:
255 * the database on which the task run
256 * the function to call
257 * the arguments and keyword arguments to pass to the function
259 Implementation details:
260 Tasks are stored as list, allowing the cancellation by setting
262 A heapq is used to store tasks, so we don't need to sort
267 _logger = logging.getLogger('netsvc.agent')
270 def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
271 task = [timestamp, db_name, function, args, kwargs]
272 heapq.heappush(cls.__tasks, task)
273 cls.__tasks_by_db.setdefault(db_name, []).append(task)
276 def cancel(cls, db_name):
277 """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
278 cls._logger.debug("Cancel timers for %s db", db_name or 'all')
280 cls.__tasks, cls.__tasks_by_db = [], {}
282 if db_name in cls.__tasks_by_db:
283 for task in cls.__tasks_by_db[db_name]:
292 """Neverending function (intended to be ran in a dedicated thread) that
293 checks every 60 seconds tasks to run. TODO: make configurable
295 current_thread = threading.currentThread()
297 while cls.__tasks and cls.__tasks[0][0] < time.time():
298 task = heapq.heappop(cls.__tasks)
299 timestamp, dbname, function, args, kwargs = task
300 cls.__tasks_by_db[dbname].remove(task)
302 # null timestamp -> cancelled task
304 current_thread.dbname = dbname # hack hack
305 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
306 delattr(current_thread, 'dbname')
307 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
308 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
309 task_thread.setDaemon(False)
315 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
316 # the agent runner is a typical daemon thread, that will never quit and must be
317 # terminated when the main process exits - with no consequence (the processing
318 # threads it spawns are not marked daemon)
319 agent_runner.setDaemon(True)
325 """ Generic interface for all servers with an event loop etc.
326 Override this to impement http, net-rpc etc. servers.
328 Servers here must have threaded behaviour. start() must not block,
333 __starter_threads = []
335 # we don't want blocking server calls (think select()) to
336 # wait forever and possibly prevent exiting the process,
337 # but instead we want a form of polling/busy_wait pattern, where
338 # _server_timeout should be used as the default timeout for
339 # all I/O blocking operations
340 _busywait_timeout = 0.5
343 __logger = logging.getLogger('server')
346 Server.__servers.append(self)
347 if Server.__is_started:
348 # raise Exception('All instances of servers must be inited before the startAll()')
349 # Since the startAll() won't be called again, allow this server to
350 # init and then start it after 1sec (hopefully). Register that
351 # timer thread in a list, so that we can abort the start if quitAll
352 # is called in the meantime
353 t = threading.Timer(1.0, self._late_start)
354 t.name = 'Late start timer for %s' % str(self.__class__)
355 Server.__starter_threads.append(t)
359 self.__logger.debug("called stub Server.start")
361 def _late_start(self):
363 for thr in Server.__starter_threads:
364 if thr.finished.is_set():
365 Server.__starter_threads.remove(thr)
368 self.__logger.debug("called stub Server.stop")
371 """ This function should return statistics about the server """
372 return "%s: No statistics" % str(self.__class__)
378 cls.__logger.info("Starting %d services" % len(cls.__servers))
379 for srv in cls.__servers:
381 cls.__is_started = True
385 if not cls.__is_started:
387 cls.__logger.info("Stopping %d services" % len(cls.__servers))
388 for thr in cls.__starter_threads:
389 if not thr.finished.is_set():
391 cls.__starter_threads.remove(thr)
393 for srv in cls.__servers:
395 cls.__is_started = False
399 res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
400 res.extend(srv.stats() for srv in cls.__servers)
401 return '\n'.join(res)
403 def _close_socket(self):
404 close_socket(self.socket)
406 class OpenERPDispatcherException(Exception):
407 def __init__(self, exception, traceback):
408 self.exception = exception
409 self.traceback = traceback
411 def replace_request_password(args):
412 # password is always 3rd argument in a request, we replace it in RPC logs
413 # so it's easier to forward logs for diagnostics/debugging purposes...
419 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
420 logger = logging.getLogger(title)
421 if logger.isEnabledFor(channel):
423 indent_after=' '*len(fn)
424 for line in (fn+pformat(msg, depth=depth)).split('\n'):
425 logger.log(channel, indent+line)
428 class OpenERPDispatcher:
429 def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
430 log(title, msg, channel=channel, depth=depth, fn=fn)
431 def dispatch(self, service_name, method, params):
433 logger = logging.getLogger('result')
434 self.log('service', tuple(replace_request_password(params)), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='%s.%s'%(service_name,method))
435 auth = getattr(self, 'auth_provider', None)
436 result = ExportService.getService(service_name).dispatch(method, auth, params)
437 self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
440 self.log('exception', tools.exception_to_unicode(e))
441 tb = getattr(e, 'traceback', sys.exc_info())
442 tb_s = cgitb.text(tb)
443 if tools.config['debug_mode']:
445 pdb.post_mortem(tb[2])
446 raise OpenERPDispatcherException(e, tb_s)
448 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: