[MERGE] merged long-polling branch:
authorVo Minh Thu <vmt@openerp.com>
Mon, 18 Feb 2013 15:32:25 +0000 (16:32 +0100)
committerVo Minh Thu <vmt@openerp.com>
Mon, 18 Feb 2013 15:32:25 +0000 (16:32 +0100)
When using --workers, an evented (for longpolling) worker is started.
When passing --gevent, all the server is evented but the cron is disabled.

bzr revid: vmt@openerp.com-20130218153225-w1yft9je0b15owdv

openerp/cli/server.py
openerp/service/__init__.py
openerp/service/workers.py
openerp/service/wsgi_server.py
openerp/tools/config.py

index 2408703..dbb77d7 100644 (file)
@@ -230,6 +230,14 @@ def main(args):
     check_root_user()
     openerp.tools.config.parse_config(args)
 
+    if openerp.tools.config.options["gevent"]:
+        openerp.evented = True
+        _logger.info('Using gevent mode')
+        import gevent.monkey
+        gevent.monkey.patch_all()
+        import gevent_psycopg2
+        gevent_psycopg2.monkey_patch()
+
     check_postgres_user()
     openerp.netsvc.init_logger()
     report_configuration()
index 2eb80e5..d669046 100644 (file)
@@ -31,6 +31,7 @@ import time
 import cron
 import wsgi_server
 
+import openerp
 import openerp.modules
 import openerp.netsvc
 import openerp.osv
@@ -85,12 +86,14 @@ def start_services():
     # Start the WSGI server.
     wsgi_server.start_service()
     # Start the main cron thread.
-    cron.start_service()
+    if not openerp.evented:
+        cron.start_service()
 
 def stop_services():
     """ Stop all services. """
     # stop services
-    cron.stop_service()
+    if not openerp.evented:
+        cron.stop_service()
     wsgi_server.stop_service()
 
     _logger.info("Initiating shutdown")
index 8d9905c..0f962cd 100644 (file)
@@ -35,6 +35,7 @@ class Multicorn(object):
     def __init__(self, app):
         # config
         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
+        self.long_polling_address = (config['xmlrpc_interface'] or '0.0.0.0', config['longpolling_port'])
         self.population = config['workers']
         self.timeout = config['limit_time_real']
         self.limit_request = config['limit_request']
@@ -45,6 +46,7 @@ class Multicorn(object):
         self.socket = None
         self.workers_http = {}
         self.workers_cron = {}
+        self.workers_longpolling = {}
         self.workers = {}
         self.generation = 0
         self.queue = []
@@ -131,7 +133,8 @@ class Multicorn(object):
     def process_timeout(self):
         now = time.time()
         for (pid, worker) in self.workers.items():
-            if now - worker.watchdog_time >= worker.watchdog_timeout:
+            if (worker.watchdog_timeout is not None) and \
+                (now - worker.watchdog_time >= worker.watchdog_timeout):
                 _logger.error("Worker (%s) timeout", pid)
                 self.worker_kill(pid, signal.SIGKILL)
 
@@ -140,6 +143,8 @@ class Multicorn(object):
             self.worker_spawn(WorkerHTTP, self.workers_http)
         while len(self.workers_cron) < config['max_cron_threads']:
             self.worker_spawn(WorkerCron, self.workers_cron)
+        while len(self.workers_longpolling) < 1:
+            self.worker_spawn(WorkerLongPolling, self.workers_longpolling)
 
     def sleep(self):
         try:
@@ -178,6 +183,12 @@ class Multicorn(object):
         self.socket.setblocking(0)
         self.socket.bind(self.address)
         self.socket.listen(8)
+        # long polling socket
+        self.long_polling_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.long_polling_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.long_polling_socket.setblocking(0)
+        self.long_polling_socket.bind(self.long_polling_address)
+        self.long_polling_socket.listen(8)
 
     def stop(self, graceful=True):
         if graceful:
@@ -221,6 +232,7 @@ class Worker(object):
         self.multi = multi
         self.watchdog_time = time.time()
         self.watchdog_pipe = multi.pipe_new()
