Преглед изворни кода

Several fixes:
* Test and fix opening & closing items
* add & remove from context
* Add convenience methods for inspection
* Wrap read transaction and cursors in ctx manager (not yet working)

Stefano Cossu пре 7 година
родитељ
комит
fe126de710
2 измењених фајлова са 389 додато и 113 уклоњено
  1. 221 110
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py
  2. 168 3
      tests/store/test_lmdb_store.py

+ 221 - 110
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -9,17 +9,24 @@ from urllib.request import pathname2url
 import lmdb
 
 from rdflib.store import Store, VALID_STORE, NO_STORE
-from rdflib.term import URIRef
+from rdflib import Namespace, URIRef
 
 
 logger = logging.getLogger(__name__)
 
 
-def s2b(u):
-    return u.encode('utf-8')
+def s2b(u, enc='UTF-8'):
+    '''
+    Convert a string into a bytes object.
+    '''
+    return u.encode(enc)
+
 
-def b2s(u):
-    return bytes(u).decode('utf-8')
+def b2s(u, enc='UTF-8'):
+    '''
+    Convert a bytes or memoryview object into a string.
+    '''
+    return bytes(u).decode(enc)
 
 
 class NoTxnError(Exception):
@@ -50,7 +57,7 @@ def read_tx(dbs=(), buffers=True):
                             self.rtxn.cursor(self.dbs[db_label]))
                 stack.pop_all()
                 ret = fn(self, *args, **kwargs)
-
+                stack.close()
                 return ret
         return wrapper
     return read_tx_deco
@@ -70,6 +77,7 @@ class LmdbStore(Store):
 
     - tk:t (triple key: pickled triple; unique keys)
     - tk:c (Triple key: pickled context; multi-valued keys)
+    - pfx:ns (pickled prefix URI: namespace string; unique)
 
     And 7 indices to optimize lookup for all possible bound/unbound term
     combination in a triple:
@@ -81,9 +89,10 @@ class LmdbStore(Store):
     - spk:tk (subject + predicate key: triple key)
     - sok:tk (subject + object key: triple key)
     - pok:tk (predicate + object key: triple key)
+    - ns:pfx (namespace: pickled prefix URI; unique)
 
-    The above indices are all multi-valued and store fixed-length hash values
-    referring to triples for economy's sake.
+    The above indices (except for ns:pfx) are all multi-valued and store
+    fixed-length hash values referring to triples for economy's sake.
 
     The search keys for terms are hashed on lookup. @TODO If this is too slow,
     we may want to index term hashes.
@@ -121,7 +130,7 @@ class LmdbStore(Store):
     db_env = None
     db = None
     dbs = {}
-    txn = None
+    wtxn = None
 
 
     def __init__(self, path, identifier=None):
@@ -133,34 +142,6 @@ class LmdbStore(Store):
         self._unpickle = self.node_pickler.loads
 
 
-    def _init_db_environment(self, path, create=True):
-        '''
-        Initialize the DB environment.
-        If `create` is True, the environment and its databases are created.
-        '''
-        if not exists(path):
-            if create is True:
-                makedirs(path)
-            else:
-                return NO_STORE
-        self.db_env = lmdb.open(path, create=create, map_size=self.MAP_SIZE,
-                max_dbs=12, readahead=False)
-
-        # Open and optionally create main databases.
-        self.dbs = {
-            # Main databases.
-            'tk:t': self.db_env.open_db(b'tk:t', create=create),
-            'tk:c': self.db_env.open_db(b'tk:c', create=create, dupsort=True),
-            'pfx:ns': self.db_env.open_db(b'pfx:ns', create=create),
-            # Index.
-            'ns:pfx': self.db_env.open_db(b'ns:pfx', create=create),
-        }
-        # Other index databases.
-        for db_key in self.idx_keys:
-            self.dbs[db_key] = self.db_env.open_db(s2b(db_key),
-                    dupsort=True, dupfixed=True, create=create)
-
-
     @property
     def is_open(self):
         return self.__open
@@ -188,6 +169,7 @@ class LmdbStore(Store):
 
         return VALID_STORE
 
