[MERGE] forward port of branch 8.0 up to 591e329
[odoo/odoo.git] / openerp / service / server.py
index 6a87c85..2189cc6 100644 (file)
@@ -9,7 +9,6 @@ import os.path
 import platform
 import psutil
 import random
-import resource
 import select
 import signal
 import socket
@@ -17,34 +16,53 @@ import subprocess
 import sys
 import threading
 import time
+import unittest2
 
 import werkzeug.serving
 
-try:
+if os.name == 'posix':
+    # Unix only for workers
     import fcntl
-except ImportError:
-    pass
+    import resource
+else:
+    # Windows shim
+    signal.SIGHUP = -1
+
+# Optional process names for workers
 try:
     from setproctitle import setproctitle
 except ImportError:
     setproctitle = lambda x: None
 
 import openerp
-import openerp.tools.config as config
+from openerp.modules.registry import RegistryManager
 from openerp.release import nt_service_name
+import openerp.tools.config as config
 from openerp.tools.misc import stripped_sys_argv, dumpstacks
 
-import wsgi_server
-
 _logger = logging.getLogger(__name__)
 
-SLEEP_INTERVAL = 60 # 1 min
+try:
+    import watchdog
+    from watchdog.observers import Observer
+    from watchdog.events import FileCreatedEvent, FileModifiedEvent
+except ImportError:
+    watchdog = None
+
+SLEEP_INTERVAL = 60     # 1 min
 
 #----------------------------------------------------------
 # Werkzeug WSGI servers patched
 #----------------------------------------------------------
-
-class BaseWSGIServerNoBind(werkzeug.serving.BaseWSGIServer):
+class LoggingBaseWSGIServerMixIn(object):
+    def handle_error(self, request, client_address):
+        t, e, _ = sys.exc_info()
+        if t == socket.error and e.errno == errno.EPIPE:
+            # broken pipe, ignore error
+            return
+        _logger.exception('Exception happened during processing of request from %s', client_address)
+
+class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
     use this class, sets the socket and calls the process_request() manually
     """
@@ -59,15 +77,27 @@ class BaseWSGIServerNoBind(werkzeug.serving.BaseWSGIServer):
         # dont listen as we use PreforkServer#socket
         pass
 
+
+class RequestHandler(werkzeug.serving.WSGIRequestHandler):
+    def setup(self):
+        # flag the current thread as handling a http request
+        super(RequestHandler, self).setup()
+        me = threading.currentThread()
+        me.name = 'openerp.service.http.request.%s' % (me.ident,)
+
 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
 # should also work with systemd socket activation. This is currently untested
 # and not yet used.
 
-class ThreadedWSGIServerReloadable(werkzeug.serving.ThreadedWSGIServer):
+class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
     given by the environement, this is used by autoreload to keep the listen
     socket open when a reload happens.
     """
+    def __init__(self, host, port, app):
+        super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
+                                                           handler=RequestHandler)
+
     def server_bind(self):
         envfd = os.environ.get('LISTEN_FDS')
         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
@@ -83,91 +113,37 @@ class ThreadedWSGIServerReloadable(werkzeug.serving.ThreadedWSGIServer):
             super(ThreadedWSGIServerReloadable, self).server_activate()
 
 #----------------------------------------------------------
-# AutoReload watcher
+# FileSystem Watcher for autoreload and cache invalidation
 #----------------------------------------------------------
-
-class AutoReload(object):
-    def __init__(self, server):
-        self.server = server
-        self.files = {}
-        self.modules = {}
-        import pyinotify
-        class EventHandler(pyinotify.ProcessEvent):
-            def __init__(self, autoreload):
-                self.autoreload = autoreload
-
-            def process_IN_CREATE(self, event):
-                _logger.debug('File created: %s', event.pathname)
-                self.autoreload.files[event.pathname] = 1
-
-            def process_IN_MODIFY(self, event):
-                _logger.debug('File modified: %s', event.pathname)
-                self.autoreload.files[event.pathname] = 1
-
-        self.wm = pyinotify.WatchManager()
-        self.handler = EventHandler(self)
-        self.notifier = pyinotify.Notifier(self.wm, self.handler, timeout=0)
-        mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE  # IN_MOVED_FROM, IN_MOVED_TO ?
-        for path in openerp.tools.config.options["addons_path"].split(','):
+class FSWatcher(object):
+    def __init__(self):
+        self.observer = Observer()
+        for path in openerp.modules.module.ad_paths:
             _logger.info('Watching addons folder %s', path)