+        # Can be set to None if no watchdog is desired.
         self.watchdog_timeout = multi.timeout
         self.ppid = os.getpid()
         self.pid = None
@@ -340,6 +352,26 @@ class WorkerHTTP(Worker):
         Worker.start(self)
         self.server = WorkerBaseWSGIServer(self.multi.app)
 
+class WorkerLongPolling(Worker):
+    """ Long polling workers """
+    def __init__(self, multi):
+        super(WorkerLongPolling, self).__init__(multi)
+        # Disable the watchdog feature for this kind of worker.
+        self.watchdog_timeout = None
+
+    def start(self):
+        openerp.evented = True
+        _logger.info('Using gevent mode')
+        import gevent.monkey
+        gevent.monkey.patch_all()
+        import gevent_psycopg2
+        gevent_psycopg2.monkey_patch()
+
+        Worker.start(self)
+        from gevent.wsgi import WSGIServer
+        self.server = WSGIServer(self.multi.long_polling_socket, self.multi.app)
+        self.server.serve_forever()
+
 class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
     """ werkzeug WSGI Server patched to allow using an external listen socket
     """
index 1513094..e9e391d 100644 (file)
@@ -426,7 +426,11 @@ def serve(interface, port, threaded):
     """
 
     global httpd
-    httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
+    if not openerp.evented:
+        httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
+    else:
+        from gevent.wsgi import WSGIServer
+        httpd = WSGIServer((interface, port), application)
     httpd.serve_forever()
 
 def start_service():
@@ -446,8 +450,13 @@ def stop_service():
     The server is supposed to have been started by start_server() above.
     """
     if httpd:
-        httpd.shutdown()
-        close_socket(httpd.socket)
+        if not openerp.evented:
+            httpd.shutdown()
+            close_socket(httpd.socket)
+        else:
+            import gevent
+            httpd.stop()
+            gevent.shutdown()
 
 def close_socket(sock):
     """ Closes a socket instance cleanly
index 3b04881..8fa65f9 100644 (file)
@@ -107,6 +107,7 @@ class configmanager(object):
                          help="specify additional addons paths (separated by commas).",
                          action="callback", callback=self._check_addons_path, nargs=1, type="string")
         group.add_option("--load", dest="server_wide_modules", help="Comma-separated list of server-wide modules default=web")
+        group.add_option("--gevent", dest="gevent", action="store_true", my_default=False, help="Activate the GEvent mode, this also desactivate the cron.")
         parser.add_option_group(group)
 
         # XML-RPC / HTTP
@@ -119,6 +120,8 @@ class configmanager(object):
                          help="disable the XML-RPC protocol")
         group.add_option("--proxy-mode", dest="proxy_mode", action="store_true", my_default=False,
                          help="Enable correct behavior when behind a reverse proxy")
+        group.add_option("--longpolling-port", dest="longpolling_port", my_default=8072,
+                         help="specify the TCP port for longpolling requests", type="int")
         parser.add_option_group(group)
 
         # XML-RPC / HTTPS
@@ -376,7 +379,8 @@ class configmanager(object):
             self.options['pidfile'] = False
 
         # if defined dont take the configfile value even if the defined value is None
-        keys = ['xmlrpc_interface', 'xmlrpc_port', 'db_name', 'db_user', 'db_password', 'db_host',
+        keys = ['xmlrpc_interface', 'xmlrpc_port', 'longpolling_port',
+                'db_name', 'db_user', 'db_password', 'db_host',
                 'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port',
                 'email_from', 'smtp_server', 'smtp_user', 'smtp_password',
                 'netrpc_interface', 'netrpc_port', 'db_maxconn', 'import_partial', 'addons_path',
@@ -407,7 +411,7 @@ class configmanager(object):
             'list_db', 'xmlrpcs', 'proxy_mode',
             'test_file', 'test_enable', 'test_commit', 'test_report_directory',
             'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent',
-            'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request'
+            'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request', 'gevent'
         ]
 
         for arg in keys: