#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010-2012 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
+import functools
import imp
import itertools
+import logging
import os
-from os.path import join as opj
+import re
import sys
-import types
-import zipimport
+import time
+import unittest
+from os.path import join as opj
-import openerp
+import unittest2
-import openerp.osv as osv
+import openerp
import openerp.tools as tools
-import openerp.tools.osutil as osutil
-from openerp.tools.safe_eval import safe_eval as eval
-from openerp.tools.translate import _
-
-import openerp.netsvc as netsvc
-
-import zipfile
import openerp.release as release
+from openerp.tools.safe_eval import safe_eval as eval
-import re
-import base64
-from zipfile import PyZipFile, ZIP_DEFLATED
-from cStringIO import StringIO
-
-import logging
-
-import openerp.modules.db
-import openerp.modules.graph
+MANIFEST = '__openerp__.py'
+README = ['README.rst', 'README.md', 'README.txt']
_logger = logging.getLogger(__name__)
-_ad = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons') # default addons path (base)
+# addons path as a list
ad_paths = []
+hooked = False
# Modules already loaded
loaded = []
-_logger = logging.getLogger(__name__)
-
class AddonsImportHook(object):
"""
Import hook to load OpenERP addons from multiple paths.
backward compatibility, `import <module>` is still supported. Now they
are living in `openerp.addons`. The good way to import such modules is
thus `import openerp.addons.module`.
-
- For backward compatibility, loading an addons puts it in `sys.modules`
- under both the legacy (short) name, and the new (longer) name. This
- ensures that
- import hr
- import openerp.addons.hr
- loads the hr addons only once.
-
- When an OpenERP addons name clashes with some other installed Python
- module (for instance this is the case of the `resource` addons),
- obtaining the OpenERP addons is only possible with the long name. The
- short name will give the expected Python module.
-
- Instead of relying on some addons path, an alternative approach would be
- to use pkg_resources entry points from already installed Python libraries
- (and install our addons as such). Even when implemented, we would still
- have to support the addons path approach for backward compatibility.
"""
def find_module(self, module_name, package_path):
if len(module_parts) == 3 and module_name.startswith('openerp.addons.'):
return self # We act as a loader too.
- # TODO list of loadable modules can be cached instead of always
- # calling get_module_path().
- if len(module_parts) == 1 and \
- get_module_path(module_parts[0],
- display_warning=False):
- try:
- # Check if the bare module name clashes with another module.
- f, path, descr = imp.find_module(module_parts[0])
- _logger.warning("""
-Ambiguous import: the OpenERP module `%s` is shadowed by another
-module (available at %s).
-To import it, use `import openerp.addons.<module>.`.""" % (module_name, path))
- return
- except ImportError, e:
- # Using `import <module_name>` instead of
- # `import openerp.addons.<module_name>` is ugly but not harmful
- # and kept for backward compatibility.
- return self # We act as a loader too.
-
def load_module(self, module_name):
+ if module_name in sys.modules:
+ return sys.modules[module_name]
- module_parts = module_name.split('.')
- if len(module_parts) == 3 and module_name.startswith('openerp.addons.'):
- module_part = module_parts[2]
- if module_name in sys.modules:
- return sys.modules[module_name]
-
- if len(module_parts) == 1:
- module_part = module_parts[0]
- if module_part in sys.modules:
- return sys.modules[module_part]
-
- try:
- # Check if the bare module name shadows another module.
- f, path, descr = imp.find_module(module_part)
- is_shadowing = True
- except ImportError, e:
- # Using `import <module_name>` instead of
- # `import openerp.addons.<module_name>` is ugly but not harmful
- # and kept for backward compatibility.
- is_shadowing = False
-
+ _1, _2, module_part = module_name.split('.')
# Note: we don't support circular import.
f, path, descr = imp.find_module(module_part, ad_paths)
mod = imp.load_module('openerp.addons.' + module_part, f, path, descr)
- if not is_shadowing:
- sys.modules[module_part] = mod
- for k in sys.modules.keys():
- if k.startswith('openerp.addons.' + module_part):
- sys.modules[k[len('openerp.addons.'):]] = sys.modules[k]
sys.modules['openerp.addons.' + module_part] = mod
return mod
PYTHONPATH.
"""
global ad_paths
- if ad_paths:
- return
+ global hooked
+
+ dd = tools.config.addons_data_dir
+ if dd not in ad_paths:
+ ad_paths.append(dd)
+
+ for ad in tools.config['addons_path'].split(','):
+ ad = os.path.abspath(tools.ustr(ad.strip()))
+ if ad not in ad_paths:
+ ad_paths.append(ad)
- ad_paths = map(lambda m: os.path.abspath(tools.ustr(m.strip())), tools.config['addons_path'].split(','))
- ad_paths.append(_ad) # for get_module_path
- sys.meta_path.append(AddonsImportHook())
+ # add base module path
+ base_path = os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons'))
+ if base_path not in ad_paths:
+ ad_paths.append(base_path)
+
+ if not hooked:
+ sys.meta_path.append(AddonsImportHook())
+ hooked = True
def get_module_path(module, downloaded=False, display_warning=True):
"""Return the path of the given module.
return opj(adp, module)
if downloaded:
- return opj(_ad, module)
+ return opj(tools.config.addons_data_dir, module)
if display_warning:
_logger.warning('module %s: module not found', module)
return False
-
def get_module_filetree(module, dir='.'):
path = get_module_path(module)
if not path:
if dir.startswith('..') or (dir and dir[0] == '/'):
raise Exception('Cannot access file outside the module')
- if not os.path.isdir(path):
- # zipmodule
- zip = zipfile.ZipFile(path + ".zip")
- files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
- else:
- files = osutil.listdir(path, True)
+ files = openerp.tools.osutil.listdir(path, True)
tree = {}
for f in files:
return tree
-def zip_directory(directory, b64enc=True, src=True):
- """Compress a directory
-
- @param directory: The directory to compress
- @param base64enc: if True the function will encode the zip file with base64
- @param src: Integrate the source files
-
- @return: a string containing the zip file
- """
-
- RE_exclude = re.compile('(?:^\..+\.swp$)|(?:\.py[oc]$)|(?:\.bak$)|(?:\.~.~$)', re.I)
+def get_resource_path(module, *args):
+ """Return the full path of a resource of the given module.
- def _zippy(archive, path, src=True):
- path = os.path.abspath(path)
- base = os.path.basename(path)
- for f in osutil.listdir(path, True):
- bf = os.path.basename(f)
- if not RE_exclude.search(bf) and (src or bf in ('__openerp__.py', '__terp__.py') or not bf.endswith('.py')):
- archive.write(os.path.join(path, f), os.path.join(base, f))
+ :param module: module name
+ :param list(str) args: resource path components within module
- archname = StringIO()
- archive = PyZipFile(archname, "w", ZIP_DEFLATED)
+ :rtype: str
+ :return: absolute path to the resource
- # for Python 2.5, ZipFile.write() still expects 8-bit strings (2.6 converts to utf-8)
- directory = tools.ustr(directory).encode('utf-8')
+ TODO make it available inside on osv object (self.get_resource_path)
+ """
+ mod_path = get_module_path(module)
+ if not mod_path: return False
+ resource_path = opj(mod_path, *args)
+ if os.path.isdir(mod_path):
+ # the module is a directory - ignore zip behavior
+ if os.path.exists(resource_path):
+ return resource_path
+ return False
- archive.writepy(directory)
- _zippy(archive, directory, src=src)
- archive.close()
- archive_data = archname.getvalue()
- archname.close()
+# backwards compatibility
+get_module_resource = get_resource_path
- if b64enc:
- return base64.encodestring(archive_data)
+def get_resource_from_path(path):
+ """Tries to extract the module name and the resource's relative path
+ out of an absolute resource path.
- return archive_data
+ If operation is successfull, returns a tuple containing the module name, the relative path
+ to the resource using '/' as filesystem seperator[1] and the same relative path using
+ os.path.sep seperators.
-def get_module_as_zip(modulename, b64enc=True, src=True):
- """Generate a module as zip file with the source or not and can do a base64 encoding
+ [1] same convention as the resource path declaration in manifests
- @param modulename: The module name
- @param b64enc: if True the function will encode the zip file with base64
- @param src: Integrate the source files
+ :param path: absolute resource path
- @return: a stream to store in a file-like object
+ :rtype: tuple
+ :return: tuple(module_name, relative_path, os_relative_path) if possible, else None
"""
+ resource = [path.replace(adpath, '') for adpath in ad_paths if path.startswith(adpath)]
+ if resource:
+ relative = resource[0].split(os.path.sep)
+ if not relative[0]:
+ relative.pop(0)
+ module = relative.pop(0)
+ return (module, '/'.join(relative), os.path.sep.join(relative))
+ return None
- ap = get_module_path(str(modulename))
- if not ap:
- raise Exception('Unable to find path for module %s' % modulename)
+def get_module_icon(module):
+ iconpath = ['static', 'description', 'icon.png']
+ if get_module_resource(module, *iconpath):
+ return ('/' + module + '/') + '/'.join(iconpath)
+ return '/base/' + '/'.join(iconpath)
- ap = ap.encode('utf8')
- if os.path.isfile(ap + '.zip'):
- val = file(ap + '.zip', 'rb').read()
- if b64enc:
- val = base64.encodestring(val)
- else:
- val = zip_directory(ap, b64enc, src)
+def get_module_root(path):
+ """
+ Get closest module's root begining from path
- return val
+ # Given:
+ # /foo/bar/module_dir/static/src/...
+ get_module_root('/foo/bar/module_dir/static/')
+ # returns '/foo/bar/module_dir'
-def get_module_resource(module, *args):
- """Return the full path of a resource of the given module.
+ get_module_root('/foo/bar/module_dir/')
+ # returns '/foo/bar/module_dir'
- @param module: the module
- @param args: the resource path components
+ get_module_root('/foo/bar')
+ # returns None
- @return: absolute path to the resource
+ @param path: Path from which the lookup should start
- TODO name it get_resource_path
- TODO make it available inside on osv object (self.get_resource_path)
+ @return: Module root path or None if not found
"""
- a = get_module_path(module)
- if not a: return False
- resource_path = opj(a, *args)
- if zipfile.is_zipfile( a +'.zip') :
- zip = zipfile.ZipFile( a + ".zip")
- files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
- resource_path = '/'.join(args)
- if resource_path in files:
- return opj(a, resource_path)
- elif os.path.exists(resource_path):
- return resource_path
- return False
-
-def get_module_icon(module):
- iconpath = ['static', 'src', 'img', 'icon.png']
- if get_module_resource(module, *iconpath):
- return ('/' + module + '/') + '/'.join(iconpath)
- return '/base/' + '/'.join(iconpath)
-
-def load_information_from_description_file(module):
+ while not os.path.exists(os.path.join(path, MANIFEST)):
+ new_path = os.path.abspath(os.path.join(path, os.pardir))
+ if path == new_path:
+ return None
+ path = new_path
+ return path
+
+def load_information_from_description_file(module, mod_path=None):
"""
:param module: The name of the module (sale, purchase, ...)
+ :param mod_path: Physical path of module, if not providedThe name of the module (sale, purchase, ...)
"""
- terp_file = get_module_resource(module, '__openerp__.py')
- if not terp_file:
- terp_file = get_module_resource(module, '__terp__.py')
- mod_path = get_module_path(module)
+ if not mod_path:
+ mod_path = get_module_path(module)
+ terp_file = mod_path and opj(mod_path, MANIFEST) or False
if terp_file:
info = {}
- if os.path.isfile(terp_file) or zipfile.is_zipfile(mod_path+'.zip'):
+ if os.path.isfile(terp_file):
# default values for descriptor
info = {
'application': False,
'author': '',
'auto_install': False,
'category': 'Uncategorized',
- 'certificate': None,
'depends': [],
'description': '',
'icon': get_module_icon(module),
'installable': True,
- 'auto_install': False,
'license': 'AGPL-3',
- 'name': False,
'post_load': None,
'version': '1.0',
'web': False,
finally:
f.close()
+ if not info.get('description'):
+ readme_path = [opj(mod_path, x) for x in README
+ if os.path.isfile(opj(mod_path, x))]
+ if readme_path:
+ readme_text = tools.file_open(readme_path[0]).read()
+ info['description'] = readme_text
+
if 'active' in info:
# 'active' has been renamed 'auto_install'
info['auto_install'] = info['active']
#TODO: refactor the logger in this file to follow the logging guidelines
# for 6.0
- _logger.debug('module %s: no descriptor file'
- ' found: __openerp__.py or __terp__.py (deprecated)', module)
+ _logger.debug('module %s: no %s file found.', module, MANIFEST)
return {}
-
def init_module_models(cr, module_name, obj_list):
""" Initialize a list of models.
for obj in obj_list:
obj._auto_end(cr, {'module': module_name})
cr.commit()
- todo.sort()
+ todo.sort(key=lambda x: x[0])
for t in todo:
t[1](cr, *t[2])
cr.commit()
initialize_sys_path()
try:
mod_path = get_module_path(module_name)
- zip_mod_path = mod_path + '.zip'
- if not os.path.isfile(zip_mod_path):
- __import__('openerp.addons.' + module_name)
- else:
- zimp = zipimport.zipimporter(zip_mod_path)
- zimp.load_module(module_name)
+ __import__('openerp.addons.' + module_name)
# Call the module's post-load hook. This can done before any model or
# data has been initialized. This is ok as the post-load hook is for
getattr(sys.modules['openerp.addons.' + module_name], info['post_load'])()
except Exception, e:
- mt = isinstance(e, zipimport.ZipImportError) and 'zip ' or ''
- msg = "Couldn't load %smodule %s" % (mt, module_name)
+ msg = "Couldn't load module %s" % (module_name)
_logger.critical(msg)
_logger.critical(e)
raise
return name
def is_really_module(name):
- name = opj(dir, name)
- return os.path.isdir(name) or zipfile.is_zipfile(name)
+ manifest_name = opj(dir, name, MANIFEST)
+ zipfile_name = opj(dir, name)
+ return os.path.isfile(manifest_name)
return map(clean, filter(is_really_module, os.listdir(dir)))
plist = []
plist.extend(listdir(ad))
return list(set(plist))
-
def get_modules_with_version():
modules = get_modules()
res = dict.fromkeys(modules, adapt_version('1.0'))
version = '%s.%s' % (serie, version)
return version
-
-def get_test_modules(module, submodule, explode):
- """
- Return a list of submodules containing tests.
- `submodule` can be:
- - None
- - the name of a submodule
- - '__fast_suite__'
- - '__sanity_checks__'
- """
- # Turn command-line module, submodule into importable names.
- if module is None:
- pass
- elif module == 'openerp':
- module = 'openerp.tests'
- else:
- module = 'openerp.addons.' + module + '.tests'
-
+def get_test_modules(module):
+ """ Return a list of module for the addons potentialy containing tests to
+ feed unittest2.TestLoader.loadTestsFromModule() """
# Try to import the module
+ module = 'openerp.addons.' + module + '.tests'
try:
__import__(module)
except Exception, e:
- if explode:
- print 'Can not `import %s`.' % module
- import logging
- logging.exception('')
- sys.exit(1)
- else:
- if str(e) == 'No module named tests':
- # It seems the module has no `tests` sub-module, no problem.
- pass
- else:
- _logger.exception('Can not `import %s`.', module)
- return []
-
- # Discover available test sub-modules.
- m = sys.modules[module]
- submodule_names = sorted([x for x in dir(m) \
- if x.startswith('test_') and \
- isinstance(getattr(m, x), types.ModuleType)])
- submodules = [getattr(m, x) for x in submodule_names]
-
- def show_submodules_and_exit():
- if submodule_names:
- print 'Available submodules are:'
- for x in submodule_names:
- print ' ', x
- sys.exit(1)
-
- if submodule is None:
- # Use auto-discovered sub-modules.
- ms = submodules
- elif submodule == '__fast_suite__':
- # Obtain the explicit test sub-modules list.
- ms = getattr(sys.modules[module], 'fast_suite', None)
- # `suite` was used before the 6.1 release instead of `fast_suite`.
- ms = ms if ms else getattr(sys.modules[module], 'suite', None)
- if ms is None:
- if explode:
- print 'The module `%s` has no defined test suite.' % (module,)
- show_submodules_and_exit()
- else:
- ms = []
- elif submodule == '__sanity_checks__':
- ms = getattr(sys.modules[module], 'checks', None)
- if ms is None:
- if explode:
- print 'The module `%s` has no defined sanity checks.' % (module,)
- show_submodules_and_exit()
- else:
- ms = []
- else:
- # Pick the command-line-specified test sub-module.
- m = getattr(sys.modules[module], submodule, None)
- ms = [m]
-
- if m is None:
- if explode:
- print 'The module `%s` has no submodule named `%s`.' % \
- (module, submodule)
- show_submodules_and_exit()
- else:
- ms = []
-
- return ms
-
-def run_unit_tests(module_name):
+ # If module has no `tests` sub-module, no problem.
+ if str(e) != 'No module named tests':
+ _logger.exception('Can not `import %s`.', module)
+ return []
+
+ # include submodules too
+ result = [mod_obj for name, mod_obj in sys.modules.iteritems()
+ if mod_obj # mod_obj can be None
+ if name.startswith(module)
+ if re.search(r'test_\w+$', name)]
+ return result
+
+# Use a custom stream object to log the test executions.
+class TestStream(object):
+ def __init__(self, logger_name='openerp.tests'):
+ self.logger = logging.getLogger(logger_name)
+ self.r = re.compile(r'^-*$|^ *... *$|^ok$')
+ def flush(self):
+ pass
+ def write(self, s):
+ if self.r.match(s):
+ return
+ first = True
+ level = logging.ERROR if s.startswith(('ERROR', 'FAIL', 'Traceback')) else logging.INFO
+ for c in s.splitlines():
+ if not first:
+ c = '` ' + c
+ first = False
+ self.logger.log(level, c)
+
+current_test = None
+
+def runs_at(test, hook, default):
+ # by default, tests do not run post install
+ test_runs = getattr(test, hook, default)
+
+ # for a test suite, we're done
+ if not isinstance(test, unittest.TestCase):
+ return test_runs
+
+ # otherwise check the current test method to see it's been set to a
+ # different state
+ method = getattr(test, test._testMethodName)
+ return getattr(method, hook, test_runs)
+
+runs_at_install = functools.partial(runs_at, hook='at_install', default=True)
+runs_post_install = functools.partial(runs_at, hook='post_install', default=False)
+
+def run_unit_tests(module_name, dbname, position=runs_at_install):
+ """
+ :returns: ``True`` if all of ``module_name``'s tests succeeded, ``False``
+ if any of them failed.
+ :rtype: bool
+ """
+ global current_test
+ current_test = module_name
+ mods = get_test_modules(module_name)
+ r = True
+ for m in mods:
+ tests = unwrap_suite(unittest2.TestLoader().loadTestsFromModule(m))
+ suite = unittest2.TestSuite(itertools.ifilter(position, tests))
+
+ if suite.countTestCases():
+ t0 = time.time()
+ t0_sql = openerp.sql_db.sql_counter
+ _logger.info('%s running tests.', m.__name__)
+ result = unittest2.TextTestRunner(verbosity=2, stream=TestStream(m.__name__)).run(suite)
+ if time.time() - t0 > 5:
+ _logger.log(25, "%s tested in %.2fs, %s queries", m.__name__, time.time() - t0, openerp.sql_db.sql_counter - t0_sql)
+ if not result.wasSuccessful():
+ r = False
+ _logger.error("Module %s: %d failures, %d errors", module_name, len(result.failures), len(result.errors))
+
+ current_test = None
+ return r
+
+def unwrap_suite(test):
"""
- Return True or False if some tests were found and succeeded or failed.
- Return None if no test was found.
+ Attempts to unpack testsuites (holding suites or cases) in order to
+ generate a single stream of terminals (either test cases or customized
+ test suites). These can then be checked for run/skip attributes
+ individually.
+
+ An alternative would be to use a variant of @unittest2.skipIf with a state
+ flag of some sort e.g. @unittest2.skipIf(common.runstate != 'at_install'),
+ but then things become weird with post_install as tests should *not* run
+ by default there
"""
- import unittest2
- ms = get_test_modules(module_name, '__fast_suite__', explode=False)
- # TODO: No need to try again if the above call failed because of e.g. a syntax error.
- ms.extend(get_test_modules(module_name, '__sanity_checks__', explode=False))
- suite = unittest2.TestSuite()
- for m in ms:
- suite.addTests(unittest2.TestLoader().loadTestsFromModule(m))
- if ms:
- _logger.info('module %s: executing %s `fast_suite` and/or `checks` sub-modules', module_name, len(ms))
- # Use a custom stream object to log the test executions.
- class MyStream(object):
- def __init__(self):
- self.r = re.compile(r'^-*$|^ *... *$|^ok$')
- def flush(self):
- pass
- def write(self, s):
- if self.r.match(s):
- return
- first = True
- for c in s.split('\n'):
- if not first:
- c = '` ' + c
- first = False
- _logger.log(logging.TEST, c)
- result = unittest2.TextTestRunner(verbosity=2, stream=MyStream()).run(suite)
- if result.wasSuccessful():
- return True
- else:
- _logger.error('module %s: at least one error occurred in a test', module_name)
- return False
+ if isinstance(test, unittest.TestCase):
+ yield test
+ return
+
+ subtests = list(test)
+ # custom test suite (no test cases)
+ if not len(subtests):
+ yield test
+ return
+
+ for item in itertools.chain.from_iterable(
+ itertools.imap(unwrap_suite, subtests)):
+ yield item
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: