ソースを参照

Merge development branch.

Stefano Cossu 6 年 前
コミット
341a29bc23
4 ファイル変更169 行追加104 行削除
  1. 2 2
      README.md
  2. 19 8
      doc/notes/indexing_strategy.md
  3. 113 72
      lakesuperior/store/ldp_rs/lmdb_store.py
  4. 35 22
      tests/store/test_lmdb_store.py

+ 2 - 2
README.md

@@ -75,13 +75,13 @@ dependencies and should be automatically installed.
 1. Create a virtualenv in a project folder:
    `virtualenv -p <python 3.5+ exec path> <virtualenv folder>`
 1. Activate the virtualenv: `source <path_to_virtualenv>/bin/activate`
-1. Clone this repo
+1. Clone this repo: `git clone https://github.com/scossu/lakesuperior.git`
 1. `cd` into repo folder
 1. Install dependencies: `pip install -r requirements.txt`
 1. Start your STOMP broker, e.g.: `coilmq &`. If you have another queue manager
    listening to port 61613 you can either configure a different port on the
    application configuration, or use the existing message queue.
-1. Run `./lsup_admin bootstrap` to initialize the binary and graph stores
+1. Run `./lsup-admin bootstrap` to initialize the binary and graph stores
 1. Run `./fcrepo`.
 
 ### Configuration

+ 19 - 8
doc/notes/indexing_strategy.md

@@ -1,6 +1,10 @@
 # LMDB Store design for RDFLib
 
-Spoiler: Strategy #5 is the one currently used.
+This is a log of subsequent strategies employed to store triples in LMDB.
+
+Strategy #5a is the one currently used. The rest is kept for historic reasons
+and academic curiosity (and also because it was too much work to just wipe out
+of memory).
 
 ## Storage approach
 
@@ -182,7 +186,7 @@ Storage total: 143 bytes per triple
   - No easy way to know if a term is used anywhere in a quad
   - Needs some routine cleanup
   - On the other hand, terms are relatively light-weight and can be reused
-  - Almost surely reusable are UUIDs, message digests, timestamps etc.
+  - Almost surely not reusable are UUIDs, message digests, timestamps etc.
 
 
 ## Strategy #5
@@ -231,19 +235,26 @@ Storage total: 95 bytes per triple
 
 ### Further optimization
 
-In order to minimiza traversing and splittig results, the first retrieval
+In order to minimize traversing and splittig results, the first retrieval
 should be made on the term with less average keys. Search order can be balanced
 by establishing a lookup order for indices.
 
 This can be achieved by calling stats on the index databases and looking up the
 database with *most* keys. Since there is an equal number of entries in each of
-the (s:spo, p:spo, o:spo) indices, the one with most keys will have the least
+the (s:po, p:so, o:sp) indices, the one with most keys will have the least
 average number of values per key. If that lookup is done first, the initial
 data set to traverse and filter will be smaller.
 
-Also, keys can be split into equally size chunks without using a
-separator. This relies on the fixed length of the keys. It also allows to use
-memory views that can be sliced without being copied. The performance gain
-should be estimated, since this changes quite a bit of code in the module.
 
+## Strategy #5a
+
+This is a slightly different implementation of #5 that somewhat simplifies and
+perhaps speeds up things a bit. It is the currently employed solution.
+
+The indexing and lookup strtegy is the same; but instead of using a separator
+byte for splitting compound keys, the logic relies on the fact that keys have
+a fixed length and are sliced instead. This *should* result in faster key
+manipulation, also because in most cases `memoryview` buffers can be used
+directly instead of being copied from memory.
 
+Index storage is 90 bytes per triple.

+ 113 - 72
lakesuperior/store/ldp_rs/lmdb_store.py

@@ -166,6 +166,12 @@ class LmdbStore(Store):
     - o:sp (O key: joined S, P keys; dupsort, dupfixed)
     - c:spo (context → triple association; dupsort, dupfixed)
     - ns:pfx (pickled namespace: prefix; 1:1)
