Переглянути джерело

Merge read and write transactions & cursors; wrap transactions in a context manager.

Stefano Cossu 7 роки тому
батько
коміт
ad05dd4de9
2 змінених файлів з 216 додано та 252 видалено
  1. 89 136
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py
  2. 127 116
      tests/store/test_lmdb_store.py

+ 89 - 136
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -1,7 +1,7 @@
 import hashlib
 import logging
 
-from contextlib import ExitStack
+from contextlib import ContextDecorator
 from os import makedirs
 from os.path import exists, abspath
 from urllib.request import pathname2url
@@ -37,30 +37,38 @@ class NoTxnError(Exception):
         return 'No transaction active in the store.'
 
 
-def read_tx(dbs=(), buffers=True):
+class TxnManager(ContextDecorator):
     '''
-    Decorator to wrap a method into a read transaction.
+    Handle ACID transactions with an LmdStore.
 
-    This method creates the necessary cursors indicated in the `db` parameter.
+    To use this, wrap t within a `with` statement:
 
-    @param dbs (tuple|list:string) Database label(s) to open cursors. No
-    cursors are automatically opened by default.
+    >>> with TxnManager(store, True):
+    ...     # Do something with the database
+    >>>
+
+    The transaction will be opened and handled automatically.
     '''
-    def read_tx_deco(fn):
-        def wrapper(self, *args, **kwargs):
-            with ExitStack() as stack:
-                self.rtxn = stack.enter_context(
-                        self.db_env.begin(buffers=buffers))
-                self.rcurs = {}
-                for db_label in dbs:
-                    self.rcurs[db_label] = stack.enter_context(
-                            self.rtxn.cursor(self.dbs[db_label]))
-                stack.pop_all()
-                ret = fn(self, *args, **kwargs)
-                stack.close()
-                return ret
-        return wrapper
-    return read_tx_deco
+    def __init__(self, store, write=False):
+        '''
+        Begin and close a transaction in a store.
+
+        @param store (LmdbStore) The store to open a transaction on.
+        @param write (bool) Whether the transaction is read-write. Default is
+        False (read-only transaction).
+        '''
+        self.store = store
+        self.write = write
+
+    def __enter__(self):
+        self.txn = self.store.begin(write=self.write)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type:
+            self.store.rollback()
+        else:
+            self.store.commit()
+        return True
 
 
 class LmdbStore(Store):
@@ -130,7 +138,7 @@ class LmdbStore(Store):
     db_env = None
     db = None
     dbs = {}
-    wtxn = None
+    txn = None
 
 
     def __init__(self, path, identifier=None):
@@ -170,80 +178,33 @@ class LmdbStore(Store):
         return VALID_STORE
 
 
-    def begin(self):
+    def begin(self, write=False):
         '''
         Begin the main write transaction and create cursors.
         '''
         if not self.is_open:
             raise RuntimeError('Store must be opened first.')
-        self.wtxn = self.db_env.begin(write=True, buffers=True)
+        self.txn = self.db_env.begin(write=write, buffers=True)
         # Cursors.
-        self.wcurs = self.get_data_cursors(self.wtxn)
-        self.wcurs.update(self.get_idx_cursors(self.wtxn))
+        self.curs = self.get_data_cursors(self.txn)
+        self.curs.update(self.get_idx_cursors(self.txn))
 
 
     @property
-    def is_rtxn_open(self):
+    def is_txn_open(self):
         '''
-        Whether the main read transaction is open.
+        Whether the main transaction is open.
         '''
         try:
-            self.rtxn.id()
+            self.txn.id()
         except (lmdb.Error, AttributeError):
-            logger.info('Read transaction does not exist or is closed.')
+            logger.info('Main transaction does not exist or is closed.')
             return False
         else:
-            logger.info('Read transaction is open.')
+            logger.info('Main transaction is open.')
             return True
 
 
