[MERGE] currency symbol position update reports dsh
[odoo/odoo.git] / openerp / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 import errno
24 import heapq
25 import logging
26 import logging.handlers
27 import os
28 import platform
29 import release
30 import socket
31 import sys
32 import threading
33 import time
34 import types
35 from pprint import pformat
36
37 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
38 from loglevels import *
39 import tools
40
41 def close_socket(sock):
42     """ Closes a socket instance cleanly
43
44     :param sock: the network socket to close
45     :type sock: socket.socket
46     """
47     try:
48         sock.shutdown(socket.SHUT_RDWR)
49     except socket.error, e:
50         # On OSX, socket shutdowns both sides if any side closes it
51         # causing an error 57 'Socket is not connected' on shutdown
52         # of the other side (or something), see
53         # http://bugs.python.org/issue4397
54         # note: stdlib fixed test, not behavior
55         if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
56             raise
57     sock.close()
58
59
60 #.apidoc title: Common Services: netsvc
61 #.apidoc module-mods: member-order: bysource
62
63 class Service(object):
64     """ Base class for *Local* services
65
66         Functionality here is trusted, no authentication.
67     """
68     _services = {}
69     def __init__(self, name, audience=''):
70         Service._services[name] = self
71         self.__name = name
72         self._methods = {}
73
74     def joinGroup(self, name):
75         raise Exception("No group for local services")
76         #GROUPS.setdefault(name, {})[self.__name] = self
77
78     @classmethod
79     def exists(cls, name):
80         return name in cls._services
81
82     @classmethod
83     def remove(cls, name):
84         if cls.exists(name):
85             cls._services.pop(name)
86
87     def exportMethod(self, method):
88         if callable(method):
89             self._methods[method.__name__] = method
90
91     def abortResponse(self, error, description, origin, details):
92         if not tools.config['debug_mode']:
93             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
94         else:
95             raise
96
97 class LocalService(object):
98     """ Proxy for local services.
99
100         Any instance of this class will behave like the single instance
101         of Service(name)
102     """
103     __logger = logging.getLogger('service')
104     def __init__(self, name):
105         self.__name = name
106         try:
107             self._service = Service._services[name]
108             for method_name, method_definition in self._service._methods.items():
109                 setattr(self, method_name, method_definition)
110         except KeyError, keyError:
111             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
112             raise
113
114     def __call__(self, method, *params):
115         return getattr(self, method)(*params)
116
117 class ExportService(object):
118     """ Proxy for exported services.
119
120     All methods here should take an AuthProxy as their first parameter. It
121     will be appended by the calling framework.
122
123     Note that this class has no direct proxy, capable of calling
124     eservice.method(). Rather, the proxy should call
125     dispatch(method,auth,params)
126     """
127
128     _services = {}
129     _groups = {}
130     _logger = logging.getLogger('web-services')
131
132     def __init__(self, name, audience=''):
133         ExportService._services[name] = self
134         self.__name = name
135         self._logger.debug("Registered an exported service: %s" % name)
136
137     def joinGroup(self, name):
138         ExportService._groups.setdefault(name, {})[self.__name] = self
139
140     @classmethod
141     def getService(cls,name):
142         return cls._services[name]
143
144     def dispatch(self, method, auth, params):
145         raise Exception("stub dispatch at %s" % self.__name)
146
147     def new_dispatch(self,method,auth,params):
148         raise Exception("stub dispatch at %s" % self.__name)
149
150     def abortResponse(self, error, description, origin, details):
151         if not tools.config['debug_mode']:
152             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
153         else:
154             raise
155
156 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
157 #The background is set with 40 plus the number of the color, and the foreground with 30
158 #These are the sequences need to get colored ouput
159 RESET_SEQ = "\033[0m"
160 COLOR_SEQ = "\033[1;%dm"
161 BOLD_SEQ = "\033[1m"
162 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
163 LEVEL_COLOR_MAPPING = {
164     logging.DEBUG_SQL: (WHITE, MAGENTA),
165     logging.DEBUG_RPC: (BLUE, WHITE),
166     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
167     logging.DEBUG: (BLUE, DEFAULT),
168     logging.INFO: (GREEN, DEFAULT),
169     logging.TEST: (WHITE, BLUE),
170     logging.WARNING: (YELLOW, DEFAULT),
171     logging.ERROR: (RED, DEFAULT),
172     logging.CRITICAL: (WHITE, RED),
173 }
174
175 class DBFormatter(logging.Formatter):
176     def format(self, record):
177         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
178         return logging.Formatter.format(self, record)
179
180 class ColoredFormatter(DBFormatter):
181     def format(self, record):
182         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
183         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
184         return DBFormatter.format(self, record)
185
186 def init_logger():
187     from tools.translate import resetlocale
188     resetlocale()
189
190     # create a format for log messages and dates
191     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
192
193     if tools.config['syslog']:
194         # SysLog Handler
195         if os.name == 'nt':
196             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
197         else:
198             handler = logging.handlers.SysLogHandler('/dev/log')
199         format = '%s %s' % (release.description, release.version) \
200                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
201
202     elif tools.config['logfile']:
203         # LogFile Handler
204         logf = tools.config['logfile']
205         try:
206             dirname = os.path.dirname(logf)
207             if dirname and not os.path.isdir(dirname):
208                 os.makedirs(dirname)
209             if tools.config['logrotate'] is not False:
210                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
211             elif os.name == 'posix':
212                 handler = logging.handlers.WatchedFileHandler(logf)
213             else:
214                 handler = logging.handlers.FileHandler(logf)
215         except Exception:
216             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
217             handler = logging.StreamHandler(sys.stdout)
218     else:
219         # Normal Handler on standard output
220         handler = logging.StreamHandler(sys.stdout)
221
222     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
223         formatter = ColoredFormatter(format)
224     else:
225         formatter = DBFormatter(format)
226     handler.setFormatter(formatter)
227
228     # add the handler to the root logger
229     logger = logging.getLogger()
230     logger.handlers = []
231     logger.addHandler(handler)
232     logger.setLevel(int(tools.config['log_level'] or '0'))
233
234 # A alternative logging scheme for automated runs of the
235 # server intended to test it.
236 def init_alternative_logger():
237     class H(logging.Handler):
238       def emit(self, record):
239         if record.levelno > 20:
240           print record.levelno, record.pathname, record.msg
241     handler = H()
242     logger = logging.getLogger()
243     logger.handlers = []
244     logger.addHandler(handler)
245     logger.setLevel(logging.ERROR)
246
247 class Agent(object):
248     """ Singleton that keeps track of cancellable tasks to run at a given
249         timestamp.
250
251         The tasks are characterised by:
252
253             * a timestamp
254             * the database on which the task run
255             * the function to call
256             * the arguments and keyword arguments to pass to the function
257
258         Implementation details:
259
260           - Tasks are stored as list, allowing the cancellation by setting
261             the timestamp to 0.
262           - A heapq is used to store tasks, so we don't need to sort
263             tasks ourself.
264     """
265     __tasks = []
266     __tasks_by_db = {}
267     _logger = logging.getLogger('netsvc.agent')
268
269     @classmethod
270     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
271         task = [timestamp, db_name, function, args, kwargs]
272         heapq.heappush(cls.__tasks, task)
273         cls.__tasks_by_db.setdefault(db_name, []).append(task)
274
275     @classmethod
276     def cancel(cls, db_name):
277         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
278         cls._logger.debug("Cancel timers for %s db", db_name or 'all')
279         if db_name is None:
280             cls.__tasks, cls.__tasks_by_db = [], {}
281         else:
282             if db_name in cls.__tasks_by_db:
283                 for task in cls.__tasks_by_db[db_name]:
284                     task[0] = 0
285
286     @classmethod
287     def quit(cls):
288         cls.cancel(None)
289
290     @classmethod
291     def runner(cls):
292         """Neverending function (intended to be ran in a dedicated thread) that
293            checks every 60 seconds tasks to run. TODO: make configurable
294         """
295         current_thread = threading.currentThread()
296         while True:
297             while cls.__tasks and cls.__tasks[0][0] < time.time():
298                 task = heapq.heappop(cls.__tasks)
299                 timestamp, dbname, function, args, kwargs = task
300                 cls.__tasks_by_db[dbname].remove(task)
301                 if not timestamp:
302                     # null timestamp -> cancelled task
303                     continue
304                 current_thread.dbname = dbname   # hack hack
305                 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
306                 delattr(current_thread, 'dbname')
307                 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
308                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
309                 task_thread.setDaemon(False)
310                 task_thread.start()
311                 time.sleep(1)
312             time.sleep(60)
313
314 def start_agent():
315     agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
316     # the agent runner is a typical daemon thread, that will never quit and must be
317     # terminated when the main process exits - with no consequence (the processing
318     # threads it spawns are not marked daemon)
319     agent_runner.setDaemon(True)
320     agent_runner.start()
321
322 import traceback
323
324 class Server:
325     """ Generic interface for all servers with an event loop etc.
326         Override this to impement http, net-rpc etc. servers.
327
328         Servers here must have threaded behaviour. start() must not block,
329         there is no run().
330     """
331     __is_started = False
332     __servers = []
333     __starter_threads = []
334
335     # we don't want blocking server calls (think select()) to
336     # wait forever and possibly prevent exiting the process,
337     # but instead we want a form of polling/busy_wait pattern, where
338     # _server_timeout should be used as the default timeout for
339     # all I/O blocking operations
340     _busywait_timeout = 0.5
341
342
343     __logger = logging.getLogger('server')
344
345     def __init__(self):
346         Server.__servers.append(self)
347         if Server.__is_started:
348             # raise Exception('All instances of servers must be inited before the startAll()')
349             # Since the startAll() won't be called again, allow this server to
350             # init and then start it after 1sec (hopefully). Register that
351             # timer thread in a list, so that we can abort the start if quitAll
352             # is called in the meantime
353             t = threading.Timer(1.0, self._late_start)
354             t.name = 'Late start timer for %s' % str(self.__class__)
355             Server.__starter_threads.append(t)
356             t.start()
357
358     def start(self):
359         self.__logger.debug("called stub Server.start")
360
361     def _late_start(self):
362         self.start()
363         for thr in Server.__starter_threads:
364             if thr.finished.is_set():
365                 Server.__starter_threads.remove(thr)
366
367     def stop(self):
368         self.__logger.debug("called stub Server.stop")
369
370     def stats(self):
371         """ This function should return statistics about the server """
372         return "%s: No statistics" % str(self.__class__)
373
374     @classmethod
375     def startAll(cls):
376         if cls.__is_started:
377             return
378         cls.__logger.info("Starting %d services" % len(cls.__servers))
379         for srv in cls.__servers:
380             srv.start()
381         cls.__is_started = True
382
383     @classmethod
384     def quitAll(cls):
385         if not cls.__is_started:
386             return
387         cls.__logger.info("Stopping %d services" % len(cls.__servers))
388         for thr in cls.__starter_threads:
389             if not thr.finished.is_set():
390                 thr.cancel()
391             cls.__starter_threads.remove(thr)
392
393         for srv in cls.__servers:
394             srv.stop()
395         cls.__is_started = False
396
397     @classmethod
398     def allStats(cls):
399         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
400         res.extend(srv.stats() for srv in cls.__servers)
401         return '\n'.join(res)
402
403     def _close_socket(self):
404         close_socket(self.socket)
405
406 class OpenERPDispatcherException(Exception):
407     def __init__(self, exception, traceback):
408         self.exception = exception
409         self.traceback = traceback
410
411 def replace_request_password(args):
412     # password is always 3rd argument in a request, we replace it in RPC logs
413     # so it's easier to forward logs for diagnostics/debugging purposes...
414     args = list(args)
415     if len(args) > 2:
416         args[2] = '*'
417     return args
418
419 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
420     logger = logging.getLogger(title)
421     if logger.isEnabledFor(channel):
422         indent=''
423         indent_after=' '*len(fn)
424         for line in (fn+pformat(msg, depth=depth)).split('\n'):
425             logger.log(channel, indent+line)
426             indent=indent_after
427
428 class OpenERPDispatcher:
429     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
430         log(title, msg, channel=channel, depth=depth, fn=fn)
431     def dispatch(self, service_name, method, params):
432         try:
433             auth = getattr(self, 'auth_provider', None)
434             logger = logging.getLogger('result')
435             start_time = end_time = 0
436             if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
437                 self.log('service', tuple(replace_request_password(params)), depth=None, fn='%s.%s'%(service_name,method))
438             if logger.isEnabledFor(logging.DEBUG_RPC):
439                 start_time = time.time()
440             result = ExportService.getService(service_name).dispatch(method, auth, params)
441             if logger.isEnabledFor(logging.DEBUG_RPC):
442                 end_time = time.time()
443             if not logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
444                 self.log('service (%.3fs)' % (end_time - start_time), tuple(replace_request_password(params)), depth=1, fn='%s.%s'%(service_name,method))
445             self.log('execution time', '%.3fs' % (end_time - start_time), channel=logging.DEBUG_RPC_ANSWER)
446             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
447             return result
448         except Exception, e:
449             self.log('exception', tools.exception_to_unicode(e))
450             tb = getattr(e, 'traceback', sys.exc_info())
451             tb_s = "".join(traceback.format_exception(*tb))
452             if tools.config['debug_mode'] and isinstance(tb[2], types.TracebackType):
453                 import pdb
454                 pdb.post_mortem(tb[2])
455             raise OpenERPDispatcherException(e, tb_s)
456
457 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: