Browse Source

Separate indexing process from data storage; do not depend on indices for
uncommitted data.

Stefano Cossu 7 years ago
parent
commit
4d85204b33

+ 2 - 1
lakesuperior/endpoints/ldp.py

@@ -365,7 +365,8 @@ def put_resource(uid):
     stream, mimetype = bitstream_from_req()
     stream, mimetype = bitstream_from_req()
 
 
     try:
     try:
-        rsrc = LdpFactory.from_provided(uid, content_length=request.content_length,
+        rsrc = LdpFactory.from_provided(
+                uid, content_length=request.content_length,
                 stream=stream, mimetype=mimetype, handling=handling,
                 stream=stream, mimetype=mimetype, handling=handling,
                 disposition=disposition)
                 disposition=disposition)
         if not request.content_length and rsrc.is_stored:
         if not request.content_length and rsrc.is_stored:

+ 12 - 1
lakesuperior/model/ldpr.py

@@ -153,6 +153,8 @@ class Ldpr(metaclass=ABCMeta):
         '''
         '''
         if not hasattr(self, '_imr'):
         if not hasattr(self, '_imr'):
             if hasattr(self, '_imr_options'):
             if hasattr(self, '_imr_options'):
+                self._logger.info('Getting RDF representation for resource /{}'
+                        .format(self.uid))
                 #self._logger.debug('IMR options: {}'.format(self._imr_options))
                 #self._logger.debug('IMR options: {}'.format(self._imr_options))
                 imr_options = self._imr_options
                 imr_options = self._imr_options
             else:
             else:
@@ -192,7 +194,13 @@ class Ldpr(metaclass=ABCMeta):
         Get resource metadata.
         Get resource metadata.
         '''
         '''
         if not hasattr(self, '_metadata'):
         if not hasattr(self, '_metadata'):
-            self._metadata = self.rdfly.get_metadata(self.uid)
+            if hasattr(self, '_imr'):
+                self._logger.info('Metadata is IMR.')
+                self._metadata = self._imr
+            else:
+                self._logger.info('Getting metadata for resource /{}'
+                        .format(self.uid))
+                self._metadata = self.rdfly.get_metadata(self.uid)
 
 
         return self._metadata
         return self._metadata
 
 
@@ -351,6 +359,7 @@ class Ldpr(metaclass=ABCMeta):
         return out_headers
         return out_headers
 
 
 
 
+
     def get(self):
     def get(self):
         '''
         '''
         Get an RDF representation of the resource.
         Get an RDF representation of the resource.
@@ -563,10 +572,12 @@ class Ldpr(metaclass=ABCMeta):
             self._check_ref_int(ref_int)
             self._check_ref_int(ref_int)
 
 
         self.rdfly.create_or_replace_rsrc(self.uid, self.provided_imr.graph)
         self.rdfly.create_or_replace_rsrc(self.uid, self.provided_imr.graph)
+        self.imr = self.provided_imr
 
 
         self._set_containment_rel()
         self._set_containment_rel()
 
 
         return self.RES_CREATED if create else self.RES_UPDATED
         return self.RES_CREATED if create else self.RES_UPDATED
+        #return self._head(self.provided_imr.graph)
 
 
 
 
     def _bury_rsrc(self, inbound, tstone_pointer=None):
     def _bury_rsrc(self, inbound, tstone_pointer=None):

+ 46 - 36
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -1,7 +1,7 @@
 import hashlib
 import hashlib
 import logging
 import logging
 
 
-from contextlib import ContextDecorator
+from contextlib import ContextDecorator, ExitStack
 from os import makedirs
 from os import makedirs
 from os.path import exists, abspath
 from os.path import exists, abspath
 from urllib.request import pathname2url
 from urllib.request import pathname2url
@@ -66,8 +66,12 @@ class TxnManager(ContextDecorator):
     def __exit__(self, exc_type, exc_value, traceback):
     def __exit__(self, exc_type, exc_value, traceback):
         if exc_type:
         if exc_type:
             self.store.rollback()
             self.store.rollback()
+            # If the tx fails, leave the index queue alone. There may still be
+            # jobs left from other requests.
         else:
         else:
             self.store.commit()
             self.store.commit()
+            if len(self.store._idx_queue):
+                self.store._run_indexing()
 
 
 
 
 class LmdbStore(Store):
 class LmdbStore(Store):
@@ -142,6 +146,8 @@ class LmdbStore(Store):
     idx_txn = None
     idx_txn = None
     is_txn_rw = None
     is_txn_rw = None
 
 
+    _idx_queue = set()
+
 
 
     def __init__(self, path, identifier=None):
     def __init__(self, path, identifier=None):
         self.__open = False
         self.__open = False
@@ -302,8 +308,7 @@ class LmdbStore(Store):
             needs_indexing = True
             needs_indexing = True
 
 
         if needs_indexing:
         if needs_indexing:
-            # @TODO make await; run outside of this txn
-            self._update_indices(trp_key, pk_ctx, triple=triple)
+            self._idx_queue.add((trp_key, pk_ctx, triple))
 
 
 
 
     def remove(self, triple_pattern, context=None):
     def remove(self, triple_pattern, context=None):
@@ -331,13 +336,10 @@ class LmdbStore(Store):
                         self.curs['tk:t'].set_key(trp_key)):
                         self.curs['tk:t'].set_key(trp_key)):
                     self.curs['tk:t'].delete()
                     self.curs['tk:t'].delete()
 
 
-                # @TODO make await; run outside of this txn
-                #import pdb; pdb.set_trace()
-                self._update_indices(trp_key, pk_ctx, triple)
+                self._idx_queue.add((trp_key, pk_ctx, triple))
 
 
 
 
-    # @TODO Make async
-    def _update_indices(self, trp_key, pk_ctx, triple=None):
+    def _run_indexing(self):
         '''
         '''
         Update indices for a given triple.
         Update indices for a given triple.
 
 
@@ -349,34 +351,42 @@ class LmdbStore(Store):
         This can be provided if already pre-calculated, otherwise it will be
         This can be provided if already pre-calculated, otherwise it will be
         retrieved from the store using `trp_key`.
         retrieved from the store using `trp_key`.
         '''
         '''
-        if triple is None:
-            triple = self._key_to_triple(trp_key)
-
-        s, p, o = triple
-        term_keys = {
-            'sk:tk': self._to_key(s),
-            'pk:tk': self._to_key(p),
-            'ok:tk': self._to_key(o),
-            'spk:tk': self._to_key((s, p)),
-            'sok:tk': self._to_key((s, o)),
-            'pok:tk': self._to_key((p, o)),
-        }
-
-        if self.curs['tk:t'].get(trp_key):
-            # Add to index.
-            for ikey in term_keys:
-                self.curs[ikey].put(term_keys[ikey], trp_key)
-        else:
-            # Delete from index if a match is found.
-            for ikey in term_keys:
-                if self.curs[ikey].set_key_dup(term_keys[ikey], trp_key):
-                    self.curs[ikey].delete()
-
-        # Add or remove context association index.
-        if self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
-            self.curs['c:tk'].put(pk_ctx, trp_key)
-        elif self.curs['c:tk'].set_key_dup(pk_ctx, trp_key):
-            self.curs['c:tk'].delete()
+        with self.data_env.begin(buffers=True) as data_txn:
+            data_curs = self.get_data_cursors(data_txn)
+            with self.idx_env.begin(write=True, buffers=True) as idx_txn:
+                idx_curs = self.get_idx_cursors(idx_txn)
+                while len(self._idx_queue):
+                    trp_key, pk_ctx, triple = self._idx_queue.pop()
+
+                    if triple is None:
+                        triple = self._key_to_triple(trp_key)
+
+                    s, p, o = triple
+                    term_keys = {
+                        'sk:tk': self._to_key(s),
+                        'pk:tk': self._to_key(p),
+                        'ok:tk': self._to_key(o),
+                        'spk:tk': self._to_key((s, p)),
+                        'sok:tk': self._to_key((s, o)),
+                        'pok:tk': self._to_key((p, o)),
+                    }
+
+                    if data_curs['tk:t'].get(trp_key):
+                        # Add to index.
+                        for ikey in term_keys:
+                            idx_curs[ikey].put(term_keys[ikey], trp_key)
+                    else:
+                        # Delete from index if a match is found.
+                        for ikey in term_keys:
+                            if idx_curs[ikey].set_key_dup(
+                                    term_keys[ikey], trp_key):
+                                idx_curs[ikey].delete()
+
+                    # Add or remove context association index.
+                    if data_curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+                        idx_curs['c:tk'].put(pk_ctx, trp_key)
+                    elif idx_curs['c:tk'].set_key_dup(pk_ctx, trp_key):
+                        idx_curs['c:tk'].delete()
 
 
 
 
     def triples(self, triple_pattern, context=None):
     def triples(self, triple_pattern, context=None):

+ 1 - 1
lakesuperior/store_layouts/ldp_rs/rsrc_centric_layout.py

@@ -423,7 +423,7 @@ class RsrcCentricLayout:
         Modify triples about a subject.
         Modify triples about a subject.
 
 
         This method adds and removes triple sets from specific graphs,
         This method adds and removes triple sets from specific graphs,
-        indicated by the term rotuer. It also adds metadata about the changed
+        indicated by the term router. It also adds metadata about the changed
         graphs.
         graphs.
         '''
         '''
         remove_routes = defaultdict(set)
         remove_routes = defaultdict(set)