Browse Source

Run indexing in a sub-thread.

Stefano Cossu 7 years ago
parent
commit
5ffe3af045
1 changed files with 229 additions and 115 deletions
  1. 229 115
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py

+ 229 - 115
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -71,6 +71,8 @@ class TxnManager(ContextDecorator):
             # jobs left from other requests.
         else:
             self.store.commit()
+            if len(self.store._data_queue):
+                self.store._apply_changes()
             if len(self.store._idx_queue):
                 #self.store._run_indexing()
                 job = Thread(target=self.store._run_indexing)
@@ -113,6 +115,8 @@ class LmdbStore(Store):
     we may want to index term hashes.
     '''
     context_aware = True
+    # This is a hassle to maintain for no apparent gain. If some use is devised
+    # in the future, it may be revised.
     formula_aware = False
     graph_aware = True
     transaction_aware = True
@@ -150,6 +154,21 @@ class LmdbStore(Store):
     idx_txn = None
     is_txn_rw = None
 
+    '''
+    List of actions to be performed when a transaction is committed.
+
+    Each element is a tuple of (action name, database index, key, value).
+    '''
+    _data_queue = []
+    '''
+    Set of indices to update. A set has been preferred to a list since the
+    index update don't need to be sequential and there may be duplicate entries
+    that can be eliminated.
+
+    Each element is a tuple of (triple key, pickled context, pre-pickled triple
+    ). The third value can be None, and in that case, it is calculated from
+    the triple key.
+    '''
     _idx_queue = set()
 
 
@@ -163,6 +182,21 @@ class LmdbStore(Store):
         self._unpickle = self.node_pickler.loads
 
 
+    def __len__(self, context=None):
+        '''
+        Return length of the dataset.
+        '''
+        if context == self or context is None:
+            context = Graph(identifier=self.DEFAULT_GRAPH_URI)
+
+        if context.identifier is not self.DEFAULT_GRAPH_URI:
+            #dataset = self.triples((None, None, None), context)
+            dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
+            return len(set(dataset))
+        else:
+            return self.data_txn.stat(self.dbs['tk:t'])['entries']
+
+
     @property
     def is_open(self):
         return self.__open
@@ -196,8 +230,7 @@ class LmdbStore(Store):
             raise RuntimeError('Store must be opened first.')
         logger.info('Beginning a {} transaction.'.format(
             'read/write' if write else 'read-only'))
-        self.data_txn = self.data_env.begin(write=write, buffers=True)
-        # Index transaction is read-write only for indexing jobs.
+        self.data_txn = self.data_env.begin(buffers=True)
         self.idx_txn = self.idx_env.begin(buffers=True)
         self.is_txn_rw = write
         # Cursors.
@@ -221,6 +254,22 @@ class LmdbStore(Store):
             return True
 
 
+    def cur(self, index):
+        '''
+        Return a new cursor by its index.
+        '''
+        if index in self.idx_keys:
+            txn = self.idx_txn
+            src = self.idx_keys
+        elif index in self.data_keys:
+            txn = self.data_txn
+            src = self.data_keys
+        else:
+            return ValueError('Cursor key not found.')
+
+        return txn.cursor(self.dbs[index])
+
+
     def get_data_cursors(self, txn):
         '''
         Build the main data cursors for a transaction.
@@ -303,15 +352,18 @@ class LmdbStore(Store):
         trp_key = hashlib.new(self.KEY_HASH_ALGO, pk_trp).digest()
 
         needs_indexing = False
-        if self.curs['tk:t'].put(trp_key, pk_trp, overwrite=False):
-            needs_indexing = True
+        with self.cur('tk:t') as cur:
+            if not cur.set_key(trp_key):
+                self._enqueue_action('put', 'tk:t', trp_key, pk_trp)
+                needs_indexing = True
 
         pk_ctx = self._pickle(context.identifier) \
                 if isinstance(context, Graph) \
                 else self._pickle(context)
-        if not self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-            self.curs['tk:c'].put(trp_key, pk_ctx)
-            needs_indexing = True
+        with self.cur('tk:c') as cur:
+            if not cur.set_key_dup(trp_key, pk_ctx):
+                self._enqueue_action('put', 'tk:c', trp_key, pk_ctx)
+                needs_indexing = True
 
         if needs_indexing:
             self._idx_queue.add((trp_key, pk_ctx, triple))
@@ -333,74 +385,19 @@ class LmdbStore(Store):
                 else self._pickle(context)
         for trp_key in self._triple_keys(triple_pattern, context):
             # Delete context association.
-            if self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-                triple = self._key_to_triple(trp_key)
-                self.curs['tk:c'].delete()
-
-                # If no other contexts are associated w/ the triple, delete it.
-                if not self.curs['tk:c'].set_key(trp_key) and (
-                        self.curs['tk:t'].set_key(trp_key)):
-                    self.curs['tk:t'].delete()
-
-                self._idx_queue.add((trp_key, pk_ctx, triple))
-
-
-    def _run_indexing(self):
-        '''
-        Update indices for a given triple.
-
-        If the triple is found, add indices. if it is not found, delete them.
-
-        @param key (bytes) Unique key associated with the triple.
-        @param pk_ctx (bytes) Pickled context term.
-        @param triple (tuple: rdflib.Identifier) Tuple of 3 RDFLib terms.
-        This can be provided if already pre-calculated, otherwise it will be
-        retrieved from the store using `trp_key`.
-        '''
-        with ExitStack() as stack:
-            data_txn = stack.enter_context(self.data_env.begin(buffers=True))
-            idx_txn = stack.enter_context(
-                    self.idx_env.begin(write=True, buffers=True))
-            data_curs = self.get_data_cursors(data_txn)
-            idx_curs = self.get_idx_cursors(idx_txn)
-
-            lock = Lock()
-            while len(self._idx_queue):
-                lock.acquire()
-                trp_key, pk_ctx, triple = self._idx_queue.pop()
-
-                if triple is None:
+            with self.cur('tk:c') as cur:
+                if cur.set_key_dup(trp_key, pk_ctx):
                     triple = self._key_to_triple(trp_key)
+                    self._enqueue_action('delete', 'tk:c', trp_key, pk_ctx)
 
-                s, p, o = triple
-                term_keys = {
-                    'sk:tk': self._to_key(s),
-                    'pk:tk': self._to_key(p),
-                    'ok:tk': self._to_key(o),
-                    'spk:tk': self._to_key((s, p)),
-                    'sok:tk': self._to_key((s, o)),
-                    'pok:tk': self._to_key((p, o)),
-                }
-
-                if data_curs['tk:t'].get(trp_key):
-                    # Add to index.
-                    for ikey in term_keys:
-                        idx_curs[ikey].put(term_keys[ikey], trp_key)
-                else:
-                    # Delete from index if a match is found.
-                    for ikey in term_keys:
-                        if idx_curs[ikey].set_key_dup(
-                                term_keys[ikey], trp_key):
-                            idx_curs[ikey].delete()
-
-                # Add or remove context association index.
-                if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-                    idx_curs['c:tk'].put(pk_ctx, trp_key)
-                elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
-                    idx_curs['c:tk'].delete()
-                lock.release()
+                    # If no other contexts are associated with the triple,
+                    # delete it.
+                    with self.cur('tk:t') as trp_cur:
+                        if not cur.set_key(trp_key):
+                            self._enqueue_action(
+                                    'delete', 'tk:c', trp_key, None)
 
-        logger.info('Index completed.')
+                    self._idx_queue.add((trp_key, pk_ctx, triple))
 
 
     def triples(self, triple_pattern, context=None):
@@ -415,21 +412,6 @@ class LmdbStore(Store):
             yield self._key_to_triple(tk), context
 
 
-    def __len__(self, context=None):
-        '''
-        Return length of the dataset.
-        '''
-        if context == self or context is None:
-            context = Graph(identifier=self.DEFAULT_GRAPH_URI)
-
-        if context.identifier is not self.DEFAULT_GRAPH_URI:
-            #dataset = self.triples((None, None, None), context)
-            dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
-            return len(set(dataset))
-        else:
-            return self.data_txn.stat(self.dbs['tk:t'])['entries']
-
-
     def bind(self, prefix, namespace):
         '''
         Bind a prefix to a namespace.
@@ -446,9 +428,9 @@ class LmdbStore(Store):
         '''
         Get the namespace for a prefix.
         '''
-        ns = self.curs['pfx:ns'].get(s2b(prefix))
-
-        return Namespace(b2s(ns)) if ns is not None else None
+        with self.cur('pfx:ns') as cur:
+            ns = cur.get(s2b(prefix))
+            return Namespace(b2s(ns)) if ns is not None else None
 
 
     def prefix(self, namespace):
@@ -458,18 +440,18 @@ class LmdbStore(Store):
         @NOTE A namespace can be only bound to one prefix in this
         implementation.
         '''