+
     def begin(self):
         '''
         Begin the main write transaction and create cursors.
@@ -200,6 +182,68 @@ class LmdbStore(Store):
         self.wcurs.update(self.get_idx_cursors(self.wtxn))
 
 
+    @property
+    def is_rtxn_open(self):
+        '''
+        Whether the main read transaction is open.
+        '''
+        try:
+            self.rtxn.id()
+        except (lmdb.Error, AttributeError):
+            logger.info('Read transaction does not exist or is closed.')
+            return False
+        else:
+            logger.info('Read transaction is open.')
+            return True
+
+
+    @property
+    def is_wtxn_open(self):
+        '''
+        Whether the main write transaction is open.
+        '''
+        try:
+            self.wtxn.id()
+        except (lmdb.Error, AttributeError):
+            logger.info('Write transaction does not exist or is closed.')
+            return False
+        else:
+            logger.info('Write transaction is open.')
+            return True
+
+
+    @property
+    def txn(self):
+        '''
+        Get current active transaction for read-only use.
+
+        @return lmdb.Transaction|None Return the main read transaction or the
+        main write transaction, whichever is open, or None if neither is
+        open.
+        '''
+        if self.is_rtxn_open:
+            return self.rtxn
+        elif self.is_wtxn_open:
+            return self.wtxn
+        else:
+            return None
+
+
+    @property
+    def curs(self):
+        '''
+        Get cursor list for the current active transaction. See txn.
+
+        @return dict:lmdb.Cursor
+        '''
+        if self.is_rtxn_open:
+            return self.rcurs
+        if self.is_wtxn_open:
+            return self.wcurs
+        else:
+            return None
+
+
     def get_data_cursors(self, txn):
         '''
         Build the main data cursors for a transaction.
@@ -232,19 +276,6 @@ class LmdbStore(Store):
         return cur
 
 
-    @property
-    def is_txn_open(self):
-        '''
-        Whether the main write transaction is open.
-        '''
-        try:
-            self.wtxn.id()
-        except lmdb.Error:
-            return False
-        else:
-            return True
-
-
     def close(self, commit_pending_transaction=False):
         '''
         Close the database connection.
@@ -252,12 +283,12 @@ class LmdbStore(Store):
         Do this at server shutdown.
         '''
         self.__open = False
-        if self.is_txn_open:
+        if self.is_wtxn_open:
             if commit_pending_transaction:
-                self.tx.commit()
+                self.commit()
             else:
-                self.tx.abort()
-            self.tx = None
+                self.rollback()
+            self.wtxn = None
 
         self.db_env.close()
 
@@ -270,38 +301,48 @@ class LmdbStore(Store):
         @param context (rdflib.Identifier | None) Context identifier.
         'None' inserts in the default graph.
         '''
-        assert self.is_txn_open, "The Store must be open."
         assert context != self, "Can not add triple directly to store"
         Store.add(self, triple, context)
 
-        context = context or self.DEFAULT_GRAPH_URI
+        if self.DEFAULT_UNION:
+            raise NotImplementedError()
+            # @TODO
+        else:
+            context = context or self.DEFAULT_GRAPH_URI
         pk_trp = self._pickle(triple)
         trp_key = hashlib.new(self.KEY_HASH_ALGO, pk_trp).digest()
-        # If it returns False, the triple had already been added.
-        trp_added = self.wcurs['tk:t'].put(trp_key, pk_trp, overwrite=False)
+
+        needs_indexing = False
+        if self.wcurs['tk:t'].put(trp_key, pk_trp, overwrite=False):
+            needs_indexing = True
 
         pk_ctx = self._pickle(context)
-        ctx_added = self.wcurs['tk:c'].put(trp_key, pk_ctx, overwrite=False)
+        if not self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
+            self.wcurs['tk:c'].put(trp_key, pk_ctx)
+            needs_indexing = True
 
-        if ctx_added or trp_added:
+        if needs_indexing:
             # @TODO make await
-            self._do_index(triple, trp_key, pk_ctx)
+            self._update_indices(triple, trp_key, pk_ctx)
 
 
     def remove(self, triple_pattern, context=None):
         '''
         Remove a triple and start indexing.
         '''
-        context = context or self.DEFAULT_GRAPH_URI
+        if self.DEFAULT_UNION:
+            raise NotImplementedError()
+            # @TODO
+        else:
+            context = context or self.DEFAULT_GRAPH_URI
         pk_ctx = self._pickle(context)
         for trp in self.triples(triple_pattern, context):
             trp_key = self._to_key(trp)
-            need_indexing = False
 
+            import pdb; pdb.set_trace()
             # Delete context association.
             if self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
                 self.wcurs['tk:c'].delete()
-                need_indexing = True
 
                 # If no other contexts are associated w/ the triple, delete it.
                 if not self.wcurs['tk:c'].set_key(trp_key) and (
@@ -309,13 +350,13 @@ class LmdbStore(Store):
                     self.wcurs['tk:t'].delete()
 
                 # @TODO make await
-                self._do_index(trp, trp_key, pk_ctx)
+                self._update_indices(trp, trp_key, pk_ctx)
 
 
     # @TODO Make async
-    def _do_index(self, triple, trp_key, pk_ctx):
+    def _update_indices(self, triple, trp_key, pk_ctx):
         '''
-        Create indices for a given triple.
+        Update indices for a given triple.
 
         If the triple is found, add indices. if it is not found, delete them.
 
@@ -344,11 +385,10 @@ class LmdbStore(Store):
                     self.wcurs[ikey].delete()
 
         # Add or remove context association index.
-        if self.wcurs['tk:c'].get(trp_key, pk_ctx):
+        if self.wcurs['tk:c'].set_key_dup(trp_key, pk_ctx):
             self.wcurs['c:tk'].put(pk_ctx, trp_key)
-        else:
-            if self.wcurs['c:tk'].set_key_dup(pk_ctx, trp_key):
-                self.wcurs['c:tk'].delete()
+        elif self.wcurs['c:tk'].set_key_dup(pk_ctx, trp_key):
+            self.wcurs['c:tk'].delete()
 
 
     @read_tx((
@@ -360,44 +400,47 @@ class LmdbStore(Store):
         '''
         if context == self:
             context = None
-        context = context or self.DEFAULT_GRAPH_URI
 
-        tkey = self._to_key(triple_pattern)
+        if self.DEFAULT_UNION:
+            raise NotImplementedError()
+            # In theory, this is what should happen:
+            #if context == self.DEFAULT_GRAPH_URI
+            #    # Any pattern with unbound context
+            #    for tk in self._lookup(triple_pattern, tkey):
+            #        yield self._key_to_triple(tk)
+            #    return
+        else:
+            context = context or self.DEFAULT_GRAPH_URI
 
-        # Any pattern with unbound context
-        if context == self.DEFAULT_GRAPH_URI:
-            for tk in self._lookup(triple_pattern, tkey):
-                yield self._key_to_triple(tk)
+        tkey = self._to_key(triple_pattern)
 
         # Shortcuts
-        else:
-            pk_ctx = self._pickle(context)
-            if not self.rcurs['c:tk'].set_key(pk_ctx):
-                # Context not found.
+        pk_ctx = self._pickle(context)
+        if not self.rcurs['c:tk'].set_key(pk_ctx):
+            # Context not found.
+            return iter(())
+
+        # s p o c
+        if all(triple_pattern):
+            if self.rcurs['tk:c'].set_key_dup(tkey, pk_ctx):
+                yield self._key_to_triple(tkey)
+                return
+            else:
+                # Triple not found.
                 return iter(())
 
-            # s p o c
-            if all(triple_pattern):
-                if self.rcurs['tk:c'].set_key_dup(tkey, pk_ctx):
-                    yield self._key_to_triple(tkey)
-                    return
-                else:
-                    # Triple not found.
-                    return iter(())
+        # ? ? ? c
+        elif not any(triple_pattern):
+            # Get all triples from the context
+            for tk in self.rcurs['c:tk'].iternext_dup():
+                yield self._key_to_triple(tk)
 
-            # ? ? ? c
-            elif not any(triple_pattern):
-                # Get all triples from the context
-                for tk in self.rcurs['c:tk'].iternext_dup():
+        # Regular lookup.
+        else:
+            for tk in self._lookup(triple_pattern, tkey):
+                if self.rcurs['c:tk'].set_key_dup(pk_ctx, tk):
                     yield self._key_to_triple(tk)
 
-            else:
-                # Regular lookup.
-                for tk in self._lookup(triple_pattern, tkey):
-                    if self.rcurs['c:tk'].set_key_dup(pk_ctx, tk):
-                        yield self._key_to_triple(tk)
-
-
 
     @read_tx()
     def __len__(self, context=None):
@@ -406,8 +449,9 @@ class LmdbStore(Store):
         '''
         if context == self:
             context = None
+        context = context or self.DEFAULT_GRAPH_URI
 
-        if context is not None:
+        if context is not self.DEFAULT_GRAPH_URI:
             dataset = self.triples((None, None, None), context)
             return len(set(dataset))
         else:
@@ -420,9 +464,9 @@ class LmdbStore(Store):
         '''
         prefix = s2b(prefix)
         namespace = s2b(namespace)
-        with self.wtxn.cursor(self.dbs(b'ns:pfx')) as cur:
+        with self.wtxn.cursor(self.dbs['ns:pfx']) as cur:
             cur.put(namespace, prefix)
-        with self.wtxn.cursor(self.dbs(b'pfx:ns')) as cur:
+        with self.wtxn.cursor(self.dbs['pfx:ns']) as cur:
             cur.put(prefix, namespace)
 
 
@@ -433,7 +477,7 @@ class LmdbStore(Store):
         '''
         ns = self.rcurs['pfx:ns'].get(s2b(prefix))
 
-        return URIRef(b2s(ns)) if ns is not None else None
+        return Namespace(b2s(ns)) if ns is not None else None
 
 
     @read_tx(('ns:pfx',))
@@ -444,25 +488,27 @@ class LmdbStore(Store):
         @NOTE A namespace can be only bound to one prefix in this
         implementation.
         '''
-        prefix = self.rcurs['pfx:ns'].get(s2b(namespace))
+        prefix = self.rcurs['ns:pfx'].get(s2b(namespace))
 
         return b2s(prefix) if prefix is not None else None
 
 
-    @read_tx(('ns:pfx',))
+    @read_tx(('pfx:ns',))
     def namespaces(self):
         '''
-        Get a dict of all prefix: namespace bindings.
+        Get an iterator of all prefix: namespace bindings.
         '''
-        bindings = iter(self.rcurs['ns:pfx'])
+        bindings = iter(self.rcurs['pfx:ns'])
 
-        return ((b2s(pfx), b2s(ns)) for pfx, ns in bindings)
+        return ((b2s(pfx), Namespace(b2s(ns))) for pfx, ns in bindings)
 
 
     @read_tx(('tk:c','c:tk'))
     def contexts(self, triple=None):
         '''
         Get a list of all contexts.
+
+        @return generator:URIRef
         '''
         if triple:
             self.rcurs['tk:c'].set_key(self._to_key(triple))
@@ -470,12 +516,14 @@ class LmdbStore(Store):
         else:
             contexts = self.rcurs['c:tk'].iternext_nodup()
 
-        return (b2s(ctx) for ctx in contexts)
+        return (self._unpickle(ctx) for ctx in contexts)
 
 
     def add_graph(self, graph):
         '''
         Add a graph to the database.
+
+        @param graph (URIRef) URI of the named graph to add.
         '''
         self.wcurs['tk:c'].put(self._pickle(None), self._pickle(graph))
         self.wcurs['c:tk'].put(self._pickle(graph), self._pickle(None))
@@ -484,6 +532,8 @@ class LmdbStore(Store):
     def remove_graph(self, graph):
         '''
         Remove all triples from graph and the graph itself.
+
+        @param graph (URIRef) URI of the named graph to remove.
         '''
         self.remove((None, None, None), graph)
 
@@ -539,6 +589,34 @@ class LmdbStore(Store):
 
     ## PRIVATE METHODS ##
 
+    def _init_db_environment(self, path, create=True):
+        '''
+        Initialize the DB environment.
+        If `create` is True, the environment and its databases are created.
+        '''
+        if not exists(path):
+            if create is True:
+                makedirs(path)
+            else:
+                return NO_STORE
+        self.db_env = lmdb.open(path, create=create, map_size=self.MAP_SIZE,
+                max_dbs=12, readahead=False)
+
+        # Open and optionally create main databases.
+        self.dbs = {
+            # Main databases.
+            'tk:t': self.db_env.open_db(b'tk:t', create=create),
+            'tk:c': self.db_env.open_db(b'tk:c', create=create, dupsort=True),
+            'pfx:ns': self.db_env.open_db(b'pfx:ns', create=create),
+            # Index.
+            'ns:pfx': self.db_env.open_db(b'ns:pfx', create=create),
+        }
+        # Other index databases.
+        for db_key in self.idx_keys:
+            self.dbs[db_key] = self.db_env.open_db(s2b(db_key),
+                    dupsort=True, dupfixed=True, create=create)
+
+
     def _to_key(self, obj):
         '''
         Convert a triple, quad or term into a key.
@@ -627,3 +705,36 @@ class LmdbStore(Store):
         else:
             return iter(())
 
+    ## Convenience methods—not necessary for functioning but useful for
+    ## debugging.
+
+    def _keys_in_ctx(self, pk_ctx):
+        '''
+        Convenience method to list all keys in a context.
+
+        @param pk_ctx (bytes) Pickled context URI.
+
+        @return Iterator:tuple Generator of triples.
+        '''
+        cur = self.curs['c:tk']
+        if cur.set_key(pk_ctx):
+            tkeys = cur.iternext_dup()
+            return {self._key_to_triple(tk) for tk in tkeys}
+        else:
+            return set()
+
+
+    def _ctx_for_key(self, tkey):
+        '''
+        Convenience method to list all contexts that a key is in.
+
+        @param tkey (bytes) Triple key.
+
+        @return Iterator:URIRef Generator of context URIs.
+        '''
+        cur = self.curs['tk:c']
+        if cur.set_key(tkey):
+            ctx = cur.iternext_dup()
+            return {self._unpickle(c) for c in ctx}
+        else:
+            return set()

+ 168 - 3
tests/store/test_lmdb_store.py

@@ -2,7 +2,7 @@ import pytest
 
 from shutil import rmtree
 
-from rdflib import URIRef
+from rdflib import Namespace, URIRef
 
 from lakesuperior.store_layouts.ldp_rs.lmdb_store import LmdbStore
 
@@ -15,9 +15,50 @@ def store():
 
 
 @pytest.mark.usefixtures('store')
-class TestLmdbStore:
+class TestStoreInit:
     '''
-    Unit tests for LMDB store.
+    Tests for intializing and shutting down store and transactions.
+    '''
+    def test_open_close(self, store):
+        '''
+        Test opening and closing a store.
+        '''
+        tmpstore = LmdbStore('/tmp/test_lmdbstore_init')
+        assert tmpstore.is_open
+        tmpstore.close()
+        assert not tmpstore.is_open
+
+
+    def test_wtxn(self, store):
+        '''
+        Test opening and closing the main write transaction.
+        '''
+        store.begin()
+        assert store.is_wtxn_open
+        store.commit()
+        assert not store.is_wtxn_open
+        store.begin()
+        store.rollback()
+        assert not store.is_wtxn_open
+
+
+    def test_rollback(self, store):
+        '''
+        Test rolling back a transaction.
+        '''
+        store.begin()
+        store.add((
+            URIRef('urn:nogo:s'), URIRef('urn:nogo:p'), URIRef('urn:nogo:o')))
+        store.rollback()
+
+        res = set(store.triples((None, None, None)))
+        assert len(res) == 0
+
+
+@pytest.mark.usefixtures('store')
+class TestBasicOps:
+    '''
+    High-level tests for basic operations.
     '''
     def test_create_triple(self, store):
         '''
@@ -28,6 +69,7 @@ class TestLmdbStore:
             URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o')))
         store.commit()
 
+        import pdb; pdb.set_trace()
         res1 = set(store.triples((None, None, None)))
         res2 = set(store.triples((
             URIRef('urn:test:s'), URIRef('urn:test:p'), URIRef('urn:test:o'))))
@@ -111,3 +153,126 @@ class TestLmdbStore:
         assert len(res2) == 0
 
 
+@pytest.mark.usefixtures('store')
+class TestBindings:
+    '''
+    Tests for namespace bindings.
+    '''
+    @pytest.fixture
+    def bindings(self):
+        return (
+            ('ns1', Namespace('http://test.org/ns#')),
+            ('ns2', Namespace('http://my_org.net/ns#')),
+            ('ns3', Namespace('urn:test:')),
+            ('ns4', Namespace('info:myinst/graph#')),
+        )
+
+
+    def test_ns(self, store, bindings):
+        '''
+        Test namespace bindings.
+        '''
+        store.begin()
+        for b in bindings:
+            store.bind(*b)
+        store.commit()
+
+        nslist = list(store.namespaces())
+        assert len(nslist) == len(bindings)
+
+        for i in range(len(bindings)):
+            assert nslist[i] == bindings[i]
+
+
+    def test_ns2pfx(self, store, bindings):
+        '''
+        Test namespace to prefix conversion.
+        '''
+        for b in bindings:
+            pfx, ns = b
+            assert store.namespace(pfx) == ns
+
+
+    def test_pfx2ns(self, store, bindings):
+        '''
+        Test namespace to prefix conversion.
+        '''
+        for b in bindings:
+            pfx, ns = b
+            assert store.prefix(ns) == pfx
+
+
+@pytest.mark.usefixtures('store')
+class TestContext:
+    '''
+    Tests for context handling.
+    '''
+    # Add empty graph
+    # Query graph → return graph
+    # Add triples to graph
+    # Query triples in graph → triples
+    # Query triples in default graph → no results
+    # Add another empty graph
+    # Delete graph with triples
+    # Query graph → no results
+    # Query triples → no results
+    # Delete empty graph
+    # Query graph → no results
+    def test_add_graph(self, store):
+        '''
+        Test creating an empty and a non-empty graph.
+        '''
+        gr_uri = URIRef('urn:bogus:graph#a')
+        store.begin()
+        store.add_graph(gr_uri)
+        store.commit()
+
+        assert gr_uri in store.contexts()
+
+    def test_add_trp_to_ctx(self, store):
+        '''
+        Test adding triples to a graph.
+        '''
+        gr_uri = URIRef('urn:bogus:graph#a') # From previous test
+        gr2_uri = URIRef('urn:bogus:graph#b') # Never created before
+        trp1 = (URIRef('urn:s:1'), URIRef('urn:p:1'), URIRef('urn:o:1'))
+        trp2 = (URIRef('urn:s:2'), URIRef('urn:p:2'), URIRef('urn:o:2'))
+        trp3 = (URIRef('urn:s:3'), URIRef('urn:p:3'), URIRef('urn:o:3'))
+        store.begin()
+        store.add(trp1, gr_uri)
+        store.add(trp2, gr_uri)
+        store.add(trp2, store.DEFAULT_GRAPH_URI)
+        store.add(trp3, gr2_uri)
+        store.add(trp3)
+        store.commit()
+
+        assert len(set(store.triples((None, None, None)))) == 2
+        assert len(set(store.triples((None, None, None), gr_uri))) == 2
+        assert len(set(store.triples((None, None, None), gr2_uri))) == 1
+
+        assert gr2_uri in store.contexts()
+        assert trp1 not in store.triples((None, None, None))
+        assert trp1 not in store.triples((None, None, None),
+                store.DEFAULT_GRAPH_URI)
+        assert trp2 in store.triples((None, None, None), gr_uri)
+        assert trp2 in store.triples((None, None, None))
+        assert trp3 in store.triples((None, None, None), gr2_uri)
+        assert trp3 in store.triples((None, None, None),
+                store.DEFAULT_GRAPH_URI)
+
+
+    def test_delete_from_ctx(self, store):
+        '''
+        Delete triples from a named graph and from the default graph.
+        '''
+        gr_uri = URIRef('urn:bogus:graph#a') # From previous test
+        gr2_uri = URIRef('urn:bogus:graph#b') # Never created before
+
+        store.begin()
+        store.remove((None, None, None))
+        store.remove((None, None, None), gr2_uri)
+        store.commit()
+
+        assert len(set(store.triples((None, None, None)))) == 0
+        assert len(set(store.triples((None, None, None), gr_uri))) == 2
+        assert len(set(store.triples((None, None, None), gr2_uri))) == 0