[FIX] tour.js: tour shop
[odoo/odoo.git] / openerp / tests / common.py
1 # -*- coding: utf-8 -*-
2 """
3 The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
4 helpers and classes to write tests.
5
6 """
7 import errno
8 import glob
9 import json
10 import logging
11 import os
12 import select
13 import subprocess
14 import threading
15 import time
16 import unittest2
17 import urllib2
18 import xmlrpclib
19 from datetime import datetime, timedelta
20 from shutil import rmtree
21 from tempfile import mkdtemp
22
23 import werkzeug
24
25 import openerp
26 from openerp.modules.registry import RegistryManager
27
28 _logger = logging.getLogger(__name__)
29
30 # The openerp library is supposed already configured.
31 ADDONS_PATH = openerp.tools.config['addons_path']
32 HOST = '127.0.0.1'
33 PORT = openerp.tools.config['xmlrpc_port']
34 DB = openerp.tools.config['db_name']
35 # If the database name is not provided on the command-line,
36 # use the one on the thread (which means if it is provided on
37 # the command-line, this will break when installing another
38 # database from XML-RPC).
39 if not DB and hasattr(threading.current_thread(), 'dbname'):
40     DB = threading.current_thread().dbname
41 # Useless constant, tests are aware of the content of demo data
42 ADMIN_USER_ID = openerp.SUPERUSER_ID
43
44 def at_install(flag):
45     """ Sets the at-install state of a test, the flag is a boolean specifying
46     whether the test should (``True``) or should not (``False``) run during
47     module installation.
48
49     By default, tests are run at install.
50     """
51     def decorator(obj):
52         obj.at_install = flag
53         return obj
54     return decorator
55
56 def post_install(flag):
57     """ Sets the post-install state of a test. The flag is a boolean
58     specifying whether the test should or should not run after a set of
59     module installations.
60
61     By default, tests are *not* run after installation.
62     """
63     def decorator(obj):
64         obj.post_install = flag
65         return obj
66     return decorator
67
68 class BaseCase(unittest2.TestCase):
69     """
70     Subclass of TestCase for common OpenERP-specific code.
71     
72     This class is abstract and expects self.registry, self.cr and self.uid to be
73     initialized by subclasses.
74     """
75
76     def cursor(self):
77         return self.registry.cursor()
78
79     def ref(self, xid):
80         """ Returns database ID corresponding to a given identifier.
81
82             :param xid: fully-qualified record identifier, in the form ``module.identifier``
83             :raise: ValueError if not found
84         """
85         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
86         module, xid = xid.split('.')
87         _, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
88         return id
89
90     def browse_ref(self, xid):
91         """ Returns a browsable record for the given identifier.
92
93             :param xid: fully-qualified record identifier, in the form ``module.identifier``
94             :raise: ValueError if not found
95         """
96         assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
97         module, xid = xid.split('.')
98         return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
99
100
101 class TransactionCase(BaseCase):
102     """
103     Subclass of BaseCase with a single transaction, rolled-back at the end of
104     each test (method).
105     """
106
107     def setUp(self):
108         self.registry = RegistryManager.get(DB)
109         self.cr = self.cursor()
110         self.uid = openerp.SUPERUSER_ID
111
112     def tearDown(self):
113         self.cr.rollback()
114         self.cr.close()
115
116
117 class SingleTransactionCase(BaseCase):
118     """
119     Subclass of BaseCase with a single transaction for the whole class,
120     rolled-back after all the tests.
121     """
122
123     @classmethod
124     def setUpClass(cls):
125         cls.registry = RegistryManager.get(DB)
126         cls.cr = cls.registry.cursor()
127         cls.uid = openerp.SUPERUSER_ID
128
129     @classmethod
130     def tearDownClass(cls):
131         cls.cr.rollback()
132         cls.cr.close()
133
134 class RedirectHandler(urllib2.HTTPRedirectHandler):
135     """
136     HTTPRedirectHandler is predicated upon HTTPErrorProcessor being used and
137     works by intercepting 3xy "errors".
138
139     Inherit from it to handle 3xy non-error responses instead, as we're not
140     using the error processor
141     """
142
143     def http_response(self, request, response):
144         code, msg, hdrs = response.code, response.msg, response.info()
145
146         if 300 <= code < 400:
147             return self.parent.error(
148                 'http', request, response, code, msg, hdrs)
149
150         return response
151
152     https_response = http_response
153
154 class HttpCase(TransactionCase):
155     """ Transactionnal HTTP TestCase with url_open and phantomjs helpers.
156     """
157
158     def __init__(self, methodName='runTest'):
159         super(HttpCase, self).__init__(methodName)
160         # v8 api with correct xmlrpc exception handling.
161         self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
162         self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
163         self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
164         self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
165
166     def setUp(self):
167         super(HttpCase, self).setUp()
168         self.registry.enter_test_mode()
169         # setup a magic session_id that will be rollbacked
170         self.session = openerp.http.root.session_store.new()
171         self.session_id = self.session.sid
172         self.session.db = DB
173         openerp.http.root.session_store.save(self.session)
174         # setup an url opener helper
175         self.opener = urllib2.OpenerDirector()
176         self.opener.add_handler(urllib2.UnknownHandler())
177         self.opener.add_handler(urllib2.HTTPHandler())
178         self.opener.add_handler(urllib2.HTTPSHandler())
179         self.opener.add_handler(urllib2.HTTPCookieProcessor())
180         self.opener.add_handler(RedirectHandler())
181         self.opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
182
183     def tearDown(self):
184         self.registry.leave_test_mode()
185         super(HttpCase, self).tearDown()
186
187     def url_open(self, url, data=None, timeout=10):
188         if url.startswith('/'):
189             url = "http://localhost:%s%s" % (PORT, url)
190         return self.opener.open(url, data, timeout)
191
192     def authenticate(self, user, password):
193         if user is not None:
194             url = '/login?%s' % werkzeug.urls.url_encode({'db': DB,'login': user, 'key': password})
195             auth = self.url_open(url)
196             assert auth.getcode() < 400, "Auth failure %d" % auth.getcode()
197
198     def phantom_poll(self, phantom, timeout):
199         """ Phantomjs Test protocol.
200
201         Use console.log in phantomjs to output test results:
202
203         - for a success: console.log("ok")
204         - for an error:  console.log("error")
205
206         Other lines are relayed to the test log.
207
208         """
209         t0 = datetime.now()
210         td = timedelta(seconds=timeout)
211         buf = bytearray()
212         while True:
213             # timeout
214             self.assertLess(datetime.now() - t0, td,
215                 "PhantomJS tests should take less than %s seconds" % timeout)
216
217             # read a byte
218             try:
219                 ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
220             except select.error, e:
221                 # In Python 2, select.error has no relation to IOError or
222                 # OSError, and no errno/strerror/filename, only a pair of
223                 # unnamed arguments (matching errno and strerror)
224                 err, _ = e.args
225                 if err == errno.EINTR:
226                     continue
227                 raise
228
229             if ready:
230                 s = phantom.stdout.read(1)
231                 if not s:
232                     break
233                 buf.append(s)
234
235             # process lines
236             if '\n' in buf:
237                 line, buf = buf.split('\n', 1)
238                 line = str(line)
239
240                 # relay everything from console.log, even 'ok' or 'error...' lines
241                 _logger.info("phantomjs: %s", line)
242
243                 if line == "ok":
244                     break
245                 if line.startswith("error"):
246                     line_ = line[6:]
247                     # when error occurs the execution stack may be sent as as JSON
248                     try:
249                         line_ = json.loads(line_)
250                     except ValueError: 
251                         pass
252                     self.fail(line_ or "phantomjs test failed")
253
254     def phantom_run(self, cmd, timeout):
255         _logger.info('phantom_run executing %s', ' '.join(cmd))
256
257         ls_glob = os.path.expanduser('~/.qws/share/data/Ofi Labs/PhantomJS/http_localhost_%s.*'%PORT)
258         for i in glob.glob(ls_glob):
259             _logger.info('phantomjs unlink localstorage %s', i)
260             os.unlink(i)
261         try:
262             phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
263         except OSError:
264             raise unittest2.SkipTest("PhantomJS not found")
265         try:
266             self.phantom_poll(phantom, timeout)
267         finally:
268             # kill phantomjs if phantom.exit() wasn't called in the test
269             if phantom.poll() is None:
270                 phantom.terminate()
271                 phantom.wait()
272             self._wait_remaining_requests()
273             _logger.info("phantom_run execution finished")
274
275     def _wait_remaining_requests(self):
276         t0 = int(time.time())
277         for thread in threading.enumerate():
278             if thread.name.startswith('openerp.service.http.request.'):
279                 while thread.isAlive():
280                     # Need a busyloop here as thread.join() masks signals
281                     # and would prevent the forced shutdown.
282                     thread.join(0.05)
283                     time.sleep(0.05)
284                     t1 = int(time.time())
285                     if t0 != t1:
286                         _logger.info('remaining requests')
287                         openerp.tools.misc.dumpstacks()
288                         t0 = t1
289
290     def phantom_jsfile(self, jsfile, timeout=60, **kw):
291         options = {
292             'timeout' : timeout,
293             'port': PORT,
294             'db': DB,
295             'session_id': self.session_id,
296         }
297         options.update(kw)
298         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
299         # phantom.args[0] == phantomtest path
300         # phantom.args[1] == options
301         cmd = [
302             'phantomjs',
303             jsfile, phantomtest, json.dumps(options)
304         ]
305         self.phantom_run(cmd, timeout)
306
307     def phantom_js(self, url_path, code, ready="window", login=None, timeout=60, **kw):
308         """ Test js code running in the browser
309         - optionnally log as 'login'
310         - load page given by url_path
311         - wait for ready object to be available
312         - eval(code) inside the page
313
314         To signal success test do:
315         console.log('ok')
316
317         To signal failure do:
318         console.log('error')
319
320         If neither are done before timeout test fails.
321         """
322         options = {
323             'port': PORT,
324             'db': DB,
325             'url_path': url_path,
326             'code': code,
327             'ready': ready,
328             'timeout' : timeout,
329             'login' : login,
330             'session_id': self.session_id,
331         }
332         options.update(kw)
333         options.setdefault('password', options.get('login'))
334         phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
335         cmd = ['phantomjs', phantomtest, json.dumps(options)]
336         self.phantom_run(cmd, timeout)
337
338
339 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: