2 # -*- coding: utf-8 -*-
3 ##############################################################################
5 # OpenERP, Open Source Management Solution
6 # Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ##############################################################################
26 import logging.handlers
35 from pprint import pformat
37 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
38 from loglevels import *
42 def close_socket(sock):
43 """ Closes a socket instance cleanly
45 :param sock: the network socket to close
46 :type sock: socket.socket
49 sock.shutdown(socket.SHUT_RDWR)
50 except socket.error, e:
51 # On OSX, socket shutdowns both sides if any side closes it
52 # causing an error 57 'Socket is not connected' on shutdown
53 # of the other side (or something), see
54 # http://bugs.python.org/issue4397
55 # note: stdlib fixed test, not behavior
56 if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
61 #.apidoc title: Common Services: netsvc
62 #.apidoc module-mods: member-order: bysource
64 class Service(object):
65 """ Base class for *Local* services
67 Functionality here is trusted, no authentication.
70 def __init__(self, name, audience=''):
71 Service._services[name] = self
75 def joinGroup(self, name):
76 raise Exception("No group for local services")
77 #GROUPS.setdefault(name, {})[self.__name] = self
80 def exists(cls, name):
81 return name in cls._services
84 def remove(cls, name):
86 cls._services.pop(name)
88 def exportMethod(self, method):
90 self._methods[method.__name__] = method
92 def abortResponse(self, error, description, origin, details):
93 if not tools.config['debug_mode']:
94 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
98 class LocalService(object):
99 """ Proxy for local services.
101 Any instance of this class will behave like the single instance
104 __logger = logging.getLogger('service')
105 def __init__(self, name):
108 self._service = Service._services[name]
109 for method_name, method_definition in self._service._methods.items():
110 setattr(self, method_name, method_definition)
111 except KeyError, keyError:
112 self.__logger.error('This service does not exist: %s' % (str(keyError),) )
115 def __call__(self, method, *params):
116 return getattr(self, method)(*params)
118 class ExportService(object):
119 """ Proxy for exported services.
121 All methods here should take an AuthProxy as their first parameter. It
122 will be appended by the calling framework.
124 Note that this class has no direct proxy, capable of calling
125 eservice.method(). Rather, the proxy should call
126 dispatch(method,auth,params)
131 _logger = logging.getLogger('web-services')
133 def __init__(self, name, audience=''):
134 ExportService._services[name] = self
136 self._logger.debug("Registered an exported service: %s" % name)
138 def joinGroup(self, name):
139 ExportService._groups.setdefault(name, {})[self.__name] = self
142 def getService(cls,name):
143 return cls._services[name]
145 def dispatch(self, method, auth, params):
146 raise Exception("stub dispatch at %s" % self.__name)
148 def new_dispatch(self,method,auth,params):
149 raise Exception("stub dispatch at %s" % self.__name)
151 def abortResponse(self, error, description, origin, details):
152 if not tools.config['debug_mode']:
153 raise Exception("%s -- %s\n\n%s"%(origin, description, details))
157 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
158 #The background is set with 40 plus the number of the color, and the foreground with 30
159 #These are the sequences need to get colored ouput
160 RESET_SEQ = "\033[0m"
161 COLOR_SEQ = "\033[1;%dm"
163 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
164 LEVEL_COLOR_MAPPING = {
165 logging.DEBUG_SQL: (WHITE, MAGENTA),
166 logging.DEBUG_RPC: (BLUE, WHITE),
167 logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
168 logging.DEBUG: (BLUE, DEFAULT),
169 logging.INFO: (GREEN, DEFAULT),
170 logging.TEST: (WHITE, BLUE),
171 logging.WARNING: (YELLOW, DEFAULT),
172 logging.ERROR: (RED, DEFAULT),
173 logging.CRITICAL: (WHITE, RED),
176 class DBFormatter(logging.Formatter):
177 def format(self, record):
178 record.dbname = getattr(threading.currentThread(), 'dbname', '?')
179 return logging.Formatter.format(self, record)
181 class ColoredFormatter(DBFormatter):
182 def format(self, record):
183 fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
184 record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
185 return DBFormatter.format(self, record)
188 from tools.translate import resetlocale
191 # create a format for log messages and dates
192 format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
194 if tools.config['syslog']:
197 handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
199 handler = logging.handlers.SysLogHandler('/dev/log')
200 format = '%s %s' % (release.description, release.version) \
201 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
203 elif tools.config['logfile']:
205 logf = tools.config['logfile']
207 dirname = os.path.dirname(logf)
208 if dirname and not os.path.isdir(dirname):
210 if tools.config['logrotate'] is not False:
211 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
212 elif os.name == 'posix':
213 handler = logging.handlers.WatchedFileHandler(logf)
215 handler = logging.handlers.FileHandler(logf)
217 sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
218 handler = logging.StreamHandler(sys.stdout)
220 # Normal Handler on standard output
221 handler = logging.StreamHandler(sys.stdout)
223 if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
224 formatter = ColoredFormatter(format)
226 formatter = DBFormatter(format)
227 handler.setFormatter(formatter)
229 # add the handler to the root logger
230 logger = logging.getLogger()
232 logger.addHandler(handler)
233 logger.setLevel(int(tools.config['log_level'] or '0'))
235 # A alternative logging scheme for automated runs of the
236 # server intended to test it.
237 def init_alternative_logger():
238 class H(logging.Handler):
239 def emit(self, record):
240 if record.levelno > 20:
241 print record.levelno, record.pathname, record.msg
243 logger = logging.getLogger()
245 logger.addHandler(handler)
246 logger.setLevel(logging.ERROR)
249 """ Singleton that keeps track of cancellable tasks to run at a given
252 The tasks are characterised by:
255 * the database on which the task run
256 * a boolean attribute specifying if the task is canceled
258 Implementation details:
260 - Tasks are stored as list, allowing the cancellation by setting
262 - A heapq is used to store tasks, so we don't need to sort
267 _logger = logging.getLogger('netsvc.agent')
270 def cancel(cls, db_name):
271 """ Cancel next wakeup for a given database. """
272 cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
273 if db_name in cls._wakeup_by_db:
274 cls._wakeup_by_db[db_name][2] = True
279 cls._wakeup_by_db = {}
282 def schedule_in_advance(cls, timestamp, db_name):
285 # Cancel the previous wakeup if any.
287 if db_name in cls._wakeup_by_db:
288 task = cls._wakeup_by_db[db_name]
289 if task[2] or timestamp < task[0]:
295 print ">>> rescheduled earlier", timestamp
296 task = [timestamp, db_name, False]
297 heapq.heappush(cls._wakeups, task)
298 cls._wakeup_by_db[db_name] = task
302 """Neverending function (intended to be ran in a dedicated thread) that
303 checks every 60 seconds tasks to run. TODO: make configurable
306 print ">>>>> cron for"
307 while cls._wakeups and cls._wakeups[0][0] < time.time():
308 task = heapq.heappop(cls._wakeups)
309 timestamp, db_name, canceled = task
310 del cls._wakeup_by_db[db_name]
313 ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
318 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
319 # the agent runner is a typical daemon thread, that will never quit and must be
320 # terminated when the main process exits - with no consequence (the processing
321 # threads it spawns are not marked daemon)
322 agent_runner.setDaemon(True)
328 """ Generic interface for all servers with an event loop etc.
329 Override this to impement http, net-rpc etc. servers.
331 Servers here must have threaded behaviour. start() must not block,
336 __starter_threads = []
338 # we don't want blocking server calls (think select()) to
339 # wait forever and possibly prevent exiting the process,
340 # but instead we want a form of polling/busy_wait pattern, where
341 # _server_timeout should be used as the default timeout for
342 # all I/O blocking operations
343 _busywait_timeout = 0.5
346 __logger = logging.getLogger('server')
349 Server.__servers.append(self)
350 if Server.__is_started:
351 # raise Exception('All instances of servers must be inited before the startAll()')
352 # Since the startAll() won't be called again, allow this server to
353 # init and then start it after 1sec (hopefully). Register that
354 # timer thread in a list, so that we can abort the start if quitAll
355 # is called in the meantime
356 t = threading.Timer(1.0, self._late_start)
357 t.name = 'Late start timer for %s' % str(self.__class__)
358 Server.__starter_threads.append(t)
362 self.__logger.debug("called stub Server.start")
364 def _late_start(self):
366 for thr in Server.__starter_threads:
367 if thr.finished.is_set():
368 Server.__starter_threads.remove(thr)
371 self.__logger.debug("called stub Server.stop")
374 """ This function should return statistics about the server """
375 return "%s: No statistics" % str(self.__class__)
381 cls.__logger.info("Starting %d services" % len(cls.__servers))
382 for srv in cls.__servers:
384 cls.__is_started = True
388 if not cls.__is_started:
390 cls.__logger.info("Stopping %d services" % len(cls.__servers))
391 for thr in cls.__starter_threads:
392 if not thr.finished.is_set():
394 cls.__starter_threads.remove(thr)
396 for srv in cls.__servers:
398 cls.__is_started = False
402 res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
403 res.extend(srv.stats() for srv in cls.__servers)
404 return '\n'.join(res)
406 def _close_socket(self):
407 close_socket(self.socket)
409 class OpenERPDispatcherException(Exception):
410 def __init__(self, exception, traceback):
411 self.exception = exception
412 self.traceback = traceback
414 def replace_request_password(args):
415 # password is always 3rd argument in a request, we replace it in RPC logs
416 # so it's easier to forward logs for diagnostics/debugging purposes...
422 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
423 logger = logging.getLogger(title)
424 if logger.isEnabledFor(channel):
426 indent_after=' '*len(fn)
427 for line in (fn+pformat(msg, depth=depth)).split('\n'):
428 logger.log(channel, indent+line)
431 class OpenERPDispatcher:
432 def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
433 log(title, msg, channel=channel, depth=depth, fn=fn)
434 def dispatch(self, service_name, method, params):
436 auth = getattr(self, 'auth_provider', None)
437 logger = logging.getLogger('result')
438 start_time = end_time = 0
439 if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
440 self.log('service', tuple(replace_request_password(params)), depth=None, fn='%s.%s'%(service_name,method))
441 if logger.isEnabledFor(logging.DEBUG_RPC):
442 start_time = time.time()
443 result = ExportService.getService(service_name).dispatch(method, auth, params)
444 if logger.isEnabledFor(logging.DEBUG_RPC):
445 end_time = time.time()
446 if not logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
447 self.log('service (%.3fs)' % (end_time - start_time), tuple(replace_request_password(params)), depth=1, fn='%s.%s'%(service_name,method))
448 self.log('execution time', '%.3fs' % (end_time - start_time), channel=logging.DEBUG_RPC_ANSWER)
449 self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
452 self.log('exception', tools.exception_to_unicode(e))
453 tb = getattr(e, 'traceback', sys.exc_info())
454 tb_s = "".join(traceback.format_exception(*tb))
455 if tools.config['debug_mode'] and isinstance(tb, types.TracebackType):
457 pdb.post_mortem(tb[2])
458 raise OpenERPDispatcherException(e, tb_s)
460 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: