b3f6dbe81a366b7196a15f2c67c7e7324006c956
[odoo/odoo.git] / openerp / tests / common.py
1 # -*- coding: utf-8 -*-
2 """
3 The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
4 helpers and classes to write tests.
5
6 """
7 import errno
8 import glob
9 import json
10 import logging
11 import os
12 import select
13 import subprocess
14 import threading
15 import time
16 import unittest2
17 import urllib2
18 import xmlrpclib
19 from datetime import datetime, timedelta
20
21 import werkzeug
22
23 import openerp
24 from openerp import api
25 from openerp.modules.registry import RegistryManager
26
27 _logger = logging.getLogger(__name__)
28
29 # The openerp library is supposed already configured.
30 ADDONS_PATH = openerp.tools.config['addons_path']
31 HOST = '127.0.0.1'
32 PORT = openerp.tools.config['xmlrpc_port']
33 DB = openerp.tools.config['db_name']
34 # If the database name is not provided on the command-line,
35 # use the one on the thread (which means if it is provided on
36 # the command-line, this will break when installing another
37 # database from XML-RPC).
38 if not DB and hasattr(threading.current_thread(), 'dbname'):
39     DB = threading.current_thread().dbname
40 # Useless constant, tests are aware of the content of demo data
41 ADMIN_USER_ID = openerp.SUPERUSER_ID
42
43 def at_install(flag):
44     """ Sets the at-install state of a test, the flag is a boolean specifying
45     whether the test should (``True``) or should not (``False``) run during
46     module installation.
47
48     By default, tests are run right after installing the module, before
49     starting the installation of the next module.
50     """
51     def decorator(obj):
52         obj.at_install = flag
53         return obj
54     return decorator
55
56 def post_install(flag):
57     """ Sets the post-install state of a test. The flag is a boolean
58     specifying whether the test should or should not run after a set of
59     module installations.
60
61     By default, tests are *not* run after installation of all modules in the
62     current installation set.
63     """
64     def decorator(obj):
65         obj.post_install = flag
66         return obj
67     return decorator
68
69 class BaseCase(unittest2.TestCase):
70     """
71     Subclass of TestCase for common OpenERP-specific code.
72     
73     This class is abstract and expects self.registry, self.cr and self.uid to be
74     initialized by subclasses.
75     """
76
77     def cursor(self):
78         return self.registry.cursor()
79
80     def ref(self, xid):
81         """ Returns database ID for the provided :term:`external identifier`,
82         shortcut for ``get_object_reference``
83
84         :param xid: fully-qualified :term:`external identifier`, in the form
85                     :samp:`{module}.{identifier}`
86         :raise: ValueError if not found
87         :returns: registered id
88         """
89         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
90         module, xid = xid.split('.')
91         _, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
92         return id
93
94     def browse_ref(self, xid):
95         """ Returns a record object for the provided
96         :term:`external identifier`
97
98         :param xid: fully-qualified :term:`external identifier`, in the form
99                     :samp:`{module}.{identifier}`
100         :raise: ValueError if not found
101         :returns: :class:`~openerp.models.BaseModel`
102         """
103         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
104         module, xid = xid.split('.')
105         return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
106
107
108 class TransactionCase(BaseCase):
109     """ TestCase in which each test method is run in its own transaction,
110     and with its own cursor. The transaction is rolled back and the cursor
111     is closed after each test.
112     """
113
114     def setUp(self):
115         self.registry = RegistryManager.get(DB)
116         #: current transaction's cursor
117         self.cr = self.cursor()
118         self.uid = openerp.SUPERUSER_ID
119         #: :class:`~openerp.api.Environment` for the current test case
120         self.env = api.Environment(self.cr, self.uid, {})
121
122     def tearDown(self):
123         self.cr.rollback()
124         self.cr.close()
125
126
127 class SingleTransactionCase(BaseCase):
128     """ TestCase in which all test methods are run in the same transaction,
129     the transaction is started with the first test method and rolled back at
130     the end of the last.
131     """
132
133     @classmethod
134     def setUpClass(cls):
135         cls.registry = RegistryManager.get(DB)
136         cls.cr = cls.registry.cursor()
137         cls.uid = openerp.SUPERUSER_ID
138         cls.env = api.Environment(cls.cr, cls.uid, {})
139
140     @classmethod
141     def tearDownClass(cls):
142         cls.cr.rollback()
143         cls.cr.close()
144
145 class RedirectHandler(urllib2.HTTPRedirectHandler):
146     """
147     HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
148     works by intercepting 3xy "errors".
149
150     Inherit from it to handle 3xy non-error responses instead, as we're not
151     using the error processor
152     """
153
154     def http_response(self, request, response):
155         code, msg, hdrs = response.code, response.msg, response.info()
156
157         if 300 <= code < 400:
158             return self.parent.error(
159                 'http', request, response, code, msg, hdrs)
160
161         return response
162
163     https_response = http_response
164
165 class HttpCase(TransactionCase):
166     """ Transactional HTTP TestCase with url_open and phantomjs helpers.
167     """
168
169     def __init__(self, methodName='runTest'):
170         super(HttpCase, self).__init__(methodName)
171         # v8 api with correct xmlrpc exception handling.
172         self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
173         self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
174         self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
175         self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
176
177     def setUp(self):
178         super(HttpCase, self).setUp()
179         self.registry.enter_test_mode()
180         # setup a magic session_id that will be rollbacked
181         self.session = openerp.http.root.session_store.new()
182         self.session_id = self.session.sid
183         self.session.db = DB
184         openerp.http.root.session_store.save(self.session)
185         # setup an url opener helper
186         self.opener = urllib2.OpenerDirector()
187         self.opener.add_handler(urllib2.UnknownHandler())
188         self.opener.add_handler(urllib2.HTTPHandler())
189         self.opener.add_handler(urllib2.HTTPSHandler())
190         self.opener.add_handler(urllib2.HTTPCookieProcessor())
191         self.opener.add_handler(RedirectHandler())
192         self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
193
194     def tearDown(self):
195         self.registry.leave_test_mode()
196         super(HttpCase, self).tearDown()
197
198     def url_open(self, url, data=None, timeout=10):
199         if url.startswith('/'):
200             url = "http://localhost:%s%s" % (PORT, url)
201         return self.opener.open(url, data, timeout)
202
203     def authenticate(self, user, password):
204         if user is not None:
205             url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
206             auth = self.url_open(url)
207             assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()
208
209     def phantom_poll(self, phantom, timeout):
210         """ Phantomjs Test protocol.
211
212         Use console.log in phantomjs to output test results:
213
214         - for a success: console.log("ok")
215         - for an error:  console.log("error")
216
217         Other lines are relayed to the test log.
218
219         """
220         t0 = datetime.now()
221         td = timedelta(seconds=timeout)
222         buf = bytearray()
223         while True:
224             # timeout
225             self.assertLess(datetime.now() - t0, td,
226                 "PhantomJS tests should take less than %s seconds" % timeout)
227
228             # read a byte
229             try:
230                 ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
231             except select.error, e:
232                 # In Python 2, select.error has no relation to IOError or
233                 # OSError, and no errno/strerror/filename, only a pair of
234                 # unnamed arguments (matching errno and strerror)
235                 err, _ = e.args
236                 if err == errno.EINTR:
237                     continue
238                 raise
239
240             if ready:
241                 s = phantom.stdout.read(1)
242                 if not s:
243                     break
244                 buf.append(s)
245
246             # process lines
247             if '\n' in buf:
248                 line, buf = buf.split('\n', 1)
249                 line = str(line)
250
251                 # relay everything from console.log, even 'ok' or 'error...' lines
252                 _logger.info("phantomjs: %s", line)
253
254                 if line == "ok":
255                     break
256                 if line.startswith("error"):
257                     line_ = line[6:]
258                     # when error occurs the execution stack may be sent as as JSON
259                     try:
260                         line_ = json.loads(line_)
261                     except ValueError: 
262                         pass
263                     self.fail(line_ or "phantomjs test failed")
264
265     def phantom_run(self, cmd, timeout):
266         _logger.info('phantom_run executing %s', ' '.join(cmd))
267
268         ls_glob = os.path.expanduser('~/.qws/share/data/Ofi Labs/PhantomJS/http_localhost_%s.*'%PORT)
269         for i in glob.glob(ls_glob):
270             _logger.info('phantomjs unlink localstorage %s', i)
271             os.unlink(i)
272         try:
273             phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
274         except OSError:
275             raise unittest2.SkipTest("PhantomJS not found")
276         try:
277             self.phantom_poll(phantom, timeout)
278         finally:
279             # kill phantomjs if phantom.exit() wasn't called in the test
280             if phantom.poll() is None:
281                 phantom.terminate()
282                 phantom.wait()
283             self._wait_remaining_requests()
284             _logger.info("phantom_run execution finished")
285
286     def _wait_remaining_requests(self):
287         t0 = int(time.time())
288         for thread in threading.enumerate():
289             if thread.name.startswith('openerp.service.http.request.'):
290                 while thread.isAlive():
291                     # Need a busyloop here as thread.join() masks signals
292                     # and would prevent the forced shutdown.
293                     thread.join(0.05)
294                     time.sleep(0.05)
295                     t1 = int(time.time())
296                     if t0 != t1:
297                         _logger.info('remaining requests')
298                         openerp.tools.misc.dumpstacks()
299                         t0 = t1
300
301     def phantom_jsfile(self, jsfile, timeout=60, **kw):
302         options = {
303             'timeout' : timeout,
304             'port': PORT,
305             'db': DB,
306             'session_id': self.session_id,
307         }
308         options.update(kw)
309         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
310         # phantom.args[0] == phantomtest path
311         # phantom.args[1] == options
312         cmd = [
313             'phantomjs',
314             jsfile, phantomtest, json.dumps(options)
315         ]
316         self.phantom_run(cmd, timeout)
317
318     def phantom_js(self, url_path, code, ready="window", login=None, timeout=60, **kw):
319         """ Test js code running in the browser
320         - optionnally log as 'login'
321         - load page given by url_path
322         - wait for ready object to be available
323         - eval(code) inside the page
324
325         To signal success test do:
326         console.log('ok')
327
328         To signal failure do:
329         console.log('error')
330
331         If neither are done before timeout test fails.
332         """
333         options = {
334             'port': PORT,
335             'db': DB,
336             'url_path': url_path,
337             'code': code,
338             'ready': ready,
339             'timeout' : timeout,
340             'login' : login,
341             'session_id': self.session_id,
342         }
343         options.update(kw)
344         options.setdefault('password', options.get('login'))
345         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
346         cmd = ['phantomjs', phantomtest, json.dumps(options)]
347         self.phantom_run(cmd, timeout)
348
349
350 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: