import workflow
import wsgi
+# Is the server running in multi-process mode (e.g. behind Gunicorn).
+# If this is True, the processes have to communicate some events,
+# e.g. database update or cache invalidation. Each process has also
+# its own copy of the data structure and we don't need to care about
+# locks between threads.
+multi_process = False
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
-- The `base_cache_signaling sequence` indicates all caches must be
-- invalidated (i.e. cleared).
CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_registry_signaling');
CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_cache_signaling');
---------------------------------
-- Users
self._init_parent = {}
self.db_name = db_name
self.db = openerp.sql_db.db_connect(db_name)
+ # Flag indicating if at least one model cache has been cleared.
+ # Useful only in a multi-process context.
+ self._any_cache_cleared = False
cr = self.db.cursor()
has_unaccent = openerp.modules.db.has_unaccent(cr)
for model in self.models.itervalues():
model.clear_caches()
+ # Useful only in a multi-process context.
+ def reset_any_cache_cleared(self):
+ self._any_cache_cleared = False
+
+ # Useful only in a multi-process context.
+ def any_cache_cleared(self):
+ return self._any_cache_cleared
+
class RegistryManager(object):
""" Model registries manager.
registries = {}
registries_lock = threading.RLock()
+ # Inter-process signaling (used only when openerp.multi_process is True):
+ # The `base_registry_signaling` sequence indicates the whole registry
+ # must be reloaded.
+ # The `base_cache_signaling sequence` indicates all caches must be
+ # invalidated (i.e. cleared).
+ base_registry_signaling_sequence = 1
+ base_cache_signaling_sequence = 1
+
@classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False,
pooljobs=True):
if db_name in cls.registries:
cls.registries[db_name].clear_caches()
+ @classmethod
+ def check_registry_signaling(cls, db_name):
+ if openerp.multi_process:
+ # Check if the model registry must be reloaded (e.g. after the
+ # database has been updated by another process).
+ cr = openerp.sql_db.db_connect(db_name).cursor()
+ registry_reloaded = False
+ try:
+ cr.execute('SELECT last_value FROM base_registry_signaling')
+ r = cr.fetchone()[0]
+ if cls.base_registry_signaling_sequence != r:
+ _logger.info("Reloading the model registry after database signaling.")
+ cls.base_registry_signaling_sequence = r
+ # Don't run the cron in the Gunicorn worker.
+ cls.new(db_name, pooljobs=False)
+ registry_reloaded = True
+ finally:
+ cr.close()
+
+ # Check if the model caches must be invalidated (e.g. after a write
+ # occured on another process). Don't clear right after a registry
+ # has been reload.
+ cr = openerp.sql_db.db_connect(db_name).cursor()
+ try:
+ cr.execute('SELECT last_value FROM base_cache_signaling')
+ r = cr.fetchone()[0]
+ if cls.base_cache_signaling_sequence != r and not registry_reloaded:
+ _logger.info("Invalidating all model caches after database signaling.")
+ cls.base_cache_signaling_sequence = r
+ registry = cls.get(db_name, pooljobs=False)
+ registry.clear_caches()
+ finally:
+ cr.close()
+
+ @classmethod
+ def signal_caches_change(cls, db_name):
+ if openerp.multi_process:
+ # Check the registries if any cache has been cleared and signal it
+ # through the database to other processes.
+ registry = cls.get(db_name, pooljobs=False)
+ if registry.any_cache_cleared():
+ _logger.info("At least one model cache has been cleare, signaling through the database.")
+ cr = openerp.sql_db.db_connect(db_name).cursor()
+ try:
+ pass
+ # cr.execute("select nextval('base_registry_signaling')")
+ # cls.base_cache_signaling to = result
+ finally:
+ cr.close()
+ registry.reset_any_cache_cleared()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
try:
getattr(self, '_ormcache')
self._ormcache = {}
+ self.pool._any_cache_cleared = True
except AttributeError:
pass
service = None
-# Inter-process signaling:
-# The `base_registry_signaling` sequence indicates the whole registry
-# must be reloaded.
-# The `base_cache_signaling sequence` indicates all caches must be
-# invalidated (i.e. cleared).
-base_registry_signaling_sequence = None
-base_cache_signaling_sequence = None
-
class object_proxy(object):
def __init__(self):
global service
@check
def execute(self, db, uid, obj, method, *args, **kw):
-
- # Check if the model registry must be reloaded (e.g. after the
- # database has been updated by another process).
- cr = pooler.get_db(db).cursor()
- registry_reloaded = False
- try:
- cr.execute('select last_value from base_registry_signaling')
- r = cr.fetchone()[0]
- global base_registry_signaling_sequence
- if base_registry_signaling_sequence != r:
- _logger.info("Reloading the model registry after database signaling.")
- base_registry_signaling_sequence = r
- # Don't run the cron in the Gunicorn worker.
- openerp.modules.registry.RegistryManager.new(db, pooljobs=False)
- registry_reloaded = True
- finally:
- cr.close()
-
- # Check if the model caches must be invalidated (e.g. after a write
- # occured on another process). Don't clear right after a registry
- # has been reload.
- cr = pooler.get_db(db).cursor()
- try:
- cr.execute('select last_value from base_cache_signaling')
- r = cr.fetchone()[0]
- global base_cache_signaling_sequence
- if base_cache_signaling_sequence != r and not registry_reloaded:
- _logger.info("Invalidating all model caches after database signaling.")
- base_cache_signaling_sequence = r
- registry = openerp.modules.registry.RegistryManager.get(db, pooljobs=False)
- registry.clear_caches()
- finally:
- cr.close()
+ openerp.modules.registry.RegistryManager.check_registry_signaling(db)
cr = pooler.get_db(db).cursor()
try:
raise
finally:
cr.close()
+
+ openerp.modules.registry.RegistryManager.signal_caches_change(db)
return res
def exec_workflow_cr(self, cr, uid, obj, method, *args):
try:
key = args[self.skiparg-2:]
del d[key]
+ self2.pool._any_cache_cleared = True
except KeyError:
pass
else:
d.clear()
+ self2.pool._any_cache_cleared = True
class ormcache_multi(ormcache):
def __init__(self, skiparg=2, size=8192, multi=3):
def on_starting(server):
global arbiter_pid
arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
+ openerp.multi_process = True # Yay!
#openerp.tools.cache = kill_workers_cache
openerp.netsvc.init_logger()
openerp.osv.osv.start_object_proxy()