123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221 |
- import hashlib
- import logging
- import os
- from contextlib import ContextDecorator, ExitStack
- from os import makedirs
- from os.path import exists, abspath
- from shutil import rmtree
- from urllib.request import pathname2url
- import lmdb
- from rdflib import Graph, Namespace, URIRef, Variable
- from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
- from rdflib.store import Store, VALID_STORE, NO_STORE
- from lakesuperior import env
- logger = logging.getLogger(__name__)
- def s2b(u, enc='UTF-8'):
- """
- Convert a string into a bytes object.
- """
- return u.encode(enc)
- def b2s(u, enc='UTF-8'):
- """
- Convert a bytes or memoryview object into a string.
- """
- return bytes(u).decode(enc)
- class TxnManager(ContextDecorator):
- """
- Handle ACID transactions with an LmdbStore.
- Wrap this within a ``with`` statement::
- >>> with TxnManager(store, True):
- ... # Do something with the database
- >>>
- The transaction will be opened and handled automatically.
- """
- def __init__(self, store, write=False):
- """
- Begin and close a transaction in a store.
- :param LmdbStore store: The store to open a transaction on.
- :param bool write: Whether the transaction is read-write. Default is
- ``False`` (read-only transaction).
- """
- self.store = store
- self.write = write
- def __enter__(self):
- # Only open a R/W transaction if one is not already open.
- if not self.write or not self.store.is_txn_rw:
- self.store.begin(write=self.write)
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type:
- self.store.rollback()
- else:
- self.store.commit()
- class LexicalSequence:
- """
- Fixed-length lexicographically ordered byte sequence.
- Useful to generate optimized sequences of keys in LMDB.
- """
- def __init__(self, start=1, max_len=5):
- """
- Create a new lexical sequence.
- :param bytes start: Starting byte value. Bytes below this value are
- never found in this sequence. This is useful to allot special bytes
- to be used e.g. as separators.
- :param int max_len: Maximum number of bytes that a byte string can
- contain. This should be chosen carefully since the number of all
- possible key combinations is determined by this value and the
- ``start`` value. The default args provide 255**5 (~1 Tn) unique
- combinations.
- """
- self.start = start
- self.length = max_len
- def first(self):
- """First possible combination."""
- return bytearray([self.start] * self.length)
- def next(self, n):
- """
- Calculate the next closest byte sequence in lexicographical order.
- This is used to fill the next available slot after the last one in
- LMDB. Keys are byte strings, which is a convenient way to keep key
- lengths as small as possible when they are referenced in several
- indices.
- This function assumes that all the keys are padded with the `start`
- value up to the `max_len` length.
- :param bytes n: Current byte sequence to add to.
- """
- if not n:
- n = self.first()
- elif isinstance(n, bytes) or isinstance(n, memoryview):
- n = bytearray(n)
- elif not isinstance(n, bytearray):
- raise ValueError('Input sequence must be bytes or a bytearray.')
- if not len(n) == self.length:
- raise ValueError('Incorrect sequence length.')
- for i, b in list(enumerate(n))[::-1]:
- try:
- n[i] += 1
- # If the value exceeds 255, i.e. the current value is the last one
- except ValueError:
- if i == 0:
- raise RuntimeError('BAD DAY: Sequence exhausted. No more '
- 'combinations are possible.')
- # Move one position up and try to increment that.
- else:
- n[i] = self.start
- continue
- else:
- return bytes(n)
- class LmdbStore(Store):
- """
- LMDB-backed store.
- This is an implementation of the RDFLib Store interface:
- https://github.com/RDFLib/rdflib/blob/master/rdflib/store.py
- Handles the interaction with a LMDB store and builds an abstraction layer
- for triples.
- This store class uses two LMDB environments (i.e. two files): one for the
- main (preservation-worthy) data and the other for the index data which
- can be rebuilt from the main database.
- There are 4 main data sets (preservation worthy data):
- - ``t:st`` (term key: serialized term; 1:1)
- - ``spo:c`` (joined S, P, O keys: context key; dupsort, dupfixed)
- - ``c:`` (context keys only, values are the empty bytestring; 1:1)
- - ``pfx:ns`` (prefix: pickled namespace; 1:1)
- And 6 indices to optimize lookup for all possible bound/unbound term
- combination in a triple:
- - ``th:t`` (term hash: term key; 1:1)
- - ``s:po`` (S key: joined P, O keys; dupsort, dupfixed)
- - ``p:so`` (P key: joined S, O keys; dupsort, dupfixed)
- - ``o:sp`` (O key: joined S, P keys; dupsort, dupfixed)
- - ``c:spo`` (context → triple association; dupsort, dupfixed)
- - ``ns:pfx`` (pickled namespace: prefix; 1:1)
- The default graph is defined in
- :data:`rdflib.graph.RDFLIB_DEFAULT_GRAPH_URI`. Adding
- triples without context will add to this graph. Looking up triples without
- context (also in a SPARQL query) will look in the union graph instead of
- in the default graph. Also, removing triples without specifying a context
- will remove triples from all contexts.
- """
- context_aware = True
- # This is a hassle to maintain for no apparent gain. If some use is devised
- # in the future, it may be revised.
- formula_aware = False
- graph_aware = True
- transaction_aware = True
- MAP_SIZE = 1024 ** 4 # 1Tb
- """
- LMDB map size. See http://lmdb.readthedocs.io/en/release/#environment-class
- """
- TERM_HASH_ALGO = 'sha1'
- """
- Term hashing algorithm. SHA1 is the default.
- """
- KEY_LENGTH = 5
- """
- Fixed length for term keys.
- 4 or 5 is a safe range. 4 allows for ~4 billion (256 ** 4) unique terms
- in the store. 5 allows ~1 trillion terms. While these numbers may seem
- huge (the total number of Internet pages indexed by Google as of 2018 is 45
- billions), it must be reminded that the keys cannot be reused, so a
- repository that deletes a lot of triples may burn through a lot of terms.
- If a repository runs ot of keys it can no longer store new terms and must
- be migrated to a new database, which will regenerate and compact the keys.
- For smaller repositories it should be safe to set this value to 4, which
- could improve performance since keys make up the vast majority of record
- exchange between the store and the application. However it is sensible not
- to expose this value as a configuration option.
- """
- KEY_START = 1
- """
- Lexical sequence start. ``\\x01`` is fine since no special characters are
- used, but it's good to leave a spare for potential future use.
- """
- data_keys = (
- # Term key to serialized term content: 1:1
- 't:st',
- # Joined triple keys to context key: 1:m, fixed-length values
- 'spo:c',
- # This has empty values and is used to keep track of empty contexts.
- 'c:',
- # Prefix to namespace: 1:1
- 'pfx:ns',
- )
- idx_keys = (
- # Namespace to prefix: 1:1
- 'ns:pfx',
- # Term hash to triple key: 1:1
- 'th:t',
- # Lookups: 1:m, fixed-length values
- 's:po', 'p:so', 'o:sp', 'c:spo',
- )
- _lookup_rank = ('s', 'o', 'p')
- """
- Order in which keys are looked up if two terms are bound.
- The indices with the smallest average number of values per key should be
- looked up first.
- If we want to get fancy, this can be rebalanced from time to time by
- looking up the number of keys in (s:po, p:so, o:sp).
- """
- _lookup_ordering = {
- 's:po': (0, 1, 2),
- 'p:so': (1, 0, 2),
- 'o:sp': (2, 0, 1),
- }
- """
- Order of terms in the lookup indices. Used to rebuild a triple from lookup.
- """
- data_env = None
- idx_env = None
- db = None
- dbs = {}
- data_txn = None
- idx_txn = None
- is_txn_rw = None
- def __init__(self, path, identifier=None):
- self.path = path
- self.__open = False
- self.identifier = identifier or URIRef(pathname2url(abspath(path)))
- super().__init__(path)
- self._pickle = self.node_pickler.dumps
- self._unpickle = self.node_pickler.loads
- self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
- def __del__(self):
- """Properly close store for garbage collection."""
- self.close(True)
- def __len__(self, context=None):
- """
- Return length of the dataset.
- :param context: Context to restrict count to.
- :type context: rdflib.URIRef or rdflib.Graph
- """
- context = self._normalize_context(context)
- if context is not None:
- #dataset = self.triples((None, None, None), context)
- with self.cur('c:spo') as cur:
- if cur.set_key(self._to_key(context)):
- return sum(1 for _ in cur.iternext_dup())
- else:
- return 0
- else:
- return self.data_txn.stat(self.dbs['spo:c'])['entries']
- @property
- def is_open(self):
- return self.__open
- def open(self, configuration=None, create=True):
- """
- Open the database.
- The database is best left open for the lifespan of the server. Read
- transactions can be opened as needed. Write transaction should be
- opened and closed within a single HTTP request to ensure atomicity of
- the request.
- This method is called outside of the main transaction. All cursors
- are created separately within the transaction.
- """
- self._init_db_environments(create)
- if self.data_env == NO_STORE:
- return NO_STORE
- self.__open = True
- return VALID_STORE
- def begin(self, write=False):
- """
- Begin the main write transaction and create cursors.
- """
- if not self.is_open:
- raise RuntimeError('Store must be opened first.')
- logger.debug('Beginning a {} transaction.'.format(
- 'read/write' if write else 'read-only'))
- self.data_txn = self.data_env.begin(buffers=True, write=write)
- self.idx_txn = self.idx_env.begin(buffers=True, write=write)
- self.is_txn_rw = write
- def stats(self):
- """Gather statistics about the database."""
- stats = {
- 'data_db_stats': {
- db_label: self.data_txn.stat(self.dbs[db_label])
- for db_label in self.data_keys},
- 'idx_db_stats': {
- db_label: self.idx_txn.stat(self.dbs[db_label])
- for db_label in self.idx_keys},
- 'data_db_size': os.stat(self.data_env.path()).st_size,
- 'idx_db_size': os.stat(self.idx_env.path()).st_size,
- 'num_triples': len(self),
- }
- return stats
- @property
- def is_txn_open(self):
- """Whether the main transaction is open."""
- try:
- self.data_txn.id()
- self.idx_txn.id()
- except (lmdb.Error, AttributeError) as e:
- #logger.info('Main transaction does not exist or is closed.')
- return False
- else:
- #logger.info('Main transaction is open.')
- return True
- def cur(self, index):
- """Return a new cursor by its index."""
- if index in self.idx_keys:
- txn = self.idx_txn
- src = self.idx_keys
- elif index in self.data_keys:
- txn = self.data_txn
- src = self.data_keys
- else:
- return ValueError('Cursor key not found.')
- return txn.cursor(self.dbs[index])
- def get_data_cursors(self, txn):
- """
- Build the main data cursors for a transaction.
- :param lmdb.Transaction txn: This can be a read or write transaction.
- :rtype: dict(string, lmdb.Cursor)
- :return: Keys are index labels, values are index cursors.
- """
- return {
- 'tk:t': txn.cursor(self.dbs['tk:t']),
- 'tk:c': txn.cursor(self.dbs['tk:c']),
- 'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
- }
- def get_idx_cursors(self, txn):
- """
- Build the index cursors for a transaction.
- :param lmdb.Transaction txn: This can be a read or write transaction.
- :rtype: dict(string, lmdb.Cursor)
- :return: dict of index labels, index cursors.
- """
- return {
- key: txn.cursor(self.dbs[key])
- for key in self.idx_keys}
- def close(self, commit_pending_transaction=False):
- """
- Close the database connection.
- Do this at server shutdown.
- """
- self.__open = False
- if self.is_txn_open:
- if commit_pending_transaction:
- self.commit()
- else:
- self.rollback()
- self.data_env.close()
- self.idx_env.close()
- def destroy(self, path):
- """
- Destroy the store.
- https://www.youtube.com/watch?v=lIVq7FJnPwg
- :param str path: Path of the folder containing the database(s).
- """
- if exists(path):
- rmtree(path)
- def add(self, triple, context=None, quoted=False):
- """
- Add a triple and start indexing.
- :param tuple(rdflib.Identifier) triple: Tuple of three identifiers.
- :param context: Context identifier. ``None`` inserts in the default
- graph.
- :type context: rdflib.Identifier or None
- :param bool quoted: Not used.
- """
- context = self._normalize_context(context)
- if context is None:
- context = RDFLIB_DEFAULT_GRAPH_URI
- Store.add(self, triple, context)
- #logger.info('Adding triple: {}'.format(triple))
- pk_trp = self._pickle(triple)
- pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
- #logger.debug('Adding quad: {} {}'.format(triple, context))
- pk_c = self._pickle(context)
- # Add new individual terms or gather keys for existing ones.
- keys = [None] * 4
- with self.cur('th:t') as icur:
- for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
- thash = self._hash(pk_t)
- if icur.set_key(thash):
- keys[i] = bytes(icur.value())
- else:
- # Put new term.
- with self.cur('t:st') as dcur:
- keys[i] = self._append(dcur, (pk_t,))[0]
- # Index.
- icur.put(thash, keys[i])
- # Add context in context DB.
- ck = keys[3]
- with self.cur('c:') as cur:
- if not cur.set_key(ck):
- cur.put(ck, b'')
- # Add triple:context association.
- spok = b''.join(keys[:3])
- with self.cur('spo:c') as dcur:
- if not dcur.set_key_dup(spok, ck):
- dcur.put(spok, ck)
- # Index spo:c association.
- with self.cur('c:spo') as icur:
- icur.put(ck, spok)
- self._index_triple('add', spok)
- def remove(self, triple_pattern, context=None):
- """
- Remove triples by a pattern.
- :param tuple:rdflib.term.Identifier|None triple_pattern: 3-tuple of
- either RDF terms or None, indicating the triple(s) to be removed.
- None is used as a wildcard.
- :param context: Context to remove the triples from. If None (the
- default) the matching triples are removed from all contexts.
- :type context: rdflib.term.Identifier or None
- """
- #logger.debug('Removing triples by pattern: {} on context: {}'.format(
- # triple_pattern, context))
- context = self._normalize_context(context)
- if context is not None:
- ck = self._to_key(context)
- # If context is specified but not found, return to avoid deleting
- # the wrong triples.
- if not ck:
- return
- else:
- ck = None
- with self.cur('spo:c') as dcur:
- with self.cur('c:spo') as icur:
- match_set = {bytes(k) for k in self._triple_keys(
- triple_pattern, context)}
- # Delete context association.
- if ck:
- for spok in match_set:
- if dcur.set_key_dup(spok, ck):
- dcur.delete()
- if icur.set_key_dup(ck, spok):
- icur.delete()
- self._index_triple('remove', spok)
- # If no context is specified, remove all associations.
- else:
- for spok in match_set:
- if dcur.set_key(spok):
- for cck in (bytes(k) for k in dcur.iternext_dup()):
- # Delete index first while we have the
- # context reference.
- if icur.set_key_dup(cck, spok):
- icur.delete()
- # Then delete the main entry.
- dcur.set_key(spok)
- dcur.delete(dupdata=True)
- self._index_triple('remove', spok)
- def triples(self, triple_pattern, context=None):
- """
- Generator over matching triples.
- :param tuple triple_pattern: 3 RDFLib terms
- :param context: Context graph, if available.
- :type context: rdflib.Graph or None
- :rtype: Iterator
- :return: Generator over triples and contexts in which each result has
- the following format::
- (s, p, o), generator(contexts)
- Where the contexts generator lists all context that the triple appears
- in.
- """
- #logger.debug('Getting triples for pattern: {} and context: {}'.format(
- # triple_pattern, context))
- # This sounds strange, RDFLib should be passing None at this point,
- # but anyway...
- context = self._normalize_context(context)
- with self.cur('spo:c') as cur:
- for spok in self._triple_keys(triple_pattern, context):
- if context is not None:
- contexts = (Graph(identifier=context),)
- else:
- if cur.set_key(spok):
- contexts = tuple(
- Graph(identifier=self._from_key(ck)[0], store=self)
- for ck in cur.iternext_dup())
- #print('Found triples: {} In contexts: {}'.format(
- # self._from_key(spok), contexts))
- yield self._from_key(spok), contexts
- def all_terms(self, term_type):
- """
- Return all terms of a type (``s``, ``p``, or ``o``) in the store.
- :param str term_type: one of ``s``, ``p`` or ``o``.
- :rtype: Iterator(rdflib.term.Identifier)
- :return: Iterator of all terms.
- :raise ValueError: if the term type is not one of the expected values.
- """
- if term_type == 's':
- idx_label = 's:po'
- elif term_type == 'p':
- idx_label = 'p:so'
- elif term_type == 'o':
- idx_label = 'o:sp'
- else:
- raise ValueError('Term type must be \'s\', \'p\' or \'o\'.')
- with self.cur(idx_label) as cur:
- for key in cur.iternext_nodup():
- yield self._from_key(key)[0]
- def bind(self, prefix, namespace):
- """
- Bind a prefix to a namespace.
- :param str prefix: Namespace prefix.
- :param rdflib.URIRef namespace: Fully qualified URI of namespace.
- """
- prefix = s2b(prefix)
- namespace = s2b(namespace)
- if self.is_txn_rw:
- with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
- cur.put(prefix, namespace)
- with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
- cur.put(namespace, prefix)
- else:
- with self.data_env.begin(write=True) as wtxn:
- with wtxn.cursor(self.dbs['pfx:ns']) as cur:
- cur.put(prefix, namespace)
- with self.idx_env.begin(write=True) as wtxn:
- with wtxn.cursor(self.dbs['ns:pfx']) as cur:
- cur.put(namespace, prefix)
- def namespace(self, prefix):
- """
- Get the namespace for a prefix.
- :param str prefix: Namespace prefix.
- """
- with self.cur('pfx:ns') as cur:
- ns = cur.get(s2b(prefix))
- return Namespace(b2s(ns)) if ns is not None else None
- def prefix(self, namespace):
- """
- Get the prefix associated with a namespace.
- **Note:** A namespace can be only bound to one prefix in this
- implementation.
- :param rdflib.Namespace namespace: Fully qualified namespace.
- :rtype: str or None
- """
- with self.cur('ns:pfx') as cur:
- prefix = cur.get(s2b(namespace))
- return b2s(prefix) if prefix is not None else None
- def namespaces(self):
- """Get an iterator of all prefix: namespace bindings.
- :rtype: Iterator(tuple(str, rdflib.Namespace))
- """
- with self.cur('pfx:ns') as cur:
- for pfx, ns in iter(cur):
- yield (b2s(pfx), Namespace(b2s(ns)))
- def contexts(self, triple=None):
- """
- Get a list of all contexts.
- :rtype: Iterator(rdflib.Graph)
- """
- if triple and any(triple):
- with self.cur('spo:c') as cur:
- if cur.set_key(self._to_key(triple)):
- for ctx_uri in cur.iternext_dup():
- yield Graph(
- identifier=self._from_key(ctx_uri)[0], store=self)
- else:
- with self.cur('c:') as cur:
- for ctx_uri in cur.iternext(values=False):
- yield Graph(
- identifier=self._from_key(ctx_uri)[0], store=self)
- def add_graph(self, graph):
- """
- Add a graph to the database.
- This creates an empty graph by associating the graph URI with the
- pickled `None` value. This prevents from removing the graph when all
- triples are removed.
- This may be called by read-only operations:
- https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
- Therefore it needs to open a write transaction. This is not ideal
- but the only way to handle datasets in RDFLib.
- :param rdflib.URIRef graph: URI of the named graph to add.
- """
- if isinstance(graph, Graph):
- graph = graph.identifier
- pk_c = self._pickle(graph)
- c_hash = self._hash(pk_c)
- with self.cur('th:t') as cur:
- c_exists = cur.set_key(c_hash)
- if not c_exists:
- # Insert context term if not existing.
- if self.is_txn_rw:
- # Use existing R/W transaction.
- with self.cur('t:st') as cur:
- ck = self._append(cur, (pk_c,))[0]
- with self.cur('th:t') as cur:
- cur.put(c_hash, ck)
- with self.cur('c:') as cur:
- cur.put(ck, b'')
- else:
- # Open new R/W transactions.
- with self.data_env.begin(write=True) as wtxn:
- with wtxn.cursor(self.dbs['t:st']) as cur:
- ck = self._append(cur, (pk_c,))[0]
- with wtxn.cursor(self.dbs['c:']) as cur:
- cur.put(ck, b'')
- with self.idx_env.begin(write=True) as wtxn:
- with wtxn.cursor(self.dbs['th:t']) as cur:
- cur.put(c_hash, ck)
- def remove_graph(self, graph):
- """
- Remove all triples from graph and the graph itself.
- :param rdflib.URIRef graph: URI of the named graph to remove.
- """
- if isinstance(graph, Graph):
- graph = graph.identifier
- self.remove((None, None, None), graph)
- with self.cur('c:') as cur:
- if cur.set_key(self._to_key(graph)):
- cur.delete()
- def commit(self):
- """Commit main transaction."""
- logger.debug('Committing transaction.')
- try:
- self.data_txn.commit()
- except (AttributeError, lmdb.Error):
- pass
- try:
- self.idx_txn.commit()
- except (AttributeError, lmdb.Error):
- pass
- self.is_txn_rw = None
- def rollback(self):
- """Roll back main transaction."""
- logger.debug('Rolling back transaction.')
- try:
- self.data_txn.abort()
- except (AttributeError, lmdb.Error):
- pass
- try:
- self.idx_txn.abort()
- except (AttributeError, lmdb.Error):
- pass
- self.is_txn_rw = None
- ## PRIVATE METHODS ##
- def _triple_keys(self, triple_pattern, context=None):
- """
- Generator over matching triple keys.
- This method is used by `triples` which returns native Python tuples,
- as well as by other methods that need to iterate and filter triple
- keys without incurring in the overhead of converting them to triples.
- :param tuple triple_pattern: 3 RDFLib terms
- :param context: Context graph or URI, or None.
- :type context: rdflib.term.Identifier or None
- """
- if context == self:
- context = None
- if context is not None:
- pk_c = self._pickle(context)
- ck = self._to_key(context)
- # Shortcuts
- if not ck:
- # Context not found.
- return iter(())
- with self.cur('c:spo') as cur:
- # s p o c
- if all(triple_pattern):
- spok = self._to_key(triple_pattern)
- if not spok:
- # A term in the triple is not found.
- return iter(())
- if cur.set_key_dup(ck, spok):
- yield spok
- return
- else:
- # Triple not found.
- return iter(())
- # ? ? ? c
- elif not any(triple_pattern):
- # Get all triples from the context
- if cur.set_key(ck):
- for spok in cur.iternext_dup():
- yield spok
- else:
- return iter(())
- # Regular lookup.
- else:
- yield from (
- spok for spok in self._lookup(triple_pattern)
- if cur.set_key_dup(ck, spok))
- else:
- yield from self._lookup(triple_pattern)
- def _init_db_environments(self, create=True):
- """
- Initialize the DB environment.
- The main database is kept in one file, the indices in a separate one
- (these may be even further split up depending on performance
- considerations).
- :param bool create: If True, the environment and its databases are
- created.
- """
- path = self.path
- if not exists(path):
- if create is True:
- makedirs(path)
- else:
- return NO_STORE
- if getattr(env, 'wsgi_options', False):
- self._workers = env.wsgi_options['workers']
- else:
- self._workers = 1
- logger.info('Max LMDB readers: {}'.format(self._workers))
- self.data_env = lmdb.open(
- path + '/main', subdir=False, create=create,
- map_size=self.MAP_SIZE, max_dbs=4,
- max_spare_txns=self._workers, readahead=False)
- self.idx_env = lmdb.open(
- path + '/index', subdir=False, create=create,
- map_size=self.MAP_SIZE, max_dbs=6,
- max_spare_txns=self._workers, readahead=False)
- # Clear stale readers.
- data_stale_readers = self.data_env.reader_check()
- idx_stale_readers = self.idx_env.reader_check()
- logger.debug(
- 'Cleared data stale readers: {}'.format(data_stale_readers))
- logger.debug(
- 'Cleared index stale readers: {}'.format(idx_stale_readers))
- # Open and optionally create main databases.
- self.dbs = {
- # Main databases.
- 't:st': self.data_env.open_db(b't:st', create=create),
- 'spo:c': self.data_env.open_db(
- b'spo:c', create=create, dupsort=True, dupfixed=True),
- 'c:': self.data_env.open_db(b'c:', create=create),
- 'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
- # One-off indices.
- 'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
- 'th:t': self.idx_env.open_db(b'th:t', create=create),
- }
- # Other index databases.
- for db_key in self.idx_keys:
- if db_key not in ('ns:pfx', 'th:t'):
- self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
- dupsort=True, dupfixed=True, create=create)
- def _from_key(self, key):
- """
- Convert a key into one or more terms.
- :param key: The key to be converted. It can be a
- :type key: bytes or memoryview
- compound one in which case the function will return multiple terms.
- :rtype: tuple(rdflib.term.Identifier)
- :return: The term(s) associated with the key(s). The result is always
- a tuple even for single results.
- """
- with self.cur('t:st') as cur:
- return tuple(
- self._unpickle(cur.get(k))
- for k in self._split_key(key))
- def _to_key(self, obj):
- """
- Convert a triple, quad or term into a key.
- The key is the checksum of the pickled object, therefore unique for
- that object. The hashing algorithm is specified in `TERM_HASH_ALGO`.
- :param Object obj: Anything that can be reduced to terms stored in the
- database. Pairs of terms, as well as triples and quads, are expressed
- as tuples.
- If more than one term is provided, the keys are concatenated.
- :rtype: memoryview or None
- :return: Keys stored for the term(s) or None if not found.
- """
- if not isinstance(obj, list) and not isinstance(obj, tuple):
- obj = (obj,)
- key = []
- with self.cur('th:t') as cur:
- for term in obj:
- tk = cur.get(self._hash(self._pickle(term)))
- if not tk:
- # If any of the terms is not found, return None immediately
- return None
- key.append(tk)
- return b''.join(key)
- def _hash(self, s):
- """Get the hash value of a serialized object."""
- return hashlib.new(self.TERM_HASH_ALGO, s).digest()
- def _split_key(self, keys):
- """
- Split a compound key into individual keys.
- This method relies on the fixed length of all term keys.
- :param keys: Concatenated keys.
- :type keys: bytes or memoryview
- :rtype: tuple(memoryview)
- """
- return tuple(
- keys[i:i+self.KEY_LENGTH]
- for i in range(0, len(keys), self.KEY_LENGTH))
- def _normalize_context(self, context):
- """
- Normalize a context parameter to conform to the model expectations.
- :param context: Context URI or graph.
- :type context: URIRef or Graph or None
- """
- if isinstance(context, Graph):
- if context == self or isinstance(context.identifier, Variable):
- context = None
- else:
- context = context.identifier
- #logger.debug('Converted graph into URI: {}'.format(context))
- return context
- def _lookup(self, triple_pattern):
- """
- Look up triples in the indices based on a triple pattern.
- :rtype: Iterator
- :return: Matching triple keys.
- """
- s, p, o = triple_pattern
- if s is not None:
- if p is not None:
- # s p o
- if o is not None:
- with self.cur('spo:c') as cur:
- tkey = self._to_key(triple_pattern)
- if tkey:
- yield tkey
- return
- else:
- return iter(())
- # s p ?
- else:
- yield from self._lookup_2bound({'s': s, 'p': p})
- else:
- # s ? o
- if o is not None:
- yield from self._lookup_2bound({'s': s, 'o': o})
- # s ? ?
- else:
- yield from self._lookup_1bound('s:po', s)
- else:
- if p is not None:
- # ? p o
- if o is not None:
- yield from self._lookup_2bound({'p': p, 'o': o})
- # ? p ?
- else:
- yield from self._lookup_1bound('p:so', p)
- else:
- # ? ? o
- if o is not None:
- yield from self._lookup_1bound('o:sp', o)
- # ? ? ?
- else:
- # Get all triples in the database.
- with self.cur('spo:c') as cur:
- yield from cur.iternext_nodup()
- def _lookup_1bound(self, idx_name, term):
- """
- Lookup triples for a pattern with one bound term.
- :param str idx_name: The index to look up as one of the keys of
- ``_lookup_ordering``.
- :param rdflib.URIRef term: Bound term to search for.
- :rtype: Iterator(bytes)
- :return: SPO keys matching the pattern.
- """
- k = self._to_key(term)
- if not k:
- return iter(())
- term_order = self._lookup_ordering[idx_name]
- with self.cur(idx_name) as cur:
- if cur.set_key(k):
- for match in cur.iternext_dup():
- subkeys = self._split_key(match)
- # Compose result.
- out = [None] * 3
- out[term_order[0]] = k
- out[term_order[1]] = subkeys[0]
- out[term_order[2]] = subkeys[1]
- yield b''.join(out)
- def _lookup_2bound(self, bound_terms):
- """
- Look up triples for a pattern with two bound terms.
- :param bound: terms (dict) Triple labels and terms to search for,
- in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
- URIRef('urn:o:1')}
- :rtype: iterator(bytes)
- :return: SPO keys matching the pattern.
- """
- if len(bound_terms) != 2:
- raise ValueError(
- 'Exactly 2 terms need to be bound. Got {}'.format(
- len(bound_terms)))
- # Establish lookup ranking.
- luc = None
- for k_label in self._lookup_rank:
- if k_label in bound_terms.keys():
- # First match is lookup term.
- if not luc:
- v_label = 'spo'.replace(k_label, '')
- # Lookup database key (cursor) name
- luc = k_label + ':' + v_label
- term_order = self._lookup_ordering[luc]
- # Term to look up
- luk = self._to_key(bound_terms[k_label])
- if not luk:
- return iter(())
- # Position of key in final triple.
- # Second match is the filter.
- else:
- # Filter key (position of sub-key in lookup results)
- fpos = v_label.index(k_label)
- # Fliter term
- ft = self._to_key(bound_terms[k_label])
- if not ft:
- return iter(())
- break
- # Look up in index.
- with self.cur(luc) as cur:
- if cur.set_key(luk):
- # Iterate over matches and filter by second term.
- for match in cur.iternext_dup():
- subkeys = self._split_key(match)
- flt_subkey = subkeys[fpos]
- if flt_subkey == ft:
- # Remainder (not filter) key used to complete the
- # triple.
- r_subkey = subkeys[1-fpos]
- # Compose result.
- out = [None, None, None]
- out[term_order[0]] = luk
- out[term_order[fpos+1]] = flt_subkey
- out[term_order[2-fpos]] = r_subkey
- yield b''.join(out)
- def _append(self, cur, values, **kwargs):
- """
- Append one or more values to the end of a database.
- :param lmdb.Cursor cur: The write cursor to act on.
- :param list(bytes) values: Value(s) to append.
- :rtype: list(memoryview)
- :return: Last key(s) inserted.
- """
- if not isinstance(values, list) and not isinstance(values, tuple):
- raise ValueError('Input must be a list or tuple.')
- data = []
- lastkey = cur.key() if cur.last() else None
- for v in values:
- lastkey = self._key_seq.next(lastkey)
- data.append((lastkey, v))
- cur.putmulti(data, **kwargs)
- return [d[0] for d in data]
- def _index_triple(self, action, spok):
- """
- Update index for a triple and context (add or remove).
- :param str action: 'add' or 'remove'.
- :param bytes spok: Triple key.
- """
- # Split and rearrange-join keys for association and indices.
- triple = self._split_key(spok)
- sk, pk, ok = triple
- spk = b''.join(triple[:2])
- spk = b''.join(triple[:2])
- sok = b''.join((triple[0], triple[2]))
- pok = b''.join(triple[1:3])
- # Associate cursor labels with k/v pairs.
- curs = {
- 's:po': (sk, pok),
- 'p:so': (pk, sok),
- 'o:sp': (ok, spk),
- }
- # Add or remove triple lookups.
- for clabel, terms in curs.items():
- with self.cur(clabel) as icur:
- if action == 'remove':
- if icur.set_key_dup(*terms):
- icur.delete()
- elif action == 'add':
- icur.put(*terms)
- else:
- raise ValueError(
- 'Index action \'{}\' is not supported.'.format(action))
- ## Convenience methods—not necessary for functioning but useful for
- ## debugging.
- def _keys_in_ctx(self, pk_ctx):
- """
- Convenience method to list all keys in a context.
- :param bytes pk_ctx: Pickled context URI.
- :rtype: Iterator(tuple)
- :return: Generator of triples.
- """
- with self.cur('c:spo') as cur:
- if cur.set_key(pk_ctx):
- tkeys = cur.iternext_dup()
- return {self._key_to_triple(tk) for tk in tkeys}
- else:
- return set()
- def _ctx_for_key(self, tkey):
- """
- Convenience method to list all contexts that a key is in.
- :param bytes tkey: Triple key.
- :rtype: Iterator(rdflib.URIRef)
- :return: Generator of context URIs.
- """
- with self.cur('spo:c') as cur:
- if cur.set_key(tkey):
- ctx = cur.iternext_dup()
- return {self._unpickle(c) for c in ctx}
- else:
- return set()
|