[IMP] gunicorn: moved database signaling to RegistryManager.
authorVo Minh Thu <vmt@openerp.com>
Wed, 8 Feb 2012 14:28:34 +0000 (15:28 +0100)
committerVo Minh Thu <vmt@openerp.com>
Wed, 8 Feb 2012 14:28:34 +0000 (15:28 +0100)
bzr revid: vmt@openerp.com-20120208142834-52oxaq72gghj687h

openerp/__init__.py
openerp/addons/base/base.sql
openerp/modules/registry.py
openerp/osv/orm.py
openerp/osv/osv.py
openerp/tools/cache.py
openerp/wsgi.py

index 7e723c4..521a9ef 100644 (file)
@@ -45,5 +45,12 @@ import wizard
 import workflow
 import wsgi
 
+# Is the server running in multi-process mode (e.g. behind Gunicorn).
+# If this is True, the processes have to communicate some events,
+# e.g. database update or cache invalidation. Each process has also
+# its own copy of the data structure and we don't need to care about
+# locks between threads.
+multi_process = False
+
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
 
index 3f7f2e3..06a13d3 100644 (file)
@@ -353,7 +353,9 @@ CREATE TABLE ir_model_data (
 -- The `base_cache_signaling sequence` indicates all caches must be
 -- invalidated (i.e. cleared).
 CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_registry_signaling');
 CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_cache_signaling');
 
 ---------------------------------
 -- Users
index 82928c3..26e153f 100644 (file)
@@ -50,6 +50,9 @@ class Registry(object):
         self._init_parent = {}
         self.db_name = db_name
         self.db = openerp.sql_db.db_connect(db_name)
+        # Flag indicating if at least one model cache has been cleared.
+        # Useful only in a multi-process context.
+        self._any_cache_cleared = False
 
         cr = self.db.cursor()
         has_unaccent = openerp.modules.db.has_unaccent(cr)
@@ -115,6 +118,14 @@ class Registry(object):
         for model in self.models.itervalues():
             model.clear_caches()
 
+    # Useful only in a multi-process context.
+    def reset_any_cache_cleared(self):
+        self._any_cache_cleared = False
+
+    # Useful only in a multi-process context.
+    def any_cache_cleared(self):
+        return self._any_cache_cleared
+
 class RegistryManager(object):
     """ Model registries manager.
 
@@ -127,6 +138,14 @@ class RegistryManager(object):
     registries = {}
     registries_lock = threading.RLock()
 
+    # Inter-process signaling (used only when openerp.multi_process is True):
+    # The `base_registry_signaling` sequence indicates the whole registry
+    # must be reloaded.
+    # The `base_cache_signaling sequence` indicates all caches must be
+    # invalidated (i.e. cleared).
+    base_registry_signaling_sequence = 1
+    base_cache_signaling_sequence = 1
+
     @classmethod
     def get(cls, db_name, force_demo=False, status=None, update_module=False,
             pooljobs=True):
@@ -215,5 +234,55 @@ class RegistryManager(object):
             if db_name in cls.registries:
                 cls.registries[db_name].clear_caches()
 
+    @classmethod
+    def check_registry_signaling(cls, db_name):
+        if openerp.multi_process:
+            # Check if the model registry must be reloaded (e.g. after the
+            # database has been updated by another process).
+            cr = openerp.sql_db.db_connect(db_name).cursor()
+            registry_reloaded = False
+            try:
+                cr.execute('SELECT last_value FROM base_registry_signaling')
+                r = cr.fetchone()[0]
+                if cls.base_registry_signaling_sequence != r:
+                    _logger.info("Reloading the model registry after database signaling.")
+                    cls.base_registry_signaling_sequence = r
+                    # Don't run the cron in the Gunicorn worker.
+                    cls.new(db_name, pooljobs=False)
+                    registry_reloaded = True
+            finally:
+                cr.close()
+
+            # Check if the model caches must be invalidated (e.g. after a write
+            # occured on another process). Don't clear right after a registry
+            # has been reload.
+            cr = openerp.sql_db.db_connect(db_name).cursor()
+            try:
+                cr.execute('SELECT last_value FROM base_cache_signaling')
+                r = cr.fetchone()[0]
+                if cls.base_cache_signaling_sequence != r and not registry_reloaded:
+                    _logger.info("Invalidating all model caches after database signaling.")
+                    cls.base_cache_signaling_sequence = r
+                    registry = cls.get(db_name, pooljobs=False)
+                    registry.clear_caches()
+            finally:
+                cr.close()
+
+    @classmethod
+    def signal_caches_change(cls, db_name):
+        if openerp.multi_process:
+            # Check the registries if any cache has been cleared and signal it
+            # through the database to other processes.
+            registry = cls.get(db_name, pooljobs=False)
+            if registry.any_cache_cleared():
+                _logger.info("At least one model cache has been cleare, signaling through the database.")
+                cr = openerp.sql_db.db_connect(db_name).cursor()
+                try:
+                    pass
+                    # cr.execute("select nextval('base_registry_signaling')")
+                    # cls.base_cache_signaling to = result
+                finally:
+                    cr.close()
+                registry.reset_any_cache_cleared()
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 3e19093..47b9c9c 100644 (file)
@@ -2384,6 +2384,7 @@ class BaseModel(object):
         try:
             getattr(self, '_ormcache')
             self._ormcache = {}
+            self.pool._any_cache_cleared = True
         except AttributeError:
             pass
 
index 15eb90d..7ae672b 100644 (file)
@@ -45,14 +45,6 @@ class except_osv(Exception):
 
 service = None
 
-# Inter-process signaling:
-# The `base_registry_signaling` sequence indicates the whole registry
-# must be reloaded.
-# The `base_cache_signaling sequence` indicates all caches must be
-# invalidated (i.e. cleared).
-base_registry_signaling_sequence = None
-base_cache_signaling_sequence = None
-
 class object_proxy(object):
     def __init__(self):
         global service
@@ -176,39 +168,7 @@ class object_proxy(object):
 
     @check
     def execute(self, db, uid, obj, method, *args, **kw):
-
-        # Check if the model registry must be reloaded (e.g. after the
-        # database has been updated by another process).
-        cr = pooler.get_db(db).cursor()
-        registry_reloaded = False
-        try:
-            cr.execute('select last_value from base_registry_signaling')
-            r = cr.fetchone()[0]
-            global base_registry_signaling_sequence
-            if base_registry_signaling_sequence != r:
-                _logger.info("Reloading the model registry after database signaling.")
-                base_registry_signaling_sequence = r
-                # Don't run the cron in the Gunicorn worker.
-                openerp.modules.registry.RegistryManager.new(db, pooljobs=False)
-                registry_reloaded = True
-        finally:
-            cr.close()
-
-        # Check if the model caches must be invalidated (e.g. after a write
-        # occured on another process). Don't clear right after a registry
-        # has been reload.
-        cr = pooler.get_db(db).cursor()
-        try:
-            cr.execute('select last_value from base_cache_signaling')
-            r = cr.fetchone()[0]
-            global base_cache_signaling_sequence
-            if base_cache_signaling_sequence != r and not registry_reloaded:
-                _logger.info("Invalidating all model caches after database signaling.")
-                base_cache_signaling_sequence = r
-                registry = openerp.modules.registry.RegistryManager.get(db, pooljobs=False)
-                registry.clear_caches()
-        finally:
-            cr.close()
+        openerp.modules.registry.RegistryManager.check_registry_signaling(db)
 
         cr = pooler.get_db(db).cursor()
         try:
@@ -224,6 +184,8 @@ class object_proxy(object):
                 raise
         finally:
             cr.close()
+
+        openerp.modules.registry.RegistryManager.signal_caches_change(db)
         return res
 
     def exec_workflow_cr(self, cr, uid, obj, method, *args):
index 2c5c4a4..68474fb 100644 (file)
@@ -57,10 +57,12 @@ class ormcache(object):
             try:
                 key = args[self.skiparg-2:]
                 del d[key]
+                self2.pool._any_cache_cleared = True
             except KeyError:
                 pass
         else:
             d.clear()
+            self2.pool._any_cache_cleared = True
 
 class ormcache_multi(ormcache):
     def __init__(self, skiparg=2, size=8192, multi=3):
index f62c94d..b865a84 100644 (file)
@@ -456,6 +456,7 @@ arbiter_pid = None
 def on_starting(server):
     global arbiter_pid
     arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
+    openerp.multi_process = True # Yay!
     #openerp.tools.cache = kill_workers_cache
     openerp.netsvc.init_logger()
     openerp.osv.osv.start_object_proxy()