|
@@ -12,8 +12,9 @@ from urllib.request import pathname2url
|
|
|
|
|
|
import lmdb
|
|
|
|
|
|
-from rdflib.store import Store, VALID_STORE, NO_STORE
|
|
|
from rdflib import Graph, Namespace, URIRef, Variable
|
|
|
+from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
|
|
|
+from rdflib.store import Store, VALID_STORE, NO_STORE
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -175,7 +176,7 @@ class LmdbStore(Store):
|
|
|
- t:st (term key: serialized term; 1:1)
|
|
|
- spo:c (joined S, P, O keys: context key; dupsort, dupfixed)
|
|
|
- c: (context keys only, values are the empty bytestring; 1:1)
|
|
|
- - pfx:ns (prefix: pickled namespace; unique)
|
|
|
+ - pfx:ns (prefix: pickled namespace; 1:1)
|
|
|
|
|
|
And 6 indices to optimize lookup for all possible bound/unbound term
|
|
|
combination in a triple:
|
|
@@ -185,7 +186,7 @@ class LmdbStore(Store):
|
|
|
- p:so (P key: joined S, O keys; dupsort, dupfixed)
|
|
|
- o:sp (O key: joined S, P keys; dupsort, dupfixed)
|
|
|
- c:spo (context → triple association; dupsort, dupfixed)
|
|
|
- - ns:pfx (pickled namespace: prefix; unique)
|
|
|
+ - ns:pfx (pickled namespace: prefix; 1:1)
|
|
|
|
|
|
These two data sets are stored in separate environments, i.e. separate
|
|
|
files in the filesystem. The index could be recreated from the main data
|
|
@@ -213,8 +214,6 @@ class LmdbStore(Store):
|
|
|
'''Separator byte. Used to join and split individual term keys.'''
|
|
|
SEP_BYTE = b'\x00'
|
|
|
|
|
|
- DEFAULT_GRAPH_URI = URIRef('urn:fcrepo:default_graph')
|
|
|
-
|
|
|
KEY_LENGTH = 5 # Max key length for terms. That allows for A LOT of terms.
|
|
|
KEY_START = 2 # \x00 is reserved as a separator. \x01 is spare.
|
|
|
|
|
@@ -299,11 +298,11 @@ class LmdbStore(Store):
|
|
|
def __len__(self, context=None):
|
|
|
'''
|
|
|
Return length of the dataset.
|
|
|
+
|
|
|
+ @param context (rdflib.URIRef | rdflib.Graph) Context to restrict count
|
|
|
+ to.
|
|
|
'''
|
|
|
- if context == self:
|
|
|
- context = None
|
|
|
- if isinstance(context, Graph):
|
|
|
- context = context.identifier
|
|
|
+ context = self._normalize_context(context)
|
|
|
|
|
|
if context is not None:
|
|
|
#dataset = self.triples((None, None, None), context)
|
|
@@ -348,7 +347,7 @@ class LmdbStore(Store):
|
|
|
'''
|
|
|
if not self.is_open:
|
|
|
raise RuntimeError('Store must be opened first.')
|
|
|
- logger.info('Beginning a {} transaction.'.format(
|
|
|
+ logger.debug('Beginning a {} transaction.'.format(
|
|
|
'read/write' if write else 'read-only'))
|
|
|
|
|
|
self.data_txn = self.data_env.begin(buffers=True, write=write)
|
|
@@ -480,17 +479,16 @@ class LmdbStore(Store):
|
|
|
@param quoted (bool) Not used.
|
|
|
'''
|
|
|
#import pdb; pdb.set_trace()
|
|
|
- assert context != self, "Cannot add triple directly to store"
|
|
|
+ context = self._normalize_context(context)
|
|
|
+ if context is None:
|
|
|
+ context = RDFLIB_DEFAULT_GRAPH_URI
|
|
|
+
|
|
|
Store.add(self, triple, context)
|
|
|
|
|
|
#logger.info('Adding triple: {}'.format(triple))
|
|
|
- if context is None:
|
|
|
- context = Graph(identifier=self.DEFAULT_GRAPH_URI)
|
|
|
pk_trp = self._pickle(triple)
|
|
|
|
|
|
pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
|
|
|
- if isinstance(context, Graph):
|
|
|
- context = context.identifier
|
|
|
#logger.debug('Adding quad: {} {}'.format(triple, context))
|
|
|
pk_c = self._pickle(context)
|
|
|
|
|
@@ -532,9 +530,8 @@ class LmdbStore(Store):
|
|
|
'''
|
|
|
#logger.debug('Removing triples by pattern: {} on context: {}'.format(
|
|
|
# triple_pattern, context))
|
|
|
+ context = self._normalize_context(context)
|
|
|
if context is not None:
|
|
|
- if isinstance(context, Graph):
|
|
|
- context = context.identifier
|
|
|
ck = self._to_key(context)
|
|
|
# If context is specified but not found, return to avoid deleting
|
|
|
# the wrong triples.
|
|
@@ -585,12 +582,8 @@ class LmdbStore(Store):
|
|
|
# triple_pattern, context))
|
|
|
# This sounds strange, RDFLib should be passing None at this point,
|
|
|
# but anyway...
|
|
|
- if isinstance(context, Graph) and isinstance(
|
|
|
- context.identifier, Variable):
|
|
|
- context = None
|
|
|
- if isinstance(context, Graph):
|
|
|
- context = context.identifier
|
|
|
- #logger.debug('Converted graph into URI: {}'.format(context))
|
|
|
+ context = self._normalize_context(context)
|
|
|
+
|
|
|
with self.cur('spo:c') as cur:
|
|
|
for spok in self._triple_keys(triple_pattern, context):
|
|
|
if context is not None:
|
|
@@ -825,10 +818,9 @@ class LmdbStore(Store):
|
|
|
|
|
|
# Regular lookup.
|
|
|
else:
|
|
|
- for spok in self._lookup(triple_pattern):
|
|
|
- if cur.set_key_dup(ck, spok):
|
|
|
- yield spok
|
|
|
- return
|
|
|
+ yield from (
|
|
|
+ spok for spok in self._lookup(triple_pattern)
|
|
|
+ if cur.set_key_dup(ck, spok))
|
|
|
else:
|
|
|
yield from self._lookup(triple_pattern)
|
|
|
|
|
@@ -928,95 +920,28 @@ class LmdbStore(Store):
|
|
|
return hashlib.new(self.KEY_HASH_ALGO, s).digest()
|
|
|
|
|
|
|
|
|
- def _lookup(self, triple_pattern):
|
|
|
+ def _normalize_context(self, context):
|
|
|
'''
|
|
|
- Look up triples in the indices based on a triple pattern.
|
|
|
+ Normalize a context parameter to conform to the model expectations.
|
|
|
|
|
|
- @return iterator of matching triple keys.
|
|
|
+ @param context (URIRef | Graph | None) Context URI or graph.
|
|
|
'''
|
|
|
- def lookup_1bound(label, term):
|
|
|
- '''
|
|
|
- Lookup triples for a pattern with one bound term.
|
|
|
- '''
|
|
|
- #import pdb; pdb.set_trace()
|
|
|
- k = self._to_key(term)
|
|
|
- if not k:
|
|
|
- return iter(())
|
|
|
- idx_name = label + ':' + 'spo'.replace(label, '')
|
|
|
- term_order = self._lookup_ordering[idx_name]
|
|
|
- with self.cur(idx_name) as cur:
|
|
|
- if cur.set_key(k):
|
|
|
- for match in cur.iternext_dup():
|
|
|
- subkeys = bytes(match).split(self.SEP_BYTE)
|
|
|
-
|
|
|
- # Compose result.
|
|
|
- out = [None, None, None]
|
|
|
- out[term_order[0]] = k
|
|
|
- out[term_order[1]] = subkeys[0]
|
|
|
- out[term_order[2]] = subkeys[1]
|
|
|
+ if isinstance(context, Graph):
|
|
|
+ if context == self or isinstance(context.identifier, Variable):
|
|
|
+ context = None
|
|
|
+ else:
|
|
|
+ context = context.identifier
|
|
|
+ #logger.debug('Converted graph into URI: {}'.format(context))
|
|
|
|
|
|
- yield self.SEP_BYTE.join(out)
|
|
|
+ return context
|
|
|
|
|
|
|
|
|
- def lookup_2bound(bound_terms):
|
|
|
- '''
|
|
|
- Look up triples for a pattern with two bound terms.
|
|
|
-
|
|
|
- @param bound terms (dict) Triple labels and terms to search for,
|
|
|
- in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
|
|
|
- URIRef('urn:o:1')}
|
|
|
- '''
|
|
|
- #import pdb; pdb.set_trace()
|
|
|
- if len(bound_terms) != 2:
|
|
|
- raise ValueError(
|
|
|
- 'Exactly 2 terms need to be bound. Got {}'.format(
|
|
|
- len(bound_terms)))
|
|
|
-
|
|
|
- # Establish lookup ranking.
|
|
|
- luc = None
|
|
|
- for k_label in self._lookup_rank:
|
|
|
- if k_label in bound_terms.keys():
|
|
|
- # First match is lookup term.
|
|
|
- if not luc:
|
|
|
- v_label = 'spo'.replace(k_label, '')
|
|
|
- # Lookup database key (cursor) name
|
|
|
- luc = k_label + ':' + v_label
|
|
|
- term_order = self._lookup_ordering[luc]
|
|
|
- # Term to look up
|
|
|
- luk = self._to_key(bound_terms[k_label])
|
|
|
- if not luk:
|
|
|
- return iter(())
|
|
|
- # Position of key in final triple.
|
|
|
- # Second match is the filter.
|
|
|
- else:
|
|
|
- # Filter key (position of sub-key in lookup results)
|
|
|
- fpos = v_label.index(k_label)
|
|
|
- # Fliter term
|
|
|
- ft = self._to_key(bound_terms[k_label])
|
|
|
- if not ft:
|
|
|
- return iter(())
|
|
|
- break
|
|
|
-
|
|
|
- # Look up in index.
|
|
|
- with self.cur(luc) as cur:
|
|
|
- if cur.set_key(luk):
|
|
|
- # Iterate over matches and filter by second term.
|
|
|
- for match in cur.iternext_dup():
|
|
|
- subkeys = bytes(match).split(self.SEP_BYTE)
|
|
|
- flt_subkey = subkeys[fpos]
|
|
|
- if flt_subkey == ft:
|
|
|
- # Remainder (not filter) key used to complete the
|
|
|
- # triple.
|
|
|
- r_subkey = subkeys[1-fpos]
|
|
|
-
|
|
|
- # Compose result.
|
|
|
- out = [None, None, None]
|
|
|
- out[term_order[0]] = luk
|
|
|
- out[term_order[fpos+1]] = flt_subkey
|
|
|
- out[term_order[2-fpos]] = r_subkey
|
|
|
-
|
|
|
- yield self.SEP_BYTE.join(out)
|
|
|
+ def _lookup(self, triple_pattern):
|
|
|
+ '''
|
|
|
+ Look up triples in the indices based on a triple pattern.
|
|
|
|
|
|
+ @return iterator of matching triple keys.
|
|
|
+ '''
|
|
|
s, p, o = triple_pattern
|
|
|
|
|
|
if s is not None:
|
|
@@ -1032,26 +957,26 @@ class LmdbStore(Store):
|
|
|
return iter(())
|
|
|
# s p ?
|
|
|
else:
|
|
|
- yield from lookup_2bound({'s': s, 'p': p})
|
|
|
+ yield from self._lookup_2bound({'s': s, 'p': p})
|
|
|
else:
|
|
|
# s ? o
|
|
|
if o is not None:
|
|
|
- yield from lookup_2bound({'s': s, 'o': o})
|
|
|
+ yield from self._lookup_2bound({'s': s, 'o': o})
|
|
|
# s ? ?
|
|
|
else:
|
|
|
- yield from lookup_1bound('s', s)
|
|
|
+ yield from self._lookup_1bound('s', s)
|
|
|
else:
|
|
|
if p is not None:
|
|
|
# ? p o
|
|
|
if o is not None:
|
|
|
- yield from lookup_2bound({'p': p, 'o': o})
|
|
|
+ yield from self._lookup_2bound({'p': p, 'o': o})
|
|
|
# ? p ?
|
|
|
else:
|
|
|
- yield from lookup_1bound('p', p)
|
|
|
+ yield from self._lookup_1bound('p', p)
|
|
|
else:
|
|
|
# ? ? o
|
|
|
if o is not None:
|
|
|
- yield from lookup_1bound('o', o)
|
|
|
+ yield from self._lookup_1bound('o', o)
|
|
|
# ? ? ?
|
|
|
else:
|
|
|
# Get all triples in the database.
|
|
@@ -1059,6 +984,92 @@ class LmdbStore(Store):
|
|
|
yield from cur.iternext_nodup()
|
|
|
|
|
|
|
|
|
+ def _lookup_1bound(self, label, term):
|
|
|
+ '''
|
|
|
+ Lookup triples for a pattern with one bound term.
|
|
|
+
|
|
|
+ @TODO This can be called millions of times in a larger SPARQL
|
|
|
+ query, so it better be as efficient as it gets.
|
|
|
+ '''
|
|
|
+ #import pdb; pdb.set_trace()
|
|
|
+ k = self._to_key(term)
|
|
|
+ if not k:
|
|
|
+ return iter(())
|
|
|
+ idx_name = '{}:{}'.format(label, 'spo'.replace(label, ''))
|
|
|
+ term_order = self._lookup_ordering[idx_name]
|
|
|
+ with self.cur(idx_name) as cur:
|
|
|
+ if cur.set_key(k):
|
|
|
+ for match in cur.iternext_dup():
|
|
|
+ subkeys = bytes(match).split(self.SEP_BYTE)
|
|
|
+
|
|
|
+ # Compose result.
|
|
|
+ out = [None, None, None]
|
|
|
+ out[term_order[0]] = k
|
|
|
+ out[term_order[1]] = subkeys[0]
|
|
|
+ out[term_order[2]] = subkeys[1]
|
|
|
+
|
|
|
+ yield self.SEP_BYTE.join(out)
|
|
|
+
|
|
|
+
|
|
|
+ def _lookup_2bound(self, bound_terms):
|
|
|
+ '''
|
|
|
+ Look up triples for a pattern with two bound terms.
|
|
|
+
|
|
|
+ @param bound terms (dict) Triple labels and terms to search for,
|
|
|
+ in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
|
|
|
+ URIRef('urn:o:1')}
|
|
|
+ '''
|
|
|
+ #import pdb; pdb.set_trace()
|
|
|
+ if len(bound_terms) != 2:
|
|
|
+ raise ValueError(
|
|
|
+ 'Exactly 2 terms need to be bound. Got {}'.format(
|
|
|
+ len(bound_terms)))
|
|
|
+
|
|
|
+ # Establish lookup ranking.
|
|
|
+ luc = None
|
|
|
+ for k_label in self._lookup_rank:
|
|
|
+ if k_label in bound_terms.keys():
|
|
|
+ # First match is lookup term.
|
|
|
+ if not luc:
|
|
|
+ v_label = 'spo'.replace(k_label, '')
|
|
|
+ # Lookup database key (cursor) name
|
|
|
+ luc = k_label + ':' + v_label
|
|
|
+ term_order = self._lookup_ordering[luc]
|
|
|
+ # Term to look up
|
|
|
+ luk = self._to_key(bound_terms[k_label])
|
|
|
+ if not luk:
|
|
|
+ return iter(())
|
|
|
+ # Position of key in final triple.
|
|
|
+ # Second match is the filter.
|
|
|
+ else:
|
|
|
+ # Filter key (position of sub-key in lookup results)
|
|
|
+ fpos = v_label.index(k_label)
|
|
|
+ # Fliter term
|
|
|
+ ft = self._to_key(bound_terms[k_label])
|
|
|
+ if not ft:
|
|
|
+ return iter(())
|
|
|
+ break
|
|
|
+
|
|
|
+ # Look up in index.
|
|
|
+ with self.cur(luc) as cur:
|
|
|
+ if cur.set_key(luk):
|
|
|
+ # Iterate over matches and filter by second term.
|
|
|
+ for match in cur.iternext_dup():
|
|
|
+ subkeys = bytes(match).split(self.SEP_BYTE)
|
|
|
+ flt_subkey = subkeys[fpos]
|
|
|
+ if flt_subkey == ft:
|
|
|
+ # Remainder (not filter) key used to complete the
|
|
|
+ # triple.
|
|
|
+ r_subkey = subkeys[1-fpos]
|
|
|
+
|
|
|
+ # Compose result.
|
|
|
+ out = [None, None, None]
|
|
|
+ out[term_order[0]] = luk
|
|
|
+ out[term_order[fpos+1]] = flt_subkey
|
|
|
+ out[term_order[2-fpos]] = r_subkey
|
|
|
+
|
|
|
+ yield self.SEP_BYTE.join(out)
|
|
|
+
|
|
|
def _append(self, cur, values, **kwargs):
|
|
|
'''
|
|
|
Append one or more values to the end of a database.
|