multiprocessing signaling manually backported from 6.1
authorAntony Lesuisse <al@openerp.com>
Sat, 8 Dec 2012 18:11:51 +0000 (19:11 +0100)
committerAntony Lesuisse <al@openerp.com>
Sat, 8 Dec 2012 18:11:51 +0000 (19:11 +0100)
bzr revid: al@openerp.com-20121208181151-lfy956ysnok5b5hf

openerp/addons/base/ir/ir_ui_menu.py
openerp/addons/base/module/module.py
openerp/cron.py
openerp/modules/registry.py
openerp/osv/orm.py
openerp/service/web_services.py
openerp/tools/cache.py

index a8475d3..94b5c8a 100644 (file)
@@ -43,7 +43,7 @@ class ir_ui_menu(osv.osv):
 
     def __init__(self, *args, **kwargs):
         self.cache_lock = threading.RLock()
-        self.clear_cache()
+        self._cache = {}
         r = super(ir_ui_menu, self).__init__(*args, **kwargs)
         self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
         return r
@@ -51,6 +51,10 @@ class ir_ui_menu(osv.osv):
     def clear_cache(self):
         with self.cache_lock:
             # radical but this doesn't frequently happen
+            if self._cache:
+                # Normally this is done by openerp.tools.ormcache
+                # but since we do not use it, set it by ourself.
+                self.pool._any_cache_cleared = True
             self._cache = {}
 
     def _filter_visible_menus(self, cr, uid, ids, context=None):
index dbb8d6a..cbd046a 100644 (file)
@@ -30,6 +30,7 @@ import re
 import urllib
 import zipimport
 
+import openerp
 from openerp import modules, pooler, release, tools, addons
 from openerp.modules.db import create_categories
 from openerp.tools.parse_version import parse_version
@@ -385,6 +386,8 @@ class module(osv.osv):
         # Mark them to be installed.
         if to_install_ids:
             self.button_install(cr, uid, to_install_ids, context=context)
+
+        openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
         return dict(ACTION_DICT, name=_('Install'))
 
     def button_immediate_install(self, cr, uid, ids, context=None):
index 7b67877..8551ed7 100644 (file)
@@ -204,9 +204,12 @@ def start_master_thread():
         _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
                         "this may cause trouble if you reach that number of parallel cron tasks.",
                         db_maxconn, _thread_slots)
-    t = threading.Thread(target=runner, name="openerp.cron.master_thread")
-    t.setDaemon(True)
-    t.start()
-    _logger.debug("Master cron daemon started!")
+    if _thread_slots:
+        t = threading.Thread(target=runner, name="openerp.cron.master_thread")
+        t.setDaemon(True)
+        t.start()
+        _logger.debug("Master cron daemon started!")
+    else:
+        _logger.info("No master cron daemon (0 workers needed).")
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index 5aa2eca..cd59a0e 100644 (file)
@@ -58,6 +58,18 @@ class Registry(object):
         self.db_name = db_name
         self.db = openerp.sql_db.db_connect(db_name)
 
+        # Inter-process signaling (used only when openerp.multi_process is True):
+        # The `base_registry_signaling` sequence indicates the whole registry
+        # must be reloaded.
+        # The `base_cache_signaling sequence` indicates all caches must be
+        # invalidated (i.e. cleared).
+        self.base_registry_signaling_sequence = 1
+        self.base_cache_signaling_sequence = 1
+
+        # Flag indicating if at least one model cache has been cleared.
+        # Useful only in a multi-process context.
+        self._any_cache_cleared = False
+
         cr = self.db.cursor()
         has_unaccent = openerp.modules.db.has_unaccent(cr)
         if openerp.tools.config['unaccent'] and not has_unaccent:
@@ -121,6 +133,36 @@ class Registry(object):
         """
         for model in self.models.itervalues():
             model.clear_caches()
+        # Special case for ir_ui_menu which does not use openerp.tools.ormcache.
+        ir_ui_menu = self.models.get('ir.ui.menu')
+        if ir_ui_menu:
+            ir_ui_menu.clear_cache()
+
+
+    # Useful only in a multi-process context.
+    def reset_any_cache_cleared(self):
+        self._any_cache_cleared = False
+
+    # Useful only in a multi-process context.
+    def any_cache_cleared(self):
+        return self._any_cache_cleared
+
+    @classmethod
+    def setup_multi_process_signaling(cls, cr):
+        if not openerp.multi_process:
+            return
+
+        # Inter-process signaling:
+        # The `base_registry_signaling` sequence indicates the whole registry
+        # must be reloaded.
+        # The `base_cache_signaling sequence` indicates all caches must be
+        # invalidated (i.e. cleared).
+        cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""")
+        if not cr.fetchall():
+            cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""")
+            cr.execute("""SELECT nextval('base_registry_signaling')""")
+            cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""")
+            cr.execute("""SELECT nextval('base_cache_signaling')""")
 
     @contextmanager
     def cursor(self, auto_commit=True):
