"""
# some magic to lazy create the cr
if not self._cr:
- # Test cursors
- self._cr = openerp.tests.common.acquire_test_cursor(self.session_id)
- if not self._cr:
- self._cr = self.registry.get_cursor()
+ self._cr = self.registry.get_cursor()
return self._cr
def __enter__(self):
_request_stack.pop()
if self._cr:
- # Dont close test cursors
- if not openerp.tests.common.release_test_cursor(self._cr):
- if exc_type is None and not self._failed:
- self._cr.commit()
- else:
- # just to be explicit - happens at close() anyway
- self._cr.rollback()
- self._cr.close()
+ if exc_type is None and not self._failed:
+ self._cr.commit()
+ self._cr.close()
# just to be sure no one tries to re-use the request
self.disable_db = True
self.uid = None
self.db_name = db_name
self.db = openerp.sql_db.db_connect(db_name)
+ # special cursor for test mode; None means "normal" mode
+ self.test_cr = None
+
# Indicates that the registry is
self.ready = False
r, c)
return r, c
+ def enter_test_mode(self):
+ """ Enter the 'test' mode, where one cursor serves several requests. """
+ assert self.test_cr is None
+ self.test_cr = self.db.test_cursor()
+ RegistryManager.enter_test_mode()
+
+ def leave_test_mode(self):
+ """ Leave the test mode. """
+ assert self.test_cr is not None
+ self.test_cr.close(force=True) # close the cursor for real
+ self.test_cr = None
+ RegistryManager.leave_test_mode()
+
def get_cursor(self):
""" Return a new cursor for the database. """
+ if self.test_cr is not None:
+ # While in test mode, we use one special cursor across requests. The
+ # test cursor uses a reentrant lock to serialize accesses. The lock
+ # is granted here by get_cursor(), and automatically released by the
+ # cursor itself in its method close().
+ self.test_cr.acquire()
+ return self.test_cr
return self.db.cursor()
@contextmanager
def __getattr__(self, name):
return getattr(self._obj, name)
+class TestCursor(Cursor):
+ """ A cursor to be used for tests. It keeps the transaction open across
+ several requests, and simulates committing, rolling back, and closing.
+ """
+ def __init__(self, *args, **kwargs):
+ # in order to simulate commit and rollback, the cursor maintains a
+ # savepoint at its last commit
+ super(TestCursor, self).__init__(*args, **kwargs)
+ super(TestCursor, self).execute("SAVEPOINT test_cursor")
+ self._lock = threading.RLock()
+ self._auto_commit = False
+
+ def acquire(self):
+ self._lock.acquire()
+
+ def release(self):
+ self._lock.release()
+
+ def execute(self, *args, **kwargs):
+ super(TestCursor, self).execute(*args, **kwargs)
+ if self._auto_commit:
+ self.commit()
+
+ def close(self, force=False):
+ self.rollback() # for stuff that has not been committed
+ if force:
+ super(TestCursor, self).close()
+ else:
+ self.release()
+
+ def autocommit(self, on):
+ self._auto_commit = on
+
+ def commit(self):
+ super(TestCursor, self).execute("RELEASE SAVEPOINT test_cursor")
+ super(TestCursor, self).execute("SAVEPOINT test_cursor")
+
+ def rollback(self):
+ super(TestCursor, self).execute("ROLLBACK TO SAVEPOINT test_cursor")
+ super(TestCursor, self).execute("SAVEPOINT test_cursor")
+
class PsycoConnection(psycopg2.extensions.connection):
pass
_logger.debug('create %scursor to %r', cursor_type, self.dbname)
return Cursor(self._pool, self.dbname, serialized=serialized)
+ def test_cursor(self, serialized=True):
+ cursor_type = serialized and 'serialized ' or ''
+ _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
+ return TestCursor(self._pool, self.dbname, serialized=serialized)
+
# serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor
import werkzeug
import openerp
+from openerp.modules.registry import RegistryManager
_logger = logging.getLogger(__name__)
# Useless constant, tests are aware of the content of demo data
ADMIN_USER_ID = openerp.SUPERUSER_ID
-# Magic session_id, unfortunately we have to serialize access to the cursors to
-# serialize requests. We first tried to duplicate the database for each tests
-# but this proved too slow. Any idea to improve this is welcome.
-HTTP_SESSION = {}
-
-def acquire_test_cursor(session_id):
- if openerp.tools.config['test_enable']:
- cr = HTTP_SESSION.get(session_id)
- if cr:
- cr._test_lock.acquire()
- return cr
-
-def release_test_cursor(cr):
- if openerp.tools.config['test_enable']:
- if hasattr(cr, '_test_lock'):
- cr._test_lock.release()
- return True
- return False
-
def at_install(flag):
""" Sets the at-install state of a test, the flag is a boolean specifying
whether the test should (``True``) or should not (``False``) run during
"""
def setUp(self):
- self.registry = openerp.modules.registry.RegistryManager.get(DB)
+ self.registry = RegistryManager.get(DB)
self.cr = self.cursor()
self.uid = openerp.SUPERUSER_ID
@classmethod
def setUpClass(cls):
- cls.registry = openerp.modules.registry.RegistryManager.get(DB)
+ cls.registry = RegistryManager.get(DB)
cls.cr = cls.registry.get_cursor()
cls.uid = openerp.SUPERUSER_ID
def setUp(self):
super(HttpCase, self).setUp()
- openerp.modules.registry.RegistryManager.enter_test_mode()
+ self.registry.enter_test_mode()
# setup a magic session_id that will be rollbacked
self.session = openerp.http.root.session_store.new()
self.session_id = self.session.sid
self.session.db = DB
openerp.http.root.session_store.save(self.session)
- self.cr._test_lock = threading.RLock()
- HTTP_SESSION[self.session_id] = self.cr
def tearDown(self):
- del HTTP_SESSION[self.session_id]
- openerp.modules.registry.RegistryManager.leave_test_mode()
+ self.registry.leave_test_mode()
super(HttpCase, self).tearDown()
def url_open(self, url, data=None, timeout=10):