self.sql_log_count = 0
self._closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations
- self._pool = pool
+ self.__pool = pool
self.dbname = dbname
# Whether to enable snapshot isolation level for this cursor.
chosen_template = tools.config['db_template']
templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
keep_in_pool = self.dbname not in templates_list
- self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
+ self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
@check
def autocommit(self, on):
"""
return self._cnx.rollback()
+ def __enter__(self):
+ """ Using the cursor as a contextmanager automatically commits and
+ closes it::
+
+ with cr:
+ cr.execute(...)
+
+ # cr is committed if no failure occurred
+ # cr is closed in any case
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.commit()
+ self.close()
+
@contextmanager
@check
def savepoint(self):
def __getattr__(self, name):
return getattr(self._obj, name)
+class TestCursor(Cursor):
+ """ A cursor to be used for tests. It keeps the transaction open across
+ several requests, and simulates committing, rolling back, and closing.
+ """
+ def __init__(self, *args, **kwargs):
+ super(TestCursor, self).__init__(*args, **kwargs)
+ # in order to simulate commit and rollback, the cursor maintains a
+ # savepoint at its last commit
+ self.execute("SAVEPOINT test_cursor")
+ # we use a lock to serialize concurrent requests
+ self._lock = threading.RLock()
+
+ def acquire(self):
+ self._lock.acquire()
+
+ def release(self):
+ self._lock.release()
+
+ def force_close(self):
+ super(TestCursor, self).close()
+
+ def close(self):
+ if not self._closed:
+ self.rollback() # for stuff that has not been committed
+ self.release()
+
+ def autocommit(self, on):
+ _logger.debug("TestCursor.autocommit(%r) does nothing", on)
+
+ def commit(self):
+ self.execute("RELEASE SAVEPOINT test_cursor")
+ self.execute("SAVEPOINT test_cursor")
+
+ def rollback(self):
+ self.execute("ROLLBACK TO SAVEPOINT test_cursor")
+ self.execute("SAVEPOINT test_cursor")
+
class PsycoConnection(psycopg2.extensions.connection):
pass
def __init__(self, pool, dbname):
self.dbname = dbname
- self._pool = pool
+ self.__pool = pool
def cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or ''
_logger.debug('create %scursor to %r', cursor_type, self.dbname)
- return Cursor(self._pool, self.dbname, serialized=serialized)
+ return Cursor(self.__pool, self.dbname, serialized=serialized)
+
+ def test_cursor(self, serialized=True):
+ cursor_type = serialized and 'serialized ' or ''
+ _logger.debug('create test %scursor to %r', cursor_type, self.dbname)
+ return TestCursor(self.__pool, self.dbname, serialized=serialized)
# serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor