"""
t0 = time.time()
- buf = ''
+ buf = bytearray()
while 1:
# timeout
- if time.time() > t0 + timeout:
- raise Exception("phantomjs test timeout (%ss)" % timeout)
+ self.assertLess(time.time(), t0 + timeout,
+ "PhantomJS tests should take less than %s seconds" % timeout)
# read a byte
ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
if ready:
s = phantom.stdout.read(1)
if s:
- buf += s
+ buf.append(s)
else:
break
# process lines
if '\n' in buf:
line, buf = buf.split('\n', 1)
- _logger.info("phantomjs: %s", line)
if line == "ok":
- _logger.info("phantomjs test successful")
return
- if line == "error":
- raise Exception("phantomjs test failed")
+ if line.startswith("error"):
+ # 'error $message' or use generic message
+ self.fail(line[6:] or "phantomjs test failed")
+ _logger.info("phantomjs: %s", line)
def phantom_run(self, cmd, timeout):
- _logger.info('executing %s', cmd)
+ _logger.debug('executing `%s`', ' '.join(cmd))
try:
phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
- _logger.info("phantomjs not found, test skipped")
- return
+ raise unittest2.SkipTest("PhantomJS not found")
try:
self.phantom_poll(phantom, timeout)
finally: