浏览代码

LMDB strategy #4.

Stefano Cossu 7 年之前
父节点
当前提交
3b6a0717ae
共有 1 个文件被更改,包括 441 次插入340 次删除
  1. 441 340
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py

+ 441 - 340
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -72,19 +72,88 @@ class TxnManager(ContextDecorator):
             # jobs left from other requests.
             # jobs left from other requests.
         else:
         else:
             self.store.commit()
             self.store.commit()
-            if len(self.store._data_queue):
-                self.store._apply_changes()
-            if len(self.store._idx_queue):
-                # Ditch index data. For testing data entry only.
-                #self.store._idx_queue = []
-                # Synchronous.
-                self.store._run_indexing()
-                # Threading.
-                #job = Thread(target=self.store._run_indexing)
-                # Multiprocess.
-                #job = Process(target=self.store._run_indexing)
-                #job.start()
-                #logger.info('Started indexing job #{}'.format(job.ident))
+            if self.write:
+                if len(self.store._data_queue):
+                    self.store._apply_changes()
+                if len(self.store._idx_queue):
+                    # Ditch index data. For testing data entry only.
+                    #self.store._idx_queue = []
+                    # Synchronous.
+                    self.store._run_indexing()
+                    # Threading.
+                    #job = Thread(target=self.store._run_indexing)
+                    # Multiprocess.
+                    #job = Process(target=self.store._run_indexing)
+                    #job.start()
+                    #logger.info('Started indexing job #{}'.format(job.ident))
+
+
+class LexicalSequence:
+    '''
+    Fixed-length lexicographically ordered byte sequence.
+
+    Useful to generate optimized sequences of keys in LMDB.
+    '''
+    def __init__(self, start=1, max_len=5):
+        '''
+        @param start (bytes) Starting byte value. Bytes below this value are
+        never found in this sequence. This is useful to allot special bytes
+        to be used e.g. as separators.
+        @param max_len (int) Maximum number of bytes that a byte string can
+        contain. This should be chosen carefully since the number of all
+        possible key combinations is determined by this value and the `start`
+        value. The default args provide 255**5 (~1 Tn) unique combinations.
+        '''
+        self.start = start
+        self.length = max_len
+
+
+    def first(self):
+        '''
+        First possible combination.
+        '''
+        return bytearray([self.start] * self.length)
+
+
+    def next(self, n):
+        '''
+        Calculate the next closest byte sequence in lexicographical order.
+
+        This is used to fill the next available slot after the last one in
+        LMDB. Keys are byte strings, which is a convenient way to keep key
+        lengths as small as possible when they are referenced in several
+        indices.
+
+        This function assumes that all the keys are padded with the `start`
+        value up to the `max_len` length.
+
+        @param n (bytes) Current byte sequence to add to.
+        '''
+        if not n:
+            n = self.first()
+        elif isinstance(n, bytes) or isinstance(n, memoryview):
+            n = bytearray(n)
+        elif not isinstance(n, bytearray):
+            raise ValueError('Input sequence must be bytes or a bytearray.')
+
+        if not len(n) == self.length:
+            raise ValueError('Incorrect sequence length.')
+
+        for i, b in list(enumerate(n))[::-1]:
+            try:
+                n[i] += 1
+            # If the value exceeds 255, i.e. the current value is the last one
+            except ValueError:
+                if i == 0:
+                    raise RuntimeError('BAD DAY: Sequence exhausted. No more '
+                            'combinations are possible.')
+                # Move one position up and try to increment that.
+                else:
+                    n[i] = self.start
+                    continue
+            else:
+                return bytes(n)
+
 
 
 
 
 class LmdbStore(Store):
 class LmdbStore(Store):
@@ -140,18 +209,36 @@ class LmdbStore(Store):
     '''
     '''
     KEY_HASH_ALGO = 'sha1'
     KEY_HASH_ALGO = 'sha1'
 
 
+    '''Separator byte. Used to join and plit individual term keys.'''
+    SEP_BYTE = b'\x00'
+
     '''
     '''
-    Whether the default graph is the union graph. At the moment only False
-    is supported.
+    Dummy bytestriung to associate with a "no triple" statement in the c:spo
+    index. Used to keep track of empty graphs.
     '''
     '''
-    DEFAULT_UNION = False
+    NO_TRIPLE = b'\x01' * 5
 
 
     DEFAULT_GRAPH_URI = URIRef('urn:fcrepo:default_graph')
     DEFAULT_GRAPH_URI = URIRef('urn:fcrepo:default_graph')
 
 
-    data_keys = ('tk:c', 'tk:t', 'pfx:ns')
+    KEY_LENGTH = 5 # Max key length for terms. That allows for A LOT of terms.
+    KEY_START = 2 # \x00 is reserved as a separator. \x01 is spare.
+
+    data_keys = (
+        # Term key to serialized term content: 1:1
+        't:st',
+        # Joined triple keys to context key: 1:m
+        'spo:c',
+    )
     idx_keys = (
     idx_keys = (
-            'c:tk', 'sk:tk', 'pk:tk', 'ok:tk', 'spk:tk', 'sok:tk', 'pok:tk',
-            'ns:pfx')
+        # Namespace to prefix: 1:1
+        'ns:pfx',
+        # Term hash to triple key: 1:1
+        'th:t',
+        # Lookups for one known term: 1:m
+        's:po', 'p:so', 'o:sp', 'c:spo',
+        # Lookups for two known terms: 1:m
+        'sp:o', 'so:p', 'po:s',
+    )
 
 
     data_env = None
     data_env = None
     idx_env = None
     idx_env = None
@@ -188,6 +275,8 @@ class LmdbStore(Store):
         self._pickle = self.node_pickler.dumps
         self._pickle = self.node_pickler.dumps
         self._unpickle = self.node_pickler.loads
         self._unpickle = self.node_pickler.loads
 
 
+        self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
+
 
 
     def __len__(self, context=None):
     def __len__(self, context=None):
         '''
         '''
@@ -198,8 +287,9 @@ class LmdbStore(Store):
 
 
         if context.identifier is not self.DEFAULT_GRAPH_URI:
         if context.identifier is not self.DEFAULT_GRAPH_URI:
             #dataset = self.triples((None, None, None), context)
             #dataset = self.triples((None, None, None), context)
-            dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
-            return len(set(dataset))
+            with self.cur('c:spo') as cur:
+                dataset = set(cur.iternext_dup())
+                return len(dataset)
         else:
         else:
             return self.data_txn.stat(self.dbs['tk:t'])['entries']
             return self.data_txn.stat(self.dbs['tk:t'])['entries']
 
 
@@ -237,12 +327,10 @@ class LmdbStore(Store):
             raise RuntimeError('Store must be opened first.')
             raise RuntimeError('Store must be opened first.')
         logger.info('Beginning a {} transaction.'.format(
         logger.info('Beginning a {} transaction.'.format(
             'read/write' if write else 'read-only'))
             'read/write' if write else 'read-only'))
-        self.data_txn = self.data_env.begin(buffers=True)
-        self.idx_txn = self.idx_env.begin(buffers=True)
-        self.is_txn_rw = write
-        # Cursors.
-        self.curs = self.get_data_cursors(self.data_txn)
-        self.curs.update(self.get_idx_cursors(self.idx_txn))
+        self.data_txn = self.data_env.begin(buffers=True, write=write)
+        self.idx_txn = self.idx_env.begin(buffers=True, write=write)
+
+        self.is_txn_rw = write==True
 
 
 
 
     @property
     @property
@@ -350,61 +438,121 @@ class LmdbStore(Store):
         Store.add(self, triple, context)
         Store.add(self, triple, context)
 
 
         #logger.info('Adding triple: {}'.format(triple))
         #logger.info('Adding triple: {}'.format(triple))
-        if self.DEFAULT_UNION:
-            raise NotImplementedError()
-            # @TODO
-        elif context is None:
+        if context is None:
             context = self.DEFAULT_GRAPH_URI
             context = self.DEFAULT_GRAPH_URI
         pk_trp = self._pickle(triple)
         pk_trp = self._pickle(triple)
-        trp_key = hashlib.new(self.KEY_HASH_ALGO, pk_trp).digest()
-
-        needs_indexing = False
-        with self.cur('tk:t') as cur:
-            if not cur.set_key(trp_key):
-                self._enqueue_action('put', 'tk:t', trp_key, pk_trp)
-                needs_indexing = True
 
 
-        pk_ctx = self._pickle(context.identifier) \
+        pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
+        pk_c = self._pickle(context.identifier) \
                 if isinstance(context, Graph) \
                 if isinstance(context, Graph) \
                 else self._pickle(context)
                 else self._pickle(context)
-        with self.cur('tk:c') as cur:
-            if not cur.set_key_dup(trp_key, pk_ctx):
-                self._enqueue_action('put', 'tk:c', trp_key, pk_ctx)
-                needs_indexing = True
 
 
-        if needs_indexing:
-            self._idx_queue.append((trp_key, pk_ctx, triple))
+        # Add new individual terms or gather keys for existing ones.
+        keys = [None, None, None, None]
+        with self.cur('th:t') as idx_cur:
+            for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
+                thash = self._hash(pk_t)
+                if idx_cur.set_key(thash):
+                    keys[i] = idx_cur.value()
+                else:
+                    # Put new term.
+                    with self.cur('t:st') as cur:
+                        keys[i] = self._append(cur, (pk_t,))[0]
+                    # Index.
+                    idx_cur.put(thash, keys[i])
+
+        # Add triple:context association.
+        ck = keys[3]
+        spok = self.SEP_BYTE.join(keys[:3])
+        with self.cur('spo:c') as cur:
+            triple_exists = cur.set_key_dup(spok, ck)
+            if not triple_exists:
+                cur.put(spok, ck)
+
+        self._index('add', spok, ck)
+
+
+    def _index(self, action, spok, ck=None):
+        '''
+        Update index for a triple and context (add or remove).
+
+        @param action (string) 'add' or 'remove'.
+        @param spok (bytes) Triple key.
+        @param ck (bytes|None) Context key. If None, all contexts found are
+        indexed. Context MUST be specified for 'add'.
+        '''
+        # Split and rearrange-join keys for association and indices.
+        triple = spok.split(self.SEP_BYTE)
+        sk, pk, ok = triple[:3]
+        spk = self.SEP_BYTE.join(triple[:2])
+        sok = bytes(triple[0]) + self.SEP_BYTE + bytes(triple[2])
+        pok = self.SEP_BYTE.join(triple[1:3])
+        spok = self.SEP_BYTE.join(triple[:3])
+
+        # Associate cursor labels with k/v pairs.
+        curs = {
+            's:po': (sk, pok),
+            'p:so': (pk, sok),
+            'o:sp': (ok, spk),
+            'sp:o': (spk, ok),
+            'so:p': (sok, pk),
+            'po:s': (pok, sk),
+        }
+
+        # Index context association.
+        if ck:
+            cks = (ck,)
+        elif action == 'remove':
+            # Delete all contexts if none is specified.
+            with self.cur('spo:c') as spo_cur:
+                if spo_cur.set_key(spok):
+                    cks = iternext_dup()
+                    with self.cur('c:spo') as c_cur:
+                        if c_cur.set_key_dup(ck, spok):
+                            c_cur.delete()
+        else:
+            raise ValueError('Cannot run an \'add\' index without context.')
+
+        # Loop over contexts.
+        for ck in cks:
+            for clabel, terms in curs.items():
+                with self.cur(clabel) as cur:
+                    if action == 'remove':
+                        if cur.set_key_dup(*terms):
+                            cur.delete()
+                    elif action == 'add':
+                        cur.put(*terms)
+                    else:
+                        raise ValueError(
+                                'Index action \'{}\' not supported.'
+                                .format(action))
 
 
 
 
     def remove(self, triple_pattern, context=None):
     def remove(self, triple_pattern, context=None):
         '''
         '''
         Remove a triple and start indexing.
         Remove a triple and start indexing.
         '''
         '''
-        if self.DEFAULT_UNION:
-            raise NotImplementedError()
-            # @TODO
-        elif context is None:
-            context = self.DEFAULT_GRAPH_URI
-
         #import pdb; pdb.set_trace()
         #import pdb; pdb.set_trace()
-        pk_ctx = self._pickle(context.identifier) \
-                if isinstance(context, Graph) \
-                else self._pickle(context)
+        if context is not None:
+            pk_c = self._pickle(context.identifier) \
+                    if isinstance(context, Graph) \
+                    else self._pickle(context)
+            ck = self._to_key(context)
+        else:
+            ck = None
+
         for trp_key in self._triple_keys(triple_pattern, context):
         for trp_key in self._triple_keys(triple_pattern, context):
             # Delete context association.
             # Delete context association.
-            with self.cur('tk:c') as cur:
-                if cur.set_key_dup(trp_key, pk_ctx):
-                    triple = self._key_to_triple(trp_key)
-                    self._enqueue_action('delete', 'tk:c', trp_key, pk_ctx)
-
-                    # If no other contexts are associated with the triple,
-                    # delete it.
-                    with self.cur('tk:t') as trp_cur:
-                        if not cur.set_key(trp_key):
-                            self._enqueue_action(
-                                    'delete', 'tk:c', trp_key, None)
+            with self.cur('spo:c') as cur:
+                if ck:
+                    if cur.set_key_dup(trp_key, ck):
+                        cur.delete()
+                else:
+                    # If no context is specified, remove all associations.
+                    if cur.set_key(trp_key):
+                        cur.delete(dupdata=True)
 
 
-                    self._idx_queue.append((trp_key, pk_ctx, triple))
+            self._index('remove', trp_key, ck)
 
 
 
 
     def triples(self, triple_pattern, context=None):
     def triples(self, triple_pattern, context=None):
@@ -413,10 +561,23 @@ class LmdbStore(Store):
 
 
         @param triple_pattern (tuple) 3 RDFLib terms
         @param triple_pattern (tuple) 3 RDFLib terms
         @param context (rdflib.Graph | None) Context graph, if available.
         @param context (rdflib.Graph | None) Context graph, if available.
-        If a graph is given, only its identifier is stored.
+
+        @return Generator over triples and contexts in which each result has
+        the following format:
+        > (s, p, o), generator(contexts)
+        Where the contexts generator lists all context that the triple appears
+        in.
         '''
         '''
-        for tk in self._triple_keys(triple_pattern, context):
-            yield self._key_to_triple(tk), context
+        #import pdb; pdb.set_trace()
+        with self.cur('spo:c') as cur:
+            for spok in self._triple_keys(triple_pattern, context):
+                if context is not None:
+                    yield self._from_key(spok), (context,)
+                else:
+                    if cur.set_key(spok):
+                        contexts = (self._from_key(ck)
+                                for ck in cur.iternext_dup())
+                        yield self._from_key(spok), contexts
 
 
 
 
     def bind(self, prefix, namespace):
     def bind(self, prefix, namespace):
@@ -468,14 +629,14 @@ class LmdbStore(Store):
         @return generator:URIRef
         @return generator:URIRef
         '''
         '''
         if triple:
         if triple:
-            with self.cur('tk:c') as cur:
+            with self.cur('spo:c') as cur:
                 cur.set_key(self._to_key(triple))
                 cur.set_key(self._to_key(triple))
                 contexts = cur.iternext_dup()
                 contexts = cur.iternext_dup()
         else:
         else:
-            with self.cur('c:tk') as cur:
+            with self.cur('c:spo') as cur:
                 contexts = cur.iternext_nodup()
                 contexts = cur.iternext_nodup()
 
 
-        return (self._unpickle(ctx) for ctx in contexts)
+        return (self._from_key(ctx)[0] for ctx in contexts)
 
 
 
 
     def add_graph(self, graph):
     def add_graph(self, graph):
@@ -486,22 +647,39 @@ class LmdbStore(Store):
         pickled `None` value. This prevents from removing the graph when all
         pickled `None` value. This prevents from removing the graph when all
         triples are removed.
         triples are removed.
 
 
-        This may be called by supposedly read-only operations:
+        This may be called by read-only operations:
         https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
         https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
         Therefore it needs to open a write transaction. This is not ideal
         Therefore it needs to open a write transaction. This is not ideal
         but the only way to handle datasets in RDFLib.
         but the only way to handle datasets in RDFLib.
 
 
         @param graph (URIRef) URI of the named graph to add.
         @param graph (URIRef) URI of the named graph to add.
         '''
         '''
-        pk_none = self._pickle(None)
-        pk_ctx = self._pickle(graph)
-        with self.data_env.begin(write=True).cursor(self.dbs['tk:c']) \
-                as tk2c_cur:
-            tk2c_cur.put(pk_none, pk_ctx)
-
-        with self.idx_env.begin(write=True)\
-                .cursor(self.dbs['c:tk']) as c2tk_cur:
-            c2tk_cur.put(pk_ctx, pk_none)
+        if isinstance(graph, Graph):
+            graph = graph.identifier
+        pk_c = self._pickle(graph)
+        c_hash = self._hash(pk_c)
+        with self.cur('th:t') as cur:
+            c_exists = cur.set_key(c_hash)
+        if not c_exists:
+            # Insert context term if not existing.
+            if self.is_txn_rw:
+                # Use existing R/W transaction.
+                with self.cur('t:st') as cur:
+                    ck = self._append(cur, (pk_c,))[0]
+                with self.cur('th:t') as cur:
+                    cur.put(c_hash, ck)
+                with self.cur('c:spo') as cur:
+                    cur.put(ck, self.NO_TRIPLE)
+            else:
+                # Open new R/W transactions.
+                with self.data_env.begin(write=True).cursor(self.dbs['t:st']) \
+                        as cur:
+                    ck = self._append(cur, (pk_c,))[0]
+                with self.idx_env.begin(write=True) as wtxn:
+                    with wtxn.cursor(self.dbs['th:t']) as cur:
+                        cur.put(c_hash, ck)
+                    with wtxn.cursor(self.dbs['c:spo']) as cur:
+                        cur.put(ck, b'')
 
 
 
 
     def remove_graph(self, graph):
     def remove_graph(self, graph):
@@ -510,16 +688,15 @@ class LmdbStore(Store):
 
 
         @param graph (URIRef) URI of the named graph to remove.
         @param graph (URIRef) URI of the named graph to remove.
         '''
         '''
+        if isinstance(graph, Graph):
+            graph = graph.identifier
+
         self.remove((None, None, None), graph)
         self.remove((None, None, None), graph)
 
 
-        pk_none = self._pickle(None)
-        pk_ctx = self._pickle(graph)
-        self._enqueue_action('delete', 'tk:c', pk_none, pk_ctx)
-        self._idx_queue.append((None, pk_ctx, None))
-
-        with self.cur('c:tk') as cur:
-            if cur.set_key_dup(self._pickle(graph), self._pickle(None)):
-                self.curs['tk:c'].delete()
+        ck = self._to_key(graph)
+        with self.cur('c:spo') as cur:
+            if cur.set_key_dup(ck, self.NO_TRIPLE):
+                cur.delete()
 
 
 
 
     def commit(self):
     def commit(self):
@@ -529,7 +706,7 @@ class LmdbStore(Store):
         if self.is_txn_open:
         if self.is_txn_open:
             self.data_txn.commit()
             self.data_txn.commit()
             self.idx_txn.commit()
             self.idx_txn.commit()
-        self.data_txn = self.idx_txn = self.is_txn_rw = None
+        self.data_txn = self.idx_txn = None
 
 
 
 
     def rollback(self):
     def rollback(self):
@@ -539,34 +716,28 @@ class LmdbStore(Store):
         if self.is_txn_open:
         if self.is_txn_open:
             self.data_txn.abort()
             self.data_txn.abort()
             self.idx_txn.abort()
             self.idx_txn.abort()
-        self.data_txn = self.idx_txn = self.is_txn_rw = None
-
-
-    #def _next_lex_key(self, db=None):
-    #    '''
-    #    Calculate the next closest byte sequence in lexicographical order.
-
-    #    This is needed to fill the next available slot after the last one in
-    #    LMDB. Keys are byte strings. This is convenient to keep key
-    #    lengths as small as possible because they are referenced in several
-    #    indices.
-    #    '''
-    #    with self.env.begin(buffers=True) as txn:
-    #        with txn.cursor(db) as cur:
-    #            has_entries = cur.last()
-    #            if has_entries:
-    #                next = bytearray(cur.key())
-    #            else:
-    #                # First key in db.
-    #                return b'\x00'
-    #    try:
-    #        next[-1] += 1
-    #    # If the value exceeds 256, i.e. the current value is the last one,
-    #    # append a new \x00 and the next iteration will start incrementing that
-    #    except ValueError:
-    #        next.append(0)
-
-    #    return next
+        self.data_txn = self.idx_txn = None
+
+
+    def rebase(self, n, start=1):
+        '''
+        Create a bytearray translating an integer to an arbitrary base.
+
+        the base is between the `start` value and 255 to fit in one-byte
+        chunks.
+
+        @param n (int) Number to rebase.
+        @param start (int) Starting byte. This is useful to leave out "special"
+        bytes for purposes such as separators.
+
+        @return bytearray
+        '''
+        map = list(range(start, 255))
+        base = len(map)
+        if n < base:
+            return bytearray([map[n]])
+        else:
+            return self.rebase(n // base, start) + bytearray([map[n % base]])
 
 
 
 
     ## PRIVATE METHODS ##
     ## PRIVATE METHODS ##
@@ -580,54 +751,50 @@ class LmdbStore(Store):
         keys without incurring in the overhead of converting them to triples.
         keys without incurring in the overhead of converting them to triples.
 
 
         @param triple_pattern (tuple) 3 RDFLib terms
         @param triple_pattern (tuple) 3 RDFLib terms
-        @param context (rdflib.Graph | None) Context graph, if available.
-        If a graph is given, only its identifier is stored.
+        @param context (rdflib.Graph | None) Context graph or URI, or None.
         '''
         '''
         if context == self:
         if context == self:
             context = None
             context = None
 
 
-        if self.DEFAULT_UNION:
-            raise NotImplementedError()
-            # In theory, this is what should happen:
-            #if context == self.DEFAULT_GRAPH_URI
-            #    # Any pattern with unbound context
-            #    for tk in self._lookup(triple_pattern, tkey):
-            #        yield self._key_to_triple(tk)
-            #    return
-        elif context is None:
-            context = self.DEFAULT_GRAPH_URI
-
-        tkey = self._to_key(triple_pattern)
+        if context:
+            pk_c = self._pickle(context.identifier) \
+                    if isinstance(context, Graph) \
+                    else self._pickle(context)
+            ck = self._to_key(context)
 
 
-        # Shortcuts
-        pk_ctx = self._pickle(context.identifier) \
-                if isinstance(context, Graph) \
-                else self._pickle(context)
-        if not self.curs['c:tk'].set_key(pk_ctx):
-            # Context not found.
-            return iter(())
-
-        # s p o c
-        if all(triple_pattern):
-            if self.curs['tk:c'].set_key_dup(tkey, pk_ctx):
-                yield tkey
-                return
-            else:
-                # Triple not found.
+            # Shortcuts
+            if not ck:
+                # Context not found.
                 return iter(())
                 return iter(())
 
 
-        # ? ? ? c
-        elif not any(triple_pattern):
-            # Get all triples from the context
-            for tk in self.curs['c:tk'].iternext_dup():
-                yield tk
+            with self.cur('c:spo') as cur:
+                # s p o c
+                if all(triple_pattern):
+                    spok = self._to_key(triple_pattern)
+                    if not spok:
+                        # A term in the triple is not found.
+                        return iter(())
+                    if cur.set_key_dup(ck, spok):
+                        yield spok
+                        return
+                    else:
+                        # Triple not found.
+                        return iter(())
+
+                # ? ? ? c
+                elif not any(triple_pattern):
+                    # Get all triples from the context
+                    for spok in cur.iternext_dup():
+                        yield spok
 
 
-        # Regular lookup.
+                # Regular lookup.
+                else:
+                    for spok in self._lookup(triple_pattern):
+                        if cur.set_key_dup(ck, spok):
+                            yield spok
+                    return
         else:
         else:
-            for tk in self._lookup(triple_pattern, tkey):
-                if self.curs['c:tk'].set_key_dup(pk_ctx, tk):
-                    yield tk
-            return
+            yield from self._lookup(triple_pattern)
 
 
 
 
     def _init_db_environments(self, path, create=True):
     def _init_db_environments(self, path, create=True):
@@ -655,16 +822,35 @@ class LmdbStore(Store):
         # Open and optionally create main databases.
         # Open and optionally create main databases.
         self.dbs = {
         self.dbs = {
             # Main databases.
             # Main databases.
-            'tk:t': self.data_env.open_db(b'tk:t', create=create),
-            'tk:c': self.data_env.open_db(b'tk:c', create=create, dupsort=True),
+            't:st': self.data_env.open_db(b't:st', create=create),
+            'spo:c': self.data_env.open_db(
+                    b'spo:c', create=create, dupsort=True, dupfixed=True),
             'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
             'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
-            # Index.
+            # One-off indices.
             'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
             'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
+            'th:t': self.idx_env.open_db(b'th:t', create=create),
         }
         }
         # Other index databases.
         # Other index databases.
         for db_key in self.idx_keys:
         for db_key in self.idx_keys:
-            self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
-                    dupsort=True, dupfixed=True, create=create)
+            if db_key not in ('ns:pfx', 'th:t'):
+                self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
+                        dupsort=True, dupfixed=True, create=create)
+
+
+    def _from_key(self, key):
+        '''
+        Convert a key into one or more terms.
+
+        @param key (bytes) The key to be converted. It can be a compound one
+        in which case the function will return multiple terms.
+        '''
+        terms = []
+        with self.cur('t:st') as cur:
+            for k in bytes(key).split(self.SEP_BYTE):
+                pk_t = cur.get(k)
+                terms.append(self._unpickle(pk_t))
+
+        return terms
 
 
 
 
     def _to_key(self, obj):
     def _to_key(self, obj):
@@ -674,217 +860,132 @@ class LmdbStore(Store):
         The key is the checksum of the pickled object, therefore unique for
         The key is the checksum of the pickled object, therefore unique for
         that object. The hashing algorithm is specified in `KEY_HASH_ALGO`.
         that object. The hashing algorithm is specified in `KEY_HASH_ALGO`.
 
 
-        @param obj (Object) Anything that can be pickled. Pairs of terms, as
-        well as triples and quads, are expressed as tuples within the scope of
-        this application.
+        @param obj (Object) Anything that can be reduced to terms stored in the
+        database. Pairs of terms, as well as triples and quads, are expressed
+        as tuples.
+
+        If more than one term is provided, the keys are concatenated using the
+        designated separator byte (`\x00`).
 
 
         @return bytes
         @return bytes
         '''
         '''
-        return hashlib.new(self.KEY_HASH_ALGO, self._pickle(obj)).digest()
-
+        if not isinstance(obj, list) and not isinstance(obj, tuple):
+            obj = (obj,)
+        key = []
+        with self.cur('th:t') as cur:
+            for term in obj:
+                tk = cur.get(self._hash(self._pickle(term)))
+                if not tk:
+                    # If any of the terms is not found, return None immediately
+                    return None
+                key.append(tk)
 
 
-    def _key_to_triple(self, key):
-        '''
-        Look up for the hash key of a triple and return the triple as a tuple.
+        return self.SEP_BYTE.join(key)
 
 
-        @param key (bytes) Hash key of triple.
 
 
-        @return Tuple with triple elements or None if key is not found.
+    def _hash(self, s):
         '''
         '''
-        pk_trp = self.curs['tk:t'].get(key)
-
-        return self._unpickle(pk_trp) if pk_trp else None
+        Get the hash value of a serialized object.
+        '''
+        return hashlib.new(self.KEY_HASH_ALGO, s).digest()
 
 
 
 
-    def _lookup(self, triple_pattern, tkey=None):
+    def _lookup(self, triple_pattern):
         '''
         '''
-        Look up triples based on a triple pattern.
+        Look up triples in the indices based on a triple pattern.
 
 
         @return iterator of matching triple keys.
         @return iterator of matching triple keys.
         '''
         '''
+        #import pdb; pdb.set_trace()
         s, p, o = triple_pattern
         s, p, o = triple_pattern
 
 
         if s is not None:
         if s is not None:
             if p is not None:
             if p is not None:
                 # s p o
                 # s p o
                 if o is not None:
                 if o is not None:
-                    if self.curs['tk:t'].set_key(tkey):
-                        yield tkey
-                        return
-                    else:
-                        return iter(())
+                    with self.cur('spo:c') as cur:
+                        tkey = self._to_key(triple_pattern)
+                        if cur.set_key(tkey):
+                            yield tkey
+                            return
+                        else:
+                            return iter(())
                 # s p ?
                 # s p ?
                 else:
                 else:
-                    cur = self.curs['spk:tk']
-                    term = self._pickle((s, p))
+                    bound_terms = [s, p]
+                    cur_label = 'sp:o'
+                    order = (0, 1, 2)
             else:
             else:
                 # s ? o
                 # s ? o
                 if o is not None:
                 if o is not None:
-                    cur = self.curs['sok:tk']
-                    term = self._pickle((s, o))
+                    bound_terms = [s, o]
+                    cur_label = 'so:p'
+                    order = (0, 2, 1)
                 # s ? ?
                 # s ? ?
                 else:
                 else:
-                    cur = self.curs['sk:tk']
-                    term = self._pickle(s)
+                    bound_terms = [s]
+                    cur_label = 's:po'
+                    order = (0, 1, 2)
         else:
         else:
             if p is not None:
             if p is not None:
                 # ? p o
                 # ? p o
                 if o is not None:
                 if o is not None:
-                    cur = self.curs['pok:tk']
-                    term = self._pickle((p, o))
+                    bound_terms = [p, o]
+                    cur_label = 'po:s'
+                    order = (2, 0, 1)
                 # ? p ?
                 # ? p ?
                 else:
                 else:
-                    cur = self.curs['pk:tk']
-                    term = self._pickle(p)
+                    bound_terms = [p]
+                    cur_label = 'p:so'
+                    order = (1, 0, 2)
             else:
             else:
-                # ? ? o
+                # ? ? or
                 if o is not None:
                 if o is not None:
-                    cur = self.curs['ok:tk']
-                    term = self._pickle(o)
+                    bound_terms = [o]
+                    cur_label = 'o:sp'
+                    order = (1, 2, 0)
                 # ? ? ?
                 # ? ? ?
                 else:
                 else:
-                    # Get all triples in the database
-                    for c in self.curs['tk:t'].iternext(values=False):
-                        yield c
+                    # Get all triples in the database.
+                    with self.cur('spo:c') as cur:
+                        yield from cur.iternext_nodup()
                     return
                     return
 
 
-        key = hashlib.new(self.KEY_HASH_ALGO, term).digest()
-        if cur.set_key(key):
-            for match in cur.iternext_dup():
-                yield match
-        else:
-            return iter(())
-
-
-    def _enqueue_action(self, action, db, k, v):
-        '''
-        Enqueue an action to be performed in a write transaction.
+        tkey = self._to_key(bound_terms)
+        with self.cur(cur_label) as cur:
+            #import pdb; pdb.set_trace()
+            if cur.set_key(tkey):
+                for match in cur.iternext_dup():
+                    # Combine bound and found in search order.
+                    comb_keys = (
+                            bytes(tkey).split(self.SEP_BYTE)
+                            + bytes(match).split(self.SEP_BYTE))
+                    # Rearrange term keys according to given order.
+                    yield self.SEP_BYTE.join([comb_keys[i] for i in order])
+            else:
+                return iter(())
 
 
-        Actions are accumulated sequentially and then executed once the
-        `_run_update` method is called. This is usually done by the
-        TxnManager class.
 
 
-        @param action (string) One of 'put', 'putmulti' or 'delete'.
-        @param db (string) Label of the database to perform the action.
-        @param k (bytes) Key to update.
-        @param v (bytes) Value to insert or delete.
+    def _append(self, cur, values, **kwargs):
         '''
         '''
-        if not action in ('put', 'putmulti', 'delete'):
-            raise NameError('No action with name {}.'.format(action))
-
-        self._data_queue.append((action, db, k, v))
+        Append one or more values to the end of a database.
 
 
+        @param cur (lmdb.Cursor) The write cursor to act on.
+        @param data (list(bytes)) Value(s) to append.
 
 
-    def _apply_changes(self):
-        '''
-        Apply changes in `_data_queue`.
+        @return list(bytes) Last key(s) inserted.
         '''
         '''
-        with ExitStack() as stack:
-            data_txn = stack.enter_context(
-                    self.data_env.begin(write=True, buffers=True))
-            logger.info('Beginning data insert. Data write lock acquired.')
+        if not isinstance(values, list) and not isinstance(values, tuple):
+            raise ValueError('Input must be a list or tuple.')
+        data = []
+        lastkey = cur.key() if cur.last() else None
+        for v in values:
+            lastkey = self._key_seq.next(lastkey)
+            data.append((lastkey, v))
 
 
-            curs = {
-                task[1]: stack.enter_context(
-                        data_txn.cursor(self.dbs[task[1]]))
-                for task in self._data_queue
-            }
-            #logger.debug('Data queue: {}'.format(self._data_queue))
-            #import pdb; pdb.set_trace()
-            logger.debug('Data queue: {} triples.'.format(len(self._data_queue)))
-            while len(self._data_queue):
-                action, db, k, v = self._data_queue.pop()
-                if action == 'put':
-                    curs[db].put(k, v)
-                elif action == 'putmulti':
-                    # With 'putmulti', `k` is a series of 2-tuples and `v` is
-                    # ignored.
-                    data = k
-                    curs[db].putmulti(data)
-                elif action == 'delete':
-                    if v is None:
-                        # Delete all values for the key.
-                        if curs[db].set_key(k):
-                            curs[db].delete(dupdata=True)
-                    else:
-                        # Delete only a specific k:v pair.
-                        if curs[db].set_key_dup(k, v):
-                            curs[db].delete(dupdata=False)
-                else:
-                    raise ValueError(
-                        'Action type \'{}\' is not supported.' .format(action))
-        logger.info('Data insert completed. Data write lock released.')
-
-
-    def _run_indexing(self):
-        '''
-        Update indices for a given triple.
-
-        If the triple is found, add indices. if it is not found, delete them.
-        This method is run asynchronously and may outlive the HTTP request.
-
-        @param key (bytes) Unique key associated with the triple.
-        @param pk_ctx (bytes) Pickled context term.
-        @param triple (tuple: rdflib.Identifier) Tuple of 3 RDFLib terms.
-        This can be provided if already pre-calculated, otherwise it will be
-        retrieved from the store using `trp_key`.
-        '''
-        with ExitStack() as stack:
-            data_txn = stack.enter_context(self.data_env.begin(buffers=True))
-            idx_txn = stack.enter_context(
-                    self.idx_env.begin(write=True, buffers=True))
-            logger.info('Index started. Index write lock acquired.')
-            data_curs = self.get_data_cursors(data_txn)
-            idx_curs = self.get_idx_cursors(idx_txn)
-
-            lock = Lock()
-            #logger.debug('Index queue: {}'.format(self._idx_queue))
-            logger.debug('Index queue: {}'.format(len(self._idx_queue)))
-            while len(self._idx_queue):
-                lock.acquire()
-                trp_key, pk_ctx, triple = self._idx_queue.pop()
-
-                if trp_key is None and triple is None:
-                    # This is when a graph is deleted.
-                    if not data_curs['tk:c'].set_key(pk_ctx):
-                        pk_none = self._pickle(None)
-                        if idx_curs['c:tk'].set_key_dup(pk_none, pk_ctx):
-                            idx_curs['c:tk'].delete()
-                    lock.release()
-                    continue
-
-                if triple is None:
-                    triple = self._key_to_triple(trp_key)
-
-                s, p, o = triple
-                term_keys = {
-                    'sk:tk': self._to_key(s),
-                    'pk:tk': self._to_key(p),
-                    'ok:tk': self._to_key(o),
-                    'spk:tk': self._to_key((s, p)),
-                    'sok:tk': self._to_key((s, o)),
-                    'pok:tk': self._to_key((p, o)),
-                }
-
-                if data_curs['tk:t'].get(trp_key):
-                    # Add to index.
-                    for ikey in term_keys:
-                        idx_curs[ikey].put(term_keys[ikey], trp_key)
-                else:
-                    # Delete from index if a match is found.
-                    for ikey in term_keys:
-                        if idx_curs[ikey].set_key_dup(
-                                term_keys[ikey], trp_key):
-                            idx_curs[ikey].delete()
-
-                # Add or remove context association index.
-                if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-                    idx_curs['c:tk'].put(pk_ctx, trp_key)
-                elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
-                    idx_curs['c:tk'].delete()
-                lock.release()
+        cur.putmulti(data, **kwargs)
 
 
-        logger.info('Index completed. Index write lock released.')
+        return [d[0] for d in data]
 
 
 
 
     ## Convenience methods—not necessary for functioning but useful for
     ## Convenience methods—not necessary for functioning but useful for
@@ -898,12 +999,12 @@ class LmdbStore(Store):
 
 
         @return Iterator:tuple Generator of triples.
         @return Iterator:tuple Generator of triples.
         '''
         '''
-        cur = self.curs['c:tk']
-        if cur.set_key(pk_ctx):
-            tkeys = cur.iternext_dup()
-            return {self._key_to_triple(tk) for tk in tkeys}
-        else:
-            return set()
+        with self.cur('c:tk') as cur:
+            if cur.set_key(pk_ctx):
+                tkeys = cur.iternext_dup()
+                return {self._key_to_triple(tk) for tk in tkeys}
+            else:
+                return set()
 
 
 
 
     def _ctx_for_key(self, tkey):
     def _ctx_for_key(self, tkey):
@@ -914,9 +1015,9 @@ class LmdbStore(Store):
 
 
         @return Iterator:URIRef Generator of context URIs.
         @return Iterator:URIRef Generator of context URIs.
         '''
         '''
-        cur = self.curs['tk:c']
-        if cur.set_key(tkey):
-            ctx = cur.iternext_dup()
-            return {self._unpickle(c) for c in ctx}
-        else:
-            return set()
+        with self.cur('tk:c') as cur:
+            if cur.set_key(tkey):
+                ctx = cur.iternext_dup()
+                return {self._unpickle(c) for c in ctx}
+            else:
+                return set()