-    @property
-    def is_wtxn_open(self):
-        '''
-        Whether the main write transaction is open.
-        '''
-        try:
-            self.wtxn.id()
-        except (lmdb.Error, AttributeError):
-            logger.info('Write transaction does not exist or is closed.')
-            return False
-        else:
-            logger.info('Write transaction is open.')
-            return True
-
-
-    @property
-    def txn(self):
-        '''
-        Get current active transaction for read-only use.
-
-        @return lmdb.Transaction|None Return the main read transaction or the
-        main write transaction, whichever is open, or None if neither is
-        open.
-        '''
-        if self.is_rtxn_open:
-            return self.rtxn
-        elif self.is_wtxn_open:
-            return self.wtxn
-        else:
-            return None
-
-
-    @property
-    def curs(self):
-        '''
-        Get cursor list for the current active transaction. See txn.
-
-        @return dict:lmdb.Cursor
-        '''
-        if self.is_rtxn_open:
-            return self.rcurs
-        if self.is_wtxn_open:
-            return self.wcurs
-        else:
-            return None
-
-
     def get_data_cursors(self, txn):
         '''
         Build the main data cursors for a transaction.
@@ -271,7 +232,7 @@ class LmdbStore(Store):
         '''
         cur = {}
         for key in self.idx_keys:
-            cur[key] = self.wtxn.cursor(self.dbs[key])
+            cur[key] = self.txn.cursor(self.dbs[key])
 
         return cur
 
@@ -283,12 +244,11 @@ class LmdbStore(Store):
         Do this at server shutdown.
         '''
         self.__open = False
-        if self.is_wtxn_open:
+        if self.is_txn_open:
             if commit_pending_transaction:
                 self.commit()
             else:
                 self.rollback()
-            self.wtxn = None
 
         self.db_env.close()
 
@@ -313,12 +273,12 @@ class LmdbStore(Store):
         trp_key = hashlib.new(self.KEY_HASH_ALGO, pk_trp).digest()
 
         needs_indexing = False
-        if self.wcurs['tk:t'].put(trp_key, pk_trp, overwrite=False):
+        if self.curs['tk:t'].put(trp_key, pk_trp, overwrite=False):
             needs_indexing = True
 
         pk_ctx = self._pickle(context)
-        if not self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
-            self.wcurs['tk:c'].put(trp_key, pk_ctx)
+        if not self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+            self.curs['tk:c'].put(trp_key, pk_ctx)
             needs_indexing = True
 
         if needs_indexing:
@@ -339,15 +299,14 @@ class LmdbStore(Store):
         for trp in self.triples(triple_pattern, context):
             trp_key = self._to_key(trp)
 
-            import pdb; pdb.set_trace()
             # Delete context association.
-            if self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
-                self.wcurs['tk:c'].delete()
+            if self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+                self.curs['tk:c'].delete()
 
                 # If no other contexts are associated w/ the triple, delete it.
-                if not self.wcurs['tk:c'].set_key(trp_key) and (
-                        self.wcurs['tk:t'].set_key(trp_key)):
-                    self.wcurs['tk:t'].delete()
+                if not self.curs['tk:c'].set_key(trp_key) and (
+                        self.curs['tk:t'].set_key(trp_key)):
+                    self.curs['tk:t'].delete()
 
                 # @TODO make await
                 self._update_indices(trp, trp_key, pk_ctx)
@@ -374,26 +333,23 @@ class LmdbStore(Store):
             'pok:tk': self._to_key((p, o)),
         }
 
-        if self.wcurs['tk:t'].get(trp_key):
+        if self.curs['tk:t'].get(trp_key):
             # Add to index.
             for ikey in term_keys:
-                self.wcurs[ikey].put(term_keys[ikey], trp_key)
+                self.curs[ikey].put(term_keys[ikey], trp_key)
         else:
             # Delete from index if a match is found.
             for ikey in term_keys:
-                if self.wcurs[ikey].set_key_dup(term_keys[ikey], trp_key):
-                    self.wcurs[ikey].delete()
+                if self.curs[ikey].set_key_dup(term_keys[ikey], trp_key):
+                    self.curs[ikey].delete()
 
         # Add or remove context association index.
-        if self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
-            self.wcurs['c:tk'].put(pk_ctx, trp_key)
-        elif self.wcurs['c:tk'].set_key_dup(pk_ctx, trp_key):
-            self.wcurs['c:tk'].delete()
+        if self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+            self.curs['c:tk'].put(pk_ctx, trp_key)
+        elif self.curs['c:tk'].set_key_dup(pk_ctx, trp_key):
+            self.curs['c:tk'].delete()
 
 
-    @read_tx((
-        'sk:tk', 'pk:tk', 'ok:tk', 'spk:tk', 'sok:tk', 'pok:tk',
-        'c:tk', 'tk:c', 'tk:t'))
     def triples(self, triple_pattern, context=None):
         '''
         Generator over matching triples.
@@ -416,13 +372,13 @@ class LmdbStore(Store):
 
         # Shortcuts
         pk_ctx = self._pickle(context)
-        if not self.rcurs['c:tk'].set_key(pk_ctx):
+        if not self.curs['c:tk'].set_key(pk_ctx):
             # Context not found.
             return iter(())
 
         # s p o c
         if all(triple_pattern):
-            if self.rcurs['tk:c'].set_key_dup(tkey, pk_ctx):
+            if self.curs['tk:c'].set_key_dup(tkey, pk_ctx):
                 yield self._key_to_triple(tkey)
                 return
             else:
@@ -432,17 +388,16 @@ class LmdbStore(Store):
         # ? ? ? c
         elif not any(triple_pattern):
             # Get all triples from the context
-            for tk in self.rcurs['c:tk'].iternext_dup():
+            for tk in self.curs['c:tk'].iternext_dup():
                 yield self._key_to_triple(tk)
 
         # Regular lookup.
         else:
             for tk in self._lookup(triple_pattern, tkey):
-                if self.rcurs['c:tk'].set_key_dup(pk_ctx, tk):
+                if self.curs['c:tk'].set_key_dup(pk_ctx, tk):
                     yield self._key_to_triple(tk)
 
 
-    @read_tx()
     def __len__(self, context=None):
         '''
         Return length of the dataset.
@@ -455,7 +410,7 @@ class LmdbStore(Store):
             dataset = self.triples((None, None, None), context)
             return len(set(dataset))
         else:
-            return self.rtxn.stat(self.dbs['tk:t'])['entries']
+            return self.txn.stat(self.dbs['tk:t'])['entries']
 
 
     def bind(self, prefix, namespace):
@@ -464,23 +419,21 @@ class LmdbStore(Store):
         '''
         prefix = s2b(prefix)
         namespace = s2b(namespace)
-        with self.wtxn.cursor(self.dbs['ns:pfx']) as cur:
+        with self.txn.cursor(self.dbs['ns:pfx']) as cur:
             cur.put(namespace, prefix)
-        with self.wtxn.cursor(self.dbs['pfx:ns']) as cur:
+        with self.txn.cursor(self.dbs['pfx:ns']) as cur:
             cur.put(prefix, namespace)
 
 
-    @read_tx(('pfx:ns',))
     def namespace(self, prefix):
         '''
         Get the namespace for a prefix.
         '''
-        ns = self.rcurs['pfx:ns'].get(s2b(prefix))
+        ns = self.curs['pfx:ns'].get(s2b(prefix))
 
         return Namespace(b2s(ns)) if ns is not None else None
 
 
-    @read_tx(('ns:pfx',))
     def prefix(self, namespace):
         '''
         Get the prefix associated with a namespace.
@@ -488,22 +441,20 @@ class LmdbStore(Store):
         @NOTE A namespace can be only bound to one prefix in this
         implementation.
         '''
-        prefix = self.rcurs['ns:pfx'].get(s2b(namespace))
+        prefix = self.curs['ns:pfx'].get(s2b(namespace))
 
         return b2s(prefix) if prefix is not None else None
 
 
-    @read_tx(('pfx:ns',))
     def namespaces(self):
         '''
         Get an iterator of all prefix: namespace bindings.
         '''
-        bindings = iter(self.rcurs['pfx:ns'])
+        bindings = iter(self.curs['pfx:ns'])
 
         return ((b2s(pfx), Namespace(b2s(ns))) for pfx, ns in bindings)
 
 
-    @read_tx(('tk:c','c:tk'))
     def contexts(self, triple=None):
         '''
         Get a list of all contexts.
@@ -511,10 +462,10 @@ class LmdbStore(Store):
         @return generator:URIRef
         '''
         if triple:
-            self.rcurs['tk:c'].set_key(self._to_key(triple))
-            contexts = self.rcurs['tk:c'].iternext_dup()
+            self.curs['tk:c'].set_key(self._to_key(triple))
+            contexts = self.curs['tk:c'].iternext_dup()
         else:
-            contexts = self.rcurs['c:tk'].iternext_nodup()
+            contexts = self.curs['c:tk'].iternext_nodup()
 
         return (self._unpickle(ctx) for ctx in contexts)
 
@@ -525,8 +476,8 @@ class LmdbStore(Store):
 
         @param graph (URIRef) URI of the named graph to add.
         '''
-        self.wcurs['tk:c'].put(self._pickle(None), self._pickle(graph))
-        self.wcurs['c:tk'].put(self._pickle(graph), self._pickle(None))
+        self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
+        self.curs['c:tk'].put(self._pickle(graph), self._pickle(None))
 
 
     def remove_graph(self, graph):
@@ -537,27 +488,29 @@ class LmdbStore(Store):
         '''
         self.remove((None, None, None), graph)
 
-        if self.wcurs['tk:c'].set_key_dup(
+        if self.curs['tk:c'].set_key_dup(
                 self._pickle(None), self._pickle(graph)):
-            self.wcurs['tk:c'].delete()
+            self.curs['tk:c'].delete()
 
-        if self.wcurs['c:tk'].set_key_dup(
+        if self.curs['c:tk'].set_key_dup(
                 self._pickle(graph), self._pickle(None)):
-            self.wcurs['tk:c'].delete()
+            self.curs['tk:c'].delete()
 
 
     def commit(self):
         '''
         Commit main write transaction.
         '''
-        self.wtxn.commit()
+        self.txn.commit()
+        self.txn = None
 
 
     def rollback(self):
         '''
         Roll back main write transaction.
         '''
-        self.wtxn.abort()
+        self.txn.abort()
+        self.txn = None
 
 
     #def _next_lex_key(self, db=None):
@@ -641,7 +594,7 @@ class LmdbStore(Store):
 
         @return Tuple with triple elements or None if key is not found.
         '''
-        pk_trp = self.rcurs['tk:t'].get(key)
+        pk_trp = self.curs['tk:t'].get(key)
 
         return self._unpickle(pk_trp) if pk_trp else None
 
@@ -658,43 +611,43 @@ class LmdbStore(Store):
             if p is not None:
                 # s p o
                 if o is not None:
-                    if self.rcurs['tk:t'].set_key(tkey):
+                    if self.curs['tk:t'].set_key(tkey):
                         yield tkey
                         return
                     else:
                         return iter(())
                 # s p ?
                 else:
-                    cur = self.rcurs['spk:tk']
+                    cur = self.curs['spk:tk']
                     term = self._pickle((s, p))
             else:
                 # s ? o
                 if o is not None:
-                    cur = self.rcurs['sok:tk']
+                    cur = self.curs['sok:tk']
                     term = self._pickle((s, o))
                 # s ? ?
                 else:
-                    cur = self.rcurs['sk:tk']
+                    cur = self.curs['sk:tk']
                     term = self._pickle(s)
         else:
             if p is not None:
                 # ? p o
                 if o is not None:
-                    cur = self.rcurs['pok:tk']
+                    cur = self.curs['pok:tk']
                     term = self._pickle((p, o))
                 # ? p ?
                 else:
-                    cur = self.rcurs['pk:tk']
+                    cur = self.curs['pk:tk']
                     term = self._pickle(p)
             else:
                 # ? ? o
                 if o is not None:
-                    cur = self.rcurs['ok:tk']
+                    cur = self.curs['ok:tk']
                     term = self._pickle(o)
                 # ? ? ?
                 else:
                     # Get all triples in the database
-                    for c in self.rcurs['tk:t'].iternext(values=False):
+                    for c in self.curs['tk:t'].iternext(values=False):
                         yield c
                     return
 

+ 127 - 116
tests/store/test_lmdb_store.py

@@ -4,7 +4,8 @@ from shutil import rmtree
 
 from rdflib import Namespace, URIRef
 
-from lakesuperior.store_layouts.ldp_rs.lmdb_store import LmdbStore
+from lakesuperior.store_layouts.ldp_rs.lmdb_store import LmdbStore, TxnManager
+
 
 @pytest.fixture(scope='class')
 def store():
@@ -29,29 +30,44 @@ class TestStoreInit:
         assert not tmpstore.is_open
 
 
-    def test_wtxn(self, store):
+    def test_txn(self, store):
         '''
-        Test opening and closing the main write transaction.
+        Test opening and closing the main transaction.
         '''
-        store.begin()
-        assert store.is_wtxn_open
+        store.begin(True)
+        assert store.is_txn_open
         store.commit()
-        assert not store.is_wtxn_open
-        store.begin()
+        assert not store.is_txn_open
+        store.begin(True)
         store.rollback()
-        assert not store.is_wtxn_open
+        assert not store.is_txn_open
+
+
+    def test_ctx_mgr(self, store):
+        '''
+        Test enclosing a transaction in a context.
+        '''
+        with TxnManager(store) as txn:
+            pass
+        assert not store.is_txn_open
+
+        with TxnManager(store) as txn:
+            raise RuntimeError
+        assert not store.is_txn_open
 
 
     def test_rollback(self, store):
         '''
         Test rolling back a transaction.
         '''
-        store.begin()
-        store.add((
-            URIRef('urn:nogo:s'), URIRef('urn:nogo:p'), URIRef('urn:nogo:o')))
-        store.rollback()
-
-        res = set(store.triples((None, None, None)))
+        with TxnManager(store, True) as txn:
+            store.add((
+                URIRef('urn:nogo:s'), URIRef('urn:nogo:p'),
+                URIRef('urn:nogo:o')))
+            raise RuntimeError # This should roll back the transaction.
+
+        with TxnManager(store) as txn:
+            res = set(store.triples((None, None, None)))
         assert len(res) == 0
 
 
@@ -64,93 +80,89 @@ class TestBasicOps:
         '''
         Test creation of a single triple.
         '''
-        store.begin()
-        store.add((
-            URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o')))
-        store.commit()
+        trp = (
+            URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))
+        with TxnManager(store, True) as txn:
+            store.add(trp)
 
-        import pdb; pdb.set_trace()
-        res1 = set(store.triples((None, None, None)))
-        res2 = set(store.triples((
-            URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))))
-        assert len(res1) == 1
-        assert len(res2) == 1
-        assert (
-            URIRef('urn:test:s'), URIRef('urn:test:p'),
-            URIRef('urn:test:o')) in res1 & res2
+            #import pdb; pdb.set_trace()
+            res1 = set(store.triples((None, None, None)))
+            res2 = set(store.triples(trp))
+            assert len(res1) == 1
+            assert len(res2) == 1
+            assert trp in res1 & res2
 
 
     def test_triple_match_1bound(self, store):
         '''
         Test triple patterns matching one bound term.
         '''
-        res1 = set(store.triples((URIRef('urn:test:s'), None, None)))
-        res2 = set(store.triples((None, URIRef('urn:test:p'), None)))
-        res3 = set(store.triples((None, None, URIRef('urn:test:o'))))
-        assert res1 == {(
-            URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))}
-        assert res2 == res1
-        assert res3 == res2
+        with TxnManager(store) as txn:
+            res1 = set(store.triples((URIRef('urn:test:s'), None, None)))
+            res2 = set(store.triples((None, URIRef('urn:test:p'), None)))
+            res3 = set(store.triples((None, None, URIRef('urn:test:o'))))
+            assert res1 == {(
+                URIRef('urn:test:s'), URIRef('urn:test:p'),
+                URIRef('urn:test:o'))}
+            assert res2 == res1
+            assert res3 == res2
 
 
     def test_triple_match_2bound(self, store):
         '''
         Test triple patterns matching two bound terms.
         '''
-        res1 = set(store.triples(
-            (URIRef('urn:test:s'), URIRef('urn:test:p'), None)))
-        res2 = set(store.triples(
-            (URIRef('urn:test:s'), None, URIRef('urn:test:o'))))
-        res3 = set(store.triples(
-            (None, URIRef('urn:test:p'), URIRef('urn:test:o'))))
-        assert res1 == {(
-            URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))}
-        assert res2 == res1
-        assert res3 == res2
+        with TxnManager(store) as txn:
+            res1 = set(store.triples(
+                (URIRef('urn:test:s'), URIRef('urn:test:p'), None)))
+            res2 = set(store.triples(
+                (URIRef('urn:test:s'), None, URIRef('urn:test:o'))))
+            res3 = set(store.triples(
+                (None, URIRef('urn:test:p'), URIRef('urn:test:o'))))
+            assert res1 == {(
+                URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))}
+            assert res2 == res1
+            assert res3 == res2
 
 
     def test_triple_no_match(self, store):
         '''
         Test various mismatches.
         '''
-        store.begin()
-        store.add((
-            URIRef('urn:test:s'),
-            URIRef('urn:test:p2'), URIRef('urn:test:o2')))
-        store.add((
-            URIRef('urn:test:s3'),
-            URIRef('urn:test:p3'), URIRef('urn:test:o3')))
-        store.commit()
-        res1 = set(store.triples((None, None, None)))
-        assert len(res1) == 3
+        with TxnManager(store, True) as txn:
+            store.add((
+                URIRef('urn:test:s'),
+                URIRef('urn:test:p2'), URIRef('urn:test:o2')))
+            store.add((
+                URIRef('urn:test:s3'),
+                URIRef('urn:test:p3'), URIRef('urn:test:o3')))
+            res1 = set(store.triples((None, None, None)))
+            assert len(res1) == 3
 
-        res1 = set(store.triples(
-            (URIRef('urn:test:s2'), URIRef('urn:test:p'), None)))
-        res2 = set(store.triples(
-            (URIRef('urn:test:s3'), None, URIRef('urn:test:o'))))
-        res3 = set(store.triples(
-            (None, URIRef('urn:test:p3'), URIRef('urn:test:o2'))))
+            res1 = set(store.triples(
+                (URIRef('urn:test:s2'), URIRef('urn:test:p'), None)))
+            res2 = set(store.triples(
+                (URIRef('urn:test:s3'), None, URIRef('urn:test:o'))))
+            res3 = set(store.triples(
+                (None, URIRef('urn:test:p3'), URIRef('urn:test:o2'))))
 
-        assert len(res1) == len(res2) == len(res3) == 0
+            assert len(res1) == len(res2) == len(res3) == 0
 
 
     def test_remove(self, store):
         '''
         Test removing one or more triples.
         '''
-        store.begin()
-        store.remove((URIRef('urn:test:s3'),
-                URIRef('urn:test:p3'), URIRef('urn:test:o3')))
-        store.commit()
+        with TxnManager(store, True) as txn:
+            store.remove((URIRef('urn:test:s3'),
+                    URIRef('urn:test:p3'), URIRef('urn:test:o3')))
 
-        res1 = set(store.triples((None, None, None)))
-        assert len(res1) == 2
+            res1 = set(store.triples((None, None, None)))
+            assert len(res1) == 2
 
-        store.begin()
-        store.remove((URIRef('urn:test:s'), None, None))
-        store.commit()
-        res2 = set(store.triples((None, None, None)))
-        assert len(res2) == 0
+            store.remove((URIRef('urn:test:s'), None, None))
+            res2 = set(store.triples((None, None, None)))
+            assert len(res2) == 0
 
 
 @pytest.mark.usefixtures('store')
@@ -172,34 +184,35 @@ class TestBindings:
         '''
         Test namespace bindings.
         '''
-        store.begin()
-        for b in bindings:
-            store.bind(*b)
-        store.commit()
+        with TxnManager(store, True) as txn:
+            for b in bindings:
+                store.bind(*b)
 
-        nslist = list(store.namespaces())
-        assert len(nslist) == len(bindings)
+            nslist = list(store.namespaces())
+            assert len(nslist) == len(bindings)
 
-        for i in range(len(bindings)):
-            assert nslist[i] == bindings[i]
+            for i in range(len(bindings)):
+                assert nslist[i] == bindings[i]
 
 
     def test_ns2pfx(self, store, bindings):
         '''
         Test namespace to prefix conversion.
         '''
-        for b in bindings:
-            pfx, ns = b
-            assert store.namespace(pfx) == ns
+        with TxnManager(store, True) as txn:
+            for b in bindings:
+                pfx, ns = b
+                assert store.namespace(pfx) == ns
 
 
     def test_pfx2ns(self, store, bindings):
         '''
         Test namespace to prefix conversion.
         '''
-        for b in bindings:
-            pfx, ns = b
-            assert store.prefix(ns) == pfx
+        with TxnManager(store, True) as txn:
+            for b in bindings:
+                pfx, ns = b
+                assert store.prefix(ns) == pfx
 
 
 @pytest.mark.usefixtures('store')
@@ -223,11 +236,10 @@ class TestContext:
         Test creating an empty and a non-empty graph.
         '''
         gr_uri = URIRef('urn:bogus:graph#a')
-        store.begin()
-        store.add_graph(gr_uri)
-        store.commit()
 
-        assert gr_uri in store.contexts()
+        with TxnManager(store, True) as txn:
+            store.add_graph(gr_uri)
+            assert gr_uri in store.contexts()
 
     def test_add_trp_to_ctx(self, store):
         '''
@@ -238,27 +250,27 @@ class TestContext:
         trp1 = (URIRef('urn:s:1'), URIRef('urn:p:1'), URIRef('urn:o:1'))
         trp2 = (URIRef('urn:s:2'), URIRef('urn:p:2'), URIRef('urn:o:2'))
         trp3 = (URIRef('urn:s:3'), URIRef('urn:p:3'), URIRef('urn:o:3'))
-        store.begin()
-        store.add(trp1, gr_uri)
-        store.add(trp2, gr_uri)
-        store.add(trp2, store.DEFAULT_GRAPH_URI)
-        store.add(trp3, gr2_uri)
-        store.add(trp3)
-        store.commit()
 
-        assert len(set(store.triples((None, None, None)))) == 2
-        assert len(set(store.triples((None, None, None), gr_uri))) == 2
-        assert len(set(store.triples((None, None, None), gr2_uri))) == 1
+        with TxnManager(store, True) as txn:
+            store.add(trp1, gr_uri)
+            store.add(trp2, gr_uri)
+            store.add(trp2, store.DEFAULT_GRAPH_URI)
+            store.add(trp3, gr2_uri)
+            store.add(trp3)
 
-        assert gr2_uri in store.contexts()
-        assert trp1 not in store.triples((None, None, None))
-        assert trp1 not in store.triples((None, None, None),
-                store.DEFAULT_GRAPH_URI)
-        assert trp2 in store.triples((None, None, None), gr_uri)
-        assert trp2 in store.triples((None, None, None))
-        assert trp3 in store.triples((None, None, None), gr2_uri)
-        assert trp3 in store.triples((None, None, None),
-                store.DEFAULT_GRAPH_URI)
+            assert len(set(store.triples((None, None, None)))) == 2
+            assert len(set(store.triples((None, None, None), gr_uri))) == 2
+            assert len(set(store.triples((None, None, None), gr2_uri))) == 1
+
+            assert gr2_uri in store.contexts()
+            assert trp1 not in store.triples((None, None, None))
+            assert trp1 not in store.triples((None, None, None),
+                    store.DEFAULT_GRAPH_URI)
+            assert trp2 in store.triples((None, None, None), gr_uri)
+            assert trp2 in store.triples((None, None, None))
+            assert trp3 in store.triples((None, None, None), gr2_uri)
+            assert trp3 in store.triples((None, None, None),
+                    store.DEFAULT_GRAPH_URI)
 
 
     def test_delete_from_ctx(self, store):
@@ -268,11 +280,10 @@ class TestContext:
         gr_uri = URIRef('urn:bogus:graph#a') # From previous test
         gr2_uri = URIRef('urn:bogus:graph#b') # Never created before
 
-        store.begin()
-        store.remove((None, None, None))
-        store.remove((None, None, None), gr2_uri)
-        store.commit()
+        with TxnManager(store, True) as txn:
+            store.remove((None, None, None))
+            store.remove((None, None, None), gr2_uri)
 
-        assert len(set(store.triples((None, None, None)))) == 0
-        assert len(set(store.triples((None, None, None), gr_uri))) == 2
-        assert len(set(store.triples((None, None, None), gr2_uri))) == 0
+            assert len(set(store.triples((None, None, None)))) == 0
+            assert len(set(store.triples((None, None, None), gr_uri))) == 2
+            assert len(set(store.triples((None, None, None), gr2_uri))) == 0