+
+    The default graph is defined in `RDFLIB_DEFAULT_GRAPH_URI`. Adding triples
+    without context will add to this graph. Looking up triples without context
+    (also in a SPARQL query) will look in the  union graph instead of in the
+    default graph. Also, removing triples without specifying a context will
+    remove triples from all contexts.
     '''
 
     context_aware = True
@@ -187,11 +193,30 @@ class LmdbStore(Store):
     '''
     KEY_HASH_ALGO = 'sha1'
 
-    '''Separator byte. Used to join and split individual term keys.'''
-    SEP_BYTE = b'\x00'
+    '''
+    Fixed length for term keys.
+
+    4 or 5 is a safe range. 4 allows for ~4 billion (256 ** 4) unique terms
+    in the store. 5 allows ~1 trillion terms. While these numbers may seem
+    huge (the total number of Internet pages indexed by Google as of 2018 is 45
+    billions), it must be reminded that the keys cannot be reused, so a
+    repository that deletes a lot of triples may burn through a lot of terms.
 
-    KEY_LENGTH = 5 # Max key length for terms. That allows for A LOT of terms.
-    KEY_START = 2 # \x00 is reserved as a separator. \x01 is spare.
+    If a repository runs ot of keys it can no longer store new terms and must
+    be migrated to a new database, which will regenerate and compact the keys.
+
+    For smaller repositories it should be safe to set this value to 4, which
+    could improve performance since keys make up the vast majority of record
+    exchange between the store and the application. However it is sensible not
+    to expose this value as a configuration option.
+    '''
+    KEY_LENGTH = 5
+
+    '''
+    Lexical sequence start. `\x01` is fine since no special characters are used,
+    but it's good to leave a spare for potential future use.
+    '''
+    KEY_START = 1
 
     data_keys = (
         # Term key to serialized term content: 1:1
@@ -218,7 +243,7 @@ class LmdbStore(Store):
     looked up first.
 
     If we want to get fancy, this can be rebalanced from time to time by
-    looking up the number of keys in (s:spo, p:spo, o:spo).
+    looking up the number of keys in (s:po, p:so, o:sp).
     '''
     _lookup_rank = ('s', 'o', 'p')
 
@@ -231,7 +256,6 @@ class LmdbStore(Store):
         'o:sp': (2, 0, 1),
     }
 
-
     data_env = None
     idx_env = None
     db = None
@@ -240,23 +264,6 @@ class LmdbStore(Store):
     idx_txn = None
     is_txn_rw = None
 
-    '''
-    List of actions to be performed when a transaction is committed.
-
-    Each element is a tuple of (action name, database index, key, value).
-    '''
-    _data_queue = []
-    '''
-    Set of indices to update. A set has been preferred to a list since the
-    index update don't need to be sequential and there may be duplicate entries
-    that can be eliminated.
-
-    Each element is a tuple of (triple key, pickled context, pre-pickled triple
-    ). The third value can be None, and in that case, it is calculated from
-    the triple key.
-    '''
-    _idx_queue = []
-
 
     def __init__(self, path, identifier=None):
         self.path = path
@@ -271,6 +278,13 @@ class LmdbStore(Store):
         self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
 
 
+    def __del__(self):
+        '''
+        Properly close store for garbage collection.
+        '''
+        self.close(True)
+
+
     def __len__(self, context=None):
         '''
         Return length of the dataset.
@@ -284,8 +298,7 @@ class LmdbStore(Store):
             #dataset = self.triples((None, None, None), context)
             with self.cur('c:spo') as cur:
                 if cur.set_key(self._to_key(context)):
-                    dataset = set(cur.iternext_dup())
-                    return len(dataset)
+                    return sum(1 for _ in cur.iternext_dup())
                 else:
                     return 0
         else:
@@ -327,7 +340,7 @@ class LmdbStore(Store):
             'read/write' if write else 'read-only'))
 
         self.data_txn = self.data_env.begin(buffers=True, write=write)
-        self.idx_txn = self.idx_env.begin(buffers=False, write=write)
+        self.idx_txn = self.idx_env.begin(buffers=True, write=write)
 
         self.is_txn_rw = write
 
@@ -410,11 +423,9 @@ class LmdbStore(Store):
         @return dict(string, lmdb.Cursor) Keys are index labels, values are
         index cursors.
         '''
-        cur = {}
-        for key in self.idx_keys:
-            cur[key] = txn.cursor(self.dbs[key])
-
-        return cur
+        return {
+            key: txn.cursor(self.dbs[key])
+            for key in self.idx_keys}
 
 
     def close(self, commit_pending_transaction=False):
@@ -455,7 +466,6 @@ class LmdbStore(Store):
         'None' inserts in the default graph.
         @param quoted (bool) Not used.
         '''
-        #import pdb; pdb.set_trace()
         context = self._normalize_context(context)
         if context is None:
             context = RDFLIB_DEFAULT_GRAPH_URI
@@ -475,7 +485,7 @@ class LmdbStore(Store):
             for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
                 thash = self._hash(pk_t)
                 if icur.set_key(thash):
-                    keys[i] = icur.value()
+                    keys[i] = bytes(icur.value())
                 else:
                     # Put new term.
                     with self.cur('t:st') as dcur:
@@ -490,7 +500,7 @@ class LmdbStore(Store):
                 cur.put(ck, b'')
 
         # Add triple:context association.
-        spok = self.SEP_BYTE.join(keys[:3])
+        spok = b''.join(keys[:3])
         with self.cur('spo:c') as dcur:
             if not dcur.set_key_dup(spok, ck):
                 dcur.put(spok, ck)
@@ -504,6 +514,13 @@ class LmdbStore(Store):
     def remove(self, triple_pattern, context=None):
         '''
         Remove triples by a pattern.
+
+        @param triple_pattern (tuple:rdflib.term.Identifier|None) 3-tuple of
+        either RDF terms or None, indicating the triple(s) to be removed.
+        None is used as a wildcard.
+        @param context (rdflib.term.Identifier|None) Context to remove the
+        triples from. If None (the default) the matching triples are removed
+        from all contexts.
         '''
         #logger.debug('Removing triples by pattern: {} on context: {}'.format(
         #    triple_pattern, context))
@@ -517,28 +534,31 @@ class LmdbStore(Store):
         else:
             ck = None
 
-        for spok in set(self._triple_keys(triple_pattern, context)):
-            # Delete context association.
-            with self.cur('spo:c') as dcur:
-                with self.cur('c:spo') as icur:
-                    if ck:
+        with self.cur('spo:c') as dcur:
+            with self.cur('c:spo') as icur:
+                match_set = {bytes(k) for k in self._triple_keys(
+                        triple_pattern, context)}
+                # Delete context association.
+                if ck:
+                    for spok in match_set:
                         if dcur.set_key_dup(spok, ck):
                             dcur.delete()
                             if icur.set_key_dup(ck, spok):
                                 icur.delete()
-                    else:
-                        # If no context is specified, remove all associations.
+                            self._index_triple('remove', spok)
+                # If no context is specified, remove all associations.
+                else:
+                    for spok in match_set:
                         if dcur.set_key(spok):
-                            for ck in dcur.iternext_dup():
+                            for cck in (bytes(k) for k in dcur.iternext_dup()):
                                 # Delete index first while we have the
                                 # context reference.
-                                if icur.set_key_dup(ck, spok):
+                                if icur.set_key_dup(cck, spok):
                                     icur.delete()
                             # Then delete the main entry.
                             dcur.set_key(spok)
                             dcur.delete(dupdata=True)
-
-            self._index_triple('remove', spok)
+                            self._index_triple('remove', spok)
 
 
     def triples(self, triple_pattern, context=None):
@@ -559,8 +579,6 @@ class LmdbStore(Store):
         # This sounds strange, RDFLib should be passing None at this point,
         # but anyway...
         context = self._normalize_context(context)
-        if context == RDFLIB_DEFAULT_GRAPH_URI:
-            context = None
 
         with self.cur('spo:c') as cur:
             for spok in self._triple_keys(triple_pattern, context):
@@ -568,7 +586,7 @@ class LmdbStore(Store):
                     contexts = (Graph(identifier=context),)
                 else:
                     if cur.set_key(spok):
-                        contexts = (
+                        contexts = tuple(
                             Graph(identifier=self._from_key(ck)[0], store=self)
                             for ck in cur.iternext_dup())
 
@@ -580,6 +598,9 @@ class LmdbStore(Store):
     def bind(self, prefix, namespace):
         '''
         Bind a prefix to a namespace.
+
+        @param prefix (string) Namespace prefix.
+        @param namespace (rdflib.URIRef) Fully qualified URI of namespace.
         '''
         prefix = s2b(prefix)
         namespace = s2b(namespace)
@@ -600,6 +621,7 @@ class LmdbStore(Store):
     def namespace(self, prefix):
         '''
         Get the namespace for a prefix.
+        @param prefix (string) Namespace prefix.
         '''
         with self.cur('pfx:ns') as cur:
             ns = cur.get(s2b(prefix))
@@ -612,6 +634,8 @@ class LmdbStore(Store):
 
         @NOTE A namespace can be only bound to one prefix in this
         implementation.
+
+        @param namespace (rdflib.URIRef) Fully qualified URI of namespace.
         '''
         with self.cur('ns:pfx') as cur:
             prefix = cur.get(s2b(namespace))
@@ -661,7 +685,6 @@ class LmdbStore(Store):
 
         @param graph (URIRef) URI of the named graph to add.
         '''
-        #import pdb; pdb.set_trace()
         if isinstance(graph, Graph):
             graph = graph.identifier
         pk_c = self._pickle(graph)
@@ -848,16 +871,15 @@ class LmdbStore(Store):
         '''
         Convert a key into one or more terms.
 
-        @param key (bytes) The key to be converted. It can be a compound one
-        in which case the function will return multiple terms.
+        @param key (bytes | memoryview) The key to be converted. It can be a
+        compound one in which case the function will return multiple terms.
+
+        @return tuple
         '''
-        terms = []
         with self.cur('t:st') as cur:
-            for k in bytes(key).split(self.SEP_BYTE):
-                pk_t = cur.get(k)
-                terms.append(self._unpickle(pk_t))
-
-        return tuple(terms)
+            return tuple(
+                   self._unpickle(cur.get(k))
+                   for k in self._split_key(key))
 
 
     def _to_key(self, obj):
@@ -871,8 +893,7 @@ class LmdbStore(Store):
         database. Pairs of terms, as well as triples and quads, are expressed
         as tuples.
 
-        If more than one term is provided, the keys are concatenated using the
-        designated separator byte (`\x00`).
+        If more than one term is provided, the keys are concatenated.
 
         @return bytes
         '''
@@ -887,7 +908,7 @@ class LmdbStore(Store):
                     return None
                 key.append(tk)
 
-        return self.SEP_BYTE.join(key)
+        return b''.join(key)
 
 
     def _hash(self, s):
@@ -897,6 +918,21 @@ class LmdbStore(Store):
         return hashlib.new(self.KEY_HASH_ALGO, s).digest()
 
 
+    def _split_key(self, keys):
+        '''
+        Split a compound key into individual keys.
+
+        This method relies on the fixed length of all term keys.
+
+        @param keys (bytes | memoryview) Concatenated keys.
+
+        @return tuple: bytes | memoryview
+        '''
+        return tuple(
+                keys[i:i+self.KEY_LENGTH]
+                for i in range(0, len(keys), self.KEY_LENGTH))
+
+
     def _normalize_context(self, context):
         '''
         Normalize a context parameter to conform to the model expectations.
@@ -965,10 +1001,12 @@ class LmdbStore(Store):
         '''
         Lookup triples for a pattern with one bound term.
 
-        @TODO This can be called millions of times in a larger SPARQL
-        query, so it better be as efficient as it gets.
+        @param label (string) Which term is being searched for. One of `s`,
+        `p`, or `o`.
+        @param term (rdflib.URIRef) Bound term to search for.
+
+        @return iterator(bytes) SPO keys matching the pattern.
         '''
-        #import pdb; pdb.set_trace()
         k = self._to_key(term)
         if not k:
             return iter(())
@@ -977,7 +1015,7 @@ class LmdbStore(Store):
         with self.cur(idx_name) as cur:
             if cur.set_key(k):
                 for match in cur.iternext_dup():
-                    subkeys = bytes(match).split(self.SEP_BYTE)
+                    subkeys = self._split_key(match)
 
                     # Compose result.
                     out = [None, None, None]
@@ -985,7 +1023,7 @@ class LmdbStore(Store):
                     out[term_order[1]] = subkeys[0]
                     out[term_order[2]] = subkeys[1]
 
-                    yield self.SEP_BYTE.join(out)
+                    yield b''.join(out)
 
 
     def _lookup_2bound(self, bound_terms):
@@ -995,8 +1033,9 @@ class LmdbStore(Store):
         @param bound terms (dict) Triple labels and terms to search for,
         in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
         URIRef('urn:o:1')}
+
+        @return iterator(bytes) SPO keys matching the pattern.
         '''
-        #import pdb; pdb.set_trace()
         if len(bound_terms) != 2:
             raise ValueError(
                     'Exactly 2 terms need to be bound. Got {}'.format(
@@ -1032,7 +1071,7 @@ class LmdbStore(Store):
             if cur.set_key(luk):
                 # Iterate over matches and filter by second term.
                 for match in cur.iternext_dup():
-                    subkeys = bytes(match).split(self.SEP_BYTE)
+                    subkeys = self._split_key(match)
                     flt_subkey = subkeys[fpos]
                     if flt_subkey == ft:
                         # Remainder (not filter) key used to complete the
@@ -1045,7 +1084,8 @@ class LmdbStore(Store):
                         out[term_order[fpos+1]] = flt_subkey
                         out[term_order[2-fpos]] = r_subkey
 
-                        yield self.SEP_BYTE.join(out)
+                        yield b''.join(out)
+
 
     def _append(self, cur, values, **kwargs):
         '''
@@ -1078,11 +1118,12 @@ class LmdbStore(Store):
         indexed. Context MUST be specified for 'add'.
         '''
         # Split and rearrange-join keys for association and indices.
-        triple = bytes(spok).split(self.SEP_BYTE)
+        triple = self._split_key(spok)
         sk, pk, ok = triple
-        spk = self.SEP_BYTE.join(triple[:2])
-        sok = bytes(triple[0]) + self.SEP_BYTE + bytes(triple[2])
-        pok = self.SEP_BYTE.join(triple[1:3])
+        spk = b''.join(triple[:2])
+        spk = b''.join(triple[:2])
+        sok = b''.join((triple[0], triple[2]))
+        pok = b''.join(triple[1:3])
 
         # Associate cursor labels with k/v pairs.
         curs = {

+ 35 - 22
tests/store/test_lmdb_store.py

@@ -266,48 +266,61 @@ class TestContext:
         trp1 = (URIRef('urn:s:1'), URIRef('urn:p:1'), URIRef('urn:o:1'))
         trp2 = (URIRef('urn:s:2'), URIRef('urn:p:2'), URIRef('urn:o:2'))
         trp3 = (URIRef('urn:s:3'), URIRef('urn:p:3'), URIRef('urn:o:3'))
+        trp4 = (URIRef('urn:s:4'), URIRef('urn:p:4'), URIRef('urn:o:4'))
 
         with TxnManager(store, True) as txn:
             store.add(trp1, gr_uri)
             store.add(trp2, gr_uri)
-            store.add(trp2, None)
+            store.add(trp2, gr_uri) # Duplicate; dropped.
+            store.add(trp2, None) # Goes to the default graph.
             store.add(trp3, gr2_uri)
-            store.add(trp3)
+            store.add(trp3, gr_uri)
+            store.add(trp4) # Goes to the default graph.
 
-            assert len(set(store.triples((None, None, None)))) == 3
+            assert len(set(store.triples((None, None, None)))) == 4
             assert len(set(store.triples((None, None, None),
-                RDFLIB_DEFAULT_GRAPH_URI))) == 3
-            assert len(set(store.triples((None, None, None), gr_uri))) == 2
+                RDFLIB_DEFAULT_GRAPH_URI))) == 2
+            assert len(set(store.triples((None, None, None), gr_uri))) == 3
             assert len(set(store.triples((None, None, None), gr2_uri))) == 1
 
             assert gr2_uri in {gr.identifier for gr in store.contexts()}
             assert trp1 in _clean(store.triples((None, None, None)))
-            assert trp1 in _clean(store.triples((None, None, None),
+            assert trp1 not in _clean(store.triples((None, None, None),
                     RDFLIB_DEFAULT_GRAPH_URI))
             assert trp2 in _clean(store.triples((None, None, None), gr_uri))
             assert trp2 in _clean(store.triples((None, None, None)))
             assert trp3 in _clean(store.triples((None, None, None), gr2_uri))
-            assert trp3 in _clean(store.triples((None, None, None),
+            assert trp3 not in _clean(store.triples((None, None, None),
                     RDFLIB_DEFAULT_GRAPH_URI))
 
 
-    #def test_delete_from_ctx(self, store):
-    #    '''
-    #    Delete triples from a named graph and from the default graph.
-    #    '''
-    #    gr_uri = URIRef('urn:bogus:graph#a')
-    #    gr2_uri = URIRef('urn:bogus:graph#b')
+    def test_delete_from_ctx(self, store):
+        '''
+        Delete triples from a named graph and from the default graph.
+        '''
+        gr_uri = URIRef('urn:bogus:graph#a')
+        gr2_uri = URIRef('urn:bogus:graph#b')
 
-    #    with TxnManager(store, True) as txn:
-    #        store.remove((None, None, None), gr2_uri)
-    #        assert len(set(store.triples((None, None, None), gr2_uri))) == 0
-    #        assert len(set(store.triples((None, None, None), gr_uri))) == 2
+        with TxnManager(store, True) as txn:
+            store.remove((None, None, None), gr2_uri)
+            assert len(set(store.triples((None, None, None), gr2_uri))) == 0
+            assert len(set(store.triples((None, None, None), gr_uri))) == 3
 
-    #    with TxnManager(store, True) as txn:
-    #        store.remove((None, None, None))
-    #        assert len(set(store.triples((None, None, None)))) == 0
-    #        assert len(set(store.triples((None, None, None), gr_uri))) == 0
-    #        assert len(store) == 0
+        with TxnManager(store, True) as txn:
+            store.remove((URIRef('urn:s:1'), None, None))
+            assert len(set(store.triples((None, None, None), gr_uri))) == 2
+            assert len(set(store.triples((None, None, None)))) == 3
+
+        with TxnManager(store, True) as txn:
+            store.remove((URIRef('urn:s:4'), None, None),
+                    RDFLIB_DEFAULT_GRAPH_URI)
+            assert len(set(store.triples((None, None, None)))) == 2
+
+        with TxnManager(store,True) as txn:
+            store.remove((None, None, None))
+            assert len(set(store.triples((None, None, None)))) == 0
+            assert len(set(store.triples((None, None, None), gr_uri))) == 0
+            assert len(store) == 0
 
 
 @pytest.mark.usefixtures('store')