__author__ = openerp.release.author
__version__ = openerp.release.version
- if os.name == 'posix':
- import pwd
- # We DON't log this using the standard logger, because we might mess
- # with the logfile's permissions. Just do a quick exit here.
- if pwd.getpwuid(os.getuid())[0] == 'root' :
- sys.stderr.write("Attempted to run OpenERP server as root. This is not good, aborting.\n")
+ def check_root_user():
+ """ Exit if the process's user is 'root' (on POSIX system)."""
+ if os.name == 'posix':
+ import pwd
+ if pwd.getpwuid(os.getuid())[0] == 'root' :
+ sys.stderr.write("Running as user 'root' is a security risk, aborting.\n")
+ sys.exit(1)
+
+ def check_postgres_user():
+ """ Exit if the configured database user is 'postgres'.
+
+ This function assumes the configuration has been initialized.
+ """
+ config = openerp.tools.config
+ if config['db_user'] == 'postgres':
+ sys.stderr.write("Using the database user 'postgres' is a security risk, aborting.")
sys.exit(1)
- #-----------------------------------------------------------------------
- # parse the command line
- #-----------------------------------------------------------------------
- openerp.tools.config.parse_config(sys.argv[1:])
- config = openerp.tools.config
-
- #----------------------------------------------------------
- # get logger
- #----------------------------------------------------------
- openerp.netsvc.init_logger()
- logger = logging.getLogger('server')
-
- logger.info("OpenERP version - %s", __version__)
- for name, value in [('addons_path', config['addons_path']),
- ('database hostname', config['db_host'] or 'localhost'),
- ('database port', config['db_port'] or '5432'),
- ('database user', config['db_user'])]:
- logger.info("%s - %s", name, value)
-
- # Don't allow if the connection to PostgreSQL done by postgres user
- if config['db_user'] == 'postgres':
- logger.error("Connecting to the database as 'postgres' user is forbidden, as it present major security issues. Shutting down.")
- sys.exit(1)
-
- #----------------------------------------------------------
- # init net service
- #----------------------------------------------------------
- logger.info('initialising distributed objects services')
-
- #----------------------------------------------------------
- # Load and update databases if requested
- #----------------------------------------------------------
-
- if not ( config["stop_after_init"] or \
- config["translate_in"] or \
- config["translate_out"] ):
- openerp.osv.osv.start_object_proxy()
- openerp.service.web_services.start_web_services()
- http_server = openerp.service.http_server
- netrpc_server = openerp.service.netrpc_server
- http_server.init_servers()
- http_server.init_xmlrpc()
- http_server.init_static_http()
- netrpc_server.init_servers()
-
- if config['db_name']:
- for dbname in config['db_name'].split(','):
- db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
- cr = db.cursor()
+ def report_configuration():
+ """ Log the server version and some configuration values.
- if config["test_file"]:
- logger.info('loading test file %s', config["test_file"])
- openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True)
- cr.rollback()
+ This function assumes the configuration has been initialized.
+ """
+ config = openerp.tools.config
+ logger = logging.getLogger('server')
+ logger.info("OpenERP version %s", __version__)
+ for name, value in [('addons paths', config['addons_path']),
+ ('database hostname', config['db_host'] or 'localhost'),
+ ('database port', config['db_port'] or '5432'),
+ ('database user', config['db_user'])]:
+ logger.info("%s: %s", name, value)
+
+ def setup_pid_file():
+ """ Create a file with the process id written in it.
+
+ This function assumes the configuration has been initialized.
+ """
+ config = openerp.tools.config
+ if config['pidfile']:
+ fd = open(config['pidfile'], 'w')
+ pidtext = "%d" % (os.getpid())
+ fd.write(pidtext)
+ fd.close()
+
+ def preload_registry(dbname):
+ """ Preload a registry, and start the cron."""
+ try:
- db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
- pool.get('ir.cron').restart(db.dbname)
++ db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
+
- # jobs will start to be processed later, when openerp.cron.start_master_thread below is called.
++ # jobs will start to be processed later, when openerp.cron.start_master_thread() is called by openerp.service.start_services()
+ registry.schedule_cron_jobs()
+ except Exception:
+ logging.exception('Failed to initialize database `%s`.', dbname)
+ def run_test_file(dbname, test_file):
+ """ Preload a registry, possibly run a test file, and start the cron."""
+ try:
- db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
++ db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
+ cr = db.cursor()
+ logger = logging.getLogger('server')
+ logger.info('loading test file %s', test_file)
+ openerp.tools.convert_yaml_import(cr, 'base', file(test_file), {}, 'test', True)
+ cr.rollback()
cr.close()
+ except Exception:
+ logging.exception('Failed to initialize database `%s` and run test file `%s`.', dbname, test_file)
+
+
+ def export_translation():
+ config = openerp.tools.config
+ dbname = config['db_name']
+ logger = logging.getLogger('server')
- #----------------------------------------------------------
- # translation stuff
- #----------------------------------------------------------
- if config["translate_out"]:
if config["language"]:
msg = "language %s" % (config["language"],)
else:
--- /dev/null
+-
+ Test the cron jobs scheduling.
+-
+ Disable the existing cron jobs if any during the tests.
+-
+ !python {model: ir.cron }: |
+ # For this test to work, as it involves multiple database cursors,
+ # we have to commit changes. But YAML tests must be rollbacked, so
+ # the final database state is left untouched. So we have to be a bit
+ # ugly here: use our own cursor, commit, and clean after ourselves.
+ # We also pass around some ids using setattr/delattr, and we have to
+ # rollback the previous tests otherwise we won't be able to touch the
+ # db.
+ # Well, this should probably be a standalone, or regular unit test,
+ # instead of using the YAML infrastructure.
+ cr.rollback()
+ our_cr = self.pool.db.cursor()
+ try:
+ ids = self.search(our_cr, uid, [], {})
+ setattr(self, 'saved_ids', ids)
+ self.write(our_cr, uid, ids, {'active': False}, {})
+ our_cr.commit()
+ finally:
+ our_cr.close()
+-
+ Three concurrent jobs started with a slight time gap. Assume 4 cron threads.
+ This will take about 2 minutes.
+-
+ !python {model: ir.cron }: |
+ # Pretend initialization is already done. We the use a try/finally
+ # to reset _init correctly.
+ self.pool._init = False
+ our_cr = self.pool.db.cursor()
+ try:
+ self.test_00(our_cr, uid) # this will commit using the passed cursor
+ import openerp.cron
- openerp.cron._thread_count = 4
++ openerp.cron._thread_slots = 4
+ # Wake up this db as soon as the master cron thread starts.
- openerp.cron.schedule_in_advance(1, self.pool.db.dbname)
++ openerp.cron.schedule_wakeup(1, self.pool.db.dbname)
+ # Pretend to be the master thread, for 4 iterations.
+ openerp.cron.runner_body()
+ openerp.cron.runner_body()
+ openerp.cron.runner_body()
+ openerp.cron.runner_body()
+ finally:
+ self.pool._init = True
+ our_cr.close()
+-
+ Clean after ourselves.
+-
+ !python {model: ir.cron }: |
+ our_cr = self.pool.db.cursor()
+ try:
+ ids = [x for x in self.search(our_cr, uid, ['|', ('active', '=', True), ('active', '=', False)], {}) if x not in self.saved_ids]
+ self.unlink(our_cr, uid, ids, {})
+ ids = self.saved_ids
+ delattr(self, 'saved_ids')
+ self.write(our_cr, uid, ids, {'active': True}, {})
+ our_cr.commit()
+ finally:
+ our_cr.close()
import deprecation
+# Maximum number of threads processing concurrently cron jobs.
+max_cron_threads = 4 # Actually the default value here is meaningless,
+ # look at tools.config for the default value.
+
+ # Paths to search for OpenERP addons.
+ addons_paths = []
+
+ # List of server-wide modules to load. Those modules are supposed to provide
+ # features not necessarily tied to a particular database. This is in contrast
+ # to modules that are always bound to a specific database when they are
+ # installed (i.e. the majority of OpenERP addons). This is set with the --load
+ # command-line option.
+ server_wide_modules = []
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import openerp.sql_db
import openerp.osv.orm
+import openerp.cron
+import openerp.tools
-
+ import openerp.modules.db
+ import openerp.tools.config
class Registry(object):
""" Model registry for a particular database.
return res
+ def schedule_cron_jobs(self):
+ """ Make the cron thread care about this registry/database jobs.
-
+ This will initiate the cron thread to check for any pending jobs for
+ this registry/database as soon as possible. Then it will continuously
+ monitor the ir.cron model for future jobs. See openerp.cron for
+ details.
-
+ """
+ openerp.cron.schedule_in_advance(1, self.db.dbname)
+
+ def clear_caches(self):
+ """ Clear the caches
-
+ This clears the caches associated to methods decorated with
+ ``tools.ormcache`` or ``tools.ormcache_multi`` for all the models.
+ """
+ for model in self.models.itervalues():
+ model.clear_caches()
class RegistryManager(object):
""" Model registries manager.
registries (essentially database connection/model registry pairs).
"""
# Mapping between db name and model registry.
# Accessed through the methods below.
registries = {}
+ registries_lock = threading.RLock()
-
@classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False,
pooljobs=True):
""" Return a registry for a given database name."""
-
- if db_name in cls.registries:
- registry = cls.registries[db_name]
- else:
- registry = cls.new(db_name, force_demo, status,
- update_module, pooljobs)
- return registry
+ with cls.registries_lock:
+ if db_name in cls.registries:
+ registry = cls.registries[db_name]
+ else:
+ registry = cls.new(db_name, force_demo, status,
+ update_module, pooljobs)
+ return registry
-
@classmethod
def new(cls, db_name, force_demo=False, status=None,
update_module=False, pooljobs=True):
The (possibly) previous registry for that database name is discarded.
"""
-
import openerp.modules
- registry = Registry(db_name)
-
- # Initializing a registry will call general code which will in turn
- # call registries.get (this object) to obtain the registry being
- # initialized. Make it available in the registries dictionary then
- # remove it if an exception is raised.
- cls.delete(db_name)
- cls.registries[db_name] = registry
- try:
- # This should be a method on Registry
- openerp.modules.load_modules(registry.db, force_demo, status, update_module)
- except Exception:
- del cls.registries[db_name]
- raise
-
- cr = registry.db.cursor()
- try:
- registry.do_parent_store(cr)
- registry.get('ir.actions.report.xml').register_all(cr)
- cr.commit()
- finally:
- cr.close()
-
- if pooljobs:
- registry.schedule_cron_jobs()
-
- return registry
+ with cls.registries_lock:
+ registry = Registry(db_name)
+
+ # Initializing a registry will call general code which will in turn
+ # call registries.get (this object) to obtain the registry being
+ # initialized. Make it available in the registries dictionary then
+ # remove it if an exception is raised.
+ cls.delete(db_name)
+ cls.registries[db_name] = registry
+ try:
+ # This should be a method on Registry
+ openerp.modules.load_modules(registry.db, force_demo, status, update_module)
+ except Exception:
+ del cls.registries[db_name]
+ raise
+
+ cr = registry.db.cursor()
+ try:
+ registry.do_parent_store(cr)
+ registry.get('ir.actions.report.xml').register_all(cr)
+ cr.commit()
+ finally:
+ cr.close()
+
+ if pooljobs:
- registry.get('ir.cron').restart(registry.db.dbname)
++ registry.registry.start_cron_thread()
+
+ return registry
-
@classmethod
def delete(cls, db_name):
- """ Delete the registry linked to a given database.
- """ Delete the registry linked to a given database. """
++ """Delete the registry linked to a given database.
+
+ This also cleans the associated caches. For good measure this also
+ cancels the associated cron job. But please note that the cron job can
+ be running and take some time before ending, and that you should not
+ remove a registry if it can still be used by some thread. So it might
+ be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
+ and join (i.e. wait for) the thread.
-
+ """
- if db_name in cls.registries:
- del cls.registries[db_name]
- openerp.tools.cache.clean_caches_for_db(db_name)
- openerp.cron.cancel(db_name)
+ with cls.registries_lock:
+ if db_name in cls.registries:
++ cls.registries[db_name].clear_caches()
+ del cls.registries[db_name]
++ openerp.cron.cancel(db_name)
++
+ @classmethod
+ def delete_all(cls):
- """ Delete all the registries. """
- for db_name in cls.registries.keys():
- cls.delete(db_name)
++ """Delete all the registries. """
++ with cls.registries_lock:
++ for db_name in cls.registries.keys():
++ cls.delete(db_name)
+
+ @classmethod
+ def clear_caches(cls, db_name):
- """ Clear the caches
++ """Clear caches
+
+ This clears the caches associated to methods decorated with
+ ``tools.ormcache`` or ``tools.ormcache_multi`` for all the models
+ of the given database name.
+
+ This method is given to spare you a ``RegistryManager.get(db_name)``
+ that would loads the given database if it was not already loaded.
+ """
+ with cls.registries_lock:
+ if db_name in cls.registries:
+ cls.registries[db_name].clear_caches()
--# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
++# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import sys
import threading
import time
++import traceback
import types
from pprint import pformat
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
- import traceback
-
-class Agent(object):
- """ Singleton that keeps track of cancellable tasks to run at a given
- timestamp.
-
- The tasks are characterised by:
-
- * a timestamp
- * the database on which the task run
- * the function to call
- * the arguments and keyword arguments to pass to the function
-
- Implementation details:
-
- - Tasks are stored as list, allowing the cancellation by setting
- the timestamp to 0.
- - A heapq is used to store tasks, so we don't need to sort
- tasks ourself.
- """
- __tasks = []
- __tasks_by_db = {}
- _logger = logging.getLogger('netsvc.agent')
-
- @classmethod
- def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
- task = [timestamp, db_name, function, args, kwargs]
- heapq.heappush(cls.__tasks, task)
- cls.__tasks_by_db.setdefault(db_name, []).append(task)
-
- @classmethod
- def cancel(cls, db_name):
- """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
- cls._logger.debug("Cancel timers for %s db", db_name or 'all')
- if db_name is None:
- cls.__tasks, cls.__tasks_by_db = [], {}
- else:
- if db_name in cls.__tasks_by_db:
- for task in cls.__tasks_by_db[db_name]:
- task[0] = 0
-
- @classmethod
- def quit(cls):
- cls.cancel(None)
-
- @classmethod
- def runner(cls):
- """Neverending function (intended to be ran in a dedicated thread) that
- checks every 60 seconds tasks to run. TODO: make configurable
- """
- current_thread = threading.currentThread()
- while True:
- while cls.__tasks and cls.__tasks[0][0] < time.time():
- task = heapq.heappop(cls.__tasks)
- timestamp, dbname, function, args, kwargs = task
- cls.__tasks_by_db[dbname].remove(task)
- if not timestamp:
- # null timestamp -> cancelled task
- continue
- current_thread.dbname = dbname # hack hack
- cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
- delattr(current_thread, 'dbname')
- task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
- # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
- task_thread.setDaemon(False)
- task_thread.start()
- time.sleep(1)
- time.sleep(60)
-
-def start_agent():
- agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
- # the agent runner is a typical daemon thread, that will never quit and must be
- # terminated when the main process exits - with no consequence (the processing
- # threads it spawns are not marked daemon)
- agent_runner.setDaemon(True)
- agent_runner.start()
-
-import traceback
-
class Server:
""" Generic interface for all servers with an event loop etc.
Override this to impement http, net-rpc etc. servers.
import http_server
import netrpc_server
import web_services
+ import websrv_lib
+
++import openerp.cron
++import openerp.modules
+ import openerp.netsvc
+ import openerp.osv
+ import openerp.tools
+ import openerp.wsgi
#.apidoc title: RPC Services
low-level behavior of the wire.
"""
+ def start_services():
+ """ Start all services.
+
+ Services include the different servers and cron threads.
+
+ """
+ # Instantiate local services (this is a legacy design).
+ openerp.osv.osv.start_object_proxy()
+ # Export (for RPC) services.
+ web_services.start_web_services()
+
+ # Initialize the HTTP stack.
+ #http_server.init_servers()
+ #http_server.init_xmlrpc()
+ #http_server.init_static_http()
+ netrpc_server.init_servers()
+
+ # Start the main cron thread.
- openerp.netsvc.start_agent()
++ openerp.cron.start_master_thread()
+
+ # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
+ openerp.netsvc.Server.startAll()
+
+
+ # Start the WSGI server.
+ openerp.wsgi.start_server()
+
+
+ def stop_services():
+ """ Stop all services. """
- openerp.netsvc.Agent.quit()
++ # stop scheduling new jobs; we will have to wait for the jobs to complete below
++ openerp.cron.cancel_all()
++
+ openerp.netsvc.Server.quitAll()
+ openerp.wsgi.stop_server()
+ config = openerp.tools.config
+ logger = logging.getLogger('server')
+ logger.info("Initiating shutdown")
+ logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
+ logging.shutdown()
+
+ # Manually join() all threads before calling sys.exit() to allow a second signal
+ # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
+ # threading.Thread.join() should not mask signals (at least in python 2.5).
+ for thread in threading.enumerate():
+ if thread != threading.currentThread() and not thread.isDaemon():
+ while thread.isAlive():
+ # Need a busyloop here as thread.join() masks signals
+ # and would prevent the forced shutdown.
+ thread.join(0.05)
+ time.sleep(0.05)
+
++ openerp.modules.registry.RegistryManager.delete_all()
++
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
params = params or None
res = self._obj.execute(query, params)
except psycopg2.ProgrammingError, pe:
- if log_exceptions:
- if self._default_log_exceptions or log_exceptions:
++ if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
self.__logger.error("Programming error: %s, in query %s", pe, query)
raise
except Exception:
- if log_exceptions:
- if self._default_log_exceptions or log_exceptions:
++ if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
self.__logger.exception("bad query: %s", self._obj.query or query)
raise
"osv_memory tables. This is a decimal value expressed in hours, "
"and the default is 1 hour.",
type="float")
+ group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4,
+ help="Maximum number of threads processing concurrently cron jobs.",
+ type="int")
+ group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true",
+ help="Use the unaccent function provided by the database when available.")
++
parser.add_option_group(group)
# Copy all optparse options (i.e. MyOption) into self.options.
'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog',
'list_db', 'xmlrpcs',
'test_file', 'test_disable', 'test_commit', 'test_report_directory',
- 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads',
- 'osv_memory_count_limit', 'osv_memory_age_limit', 'unaccent',
++ 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent',
]
for arg in keys:
if opt.save:
self.save()
+ openerp.conf.max_cron_threads = self.options['max_cron_threads']
+
+ openerp.conf.addons_paths = self.options['addons_path'].split(',')
+ openerp.conf.server_wide_modules = \
+ map(lambda m: m.strip(), opt.server_wide_modules.split(',')) if \
+ opt.server_wide_modules else []
+ if complete:
+ openerp.modules.module.initialize_sys_path()
+ openerp.modules.loading.open_openerp_namespace()
+ # openerp.addons.__path__.extend(openerp.conf.addons_paths) # This
+ # is not compatible with initialize_sys_path(): import crm and
+ # import openerp.addons.crm load twice the module.
+
def _generate_pgpassfile(self):
"""
Generate the pgpass file with the parameters from the command line (db_host, db_user,