-            self.wm.add_watch(path, mask, rec=True)
-
-    def process_data(self, files):
-        xml_files = [i for i in files if i.endswith('.xml')]
-        addons_path = openerp.tools.config.options["addons_path"].split(',')
-        for i in xml_files:
-            for path in addons_path:
-                if i.startswith(path):
-                    # find out wich addons path the file belongs to
-                    # and extract it's module name
-                    right = i[len(path) + 1:].split('/')
-                    if len(right) < 2:
-                        continue
-                    module = right[0]
-                    self.modules[module]=1
-        if self.modules:
-            _logger.info('autoreload: xml change detected, autoreload activated')
-            restart()
-
-    def process_python(self, files):
-        # process python changes
-        py_files = [i for i in files if i.endswith('.py')]
-        py_errors = []
-        # TODO keep python errors until they are ok
-        if py_files:
-            for i in py_files:
-                try:
-                    source = open(i, 'rb').read() + '\n'
-                    compile(source, i, 'exec')
-                except SyntaxError:
-                    py_errors.append(i)
-            if py_errors:
-                _logger.info('autoreload: python code change detected, errors found')
-                for i in py_errors:
-                    _logger.info('autoreload: SyntaxError %s',i)
-            else:
-                _logger.info('autoreload: python code updated, autoreload activated')
-                restart()
-
-    def check_thread(self):
-        # Check if some files have been touched in the addons path.
-        # If true, check if the touched file belongs to an installed module
-        # in any of the database used in the registry manager.
-        while 1:
-            while self.notifier.check_events(1000):
-                self.notifier.read_events()
-                self.notifier.process_events()
-            l = self.files.keys()
-            self.files.clear()
-            self.process_data(l)
-            self.process_python(l)
+            self.observer.schedule(self, path, recursive=True)
+
+    def dispatch(self, event):
+        if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
+            if not event.is_directory:
+                path = event.src_path
+                if path.endswith('.py'):
+                    try:
+                        source = open(path, 'rb').read() + '\n'
+                        compile(source, path, 'exec')
+                    except SyntaxError:
+                        _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
+                    else:
+                        _logger.info('autoreload: python code updated, autoreload activated')
+                        restart()
 
-    def run(self):
-        t = threading.Thread(target=self.check_thread)
-        t.setDaemon(True)
-        t.start()
+    def start(self):
+        self.observer.start()
         _logger.info('AutoReload watcher running')
 
+    def stop(self):
+        self.observer.stop()
+        self.observer.join()
+
 #----------------------------------------------------------
 # Servers: Threaded, Gevented and Prefork
 #----------------------------------------------------------
@@ -211,7 +187,7 @@ class ThreadedServer(CommonServer):
         self.httpd = None
 
     def signal_handler(self, sig, frame):
-        if sig in [signal.SIGINT,signal.SIGTERM]:
+        if sig in [signal.SIGINT, signal.SIGTERM]:
             # shutdown on kill -INT or -TERM
             self.quit_signals_received += 1
             if self.quit_signals_received > 1:
@@ -225,11 +201,11 @@ class ThreadedServer(CommonServer):
 
     def cron_thread(self, number):
         while True:
-            time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
+            time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
             registries = openerp.modules.registry.RegistryManager.registries
             _logger.debug('cron%d polling for jobs', number)
-            for db_name, registry in registries.items():
-                while True and registry.ready:
+            for db_name, registry in registries.iteritems():
+                while registry.ready:
                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
                     if not acquired:
                         break
@@ -255,16 +231,18 @@ class ThreadedServer(CommonServer):
             _logger.debug("cron%d started!" % i)
 
     def http_thread(self):
-        def app(e,s):
-            return self.app(e,s)
+        def app(e, s):
+            return self.app(e, s)
         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
         self.httpd.serve_forever()
 
     def http_spawn(self):
-        threading.Thread(target=self.http_thread).start()
+        t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
+        t.setDaemon(True)
+        t.start()
         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
 
-    def start(self):
+    def start(self, stop=False):
         _logger.debug("Setting signal handlers")
         if os.name == 'posix':
             signal.signal(signal.SIGINT, self.signal_handler)
@@ -274,9 +252,16 @@ class ThreadedServer(CommonServer):
             signal.signal(signal.SIGQUIT, dumpstacks)
         elif os.name == 'nt':
             import win32api
-            win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
-        self.cron_spawn()
-        self.http_spawn()
+            win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
+
+        test_mode = config['test_enable'] or config['test_file']
+        if not stop or test_mode:
+            # some tests need the http deamon to be available...
+            self.http_spawn()
+
+        if not stop:
+            # only relevant if we are not in "--stop-after-init" mode
+            self.cron_spawn()
 
     def stop(self):
         """ Shutdown the WSGI server. Wait for non deamon threads.
@@ -284,8 +269,9 @@ class ThreadedServer(CommonServer):
         _logger.info("Initiating shutdown")
         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
 
-        self.httpd.shutdown()
-        self.close_socket(self.httpd.socket)
+        if self.httpd:
+            self.httpd.shutdown()
+            self.close_socket(self.httpd.socket)
 
         # Manually join() all threads before calling sys.exit() to allow a second signal
         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
@@ -306,13 +292,19 @@ class ThreadedServer(CommonServer):
         openerp.modules.registry.RegistryManager.delete_all()
         logging.shutdown()
 
-    def run(self):
+    def run(self, preload=None, stop=False):
         """ Start the http server and the cron thread then wait for a signal.
 
         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
         a second one if any will force an immediate exit.
         """
-        self.start()
+        self.start(stop=stop)
+
+        rc = preload_registries(preload)
+
+        if stop:
+            self.stop()
+            return rc
 
         # Wait for a first signal to be handled. (time.sleep will be interrupted
         # by the signal handler.) The try/except is for the win32 case.
@@ -355,14 +347,18 @@ class GeventServer(CommonServer):
         gevent.spawn(self.watch_parent)
         self.httpd = WSGIServer((self.interface, self.port), self.app)
         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
-        self.httpd.serve_forever()
+        try:
+            self.httpd.serve_forever()
+        except:
+            _logger.exception("Evented Service (longpolling): uncaught error during main loop")
+            raise
 
     def stop(self):
         import gevent
         self.httpd.stop()
         gevent.shutdown()
 
-    def run(self):
+    def run(self, preload, stop):
         self.start()
         self.stop()
 
@@ -433,15 +429,17 @@ class PreforkServer(CommonServer):
         cmd = nargs[0]
         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
         nargs[0] = cmd
-        popen = subprocess.Popen(nargs)
+        popen = subprocess.Popen([sys.executable] + nargs)
         self.long_polling_pid = popen.pid
 
     def worker_pop(self, pid):
+        if pid == self.long_polling_pid:
+            self.long_polling_pid = None
         if pid in self.workers:
-            _logger.debug("Worker (%s) unregistered",pid)
+            _logger.debug("Worker (%s) unregistered", pid)
             try:
-                self.workers_http.pop(pid,None)
-                self.workers_cron.pop(pid,None)
+                self.workers_http.pop(pid, None)
+                self.workers_cron.pop(pid, None)
                 u = self.workers.pop(pid)
                 u.close()
             except OSError:
@@ -457,7 +455,7 @@ class PreforkServer(CommonServer):
     def process_signals(self):
         while len(self.queue):
             sig = self.queue.pop(0)
-            if sig in [signal.SIGINT,signal.SIGTERM]:
+            if sig in [signal.SIGINT, signal.SIGTERM]:
                 raise KeyboardInterrupt
             elif sig == signal.SIGHUP:
                 # restart on kill -HUP
@@ -493,8 +491,8 @@ class PreforkServer(CommonServer):
     def process_timeout(self):
         now = time.time()
         for (pid, worker) in self.workers.items():
-            if (worker.watchdog_timeout is not None) and \
-                (now - worker.watchdog_time >= worker.watchdog_timeout):
+            if worker.watchdog_timeout is not None and \
+                    (now - worker.watchdog_time) >= worker.watchdog_timeout:
                 _logger.error("Worker (%s) timeout", pid)
                 self.worker_kill(pid, signal.SIGKILL)
 
@@ -509,7 +507,7 @@ class PreforkServer(CommonServer):
     def sleep(self):
         try:
             # map of fd -> worker
-            fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
+            fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
             fd_in = fds.keys() + [self.pipe[0]]
             # check for ping or internal wakeups
             ready = select.select(fd_in, [], [], self.beat)
@@ -529,8 +527,6 @@ class PreforkServer(CommonServer):
                 raise
 
     def start(self):
-        # Empty the cursor pool, we dont want them to be shared among forked workers.
-        openerp.sql_db.close_all()
         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
         # signal handler to overcome this behaviour
@@ -549,11 +545,12 @@ class PreforkServer(CommonServer):
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.socket.setblocking(0)
         self.socket.bind(self.address)
-        self.socket.listen(8*self.population)
+        self.socket.listen(8 * self.population)
 
     def stop(self, graceful=True):
         if self.long_polling_pid is not None:
-            self.worker_kill(self.long_polling_pid, signal.SIGKILL)     # FIXME make longpolling process handle SIGTERM correctly
+            # FIXME make longpolling process handle SIGTERM correctly
+            self.worker_kill(self.long_polling_pid, signal.SIGKILL)
             self.long_polling_pid = None
         if graceful:
             _logger.info("Stopping gracefully")
@@ -569,8 +566,18 @@ class PreforkServer(CommonServer):
             self.worker_kill(pid, signal.SIGTERM)
         self.socket.close()
 
-    def run(self):
+    def run(self, preload, stop):
         self.start()
+
+        rc = preload_registries(preload)
+
+        if stop:
+            self.stop()
+            return rc
+
+        # Empty the cursor pool, we dont want them to be shared among forked workers.
+        openerp.sql_db.close_all()
+
         _logger.debug("Multiprocess starting")
         while 1:
             try:
@@ -584,10 +591,10 @@ class PreforkServer(CommonServer):
                 _logger.debug("Multiprocess clean stop")
                 self.stop()
                 break
-            except Exception,e:
+            except Exception, e:
                 _logger.exception(e)
                 self.stop(False)
-                sys.exit(-1)
+                return -1
 
 class Worker(object):
     """ Workers """
@@ -616,7 +623,7 @@ class Worker(object):
 
     def sleep(self):
         try:
-            ret = select.select([self.multi.socket], [], [], self.multi.beat)
+            select.select([self.multi.socket], [], [], self.multi.beat)
         except select.error, e:
             if e[0] not in [errno.EINTR]:
                 raise
@@ -634,7 +641,7 @@ class Worker(object):
         rss, vms = psutil.Process(os.getpid()).get_memory_info()
         if vms > config['limit_memory_soft']:
             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
-            self.alive = False # Commit suicide after the request.
+            self.alive = False      # Commit suicide after the request.
 
         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
@@ -682,7 +689,7 @@ class Worker(object):
                 self.process_work()
             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
             self.stop()
-        except Exception,e:
+        except Exception:
             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
             # should we use 3 to abort everything ?
             sys.exit(1)
@@ -700,7 +707,7 @@ class WorkerHTTP(Worker):
         # tolerate broken pipe when the http client closes the socket before
         # receiving the full reply
         try:
-            self.server.process_request(client,addr)
+            self.server.process_request(client, addr)
         except IOError, e:
             if e.errno != errno.EPIPE:
                 raise
@@ -731,7 +738,7 @@ class WorkerCron(Worker):
     def sleep(self):
         # Really sleep once all the databases have been processed.
         if self.db_index == 0:
-            interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
+            interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
             time.sleep(interval)
 
     def _db_list(self):
@@ -753,7 +760,7 @@ class WorkerCron(Worker):
             if rpc_request_flag:
                 start_time = time.time()
                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
-            
+
             import openerp.addons.base as base
             base.ir.ir_cron.ir_cron._acquire_job(db_name)
             openerp.modules.registry.RegistryManager.delete(db_name)
@@ -762,16 +769,18 @@ class WorkerCron(Worker):
             if len(db_names) > 1:
                 openerp.sql_db.close_db(db_name)
             if rpc_request_flag:
-                end_time = time.time()
+                run_time = time.time() - start_time
                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
-                logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
+                vms_diff = (end_vms - start_vms) / 1024
+                logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
+                    (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
 
             self.request_count += 1
             if self.request_count >= self.request_max and self.request_max < len(db_names):
                 _logger.error("There are more dabatases to process than allowed "
-                    "by the `limit_request` configuration variable: %s more.",
-                    len(db_names) - self.request_max)
+                              "by the `limit_request` configuration variable: %s more.",
+                              len(db_names) - self.request_max)
         else:
             self.db_index = 0
 
@@ -804,12 +813,68 @@ def _reexec(updated_modules=None):
         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
     exe = os.path.basename(sys.executable)
     args = stripped_sys_argv()
-    args +=  ["-u", ','.join(updated_modules)]
+    if updated_modules:
+        args += ["-u", ','.join(updated_modules)]
     if not args or args[0] != exe:
         args.insert(0, exe)
     os.execv(sys.executable, args)
 
-def start():
+def load_test_file_yml(registry, test_file):
+    with registry.cursor() as cr:
+        openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
+        if config['test_commit']:
+            _logger.info('test %s has been commited', test_file)
+            cr.commit()
+        else:
+            _logger.info('test %s has been rollbacked', test_file)
+            cr.rollback()
+
+def load_test_file_py(registry, test_file):
+    # Locate python module based on its filename and run the tests
+    test_path, _ = os.path.splitext(os.path.abspath(test_file))
+    for mod_name, mod_mod in sys.modules.items():
+        if mod_mod:
+            mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
+            if test_path == mod_path:
+                suite = unittest2.TestSuite()
+                for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
+                    suite.addTest(t)
+                _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
+                stream = openerp.modules.module.TestStream()
+                result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
+                success = result.wasSuccessful()
+                if hasattr(registry._assertion_report,'report_result'):
+                    registry._assertion_report.report_result(success)
+                if not success:
+                    _logger.error('%s: at least one error occurred in a test', test_file)
+
+def preload_registries(dbnames):
+    """ Preload a registries, possibly run a test file."""
+    # TODO: move all config checks to args dont check tools.config here
+    config = openerp.tools.config
+    test_file = config['test_file']
+    dbnames = dbnames or []
+    rc = 0
+    for dbname in dbnames:
+        try:
+            update_module = config['init'] or config['update']
+            registry = RegistryManager.new(dbname, update_module=update_module)
+            # run test_file if provided
+            if test_file:
+                _logger.info('loading test file %s', test_file)
+                if test_file.endswith('yml'):
+                    load_test_file_yml(registry, test_file)
+                elif test_file.endswith('py'):
+                    load_test_file_py(registry, test_file)
+
+            if registry._assertion_report.failures:
+                rc += 1
+        except Exception:
+            _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
+            return -1
+    return rc
+
+def start(preload=None, stop=False):
     """ Start the openerp http server and cron processor.
     """
     global server
@@ -821,19 +886,23 @@ def start():
     else:
         server = ThreadedServer(openerp.service.wsgi_server.application)
 
-    if config['auto_reload']:
-        autoreload = AutoReload(server)
-        autoreload.run()
+    watcher = None
+    if config['dev_mode']:
+        if watchdog:
+            watcher = FSWatcher()
+            watcher.start()
+        else:
+            _logger.warning("'watchdog' module not installed. Code autoreload feature is disabled")
 
-    server.run()
+    rc = server.run(preload, stop)
 
     # like the legend of the phoenix, all ends with beginnings
     if getattr(openerp, 'phoenix', False):
-        modules = []
-        if config['auto_reload']:
-            modules = autoreload.modules.keys()
-        _reexec(modules)
-    sys.exit(0)
+        if watcher:
+            watcher.stop()
+        _reexec()
+
+    return rc if rc else 0
 
 def restart():
     """ Restart the server