1 # -*- coding: utf-8 -*-
3 The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
4 helpers and classes to write tests.
19 from contextlib import contextmanager
20 from datetime import datetime, timedelta
25 from openerp import api
26 from openerp.modules.registry import RegistryManager
28 _logger = logging.getLogger(__name__)
30 # The openerp library is supposed already configured.
31 ADDONS_PATH = openerp.tools.config['addons_path']
33 PORT = openerp.tools.config['xmlrpc_port']
34 DB = openerp.tools.config['db_name']
35 # If the database name is not provided on the command-line,
36 # use the one on the thread (which means if it is provided on
37 # the command-line, this will break when installing another
38 # database from XML-RPC).
39 if not DB and hasattr(threading.current_thread(), 'dbname'):
40 DB = threading.current_thread().dbname
41 # Useless constant, tests are aware of the content of demo data
42 ADMIN_USER_ID = openerp.SUPERUSER_ID
45 """ Sets the at-install state of a test, the flag is a boolean specifying
46 whether the test should (``True``) or should not (``False``) run during
49 By default, tests are run right after installing the module, before
50 starting the installation of the next module.
57 def post_install(flag):
58 """ Sets the post-install state of a test. The flag is a boolean
59 specifying whether the test should or should not run after a set of
62 By default, tests are *not* run after installation of all modules in the
63 current installation set.
66 obj.post_install = flag
70 class BaseCase(unittest2.TestCase):
72 Subclass of TestCase for common OpenERP-specific code.
74 This class is abstract and expects self.registry, self.cr and self.uid to be
75 initialized by subclasses.
79 return self.registry.cursor()
82 """ Returns database ID for the provided :term:`external identifier`,
83 shortcut for ``get_object_reference``
85 :param xid: fully-qualified :term:`external identifier`, in the form
86 :samp:`{module}.{identifier}`
87 :raise: ValueError if not found
88 :returns: registered id
90 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
91 module, xid = xid.split('.')
92 _, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
95 def browse_ref(self, xid):
96 """ Returns a record object for the provided
97 :term:`external identifier`
99 :param xid: fully-qualified :term:`external identifier`, in the form
100 :samp:`{module}.{identifier}`
101 :raise: ValueError if not found
102 :returns: :class:`~openerp.models.BaseModel`
104 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
105 module, xid = xid.split('.')
106 return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
109 def _assertRaises(self, exception):
110 """ Context manager that clears the environment upon failure. """
111 with super(BaseCase, self).assertRaises(exception):
112 with self.env.clear_upon_failure():
115 def assertRaises(self, exception, func=None, *args, **kwargs):
117 with self._assertRaises(exception):
118 func(*args, **kwargs)
120 return self._assertRaises(exception)
123 class TransactionCase(BaseCase):
124 """ TestCase in which each test method is run in its own transaction,
125 and with its own cursor. The transaction is rolled back and the cursor
126 is closed after each test.
130 self.registry = RegistryManager.get(DB)
131 #: current transaction's cursor
132 self.cr = self.cursor()
133 self.uid = openerp.SUPERUSER_ID
134 #: :class:`~openerp.api.Environment` for the current test case
135 self.env = api.Environment(self.cr, self.uid, {})
138 # rollback and close the cursor, and reset the environments
144 class SingleTransactionCase(BaseCase):
145 """ TestCase in which all test methods are run in the same transaction,
146 the transaction is started with the first test method and rolled back at
152 cls.registry = RegistryManager.get(DB)
153 cls.cr = cls.registry.cursor()
154 cls.uid = openerp.SUPERUSER_ID
155 cls.env = api.Environment(cls.cr, cls.uid, {})
158 def tearDownClass(cls):
159 # rollback and close the cursor, and reset the environments
165 class RedirectHandler(urllib2.HTTPRedirectHandler):
167 HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
168 works by intercepting 3xy "errors".
170 Inherit from it to handle 3xy non-error responses instead, as we're not
171 using the error processor
174 def http_response(self, request, response):
175 code, msg, hdrs = response.code, response.msg, response.info()
177 if 300 <= code < 400:
178 return self.parent.error(
179 'http', request, response, code, msg, hdrs)
183 https_response = http_response
185 class HttpCase(TransactionCase):
186 """ Transactional HTTP TestCase with url_open and phantomjs helpers.
189 def __init__(self, methodName='runTest'):
190 super(HttpCase, self).__init__(methodName)
191 # v8 api with correct xmlrpc exception handling.
192 self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
193 self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
194 self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
195 self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
198 super(HttpCase, self).setUp()
199 self.registry.enter_test_mode()
200 # setup a magic session_id that will be rollbacked
201 self.session = openerp.http.root.session_store.new()
202 self.session_id = self.session.sid
204 openerp.http.root.session_store.save(self.session)
205 # setup an url opener helper
206 self.opener = urllib2.OpenerDirector()
207 self.opener.add_handler(urllib2.UnknownHandler())
208 self.opener.add_handler(urllib2.HTTPHandler())
209 self.opener.add_handler(urllib2.HTTPSHandler())
210 self.opener.add_handler(urllib2.HTTPCookieProcessor())
211 self.opener.add_handler(RedirectHandler())
212 self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
215 self.registry.leave_test_mode()
216 super(HttpCase, self).tearDown()
218 def url_open(self, url, data=None, timeout=10):
219 if url.startswith('/'):
220 url = "http://localhost:%s%s" % (PORT, url)
221 return self.opener.open(url, data, timeout)
223 def authenticate(self, user, password):
225 url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
226 auth = self.url_open(url)
227 assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()
229 def phantom_poll(self, phantom, timeout):
230 """ Phantomjs Test protocol.
232 Use console.log in phantomjs to output test results:
234 - for a success: console.log("ok")
235 - for an error: console.log("error")
237 Other lines are relayed to the test log.
241 td = timedelta(seconds=timeout)
245 self.assertLess(datetime.now() - t0, td,
246 "PhantomJS tests should take less than %s seconds" % timeout)
250 ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
251 except select.error, e:
252 # In Python 2, select.error has no relation to IOError or
253 # OSError, and no errno/strerror/filename, only a pair of
254 # unnamed arguments (matching errno and strerror)
256 if err == errno.EINTR:
261 s = phantom.stdout.read(1)
268 line, buf = buf.split('\n', 1)
271 # relay everything from console.log, even 'ok' or 'error...' lines
272 _logger.info("phantomjs: %s", line)
276 if line.startswith("error"):
278 # when error occurs the execution stack may be sent as as JSON
280 line_ = json.loads(line_)
283 self.fail(line_ or "phantomjs test failed")
285 def phantom_run(self, cmd, timeout):
286 _logger.info('phantom_run executing %s', ' '.join(cmd))
288 ls_glob = os.path.expanduser('~/.qws/share/data/Ofi Labs/PhantomJS/http_localhost_%s.*'%PORT)
289 for i in glob.glob(ls_glob):
290 _logger.info('phantomjs unlink localstorage %s', i)
293 phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
295 raise unittest2.SkipTest("PhantomJS not found")
297 self.phantom_poll(phantom, timeout)
299 # kill phantomjs if phantom.exit() wasn't called in the test
300 if phantom.poll() is None:
303 self._wait_remaining_requests()
304 _logger.info("phantom_run execution finished")
306 def _wait_remaining_requests(self):
307 t0 = int(time.time())
308 for thread in threading.enumerate():
309 if thread.name.startswith('openerp.service.http.request.'):
310 while thread.isAlive():
311 # Need a busyloop here as thread.join() masks signals
312 # and would prevent the forced shutdown.
315 t1 = int(time.time())
317 _logger.info('remaining requests')
318 openerp.tools.misc.dumpstacks()
321 def phantom_jsfile(self, jsfile, timeout=60, **kw):
326 'session_id': self.session_id,
329 phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
330 # phantom.args[0] == phantomtest path
331 # phantom.args[1] == options
334 jsfile, phantomtest, json.dumps(options)
336 self.phantom_run(cmd, timeout)
338 def phantom_js(self, url_path, code, ready="window", login=None, timeout=60, **kw):
339 """ Test js code running in the browser
340 - optionnally log as 'login'
341 - load page given by url_path
342 - wait for ready object to be available
343 - eval(code) inside the page
345 To signal success test do:
348 To signal failure do:
351 If neither are done before timeout test fails.
356 'url_path': url_path,
361 'session_id': self.session_id,
364 options.setdefault('password', options.get('login'))
365 phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
366 cmd = ['phantomjs', phantomtest, json.dumps(options)]
367 self.phantom_run(cmd, timeout)
370 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: