Explorar o código

Spawn new thread for indexing. Hangs on parallel requests.

Stefano Cossu %!s(int64=7) %!d(string=hai) anos
pai
achega
d59597bbb1
Modificáronse 1 ficheiros con 47 adicións e 36 borrados
  1. 47 36
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py

+ 47 - 36
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -4,6 +4,7 @@ import logging
 from contextlib import ContextDecorator, ExitStack
 from os import makedirs
 from os.path import exists, abspath
+from threading import Lock, Thread
 from urllib.request import pathname2url
 
 import lmdb
@@ -71,7 +72,9 @@ class TxnManager(ContextDecorator):
         else:
             self.store.commit()
             if len(self.store._idx_queue):
-                self.store._run_indexing()
+                job = Thread(target=self.store._run_indexing)
+                job.start()
+                logger.info('Started indexing job #{}'.format(job.ident))
 
 
 class LmdbStore(Store):
@@ -351,42 +354,50 @@ class LmdbStore(Store):
         This can be provided if already pre-calculated, otherwise it will be
         retrieved from the store using `trp_key`.
         '''
-        with self.data_env.begin(buffers=True) as data_txn:
+        with ExitStack() as stack:
+            data_txn = stack.enter_context(self.data_env.begin(buffers=True))
+            idx_txn = stack.enter_context(
+                    self.idx_env.begin(write=True, buffers=True))
             data_curs = self.get_data_cursors(data_txn)
-            with self.idx_env.begin(write=True, buffers=True) as idx_txn:
-                idx_curs = self.get_idx_cursors(idx_txn)
-                while len(self._idx_queue):
-                    trp_key, pk_ctx, triple = self._idx_queue.pop()
-
-                    if triple is None:
-                        triple = self._key_to_triple(trp_key)
-
-                    s, p, o = triple
-                    term_keys = {
-                        'sk:tk': self._to_key(s),
-                        'pk:tk': self._to_key(p),
-                        'ok:tk': self._to_key(o),
-                        'spk:tk': self._to_key((s, p)),
-                        'sok:tk': self._to_key((s, o)),
-                        'pok:tk': self._to_key((p, o)),
-                    }
-
-                    if data_curs['tk:t'].get(trp_key):
-                        # Add to index.
-                        for ikey in term_keys:
-                            idx_curs[ikey].put(term_keys[ikey], trp_key)
-                    else:
-                        # Delete from index if a match is found.
-                        for ikey in term_keys:
-                            if idx_curs[ikey].set_key_dup(
-                                    term_keys[ikey], trp_key):
-                                idx_curs[ikey].delete()
-
-                    # Add or remove context association index.
-                    if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-                        idx_curs['c:tk'].put(pk_ctx, trp_key)
-                    elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
-                        idx_curs['c:tk'].delete()
+            idx_curs = self.get_idx_cursors(idx_txn)
+
+            lock = Lock()
+            while len(self._idx_queue):
+                lock.acquire()
+                trp_key, pk_ctx, triple = self._idx_queue.pop()
+
+                if triple is None:
+                    triple = self._key_to_triple(trp_key)
+
+                s, p, o = triple
+                term_keys = {
+                    'sk:tk': self._to_key(s),
+                    'pk:tk': self._to_key(p),
+                    'ok:tk': self._to_key(o),
+                    'spk:tk': self._to_key((s, p)),
+                    'sok:tk': self._to_key((s, o)),
+                    'pok:tk': self._to_key((p, o)),
+                }
+
+                if data_curs['tk:t'].get(trp_key):
+                    # Add to index.
+                    for ikey in term_keys:
+                        idx_curs[ikey].put(term_keys[ikey], trp_key)
+                else:
+                    # Delete from index if a match is found.
+                    for ikey in term_keys:
+                        if idx_curs[ikey].set_key_dup(
+                                term_keys[ikey], trp_key):
+                            idx_curs[ikey].delete()
+
+                # Add or remove context association index.
+                if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+                    idx_curs['c:tk'].put(pk_ctx, trp_key)
+                elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
+                    idx_curs['c:tk'].delete()
+                lock.release()
+
+        logger.info('Index completed.')
 
 
     def triples(self, triple_pattern, context=None):