@@ -182,6 +224,7 @@ class RegistryManager(object):
 
             cr = registry.db.cursor()
             try:
+                Registry.setup_multi_process_signaling(cr)
                 registry.do_parent_store(cr)
                 registry.get('ir.actions.report.xml').register_all(cr)
                 cr.commit()
@@ -232,5 +275,71 @@ class RegistryManager(object):
             if db_name in cls.registries:
                 cls.registries[db_name].clear_caches()
 
+    @classmethod
+    def check_registry_signaling(cls, db_name):
+        if openerp.multi_process and db_name in cls.registries:
+            registry = cls.get(db_name, pooljobs=False)
+            cr = registry.db.cursor()
+            try:
+                cr.execute("""
+                    SELECT base_registry_signaling.last_value,
+                           base_cache_signaling.last_value
+                    FROM base_registry_signaling, base_cache_signaling""")
+                r, c = cr.fetchone()
+                # Check if the model registry must be reloaded (e.g. after the
+                # database has been updated by another process).
+                if registry.base_registry_signaling_sequence != r:
+                    _logger.info("Reloading the model registry after database signaling.")
+                    # Don't run the cron in the Gunicorn worker.
+                    registry = cls.new(db_name, pooljobs=False)
+                    registry.base_registry_signaling_sequence = r
+                # Check if the model caches must be invalidated (e.g. after a write
+                # occured on another process). Don't clear right after a registry
+                # has been reload.
+                elif registry.base_cache_signaling_sequence != c:
+                    _logger.info("Invalidating all model caches after database signaling.")
+                    registry.base_cache_signaling_sequence = c
+                    registry.clear_caches()
+                    registry.reset_any_cache_cleared()
+                    # One possible reason caches have been invalidated is the
+                    # use of decimal_precision.write(), in which case we need
+                    # to refresh fields.float columns.
+                    for model in registry.models.values():
+                        for column in model._columns.values():
+                            if hasattr(column, 'digits_change'):
+                                column.digits_change(cr)
+            finally:
+                cr.close()
+
+    @classmethod
+    def signal_caches_change(cls, db_name):
+        if openerp.multi_process and db_name in cls.registries:
+            # Check the registries if any cache has been cleared and signal it
+            # through the database to other processes.
+            registry = cls.get(db_name, pooljobs=False)
+            if registry.any_cache_cleared():
+                _logger.info("At least one model cache has been cleared, signaling through the database.")
+                cr = registry.db.cursor()
+                r = 1
+                try:
+                    cr.execute("select nextval('base_cache_signaling')")
+                    r = cr.fetchone()[0]
+                finally:
+                    cr.close()
+                registry.base_cache_signaling_sequence = r
+                registry.reset_any_cache_cleared()
+
+    @classmethod
+    def signal_registry_change(cls, db_name):
+        if openerp.multi_process and db_name in cls.registries:
+            registry = cls.get(db_name, pooljobs=False)
+            cr = registry.db.cursor()
+            r = 1
+            try:
+                cr.execute("select nextval('base_registry_signaling')")
+                r = cr.fetchone()[0]
+            finally:
+                cr.close()
+            registry.base_registry_signaling_sequence = r
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
index ef13253..faa71e9 100644 (file)
@@ -2509,6 +2509,7 @@ class BaseModel(object):
         try:
             getattr(self, '_ormcache')
             self._ormcache = {}
+            self.pool._any_cache_cleared = True
         except AttributeError:
             pass
 
index 4824cea..168c930 100644 (file)
@@ -597,8 +597,10 @@ class objects_proxy(netsvc.ExportService):
             raise NameError("Method not available %s" % method)
         security.check(db,uid,passwd)
         assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
+        openerp.modules.registry.RegistryManager.check_registry_signaling(db)
         fn = getattr(openerp.osv.osv.service, method)
         res = fn(db, uid, *params)
+        openerp.modules.registry.RegistryManager.signal_caches_change(db)
         return res
 
 
@@ -680,8 +682,10 @@ class report_spool(netsvc.ExportService):
         if method not in ['report', 'report_get', 'render_report']:
             raise KeyError("Method not supported %s" % method)
         security.check(db,uid,passwd)
+        openerp.modules.registry.RegistryManager.check_registry_signaling(db)
         fn = getattr(self, 'exp_' + method)
         res = fn(db, uid, *params)
+        openerp.modules.registry.RegistryManager.signal_caches_change(db)
         return res
 
     def exp_render_report(self, db, uid, object, ids, datas=None, context=None):
index 6e18007..4b4dcea 100644 (file)
@@ -57,10 +57,12 @@ class ormcache(object):
             try:
                 key = args[self.skiparg-2:]
                 del d[key]
+                self2.pool._any_cache_cleared = True
             except KeyError:
                 pass
         else:
             d.clear()
+            self2.pool._any_cache_cleared = True
 
 class ormcache_multi(ormcache):
     def __init__(self, skiparg=2, size=8192, multi=3):