-        prefix = self.curs['ns:pfx'].get(s2b(namespace))
-
-        return b2s(prefix) if prefix is not None else None
+        with self.cur('ns:pfx') as cur:
+            prefix = cur.get(s2b(namespace))
+            return b2s(prefix) if prefix is not None else None
 
 
     def namespaces(self):
         '''
         Get an iterator of all prefix: namespace bindings.
         '''
-        bindings = iter(self.curs['pfx:ns'])
-
-        return ((b2s(pfx), Namespace(b2s(ns))) for pfx, ns in bindings)
+        with self.cur('pfx:ns') as cur:
+            bindings = iter(cur)
+            return ((b2s(pfx), Namespace(b2s(ns))) for pfx, ns in bindings)
 
 
     def contexts(self, triple=None):
@@ -479,10 +461,12 @@ class LmdbStore(Store):
         @return generator:URIRef
         '''
         if triple:
-            self.curs['tk:c'].set_key(self._to_key(triple))
-            contexts = self.curs['tk:c'].iternext_dup()
+            with self.cur('tk:c') as cur:
+                cur.set_key(self._to_key(triple))
+                contexts = cur.iternext_dup()
         else:
-            contexts = self.curs['c:tk'].iternext_nodup()
+            with self.cur('c:tk') as cur:
+                contexts = cur.iternext_nodup()
 
         return (self._unpickle(ctx) for ctx in contexts)
 
@@ -491,24 +475,26 @@ class LmdbStore(Store):
         '''
         Add a graph to the database.
 
+        This creates an empty graph by associating the graph URI with the
+        pickled `None` value. This prevents from removing the graph when all
+        triples are removed.
+
         This may be called by supposedly read-only operations:
         https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
         Therefore it needs to open a write transaction. This is not ideal
-        but the only way to play well with RDFLib.
+        but the only way to handle datasets in RDFLib.
 
         @param graph (URIRef) URI of the named graph to add.
         '''
-        if not self.is_txn_rw:
-            with self.data_env.begin(write=True).cursor(self.dbs['tk:c']) \
-                    as tk2c_cur:
-                tk2c_cur.put(self._pickle(None), self._pickle(graph))
-        else:
-            self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
+        pk_none = self._pickle(None)
+        pk_ctx = self._pickle(graph)
+        with self.data_env.begin(write=True).cursor(self.dbs['tk:c']) \
+                as tk2c_cur:
+            tk2c_cur.put(pk_none, pk_ctx)
 
-        # Open a write tx for indices.
         with self.idx_env.begin(write=True)\
                 .cursor(self.dbs['c:tk']) as c2tk_cur:
-            c2tk_cur.put(self._pickle(graph), self._pickle(None))
+            c2tk_cur.put(pk_ctx, pk_none)
 
 
     def remove_graph(self, graph):
@@ -519,18 +505,19 @@ class LmdbStore(Store):
         '''
         self.remove((None, None, None), graph)
 
-        if self.curs['tk:c'].set_key_dup(
-                self._pickle(None), self._pickle(graph)):
-            self.curs['tk:c'].delete()
+        pk_none = self._pickle(None)
+        pk_ctx = self._pickle(graph)
+        self._enqueue_action('delete', 'tk:c', pk_none, pk_ctx)
+        self._idx_queue.add((None, pk_ctx, None))
 
-        if self.curs['c:tk'].set_key_dup(
-                self._pickle(graph), self._pickle(None)):
-            self.curs['tk:c'].delete()
+        with self.cur('c:tk') as cur:
+            if cur.set_key_dup(self._pickle(graph), self._pickle(None)):
+                self.curs['tk:c'].delete()
 
 
     def commit(self):
         '''
-        Commit main transaction.
+        Commit main transaction and push action queue.
         '''
         if self.is_txn_open:
             self.data_txn.commit()
@@ -761,6 +748,133 @@ class LmdbStore(Store):
         else:
             return iter(())
 
+
+    def _enqueue_action(self, action, db, k, v):
+        '''
+        Enqueue an action to be performed in a write transaction.
+
+        Actions are accumulated sequentially and then executed once the
+        `_run_update` method is called. This is usually done by the
+        TxnManager class.
+
+        @param action (string) One of 'put', 'putmulti' or 'delete'.
+        @param db (string) Label of the database to perform the action.
+        @param k (bytes) Key to update.
+        @param v (bytes) Value to insert or delete.
+        '''
+        if not action in ('put', 'putmulti', 'delete'):
+            raise NameError('No action with name {}.'.format(action))
+
+        self._data_queue.append((action, db, k, v))
+
+
+    def _apply_changes(self):
+        '''
+        Apply changes in `_data_queue`.
+        '''
+        with ExitStack() as stack:
+            data_txn = stack.enter_context(
+                    self.data_env.begin(write=True, buffers=True))
+            logger.info('Beginning data insert. Data write lock acquired.')
+
+            curs = {
+                task[1]: stack.enter_context(
+                        data_txn.cursor(self.dbs[task[1]]))
+                for task in self._data_queue
+            }
+            while len(self._data_queue):
+                action, db, k, v = self._data_queue.pop()
+                if action == 'put':
+                    curs[db].put(k, v)
+                elif action == 'putmulti':
+                    # With 'putmulti', `k` is a series of 2-tuples and `v` is
+                    # ignored.
+                    data = k
+                    curs[db].putmulti(data)
+                elif action == 'delete':
+                    if v is None:
+                        # Delete all values for the key.
+                        if curs[db].set_key(k):
+                            curs[db].delete(dupdata=True)
+                    else:
+                        # Delete only a specific k:v pair.
+                        if curs[db].set_key_dup(k, v):
+                            curs[db].delete(dupdata=False)
+                else:
+                    raise ValueError(
+                        'Action type \'{}\' is not supported.' .format(action))
+        logger.info('Data insert completed. Data write lock released.')
+
+
+    def _run_indexing(self):
+        '''
+        Update indices for a given triple.
+
+        If the triple is found, add indices. if it is not found, delete them.
+        This method is run asynchronously and may outlive the HTTP request.
+
+        @param key (bytes) Unique key associated with the triple.
+        @param pk_ctx (bytes) Pickled context term.
+        @param triple (tuple: rdflib.Identifier) Tuple of 3 RDFLib terms.
+        This can be provided if already pre-calculated, otherwise it will be
+        retrieved from the store using `trp_key`.
+        '''
+        with ExitStack() as stack:
+            data_txn = stack.enter_context(self.data_env.begin(buffers=True))
+            idx_txn = stack.enter_context(
+                    self.idx_env.begin(write=True, buffers=True))
+            logger.info('Index started. Index write lock acquired.')
+            data_curs = self.get_data_cursors(data_txn)
+            idx_curs = self.get_idx_cursors(idx_txn)
+
+            lock = Lock()
+            while len(self._idx_queue):
+                lock.acquire()
+                trp_key, pk_ctx, triple = self._idx_queue.pop()
+
+                if trp_key is None and triple is None:
+                    # This is when a graph is deleted.
+                    if not data_curs['tk:c'].set_key(pk_ctx):
+                        pk_none = self._pickle(None)
+                        if idx_curs['c:tk'].set_key_dup(pk_none, pk_ctx):
+                            idx_curs['c:tk'].delete()
+                    lock.release()
+                    continue
+
+                if triple is None:
+                    triple = self._key_to_triple(trp_key)
+
+                s, p, o = triple
+                term_keys = {
+                    'sk:tk': self._to_key(s),
+                    'pk:tk': self._to_key(p),
+                    'ok:tk': self._to_key(o),
+                    'spk:tk': self._to_key((s, p)),
+                    'sok:tk': self._to_key((s, o)),
+                    'pok:tk': self._to_key((p, o)),
+                }
+
+                if data_curs['tk:t'].get(trp_key):
+                    # Add to index.
+                    for ikey in term_keys:
+                        idx_curs[ikey].put(term_keys[ikey], trp_key)
+                else:
+                    # Delete from index if a match is found.
+                    for ikey in term_keys:
+                        if idx_curs[ikey].set_key_dup(
+                                term_keys[ikey], trp_key):
+                            idx_curs[ikey].delete()
+
+                # Add or remove context association index.
+                if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+                    idx_curs['c:tk'].put(pk_ctx, trp_key)
+                elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
+                    idx_curs['c:tk'].delete()
+                lock.release()
+
+        logger.info('Index completed. Index write lock released.')
+
+
     ## Convenience methods—not necessary for functioning but useful for
     ## debugging.