1 # -*- coding: utf-8 -*-
3 The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
4 helpers and classes to write tests.
19 from datetime import datetime, timedelta
20 from shutil import rmtree
21 from tempfile import mkdtemp
26 from openerp.modules.registry import RegistryManager
28 _logger = logging.getLogger(__name__)
30 # The openerp library is supposed already configured.
31 ADDONS_PATH = openerp.tools.config['addons_path']
33 PORT = openerp.tools.config['xmlrpc_port']
34 DB = openerp.tools.config['db_name']
35 # If the database name is not provided on the command-line,
36 # use the one on the thread (which means if it is provided on
37 # the command-line, this will break when installing another
38 # database from XML-RPC).
39 if not DB and hasattr(threading.current_thread(), 'dbname'):
40 DB = threading.current_thread().dbname
41 # Useless constant, tests are aware of the content of demo data
42 ADMIN_USER_ID = openerp.SUPERUSER_ID
45 """ Sets the at-install state of a test, the flag is a boolean specifying
46 whether the test should (``True``) or should not (``False``) run during
49 By default, tests are run at install.
56 def post_install(flag):
57 """ Sets the post-install state of a test. The flag is a boolean
58 specifying whether the test should or should not run after a set of
61 By default, tests are *not* run after installation.
64 obj.post_install = flag
68 class BaseCase(unittest2.TestCase):
70 Subclass of TestCase for common OpenERP-specific code.
72 This class is abstract and expects self.registry, self.cr and self.uid to be
73 initialized by subclasses.
77 return self.registry.cursor()
80 """ Returns database ID corresponding to a given identifier.
82 :param xid: fully-qualified record identifier, in the form ``module.identifier``
83 :raise: ValueError if not found
85 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
86 module, xid = xid.split('.')
87 _, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
90 def browse_ref(self, xid):
91 """ Returns a browsable record for the given identifier.
93 :param xid: fully-qualified record identifier, in the form ``module.identifier``
94 :raise: ValueError if not found
96 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
97 module, xid = xid.split('.')
98 return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
101 class TransactionCase(BaseCase):
103 Subclass of BaseCase with a single transaction, rolled-back at the end of
108 self.registry = RegistryManager.get(DB)
109 self.cr = self.cursor()
110 self.uid = openerp.SUPERUSER_ID
117 class SingleTransactionCase(BaseCase):
119 Subclass of BaseCase with a single transaction for the whole class,
120 rolled-back after all the tests.
125 cls.registry = RegistryManager.get(DB)
126 cls.cr = cls.registry.cursor()
127 cls.uid = openerp.SUPERUSER_ID
130 def tearDownClass(cls):
134 class RedirectHandler(urllib2.HTTPRedirectHandler):
136 HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
137 works by intercepting 3xy "errors".
139 Inherit from it to handle 3xy non-error responses instead, as we're not
140 using the error processor
143 def http_response(self, request, response):
144 code, msg, hdrs = response.code, response.msg, response.info()
146 if 300 <= code < 400:
147 return self.parent.error(
148 'http', request, response, code, msg, hdrs)
152 https_response = http_response
154 class HttpCase(TransactionCase):
155 """ Transactionnal HTTP TestCase with url_open and phantomjs helpers.
158 def __init__(self, methodName='runTest'):
159 super(HttpCase, self).__init__(methodName)
160 # v8 api with correct xmlrpc exception handling.
161 self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
162 self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
163 self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
164 self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
167 super(HttpCase, self).setUp()
168 self.registry.enter_test_mode()
169 # setup a magic session_id that will be rollbacked
170 self.session = openerp.http.root.session_store.new()
171 self.session_id = self.session.sid
173 openerp.http.root.session_store.save(self.session)
174 # setup an url opener helper
175 self.opener = urllib2.OpenerDirector()
176 self.opener.add_handler(urllib2.UnknownHandler())
177 self.opener.add_handler(urllib2.HTTPHandler())
178 self.opener.add_handler(urllib2.HTTPSHandler())
179 self.opener.add_handler(urllib2.HTTPCookieProcessor())
180 self.opener.add_handler(RedirectHandler())
181 self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
184 self.registry.leave_test_mode()
185 super(HttpCase, self).tearDown()
187 def url_open(self, url, data=None, timeout=10):
188 if url.startswith('/'):
189 url = "http://localhost:%s%s" % (PORT, url)
190 return self.opener.open(url, data, timeout)
192 def authenticate(self, user, password):
194 url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
195 auth = self.url_open(url)
196 assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()
198 def phantom_poll(self, phantom, timeout):
199 """ Phantomjs Test protocol.
201 Use console.log in phantomjs to output test results:
203 - for a success: console.log("ok")
204 - for an error: console.log("error")
206 Other lines are relayed to the test log.
210 td = timedelta(seconds=timeout)
214 self.assertLess(datetime.now() - t0, td,
215 "PhantomJS tests should take less than %s seconds" % timeout)
219 ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
220 except select.error, e:
221 # In Python 2, select.error has no relation to IOError or
222 # OSError, and no errno/strerror/filename, only a pair of
223 # unnamed arguments (matching errno and strerror)
225 if err == errno.EINTR:
230 s = phantom.stdout.read(1)
237 line, buf = buf.split('\n', 1)
240 # relay everything from console.log, even 'ok' or 'error...' lines
241 _logger.info("phantomjs: %s", line)
245 if line.startswith("error"):
247 # when error occurs the execution stack may be sent as as JSON
249 line_ = json.loads(line_)
252 self.fail(line_ or "phantomjs test failed")
254 def phantom_run(self, cmd, timeout):
255 _logger.info('phantom_run executing %s', ' '.join(cmd))
257 ls_glob = os.path.expanduser('~/.qws/share/data/Ofi Labs/PhantomJS/http_localhost_%s.*'%PORT)
258 for i in glob.glob(ls_glob):
259 _logger.info('phantomjs unlink localstorage %s', i)
262 phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
264 raise unittest2.SkipTest("PhantomJS not found")
266 self.phantom_poll(phantom, timeout)
268 # kill phantomjs if phantom.exit() wasn't called in the test
269 if phantom.poll() is None:
272 self._wait_remaining_requests()
273 _logger.info("phantom_run execution finished")
275 def _wait_remaining_requests(self):
276 t0 = int(time.time())
277 for thread in threading.enumerate():
278 if thread.name.startswith('openerp.service.http.request.'):
279 while thread.isAlive():
280 # Need a busyloop here as thread.join() masks signals
281 # and would prevent the forced shutdown.
284 t1 = int(time.time())
286 _logger.info('remaining requests')
287 openerp.tools.misc.dumpstacks()
290 def phantom_jsfile(self, jsfile, timeout=60, **kw):
295 'session_id': self.session_id,
298 phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
299 # phantom.args[0] == phantomtest path
300 # phantom.args[1] == options
303 jsfile, phantomtest, json.dumps(options)
305 self.phantom_run(cmd, timeout)
307 def phantom_js(self, url_path, code, ready="window", login=None, timeout=60, **kw):
308 """ Test js code running in the browser
309 - optionnally log as 'login'
310 - load page given by url_path
311 - wait for ready object to be available
312 - eval(code) inside the page
314 To signal success test do:
317 To signal failure do:
320 If neither are done before timeout test fails.
325 'url_path': url_path,
330 'session_id': self.session_id,
333 options.setdefault('password', options.get('login'))
334 phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
335 cmd = ['phantomjs', phantomtest, json.dumps(options)]
336 self.phantom_run(cmd, timeout)
339 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: