[IMP] cron: removed unnecessary multi-tasks per db in Agent, some cleaning.
[odoo/odoo.git] / openerp / netsvc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 ##############################################################################
4 #
5 #    OpenERP, Open Source Management Solution
6 #    Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
7 #
8 #    This program is free software: you can redistribute it and/or modify
9 #    it under the terms of the GNU Affero General Public License as
10 #    published by the Free Software Foundation, either version 3 of the
11 #    License, or (at your option) any later version.
12 #
13 #    This program is distributed in the hope that it will be useful,
14 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 #    GNU Affero General Public License for more details.
17 #
18 #    You should have received a copy of the GNU Affero General Public License
19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21 ##############################################################################
22
23 import errno
24 import heapq
25 import logging
26 import logging.handlers
27 import os
28 import platform
29 import release
30 import socket
31 import sys
32 import threading
33 import time
34 import types
35 from pprint import pformat
36
37 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
38 from loglevels import *
39 import tools
40 import openerp
41
42 def close_socket(sock):
43     """ Closes a socket instance cleanly
44
45     :param sock: the network socket to close
46     :type sock: socket.socket
47     """
48     try:
49         sock.shutdown(socket.SHUT_RDWR)
50     except socket.error, e:
51         # On OSX, socket shutdowns both sides if any side closes it
52         # causing an error 57 'Socket is not connected' on shutdown
53         # of the other side (or something), see
54         # http://bugs.python.org/issue4397
55         # note: stdlib fixed test, not behavior
56         if e.errno != errno.ENOTCONN or platform.system() != 'Darwin':
57             raise
58     sock.close()
59
60
61 #.apidoc title: Common Services: netsvc
62 #.apidoc module-mods: member-order: bysource
63
64 class Service(object):
65     """ Base class for *Local* services
66
67         Functionality here is trusted, no authentication.
68     """
69     _services = {}
70     def __init__(self, name, audience=''):
71         Service._services[name] = self
72         self.__name = name
73         self._methods = {}
74
75     def joinGroup(self, name):
76         raise Exception("No group for local services")
77         #GROUPS.setdefault(name, {})[self.__name] = self
78
79     @classmethod
80     def exists(cls, name):
81         return name in cls._services
82
83     @classmethod
84     def remove(cls, name):
85         if cls.exists(name):
86             cls._services.pop(name)
87
88     def exportMethod(self, method):
89         if callable(method):
90             self._methods[method.__name__] = method
91
92     def abortResponse(self, error, description, origin, details):
93         if not tools.config['debug_mode']:
94             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
95         else:
96             raise
97
98 class LocalService(object):
99     """ Proxy for local services. 
100     
101         Any instance of this class will behave like the single instance
102         of Service(name)
103     """
104     __logger = logging.getLogger('service')
105     def __init__(self, name):
106         self.__name = name
107         try:
108             self._service = Service._services[name]
109             for method_name, method_definition in self._service._methods.items():
110                 setattr(self, method_name, method_definition)
111         except KeyError, keyError:
112             self.__logger.error('This service does not exist: %s' % (str(keyError),) )
113             raise
114
115     def __call__(self, method, *params):
116         return getattr(self, method)(*params)
117
118 class ExportService(object):
119     """ Proxy for exported services. 
120
121     All methods here should take an AuthProxy as their first parameter. It
122     will be appended by the calling framework.
123
124     Note that this class has no direct proxy, capable of calling 
125     eservice.method(). Rather, the proxy should call 
126     dispatch(method,auth,params)
127     """
128     
129     _services = {}
130     _groups = {}
131     _logger = logging.getLogger('web-services')
132     
133     def __init__(self, name, audience=''):
134         ExportService._services[name] = self
135         self.__name = name
136         self._logger.debug("Registered an exported service: %s" % name)
137
138     def joinGroup(self, name):
139         ExportService._groups.setdefault(name, {})[self.__name] = self
140
141     @classmethod
142     def getService(cls,name):
143         return cls._services[name]
144
145     def dispatch(self, method, auth, params):
146         raise Exception("stub dispatch at %s" % self.__name)
147         
148     def new_dispatch(self,method,auth,params):
149         raise Exception("stub dispatch at %s" % self.__name)
150
151     def abortResponse(self, error, description, origin, details):
152         if not tools.config['debug_mode']:
153             raise Exception("%s -- %s\n\n%s"%(origin, description, details))
154         else:
155             raise
156
157 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, _NOTHING, DEFAULT = range(10)
158 #The background is set with 40 plus the number of the color, and the foreground with 30
159 #These are the sequences need to get colored ouput
160 RESET_SEQ = "\033[0m"
161 COLOR_SEQ = "\033[1;%dm"
162 BOLD_SEQ = "\033[1m"
163 COLOR_PATTERN = "%s%s%%s%s" % (COLOR_SEQ, COLOR_SEQ, RESET_SEQ)
164 LEVEL_COLOR_MAPPING = {
165     logging.DEBUG_SQL: (WHITE, MAGENTA),
166     logging.DEBUG_RPC: (BLUE, WHITE),
167     logging.DEBUG_RPC_ANSWER: (BLUE, WHITE),
168     logging.DEBUG: (BLUE, DEFAULT),
169     logging.INFO: (GREEN, DEFAULT),
170     logging.TEST: (WHITE, BLUE),
171     logging.WARNING: (YELLOW, DEFAULT),
172     logging.ERROR: (RED, DEFAULT),
173     logging.CRITICAL: (WHITE, RED),
174 }
175
176 class DBFormatter(logging.Formatter):
177     def format(self, record):
178         record.dbname = getattr(threading.currentThread(), 'dbname', '?')
179         return logging.Formatter.format(self, record)
180
181 class ColoredFormatter(DBFormatter):
182     def format(self, record):
183         fg_color, bg_color = LEVEL_COLOR_MAPPING[record.levelno]
184         record.levelname = COLOR_PATTERN % (30 + fg_color, 40 + bg_color, record.levelname)
185         return DBFormatter.format(self, record)
186
187 def init_logger():
188     from tools.translate import resetlocale
189     resetlocale()
190
191     # create a format for log messages and dates
192     format = '[%(asctime)s][%(dbname)s] %(levelname)s:%(name)s:%(message)s'
193
194     if tools.config['syslog']:
195         # SysLog Handler
196         if os.name == 'nt':
197             handler = logging.handlers.NTEventLogHandler("%s %s" % (release.description, release.version))
198         else:
199             handler = logging.handlers.SysLogHandler('/dev/log')
200         format = '%s %s' % (release.description, release.version) \
201                 + ':%(dbname)s:%(levelname)s:%(name)s:%(message)s'
202
203     elif tools.config['logfile']:
204         # LogFile Handler
205         logf = tools.config['logfile']
206         try:
207             dirname = os.path.dirname(logf)
208             if dirname and not os.path.isdir(dirname):
209                 os.makedirs(dirname)
210             if tools.config['logrotate'] is not False:
211                 handler = logging.handlers.TimedRotatingFileHandler(logf,'D',1,30)
212             elif os.name == 'posix':
213                 handler = logging.handlers.WatchedFileHandler(logf)
214             else:
215                 handler = logging.handlers.FileHandler(logf)
216         except Exception:
217             sys.stderr.write("ERROR: couldn't create the logfile directory. Logging to the standard output.\n")
218             handler = logging.StreamHandler(sys.stdout)
219     else:
220         # Normal Handler on standard output
221         handler = logging.StreamHandler(sys.stdout)
222
223     if isinstance(handler, logging.StreamHandler) and os.isatty(handler.stream.fileno()):
224         formatter = ColoredFormatter(format)
225     else:
226         formatter = DBFormatter(format)
227     handler.setFormatter(formatter)
228
229     # add the handler to the root logger
230     logger = logging.getLogger()
231     logger.handlers = []
232     logger.addHandler(handler)
233     logger.setLevel(int(tools.config['log_level'] or '0'))
234
235 # A alternative logging scheme for automated runs of the
236 # server intended to test it.
237 def init_alternative_logger():
238     class H(logging.Handler):
239       def emit(self, record):
240         if record.levelno > 20:
241           print record.levelno, record.pathname, record.msg
242     handler = H()
243     logger = logging.getLogger()
244     logger.handlers = []
245     logger.addHandler(handler)
246     logger.setLevel(logging.ERROR)
247
248 class Agent(object):
249     """ Singleton that keeps track of cancellable tasks to run at a given
250         timestamp.
251        
252         The tasks are characterised by:
253        
254             * a timestamp
255             * the database on which the task run
256             * a boolean attribute specifying if the task is canceled
257
258         Implementation details:
259         
260           - Tasks are stored as list, allowing the cancellation by setting
261             the boolean to True.
262           - A heapq is used to store tasks, so we don't need to sort
263             tasks ourself.
264     """
265     _wakeups = []
266     _wakeup_by_db = {}
267     _logger = logging.getLogger('netsvc.agent')
268
269     @classmethod
270     def cancel(cls, db_name):
271         """ Cancel next wakeup for a given database. """
272         cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
273         if db_name in cls._wakeup_by_db:
274             cls._wakeup_by_db[db_name][2] = True
275
276     @classmethod
277     def cancel_all(cls):
278         cls._wakeups = []
279         cls._wakeup_by_db = {}
280
281     @classmethod
282     def schedule_in_advance(cls, timestamp, db_name):
283         if not timestamp:
284             return
285         # Cancel the previous wakeup if any.
286         add_wakeup = False
287         if db_name in cls._wakeup_by_db:
288             task = cls._wakeup_by_db[db_name]
289             if task[2] or timestamp < task[0]:
290                 add_wakeup = True
291                 task[2] = True
292         else:
293             add_wakeup = True
294         if add_wakeup:
295             print ">>> rescheduled earlier", timestamp
296             task = [timestamp, db_name, False]
297             heapq.heappush(cls._wakeups, task)
298             cls._wakeup_by_db[db_name] = task
299
300     @classmethod
301     def runner(cls):
302         """Neverending function (intended to be ran in a dedicated thread) that
303            checks every 60 seconds tasks to run. TODO: make configurable
304         """
305         while True:
306             print ">>>>> cron for"
307             while cls._wakeups and cls._wakeups[0][0] < time.time():
308                 task = heapq.heappop(cls._wakeups)
309                 timestamp, db_name, canceled = task
310                 del cls._wakeup_by_db[db_name]
311                 if canceled:
312                     continue
313                 ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
314                 ir_cron._run_jobs()
315             time.sleep(60)
316
317 def start_agent():
318     agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
319     # the agent runner is a typical daemon thread, that will never quit and must be
320     # terminated when the main process exits - with no consequence (the processing
321     # threads it spawns are not marked daemon)
322     agent_runner.setDaemon(True)
323     agent_runner.start()
324
325 import traceback
326
327 class Server:
328     """ Generic interface for all servers with an event loop etc.
329         Override this to impement http, net-rpc etc. servers.
330
331         Servers here must have threaded behaviour. start() must not block,
332         there is no run().
333     """
334     __is_started = False
335     __servers = []
336     __starter_threads = []
337
338     # we don't want blocking server calls (think select()) to
339     # wait forever and possibly prevent exiting the process,
340     # but instead we want a form of polling/busy_wait pattern, where
341     # _server_timeout should be used as the default timeout for
342     # all I/O blocking operations
343     _busywait_timeout = 0.5
344
345
346     __logger = logging.getLogger('server')
347
348     def __init__(self):
349         Server.__servers.append(self)
350         if Server.__is_started:
351             # raise Exception('All instances of servers must be inited before the startAll()')
352             # Since the startAll() won't be called again, allow this server to
353             # init and then start it after 1sec (hopefully). Register that
354             # timer thread in a list, so that we can abort the start if quitAll
355             # is called in the meantime
356             t = threading.Timer(1.0, self._late_start)
357             t.name = 'Late start timer for %s' % str(self.__class__)
358             Server.__starter_threads.append(t)
359             t.start()
360
361     def start(self):
362         self.__logger.debug("called stub Server.start")
363         
364     def _late_start(self):
365         self.start()
366         for thr in Server.__starter_threads:
367             if thr.finished.is_set():
368                 Server.__starter_threads.remove(thr)
369
370     def stop(self):
371         self.__logger.debug("called stub Server.stop")
372
373     def stats(self):
374         """ This function should return statistics about the server """
375         return "%s: No statistics" % str(self.__class__)
376
377     @classmethod
378     def startAll(cls):
379         if cls.__is_started:
380             return
381         cls.__logger.info("Starting %d services" % len(cls.__servers))
382         for srv in cls.__servers:
383             srv.start()
384         cls.__is_started = True
385
386     @classmethod
387     def quitAll(cls):
388         if not cls.__is_started:
389             return
390         cls.__logger.info("Stopping %d services" % len(cls.__servers))
391         for thr in cls.__starter_threads:
392             if not thr.finished.is_set():
393                 thr.cancel()
394             cls.__starter_threads.remove(thr)
395
396         for srv in cls.__servers:
397             srv.stop()
398         cls.__is_started = False
399
400     @classmethod
401     def allStats(cls):
402         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
403         res.extend(srv.stats() for srv in cls.__servers)
404         return '\n'.join(res)
405
406     def _close_socket(self):
407         close_socket(self.socket)
408
409 class OpenERPDispatcherException(Exception):
410     def __init__(self, exception, traceback):
411         self.exception = exception
412         self.traceback = traceback
413
414 def replace_request_password(args):
415     # password is always 3rd argument in a request, we replace it in RPC logs
416     # so it's easier to forward logs for diagnostics/debugging purposes...
417     args = list(args)
418     if len(args) > 2:
419         args[2] = '*'
420     return args
421
422 def log(title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
423     logger = logging.getLogger(title)
424     if logger.isEnabledFor(channel):
425         indent=''
426         indent_after=' '*len(fn)
427         for line in (fn+pformat(msg, depth=depth)).split('\n'):
428             logger.log(channel, indent+line)
429             indent=indent_after
430
431 class OpenERPDispatcher:
432     def log(self, title, msg, channel=logging.DEBUG_RPC, depth=None, fn=""):
433         log(title, msg, channel=channel, depth=depth, fn=fn)
434     def dispatch(self, service_name, method, params):
435         try:
436             auth = getattr(self, 'auth_provider', None)
437             logger = logging.getLogger('result')
438             start_time = end_time = 0
439             if logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
440                 self.log('service', tuple(replace_request_password(params)), depth=None, fn='%s.%s'%(service_name,method))
441             if logger.isEnabledFor(logging.DEBUG_RPC):
442                 start_time = time.time()
443             result = ExportService.getService(service_name).dispatch(method, auth, params)
444             if logger.isEnabledFor(logging.DEBUG_RPC):
445                 end_time = time.time()
446             if not logger.isEnabledFor(logging.DEBUG_RPC_ANSWER):
447                 self.log('service (%.3fs)' % (end_time - start_time), tuple(replace_request_password(params)), depth=1, fn='%s.%s'%(service_name,method))
448             self.log('execution time', '%.3fs' % (end_time - start_time), channel=logging.DEBUG_RPC_ANSWER)
449             self.log('result', result, channel=logging.DEBUG_RPC_ANSWER)
450             return result
451         except Exception, e:
452             self.log('exception', tools.exception_to_unicode(e))
453             tb = getattr(e, 'traceback', sys.exc_info())
454             tb_s = "".join(traceback.format_exception(*tb))
455             if tools.config['debug_mode'] and isinstance(tb, types.TracebackType):
456                 import pdb
457                 pdb.post_mortem(tb[2])
458             raise OpenERPDispatcherException(e, tb_s)
459
460 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: