#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010-2012 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
-import base64
+import functools
import imp
import itertools
import logging
import os
import re
import sys
-import types
-from cStringIO import StringIO
+import time
+import unittest
from os.path import join as opj
import unittest2
import openerp.release as release
from openerp.tools.safe_eval import safe_eval as eval
-_logger = logging.getLogger(__name__)
-_test_logger = logging.getLogger('openerp.tests')
+MANIFEST = '__openerp__.py'
+README = ['README.rst', 'README.md', 'README.txt']
-# addons path ','.joined
-_ad = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons') # default addons path (base)
+_logger = logging.getLogger(__name__)
# addons path as a list
ad_paths = []
+hooked = False
# Modules already loaded
loaded = []
PYTHONPATH.
"""
global ad_paths
- if ad_paths:
- return
+ global hooked
+
+ dd = tools.config.addons_data_dir
+ if dd not in ad_paths:
+ ad_paths.append(dd)
+
+ for ad in tools.config['addons_path'].split(','):
+ ad = os.path.abspath(tools.ustr(ad.strip()))
+ if ad not in ad_paths:
+ ad_paths.append(ad)
- ad_paths = map(lambda m: os.path.abspath(tools.ustr(m.strip())), tools.config['addons_path'].split(','))
- ad_paths.append(os.path.abspath(_ad)) # for get_module_path
- sys.meta_path.append(AddonsImportHook())
+ # add base module path
+ base_path = os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons'))
+ if base_path not in ad_paths:
+ ad_paths.append(base_path)
+
+ if not hooked:
+ sys.meta_path.append(AddonsImportHook())
+ hooked = True
def get_module_path(module, downloaded=False, display_warning=True):
"""Return the path of the given module.
return opj(adp, module)
if downloaded:
- return opj(_ad, module)
+ return opj(tools.config.addons_data_dir, module)
if display_warning:
_logger.warning('module %s: module not found', module)
return False
return tree
-def get_module_resource(module, *args):
+def get_resource_path(module, *args):
"""Return the full path of a resource of the given module.
:param module: module name
:rtype: str
:return: absolute path to the resource
- TODO name it get_resource_path
TODO make it available inside on osv object (self.get_resource_path)
"""
mod_path = get_module_path(module)
return resource_path
return False
+# backwards compatibility
+get_module_resource = get_resource_path
+
+def get_resource_from_path(path):
+ """Tries to extract the module name and the resource's relative path
+ out of an absolute resource path.
+
+ If operation is successfull, returns a tuple containing the module name, the relative path
+ to the resource using '/' as filesystem seperator[1] and the same relative path using
+ os.path.sep seperators.
+
+ [1] same convention as the resource path declaration in manifests
+
+ :param path: absolute resource path
+
+ :rtype: tuple
+ :return: tuple(module_name, relative_path, os_relative_path) if possible, else None
+ """
+ resource = [path.replace(adpath, '') for adpath in ad_paths if path.startswith(adpath)]
+ if resource:
+ relative = resource[0].split(os.path.sep)
+ if not relative[0]:
+ relative.pop(0)
+ module = relative.pop(0)
+ return (module, '/'.join(relative), os.path.sep.join(relative))
+ return None
+
def get_module_icon(module):
iconpath = ['static', 'description', 'icon.png']
if get_module_resource(module, *iconpath):
return ('/' + module + '/') + '/'.join(iconpath)
return '/base/' + '/'.join(iconpath)
-def load_information_from_description_file(module):
+def get_module_root(path):
+ """
+ Get closest module's root begining from path
+
+ # Given:
+ # /foo/bar/module_dir/static/src/...
+
+ get_module_root('/foo/bar/module_dir/static/')
+ # returns '/foo/bar/module_dir'
+
+ get_module_root('/foo/bar/module_dir/')
+ # returns '/foo/bar/module_dir'
+
+ get_module_root('/foo/bar')
+ # returns None
+
+ @param path: Path from which the lookup should start
+
+ @return: Module root path or None if not found
+ """
+ while not os.path.exists(os.path.join(path, MANIFEST)):
+ new_path = os.path.abspath(os.path.join(path, os.pardir))
+ if path == new_path:
+ return None
+ path = new_path
+ return path
+
+def load_information_from_description_file(module, mod_path=None):
"""
:param module: The name of the module (sale, purchase, ...)
+ :param mod_path: Physical path of module, if not providedThe name of the module (sale, purchase, ...)
"""
- terp_file = get_module_resource(module, '__openerp__.py')
- mod_path = get_module_path(module)
+ if not mod_path:
+ mod_path = get_module_path(module)
+ terp_file = mod_path and opj(mod_path, MANIFEST) or False
if terp_file:
info = {}
if os.path.isfile(terp_file):
'icon': get_module_icon(module),
'installable': True,
'license': 'AGPL-3',
- 'name': False,
'post_load': None,
'version': '1.0',
'web': False,
finally:
f.close()
+ if not info.get('description'):
+ readme_path = [opj(mod_path, x) for x in README
+ if os.path.isfile(opj(mod_path, x))]
+ if readme_path:
+ readme_text = tools.file_open(readme_path[0]).read()
+ info['description'] = readme_text
+
if 'active' in info:
# 'active' has been renamed 'auto_install'
info['auto_install'] = info['active']
#TODO: refactor the logger in this file to follow the logging guidelines
# for 6.0
- _logger.debug('module %s: no __openerp__.py file found.', module)
+ _logger.debug('module %s: no %s file found.', module, MANIFEST)
return {}
def init_module_models(cr, module_name, obj_list):
for obj in obj_list:
obj._auto_end(cr, {'module': module_name})
cr.commit()
- todo.sort()
+ todo.sort(key=lambda x: x[0])
for t in todo:
t[1](cr, *t[2])
cr.commit()
return name
def is_really_module(name):
- manifest_name = opj(dir, name, '__openerp__.py')
+ manifest_name = opj(dir, name, MANIFEST)
zipfile_name = opj(dir, name)
return os.path.isfile(manifest_name)
return map(clean, filter(is_really_module, os.listdir(dir)))
# Try to import the module
module = 'openerp.addons.' + module + '.tests'
try:
- m = __import__(module)
+ __import__(module)
except Exception, e:
# If module has no `tests` sub-module, no problem.
if str(e) != 'No module named tests':
return []
# include submodules too
- result = []
- for name in sys.modules:
- if name.startswith(module) and sys.modules[name]:
- result.append(sys.modules[name])
+ result = [mod_obj for name, mod_obj in sys.modules.iteritems()
+ if mod_obj # mod_obj can be None
+ if name.startswith(module)
+ if re.search(r'test_\w+$', name)]
return result
# Use a custom stream object to log the test executions.
class TestStream(object):
- def __init__(self):
+ def __init__(self, logger_name='openerp.tests'):
+ self.logger = logging.getLogger(logger_name)
self.r = re.compile(r'^-*$|^ *... *$|^ok$')
def flush(self):
pass
if self.r.match(s):
return
first = True
- for c in s.split('\n'):
+ level = logging.ERROR if s.startswith(('ERROR', 'FAIL', 'Traceback')) else logging.INFO
+ for c in s.splitlines():
if not first:
c = '` ' + c
first = False
- _test_logger.info(c)
+ self.logger.log(level, c)
+
+current_test = None
+
+def runs_at(test, hook, default):
+ # by default, tests do not run post install
+ test_runs = getattr(test, hook, default)
+
+ # for a test suite, we're done
+ if not isinstance(test, unittest.TestCase):
+ return test_runs
+
+ # otherwise check the current test method to see it's been set to a
+ # different state
+ method = getattr(test, test._testMethodName)
+ return getattr(method, hook, test_runs)
-def run_unit_tests(module_name, dbname):
+runs_at_install = functools.partial(runs_at, hook='at_install', default=True)
+runs_post_install = functools.partial(runs_at, hook='post_install', default=False)
+
+def run_unit_tests(module_name, dbname, position=runs_at_install):
"""
- Return True or False if some tests were found and succeeded or failed.
- Return None if no test was found.
+ :returns: ``True`` if all of ``module_name``'s tests succeeded, ``False``
+ if any of them failed.
+ :rtype: bool
"""
+ global current_test
+ current_test = module_name
mods = get_test_modules(module_name)
r = True
for m in mods:
- suite = unittest2.TestSuite()
- for t in unittest2.TestLoader().loadTestsFromModule(m):
- suite.addTest(t)
- _logger.log(logging.INFO, 'module %s: running test %s.', module_name, m.__name__)
- result = unittest2.TextTestRunner(verbosity=2, stream=TestStream()).run(suite)
- if not result.wasSuccessful():
- r = False
- _logger.error('module %s: at least one error occurred in a test', module_name)
+ tests = unwrap_suite(unittest2.TestLoader().loadTestsFromModule(m))
+ suite = unittest2.TestSuite(itertools.ifilter(position, tests))
+
+ if suite.countTestCases():
+ t0 = time.time()
+ t0_sql = openerp.sql_db.sql_counter
+ _logger.info('%s running tests.', m.__name__)
+ result = unittest2.TextTestRunner(verbosity=2, stream=TestStream(m.__name__)).run(suite)
+ if time.time() - t0 > 5:
+ _logger.log(25, "%s tested in %.2fs, %s queries", m.__name__, time.time() - t0, openerp.sql_db.sql_counter - t0_sql)
+ if not result.wasSuccessful():
+ r = False
+ _logger.error("Module %s: %d failures, %d errors", module_name, len(result.failures), len(result.errors))
+
+ current_test = None
return r
+def unwrap_suite(test):
+ """
+ Attempts to unpack testsuites (holding suites or cases) in order to
+ generate a single stream of terminals (either test cases or customized
+ test suites). These can then be checked for run/skip attributes
+ individually.
+
+ An alternative would be to use a variant of @unittest2.skipIf with a state
+ flag of some sort e.g. @unittest2.skipIf(common.runstate != 'at_install'),
+ but then things become weird with post_install as tests should *not* run
+ by default there
+ """
+ if isinstance(test, unittest.TestCase):
+ yield test
+ return
+
+ subtests = list(test)
+ # custom test suite (no test cases)
+ if not len(subtests):
+ yield test
+ return
+
+ for item in itertools.chain.from_iterable(
+ itertools.imap(unwrap_suite, subtests)):
+ yield item
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: