[FIX] tests: make sure that a failed tests does not leave the environment dirty
[odoo/odoo.git] / openerp / tests / common.py
1 # -*- coding: utf-8 -*-
2 """
3 The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
4 helpers and classes to write tests.
5
6 """
7 import errno
8 import glob
9 import json
10 import logging
11 import os
12 import select
13 import subprocess
14 import threading
15 import time
16 import unittest2
17 import urllib2
18 import xmlrpclib
19 from contextlib import contextmanager
20 from datetime import datetime, timedelta
21
22 import werkzeug
23
24 import openerp
25 from openerp import api
26 from openerp.modules.registry import RegistryManager
27
28 _logger = logging.getLogger(__name__)
29
30 # The openerp library is supposed already configured.
31 ADDONS_PATH = openerp.tools.config['addons_path']
32 HOST = '127.0.0.1'
33 PORT = openerp.tools.config['xmlrpc_port']
34 DB = openerp.tools.config['db_name']
35 # If the database name is not provided on the command-line,
36 # use the one on the thread (which means if it is provided on
37 # the command-line, this will break when installing another
38 # database from XML-RPC).
39 if not DB and hasattr(threading.current_thread(), 'dbname'):
40     DB = threading.current_thread().dbname
41 # Useless constant, tests are aware of the content of demo data
42 ADMIN_USER_ID = openerp.SUPERUSER_ID
43
44 def at_install(flag):
45     """ Sets the at-install state of a test, the flag is a boolean specifying
46     whether the test should (``True``) or should not (``False``) run during
47     module installation.
48
49     By default, tests are run right after installing the module, before
50     starting the installation of the next module.
51     """
52     def decorator(obj):
53         obj.at_install = flag
54         return obj
55     return decorator
56
57 def post_install(flag):
58     """ Sets the post-install state of a test. The flag is a boolean
59     specifying whether the test should or should not run after a set of
60     module installations.
61
62     By default, tests are *not* run after installation of all modules in the
63     current installation set.
64     """
65     def decorator(obj):
66         obj.post_install = flag
67         return obj
68     return decorator
69
70 class BaseCase(unittest2.TestCase):
71     """
72     Subclass of TestCase for common OpenERP-specific code.
73     
74     This class is abstract and expects self.registry, self.cr and self.uid to be
75     initialized by subclasses.
76     """
77
78     def cursor(self):
79         return self.registry.cursor()
80
81     def ref(self, xid):
82         """ Returns database ID for the provided :term:`external identifier`,
83         shortcut for ``get_object_reference``
84
85         :param xid: fully-qualified :term:`external identifier`, in the form
86                     :samp:`{module}.{identifier}`
87         :raise: ValueError if not found
88         :returns: registered id
89         """
90         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
91         module, xid = xid.split('.')
92         _, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
93         return id
94
95     def browse_ref(self, xid):
96         """ Returns a record object for the provided
97         :term:`external identifier`
98
99         :param xid: fully-qualified :term:`external identifier`, in the form
100                     :samp:`{module}.{identifier}`
101         :raise: ValueError if not found
102         :returns: :class:`~openerp.models.BaseModel`
103         """
104         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
105         module, xid = xid.split('.')
106         return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
107
108     @contextmanager
109     def _assertRaises(self, exception):
110         """ Context manager that clears the environment upon failure. """
111         with super(BaseCase, self).assertRaises(exception):
112             with self.env.clear_upon_failure():
113                 yield
114
115     def assertRaises(self, exception, func=None, *args, **kwargs):
116         if func:
117             with self._assertRaises(exception):
118                 func(*args, **kwargs)
119         else:
120             return self._assertRaises(exception)
121
122
123 class TransactionCase(BaseCase):
124     """ TestCase in which each test method is run in its own transaction,
125     and with its own cursor. The transaction is rolled back and the cursor
126     is closed after each test.
127     """
128
129     def setUp(self):
130         self.registry = RegistryManager.get(DB)
131         #: current transaction's cursor
132         self.cr = self.cursor()
133         self.uid = openerp.SUPERUSER_ID
134         #: :class:`~openerp.api.Environment` for the current test case
135         self.env = api.Environment(self.cr, self.uid, {})
136
137     def tearDown(self):
138         # rollback and close the cursor, and reset the environments
139         self.env.reset()
140         self.cr.rollback()
141         self.cr.close()
142
143
144 class SingleTransactionCase(BaseCase):
145     """ TestCase in which all test methods are run in the same transaction,
146     the transaction is started with the first test method and rolled back at
147     the end of the last.
148     """
149
150     @classmethod
151     def setUpClass(cls):
152         cls.registry = RegistryManager.get(DB)
153         cls.cr = cls.registry.cursor()
154         cls.uid = openerp.SUPERUSER_ID
155         cls.env = api.Environment(cls.cr, cls.uid, {})
156
157     @classmethod
158     def tearDownClass(cls):
159         # rollback and close the cursor, and reset the environments
160         cls.env.reset()
161         cls.cr.rollback()
162         cls.cr.close()
163
164
165 class RedirectHandler(urllib2.HTTPRedirectHandler):
166     """
167     HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
168     works by intercepting 3xy "errors".
169
170     Inherit from it to handle 3xy non-error responses instead, as we're not
171     using the error processor
172     """
173
174     def http_response(self, request, response):
175         code, msg, hdrs = response.code, response.msg, response.info()
176
177         if 300 <= code < 400:
178             return self.parent.error(
179                 'http', request, response, code, msg, hdrs)
180
181         return response
182
183     https_response = http_response
184
185 class HttpCase(TransactionCase):
186     """ Transactional HTTP TestCase with url_open and phantomjs helpers.
187     """
188
189     def __init__(self, methodName='runTest'):
190         super(HttpCase, self).__init__(methodName)
191         # v8 api with correct xmlrpc exception handling.
192         self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
193         self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
194         self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
195         self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
196
197     def setUp(self):
198         super(HttpCase, self).setUp()
199         self.registry.enter_test_mode()
200         # setup a magic session_id that will be rollbacked
201         self.session = openerp.http.root.session_store.new()
202         self.session_id = self.session.sid
203         self.session.db = DB
204         openerp.http.root.session_store.save(self.session)
205         # setup an url opener helper
206         self.opener = urllib2.OpenerDirector()
207         self.opener.add_handler(urllib2.UnknownHandler())
208         self.opener.add_handler(urllib2.HTTPHandler())
209         self.opener.add_handler(urllib2.HTTPSHandler())
210         self.opener.add_handler(urllib2.HTTPCookieProcessor())
211         self.opener.add_handler(RedirectHandler())
212         self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
213
214     def tearDown(self):
215         self.registry.leave_test_mode()
216         super(HttpCase, self).tearDown()
217
218     def url_open(self, url, data=None, timeout=10):
219         if url.startswith('/'):
220             url = "http://localhost:%s%s" % (PORT, url)
221         return self.opener.open(url, data, timeout)
222
223     def authenticate(self, user, password):
224         if user is not None:
225             url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
226             auth = self.url_open(url)
227             assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()
228
229     def phantom_poll(self, phantom, timeout):
230         """ Phantomjs Test protocol.
231
232         Use console.log in phantomjs to output test results:
233
234         - for a success: console.log("ok")
235         - for an error:  console.log("error")
236
237         Other lines are relayed to the test log.
238
239         """
240         t0 = datetime.now()
241         td = timedelta(seconds=timeout)
242         buf = bytearray()
243         while True:
244             # timeout
245             self.assertLess(datetime.now() - t0, td,
246                 "PhantomJS tests should take less than %s seconds" % timeout)
247
248             # read a byte
249             try:
250                 ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
251             except select.error, e:
252                 # In Python 2, select.error has no relation to IOError or
253                 # OSError, and no errno/strerror/filename, only a pair of
254                 # unnamed arguments (matching errno and strerror)
255                 err, _ = e.args
256                 if err == errno.EINTR:
257                     continue
258                 raise
259
260             if ready:
261                 s = phantom.stdout.read(1)
262                 if not s:
263                     break
264                 buf.append(s)
265
266             # process lines
267             if '\n' in buf:
268                 line, buf = buf.split('\n', 1)
269                 line = str(line)
270
271                 # relay everything from console.log, even 'ok' or 'error...' lines
272                 _logger.info("phantomjs: %s", line)
273
274                 if line == "ok":
275                     break
276                 if line.startswith("error"):
277                     line_ = line[6:]
278                     # when error occurs the execution stack may be sent as as JSON
279                     try:
280                         line_ = json.loads(line_)
281                     except ValueError: 
282                         pass
283                     self.fail(line_ or "phantomjs test failed")
284
285     def phantom_run(self, cmd, timeout):
286         _logger.info('phantom_run executing %s', ' '.join(cmd))
287
288         ls_glob = os.path.expanduser('~/.qws/share/data/Ofi Labs/PhantomJS/http_localhost_%s.*'%PORT)
289         for i in glob.glob(ls_glob):
290             _logger.info('phantomjs unlink localstorage %s', i)
291             os.unlink(i)
292         try:
293             phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
294         except OSError:
295             raise unittest2.SkipTest("PhantomJS not found")
296         try:
297             self.phantom_poll(phantom, timeout)
298         finally:
299             # kill phantomjs if phantom.exit() wasn't called in the test
300             if phantom.poll() is None:
301                 phantom.terminate()
302                 phantom.wait()
303             self._wait_remaining_requests()
304             _logger.info("phantom_run execution finished")
305
306     def _wait_remaining_requests(self):
307         t0 = int(time.time())
308         for thread in threading.enumerate():
309             if thread.name.startswith('openerp.service.http.request.'):
310                 while thread.isAlive():
311                     # Need a busyloop here as thread.join() masks signals
312                     # and would prevent the forced shutdown.
313                     thread.join(0.05)
314                     time.sleep(0.05)
315                     t1 = int(time.time())
316                     if t0 != t1:
317                         _logger.info('remaining requests')
318                         openerp.tools.misc.dumpstacks()
319                         t0 = t1
320
321     def phantom_jsfile(self, jsfile, timeout=60, **kw):
322         options = {
323             'timeout' : timeout,
324             'port': PORT,
325             'db': DB,
326             'session_id': self.session_id,
327         }
328         options.update(kw)
329         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
330         # phantom.args[0] == phantomtest path
331         # phantom.args[1] == options
332         cmd = [
333             'phantomjs',
334             jsfile, phantomtest, json.dumps(options)
335         ]
336         self.phantom_run(cmd, timeout)
337
338     def phantom_js(self, url_path, code, ready="window", login=None, timeout=60, **kw):
339         """ Test js code running in the browser
340         - optionnally log as 'login'
341         - load page given by url_path
342         - wait for ready object to be available
343         - eval(code) inside the page
344
345         To signal success test do:
346         console.log('ok')
347
348         To signal failure do:
349         console.log('error')
350
351         If neither are done before timeout test fails.
352         """
353         options = {
354             'port': PORT,
355             'db': DB,
356             'url_path': url_path,
357             'code': code,
358             'ready': ready,
359             'timeout' : timeout,
360             'login' : login,
361             'session_id': self.session_id,
362         }
363         options.update(kw)
364         options.setdefault('password', options.get('login'))
365         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
366         cmd = ['phantomjs', phantomtest, json.dumps(options)]
367         self.phantom_run(cmd, timeout)
368
369
370 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: