[MERGE] OPW 382063 - account fiscal year closing: fix default date in opening entries
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import errno
28 import logging
29 import logging.handlers
30 import os
31 import socket
32 import sys
33 import threading
34 import time
35 import release
36 from pprint import pformat
37 import warnings
38 import heapq
39
40 class Service(object):
41     """ Base class for *Local* services
42
43         Functionality here is trusted, no authentication.
44     """
45     _services = {}
46     def __init__(self, name, audience=''):
47         Service._services[name] = self
48         self.__name = name
49         self._methods = {}
50
51     def joinGroup(self, name):
52         raise Exception("No group for local services")
53         #GROUPS.setdefault(name, {})[self.__name] = self
54
55     @classmethod
56     def exists(cls, name):
57         return name in cls._services
58
59     @classmethod
60     def remove(cls, name):
61         if cls.exists(name):
62             cls._services.pop(name)
63
64     def exportMethod(self, method):
65         if callable(method):
66             self._methods[method.__name__] = method
67
68     def abortResponse(self, error, description, origin, details):
69         if not tools.config['debug_mode']:
70             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
71         else:
72             raise
73
74 class LocalService(object):
75     """ Proxy for local services. 
76     
77         Any instance of this class will behave like the single instance
78         of Service(name)
79     """
80     __logger = logging.getLogger('service')
81     def __init__(self, name):
82         self.__name = name
83         try:
84             self._service = Service._services[name]
85             for method_name, method_definition in self._service._methods.items():
86                 setattr(self, method_name, method_definition)
87         except KeyError, keyError:
88             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
89             raise
90
91     def __call__(self, method, *params):
92         return getattr(self, method)(*params)
93
94 class ExportService(object):
95     """ Proxy for exported services. 
96
97     All methods here should take an AuthProxy as their first parameter. It
98     will be appended by the calling framework.
99
100     Note that this class has no direct proxy, capable of calling 
101     eservice.method(). Rather, the proxy should call 
102     dispatch(method,auth,params)
103     """
104     
105     _services = {}
106     _groups = {}
107     _logger = logging.getLogger('web-services')
108     
109     def __init__(self, name, audience=''):
110         ExportService._services[name] = self
111         self.__name = name
112         self._logger.debug("Registered an exported service: %s" % name)
113
114     def joinGroup(self, name):
115         ExportService._groups.setdefault(name, {})[self.__name] = self
116
117     @classmethod
118     def getService(cls,name):
119         return cls._services[name]
120
121     def dispatch(self, method, auth, params):
122         raise Exception("stub dispatch at %s" % self.__name)
123         
124     def new_dispatch(self,method,auth,params):
125         raise Exception("stub dispatch at %s" % self.__name)
126
127     def abortResponse(self, error, description, origin, details):
128         if not tools.config['debug_mode']:
129             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
130         else:
131             raise
132
133 LOG_NOTSET = 'notset'
134 LOG_DEBUG_SQL = 'debug_sql'
135 LOG_DEBUG_RPC_ANSWER = 'debug_rpc_answer'
136 LOG_DEBUG_RPC = 'debug_rpc'
137 LOG_DEBUG = 'debug'
138 LOG_TEST = 'test'
139 LOG_INFO = 'info'
140 LOG_WARNING = 'warn'
141 LOG_ERROR = 'error'
142 LOG_CRITICAL = 'critical'
143
144 logging.DEBUG_RPC_ANSWER = logging.DEBUG - 4
145 logging.addLevelName(logging.DEBUG_RPC_ANSWER, 'DEBUG_RPC_ANSWER')
146 logging.DEBUG_RPC = logging.DEBUG - 2
147 logging.addLevelName(logging.DEBUG_RPC, 'DEBUG_RPC')
148 logging.DEBUG_SQL = logging.DEBUG_RPC - 3
149 logging.addLevelName(logging.DEBUG_SQL, 'DEBUG_SQL')
150
151 logging.TEST = logging.INFO - 5
152 logging.addLevelName(logging.TEST, 'TEST')
153
154 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
155 #The background is set with 40 plus the number of the color, and the foreground with 30
156 #These are the sequences need to get colored ouput
157 RESET_SEQ = "\033[0m"
158 COLOR_SEQ = "\033[1;%dm"
159 BOLD_SEQ = "\033[1m"
160 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
161 LEVEL_COLOR_MAPPING = {
162     logging.DEBUG_SQL: (WHITE, MAGENTA),
163     logging.DEBUG_RPC: (BLUE, WHITE),
164     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
165     logging.DEBUG: (BLUE, DEFAULT),
166     logging.INFO: (GREEN, DEFAULT),
167     logging.TEST: (WHITE, BLUE),
168     logging.WARNING: (YELLOW, DEFAULT),
169     logging.ERROR: (RED, DEFAULT),
170     logging.CRITICAL: (WHITE, RED),
171 }
172
173 class DBFormatter(logging.Formatter):
174     def format(self, record):
175         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
176         return logging.Formatter.format(self, record)
177
178 class ColoredFormatter(DBFormatter):
179     def format(self, record):
180         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
181         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
182         return DBFormatter.format(self, record)
183
184 def init_logger():
185     import os
186     from tools.translate import resetlocale
187     resetlocale()
188
189     # create a format for log messages and dates
190     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
191
192     if tools.config['syslog']:
193         # SysLog Handler
194         if os.name == 'nt':
195             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
196         else:
197             handler = logging.handlers.SysLogHandler('/dev/log')
198         format = '%s %s' % (release.description, release.version) \
199                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
200
201     elif tools.config['logfile']:
202         # LogFile Handler
203         logf = tools.config['logfile']
204         try:
205             dirname = os.path.dirname(logf)
206             if dirname and not os.path.isdir(dirname):
207                 os.makedirs(dirname)
208             if tools.config['logrotate'] is not False:
209                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
210             elif os.name == 'posix':
211                 handler = logging.handlers.WatchedFileHandler(logf)
212             else:
213                 handler = logging.handlers.FileHandler(logf)
214         except Exception:
215             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
216             handler = logging.StreamHandler(sys.stdout)
217     else:
218         # Normal Handler on standard output
219         handler = logging.StreamHandler(sys.stdout)
220
221     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
222         formatter = ColoredFormatter(format)
223     else:
224         formatter = DBFormatter(format)
225     handler.setFormatter(formatter)
226
227     # add the handler to the root logger
228     logger = logging.getLogger()
229     logger.addHandler(handler)
230     logger.setLevel(int(tools.config['log_level'] or '0'))
231
232
233 class Logger(object):
234     def __init__(self):
235         warnings.warn("The netsvc.Logger API shouldn't be used anymore, please "
236                       "use the standard `logging.getLogger` API instead",
237                       PendingDeprecationWarning, stacklevel=2)
238         super(Logger, self).__init__()
239
240     def notifyChannel(self, name, level, msg):
241         warnings.warn("notifyChannel API shouldn't be used anymore, please use "
242                       "the standard `logging` module instead",
243                       PendingDeprecationWarning, stacklevel=2)
244         from service.web_services import common
245
246         log = logging.getLogger(tools.ustr(name))
247
248         if level in [LOG_DEBUG_RPC, LOG_TEST] and not hasattr(log, level):
249             fct = lambda msg, *args, **kwargs: log.log(getattr(logging, level.upper()), msg, *args, **kwargs)
250             setattr(log, level, fct)
251
252
253         level_method = getattr(log, level)
254
255         if isinstance(msg, Exception):
256             msg = tools.exception_to_unicode(msg)
257
258         try:
259             msg = tools.ustr(msg).strip()
260             if level in (LOG_ERROR, LOG_CRITICAL) and tools.config.get_misc('debug','env_info',False):
261                 msg = common().exp_get_server_environment() + "\n" + msg
262
263             result = msg.split('\n')
264         except UnicodeDecodeError:
265             result = msg.strip().split('\n')
266         try:
267             if len(result)>1:
268                 for idx, s in enumerate(result):
269                     level_method('[%02d]: %s' % (idx+1, s,))
270             elif result:
271                 level_method(result[0])
272         except IOError:
273             # TODO: perhaps reset the logger streams?
274             #if logrotate closes our files, we end up here..
275             pass
276         except Exception:
277             # better ignore the exception and carry on..
278             pass
279
280     def set_loglevel(self, level, logger=None):
281         if logger is not None:
282             log = logging.getLogger(str(logger))
283         else:
284             log = logging.getLogger()
285         log.setLevel(logging.INFO) # make sure next msg is printed
286         log.info("Log level changed to %s" % logging.getLevelName(level))
287         log.setLevel(level)
288
289     def shutdown(self):
290         logging.shutdown()
291
292 import tools
293 init_logger()
294
295 class Agent(object):
296     """Singleton that keeps track of cancellable tasks to run at a given
297        timestamp.
298        The tasks are caracterised by:
299             * a timestamp
300             * the database on which the task run
301             * the function to call
302             * the arguments and keyword arguments to pass to the function
303
304         Implementation details:
305           Tasks are stored as list, allowing the cancellation by setting
306           the timestamp to 0.
307           A heapq is used to store tasks, so we don't need to sort
308           tasks ourself.
309     """
310     __tasks = []
311     __tasks_by_db = {}
312     _logger = logging.getLogger('netsvc.agent')
313
314     @classmethod
315     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
316         task = [timestamp, db_name, function, args, kwargs]
317         heapq.heappush(cls.__tasks, task)
318         cls.__tasks_by_db.setdefault(db_name, []).append(task)
319
320     @classmethod
321     def cancel(cls, db_name):
322         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
323         cls._logger.debug("Cancel timers for %s db", db_name or 'all')
324         if db_name is None:
325             cls.__tasks, cls.__tasks_by_db = [], {}
326         else:
327             if db_name in cls.__tasks_by_db:
328                 for task in cls.__tasks_by_db[db_name]:
329                     task[0] = 0
330
331     @classmethod
332     def quit(cls):
333         cls.cancel(None)
334
335     @classmethod
336     def runner(cls):
337         """Neverending function (intended to be ran in a dedicated thread) that
338            checks every 60 seconds tasks to run. TODO: make configurable
339         """
340         current_thread = threading.currentThread()
341         while True:
342             while cls.__tasks and cls.__tasks[0][0] < time.time():
343                 task = heapq.heappop(cls.__tasks)
344                 timestamp, dbname, function, args, kwargs = task
345                 cls.__tasks_by_db[dbname].remove(task)
346                 if not timestamp:
347                     # null timestamp -> cancelled task
348                     continue
349                 current_thread.dbname = dbname   # hack hack
350                 cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
351                 delattr(current_thread, 'dbname')
352                 task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
353                 # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
354                 task_thread.setDaemon(False)
355                 task_thread.start()
356                 time.sleep(1)
357             time.sleep(60)
358
359 agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
360 # the agent runner is a typical daemon thread, that will never quit and must be
361 # terminated when the main process exits - with no consequence (the processing
362 # threads it spawns are not marked daemon)
363 agent_runner.setDaemon(True)
364 agent_runner.start()
365
366 import traceback
367
368 class Server:
369     """ Generic interface for all servers with an event loop etc.
370         Override this to impement http, net-rpc etc. servers.
371
372         Servers here must have threaded behaviour. start() must not block,
373         there is no run().
374     """
375     __is_started = False
376     __servers = []
377     __starter_threads = []
378
379     # we don't want blocking server calls (think select()) to
380     # wait forever and possibly prevent exiting the process,
381     # but instead we want a form of polling/busy_wait pattern, where
382     # _server_timeout should be used as the default timeout for
383     # all I/O blocking operations
384     _busywait_timeout = 0.5
385
386
387     __logger = logging.getLogger('server')
388
389     def __init__(self):
390         Server.__servers.append(self)
391         if Server.__is_started:
392             # raise Exception('All instances of servers must be inited before the startAll()')
393             # Since the startAll() won't be called again, allow this server to
394             # init and then start it after 1sec (hopefully). Register that
395             # timer thread in a list, so that we can abort the start if quitAll
396             # is called in the meantime
397             t = threading.Timer(1.0, self._late_start)
398             t.name = 'Late start timer for %s' % str(self.__class__)
399             Server.__starter_threads.append(t)
400             t.start()
401
402     def start(self):
403         self.__logger.debug("called stub Server.start")
404         
405     def _late_start(self):
406         self.start()
407         for thr in Server.__starter_threads:
408             if thr.finished.is_set():
409                 Server.__starter_threads.remove(thr)
410
411     def stop(self):
412         self.__logger.debug("called stub Server.stop")
413
414     def stats(self):
415         """ This function should return statistics about the server """
416         return "%s: No statistics" % str(self.__class__)
417
418     @classmethod
419     def startAll(cls):
420         if cls.__is_started:
421             return
422         cls.__logger.info("Starting %d services" % len(cls.__servers))
423         for srv in cls.__servers:
424             srv.start()
425         cls.__is_started = True
426
427     @classmethod
428     def quitAll(cls):
429         if not cls.__is_started:
430             return
431         cls.__logger.info("Stopping %d services" % len(cls.__servers))
432         for thr in cls.__starter_threads:
433             if not thr.finished.is_set():
434                 thr.cancel()
435             cls.__starter_threads.remove(thr)
436
437         for srv in cls.__servers:
438             srv.stop()
439         cls.__is_started = False
440
441     @classmethod
442     def allStats(cls):
443         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
444         res.extend(srv.stats() for srv in cls.__servers)
445         return '\n'.join(res)
446
447     def _close_socket(self):
448         if os.name != 'nt':
449             try:
450                 self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
451             except socket.error, e:
452                 if e.errno != errno.ENOTCONN: raise
453                 # OSX, socket shutdowns both sides if any side closes it
454                 # causing an error 57 'Socket is not connected' on shutdown
455                 # of the other side (or something), see
456                 # http://bugs.python.org/issue4397
457                 self.__logger.debug(
458                     '"%s" when shutting down server socket, '
459                     'this is normal under OS X', e)
460         self.socket.close()
461
462 class OpenERPDispatcherException(Exception):
463     def __init__(self, exception, traceback):
464         self.exception = exception
465         self.traceback = traceback
466
467 def replace_request_password(args):
468     # password is always 3rd argument in a request, we replace it in RPC logs
469     # so it's easier to forward logs for diagnostics/debugging purposes...
470     args = list(args)
471     if len(args) > 2:
472         args[2] = '*'
473     return args
474
475 class OpenERPDispatcher:
476     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None):
477         logger = logging.getLogger(title)
478         if logger.isEnabledFor(channel):
479             for line in pformat(msg, depth=depth).split('\n'):
480                 logger.log(channel, line)
481
482     def dispatch(self, service_name, method, params):
483         try:
484             logger = logging.getLogger('result')
485             self.log('service', service_name)
486             self.log('method', method)
487             self.log('params', replace_request_password(params), depth=(None if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1))
488             auth = getattr(self, 'auth_provider', None)
489             result = ExportService.getService(service_name).dispatch(method, auth, params)
490             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
491             return result
492         except Exception, e:
493             self.log('exception', tools.exception_to_unicode(e))
494             tb = getattr(e, 'traceback', sys.exc_info())
495             tb_s = "".join(traceback.format_exception(*tb))
496             if tools.config['debug_mode']:
497                 import pdb
498                 pdb.post_mortem(tb[2])
499             raise OpenERPDispatcherException(e, tb_s)
500
501 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: