[FIX] base : Index added on ir.propery to improve performance
[odoo/odoo.git] / bin / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>). All Rights Reserved
7 #    The refactoring about the OpenSSL support come from Tryton
8 #    Copyright (C) 2007-2009 Cédric Krier.
9 #    Copyright (C) 2007-2009 Bertrand Chenal.
10 #    Copyright (C) 2008 B2CK SPRL.
11 #
12 #    This program is free software: you can redistribute it and/or modify
13 #    it under the terms of the GNU General Public License as published by
14 #    the Free Software Foundation, either version 3 of the License, or
15 #    (at your option) any later version.
16 #
17 #    This program is distributed in the hope that it will be useful,
18 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
19 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 #    GNU General Public License for more details.
21 #
22 #    You should have received a copy of the GNU General Public License
23 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
24 #
25 ##############################################################################
26
27 import errno
28 import logging
29 import logging.handlers
30 import os
31 import socket
32 import sys
33 import threading
34 import time
35 import release
36 from pprint import pformat
37 import warnings
38 import heapq
39
40 class Service(object):
41     """ Base class for *Local* services
42
43         Functionality here is trusted, no authentication.
44     """
45     _services = {}
46     def __init__(self, name, audience=''):
47         Service._services[name] = self
48         self.__name = name
49         self._methods = {}
50
51     def joinGroup(self, name):
52         raise Exception("No group for local services")
53         #GROUPS.setdefault(name, {})[self.__name] = self
54
55     @classmethod
56     def exists(cls, name):
57         return name in cls._services
58
59     @classmethod
60     def remove(cls, name):
61         if cls.exists(name):
62             cls._services.pop(name)
63
64     def exportMethod(self, method):
65         if callable(method):
66             self._methods[method.__name__] = method
67
68     def abortResponse(self, error, description, origin, details):
69         if not tools.config['debug_mode']:
70             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
71         else:
72             raise
73
74 class LocalService(object):
75     """ Proxy for local services. 
76     
77         Any instance of this class will behave like the single instance
78         of Service(name)
79     """
80     __logger = logging.getLogger('service')
81     def __init__(self, name):
82         self.__name = name
83         try:
84             self._service = Service._services[name]
85             for method_name, method_definition in self._service._methods.items():
86                 setattr(self, method_name, method_definition)
87         except KeyError, keyError:
88             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
89             raise
90
91     def __call__(self, method, *params):
92         return getattr(self, method)(*params)
93
94 class ExportService(object):
95     """ Proxy for exported services. 
96
97     All methods here should take an AuthProxy as their first parameter. It
98     will be appended by the calling framework.
99
100     Note that this class has no direct proxy, capable of calling 
101     eservice.method(). Rather, the proxy should call 
102     dispatch(method,auth,params)
103     """
104     
105     _services = {}
106     _groups = {}
107     _logger = logging.getLogger('web-services')
108     
109     def __init__(self, name, audience=''):
110         ExportService._services[name] = self
111         self.__name = name
112         self._logger.debug("Registered an exported service: %s" % name)
113
114     def joinGroup(self, name):
115         ExportService._groups.setdefault(name, {})[self.__name] = self
116
117     @classmethod
118     def getService(cls,name):
119         return cls._services[name]
120
121     def dispatch(self, method, auth, params):
122         raise Exception("stub dispatch at %s" % self.__name)
123         
124     def new_dispatch(self,method,auth,params):
125         raise Exception("stub dispatch at %s" % self.__name)
126
127     def abortResponse(self, error, description, origin, details):
128         if not tools.config['debug_mode']:
129             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
130         else:
131             raise
132
133 LOG_NOTSET = 'notset'
134 LOG_DEBUG_SQL = 'debug_sql'
135 LOG_DEBUG_RPC = 'debug_rpc'
136 LOG_DEBUG_RPC_ANSWER = 'debug_rpc_answer'
137 LOG_DEBUG = 'debug'
138 LOG_TEST = 'test'
139 LOG_INFO = 'info'
140 LOG_WARNING = 'warn'
141 LOG_ERROR = 'error'
142 LOG_CRITICAL = 'critical'
143
144 logging.DEBUG_RPC_ANSWER = logging.DEBUG - 4
145 logging.addLevelName(logging.DEBUG_RPC_ANSWER, 'DEBUG_RPC_ANSWER')
146 logging.DEBUG_RPC = logging.DEBUG - 2
147 logging.addLevelName(logging.DEBUG_RPC, 'DEBUG_RPC')
148 logging.DEBUG_SQL = logging.DEBUG_RPC - 3
149 logging.addLevelName(logging.DEBUG_SQL, 'DEBUG_SQL')
150
151 logging.TEST = logging.INFO - 5
152 logging.addLevelName(logging.TEST, 'TEST')
153
154 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
155 #The background is set with 40 plus the number of the color, and the foreground with 30
156 #These are the sequences need to get colored ouput
157 RESET_SEQ = "\033[0m"
158 COLOR_SEQ = "\033[1;%dm"
159 BOLD_SEQ = "\033[1m"
160 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
161 LEVEL_COLOR_MAPPING = {
162     logging.DEBUG_SQL: (WHITE, MAGENTA),
163     logging.DEBUG_RPC: (BLUE, WHITE),
164     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
165     logging.DEBUG: (BLUE, DEFAULT),
166     logging.INFO: (GREEN, DEFAULT),
167     logging.TEST: (WHITE, BLUE),
168     logging.WARNING: (YELLOW, DEFAULT),
169     logging.ERROR: (RED, DEFAULT),
170     logging.CRITICAL: (WHITE, RED),
171 }
172
173 class DBFormatter(logging.Formatter):
174     def format(self, record):
175         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
176         return logging.Formatter.format(self, record)
177
178 class ColoredFormatter(DBFormatter):
179     def format(self, record):
180         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
181         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
182         return DBFormatter.format(self, record)
183
184 def init_logger():
185     import os
186     from tools.translate import resetlocale
187     resetlocale()
188
189     # create a format for log messages and dates
190     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
191
192     if tools.config['syslog']:
193         # SysLog Handler
194         if os.name == 'nt':
195             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
196         else:
197             handler = logging.handlers.SysLogHandler('/dev/log')
198         format = '%s %s' % (release.description, release.version) \
199                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
200
201     elif tools.config['logfile']:
202         # LogFile Handler
203         logf = tools.config['logfile']
204         try:
205             dirname = os.path.dirname(logf)
206             if dirname and not os.path.isdir(dirname):
207                 os.makedirs(dirname)
208             if tools.config['logrotate'] is not False:
209                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
210             elif os.name == 'posix':
211                 handler = logging.handlers.WatchedFileHandler(logf)
212             else:
213                 handler = logging.handlers.FileHandler(logf)
214         except Exception, ex:
215             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
216             handler = logging.StreamHandler(sys.stdout)
217     else:
218         # Normal Handler on standard output
219         handler = logging.StreamHandler(sys.stdout)
220
221     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
222         formatter = ColoredFormatter(format)
223     else:
224         formatter = DBFormatter(format)
225     handler.setFormatter(formatter)
226
227     # add the handler to the root logger
228     logger = logging.getLogger()
229     logger.addHandler(handler)
230     logger.setLevel(int(tools.config['log_level'] or '0'))
231
232
233 class Logger(object):
234     def __init__(self):
235         warnings.warn("The netsvc.Logger API shouldn't be used anymore, please "
236                       "use the standard `logging.getLogger` API instead",
237                       PendingDeprecationWarning, stacklevel=2)
238         super(Logger, self).__init__()
239
240     def notifyChannel(self, name, level, msg):
241         warnings.warn("notifyChannel API shouldn't be used anymore, please use "
242                       "the standard `logging` module instead",
243                       PendingDeprecationWarning, stacklevel=2)
244         from service.web_services import common
245
246         log = logging.getLogger(tools.ustr(name))
247
248         if level in [LOG_DEBUG_RPC, LOG_TEST] and not hasattr(log, level):
249             fct = lambda msg, *args, **kwargs: log.log(getattr(logging, level.upper()), msg, *args, **kwargs)
250             setattr(log, level, fct)
251
252
253         level_method = getattr(log, level)
254
255         if isinstance(msg, Exception):
256             msg = tools.exception_to_unicode(msg)
257
258         try:
259             msg = tools.ustr(msg).strip()
260             if level in (LOG_ERROR, LOG_CRITICAL) and tools.config.get_misc('debug','env_info',False):
261                 msg = common().exp_get_server_environment() + "\n" + msg
262
263             result = msg.split('\n')
264         except UnicodeDecodeError:
265             result = msg.strip().split('\n')
266         try:
267             if len(result)>1:
268                 for idx, s in enumerate(result):
269                     level_method('[%02d]: %s' % (idx+1, s,))
270             elif result:
271                 level_method(result[0])
272         except IOError,e:
273             # TODO: perhaps reset the logger streams?
274             #if logrotate closes our files, we end up here..
275             pass
276         except:
277             # better ignore the exception and carry on..
278             pass
279
280     def set_loglevel(self, level, logger=None):
281         if logger is not None:
282             log = logging.getLogger(str(logger))
283         else:
284             log = logging.getLogger()
285         log.setLevel(logging.INFO) # make sure next msg is printed
286         log.info("Log level changed to %s" % logging.getLevelName(level))
287         log.setLevel(level)
288
289     def shutdown(self):
290         logging.shutdown()
291
292 import tools
293 init_logger()
294
295 class Agent(object):
296     """Singleton that keeps track of cancellable tasks to run at a given
297        timestamp.
298        The tasks are caracterised by:
299             * a timestamp
300             * the database on which the task run
301             * the function to call
302             * the arguments and keyword arguments to pass to the function
303
304         Implementation details:
305           Tasks are stored as list, allowing the cancellation by setting
306           the timestamp to 0.
307           A heapq is used to store tasks, so we don't need to sort
308           tasks ourself.
309     """
310     __tasks = []
311     __tasks_by_db = {}
312     _logger = Logger()
313
314     @classmethod
315     def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
316         task = [timestamp, db_name, function, args, kwargs]
317         heapq.heappush(cls.__tasks, task)
318         cls.__tasks_by_db.setdefault(db_name, []).append(task)
319
320     @classmethod
321     def cancel(cls, db_name):
322         """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
323         if db_name is None:
324             cls.__tasks, cls.__tasks_by_db = [], {}
325         else:
326             if db_name in cls.__tasks_by_db:
327                 for task in cls.__tasks_by_db[db_name]:
328                     task[0] = 0
329
330     @classmethod
331     def quit(cls):
332         cls.cancel(None)
333
334     @classmethod
335     def runner(cls):
336         """Neverending function (intended to be ran in a dedicated thread) that
337            checks every 60 seconds tasks to run.
338         """
339         current_thread = threading.currentThread()
340         while True:
341             while cls.__tasks and cls.__tasks[0][0] < time.time():
342                 task = heapq.heappop(cls.__tasks)
343                 timestamp, dbname, function, args, kwargs = task
344                 cls.__tasks_by_db[dbname].remove(task)
345                 if not timestamp:
346                     # null timestamp -> cancelled task
347                     continue
348                 current_thread.dbname = dbname   # hack hack
349                 cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs))
350                 delattr(current_thread, 'dbname')
351                 threading.Thread(target=function, args=args, kwargs=kwargs).start()
352                 time.sleep(1)
353             time.sleep(60)
354
355 threading.Thread(target=Agent.runner).start()
356
357
358 import traceback
359
360 class Server:
361     """ Generic interface for all servers with an event loop etc.
362         Override this to impement http, net-rpc etc. servers.
363
364         Servers here must have threaded behaviour. start() must not block,
365         there is no run().
366     """
367     __is_started = False
368     __servers = []
369     __starter_threads = []
370
371     # we don't want blocking server calls (think select()) to
372     # wait forever and possibly prevent exiting the process,
373     # but instead we want a form of polling/busy_wait pattern, where
374     # _server_timeout should be used as the default timeout for
375     # all I/O blocking operations
376     _busywait_timeout = 0.5
377
378
379     __logger = logging.getLogger('server')
380
381     def __init__(self):
382         Server.__servers.append(self)
383         if Server.__is_started:
384             # raise Exception('All instances of servers must be inited before the startAll()')
385             # Since the startAll() won't be called again, allow this server to
386             # init and then start it after 1sec (hopefully). Register that
387             # timer thread in a list, so that we can abort the start if quitAll
388             # is called in the meantime
389             t = threading.Timer(1.0, self._late_start)
390             t.name = 'Late start timer for %s' % str(self.__class__)
391             Server.__starter_threads.append(t)
392             t.start()
393
394     def start(self):
395         self.__logger.debug("called stub Server.start")
396         
397     def _late_start(self):
398         self.start()
399         for thr in Server.__starter_threads:
400             if thr.finished.is_set():
401                 Server.__starter_threads.remove(thr)
402
403     def stop(self):
404         self.__logger.debug("called stub Server.stop")
405
406     def stats(self):
407         """ This function should return statistics about the server """
408         return "%s: No statistics" % str(self.__class__)
409
410     @classmethod
411     def startAll(cls):
412         if cls.__is_started:
413             return
414         cls.__logger.info("Starting %d services" % len(cls.__servers))
415         for srv in cls.__servers:
416             srv.start()
417         cls.__is_started = True
418
419     @classmethod
420     def quitAll(cls):
421         if not cls.__is_started:
422             return
423         cls.__logger.info("Stopping %d services" % len(cls.__servers))
424         for thr in cls.__starter_threads:
425             if not thr.finished.is_set():
426                 thr.cancel()
427             cls.__starter_threads.remove(thr)
428
429         for srv in cls.__servers:
430             srv.stop()
431         cls.__is_started = False
432
433     @classmethod
434     def allStats(cls):
435         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
436         res.extend(srv.stats() for srv in cls.__servers)
437         return '\n'.join(res)
438
439     def _close_socket(self):
440         if os.name != 'nt':
441             try:
442                 self.socket.shutdown(getattr(socket, 'SHUT_RDWR', 2))
443             except socket.error, e:
444                 if e.errno != errno.ENOTCONN: raise
445                 # OSX, socket shutdowns both sides if any side closes it
446                 # causing an error 57 'Socket is not connected' on shutdown
447                 # of the other side (or something), see
448                 # http://bugs.python.org/issue4397
449                 self.__logger.debug(
450                     '"%s" when shutting down server socket, '
451                     'this is normal under OS X', e)
452         self.socket.close()
453
454 class OpenERPDispatcherException(Exception):
455     def __init__(self, exception, traceback):
456         self.exception = exception
457         self.traceback = traceback
458
459 class OpenERPDispatcher:
460     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=2):
461         logger = logging.getLogger(title)
462         if logger.isEnabledFor(channel):
463             for line in pformat(msg, depth=depth).split('\n'):
464                 logger.log(channel, line)
465
466     def dispatch(self, service_name, method, params):
467         try:
468             self.log('service', service_name)
469             self.log('method', method)
470             self.log('params', params)
471             auth = getattr(self, 'auth_provider', None)
472             result = ExportService.getService(service_name).dispatch(method, auth, params)
473             logger = logging.getLogger('result')
474             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER, depth=(logger.isEnabledFor(logging.DEBUG_SQL) and 1 or None))
475             return result
476         except Exception, e:
477             self.log('exception', tools.exception_to_unicode(e))
478             tb = getattr(e, 'traceback', sys.exc_info())
479             tb_s = "".join(traceback.format_exception(*tb))
480             if tools.config['debug_mode']:
481                 import pdb
482                 pdb.post_mortem(tb[2])
483             raise OpenERPDispatcherException(e, tb_s)
484
485 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: