lmdb_store.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. import hashlib
  2. import logging
  3. import os
  4. from contextlib import ContextDecorator, ExitStack
  5. from os import makedirs
  6. from os.path import exists, abspath
  7. from shutil import rmtree
  8. from urllib.request import pathname2url
  9. import lmdb
  10. from rdflib import Graph, Namespace, URIRef, Variable
  11. from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
  12. from rdflib.store import Store, VALID_STORE, NO_STORE
  13. from lakesuperior import env
  14. logger = logging.getLogger(__name__)
  15. def s2b(u, enc='UTF-8'):
  16. """
  17. Convert a string into a bytes object.
  18. """
  19. return u.encode(enc)
  20. def b2s(u, enc='UTF-8'):
  21. """
  22. Convert a bytes or memoryview object into a string.
  23. """
  24. return bytes(u).decode(enc)
  25. class TxnManager(ContextDecorator):
  26. """
  27. Handle ACID transactions with an LmdbStore.
  28. Wrap this within a ``with`` statement::
  29. >>> with TxnManager(store, True):
  30. ... # Do something with the database
  31. >>>
  32. The transaction will be opened and handled automatically.
  33. """
  34. def __init__(self, store, write=False):
  35. """
  36. Begin and close a transaction in a store.
  37. :param LmdbStore store: The store to open a transaction on.
  38. :param bool write: Whether the transaction is read-write. Default is
  39. ``False`` (read-only transaction).
  40. """
  41. self.store = store
  42. self.write = write
  43. def __enter__(self):
  44. # Only open a R/W transaction if one is not already open.
  45. if not self.write or not self.store.is_txn_rw:
  46. self.store.begin(write=self.write)
  47. def __exit__(self, exc_type, exc_value, traceback):
  48. if exc_type:
  49. self.store.rollback()
  50. else:
  51. self.store.commit()
  52. class LexicalSequence:
  53. """
  54. Fixed-length lexicographically ordered byte sequence.
  55. Useful to generate optimized sequences of keys in LMDB.
  56. """
  57. def __init__(self, start=1, max_len=5):
  58. """
  59. Create a new lexical sequence.
  60. :param bytes start: Starting byte value. Bytes below this value are
  61. never found in this sequence. This is useful to allot special bytes
  62. to be used e.g. as separators.
  63. :param int max_len: Maximum number of bytes that a byte string can
  64. contain. This should be chosen carefully since the number of all
  65. possible key combinations is determined by this value and the
  66. ``start`` value. The default args provide 255**5 (~1 Tn) unique
  67. combinations.
  68. """
  69. self.start = start
  70. self.length = max_len
  71. def first(self):
  72. """First possible combination."""
  73. return bytearray([self.start] * self.length)
  74. def next(self, n):
  75. """
  76. Calculate the next closest byte sequence in lexicographical order.
  77. This is used to fill the next available slot after the last one in
  78. LMDB. Keys are byte strings, which is a convenient way to keep key
  79. lengths as small as possible when they are referenced in several
  80. indices.
  81. This function assumes that all the keys are padded with the `start`
  82. value up to the `max_len` length.
  83. :param bytes n: Current byte sequence to add to.
  84. """
  85. if not n:
  86. n = self.first()
  87. elif isinstance(n, bytes) or isinstance(n, memoryview):
  88. n = bytearray(n)
  89. elif not isinstance(n, bytearray):
  90. raise ValueError('Input sequence must be bytes or a bytearray.')
  91. if not len(n) == self.length:
  92. raise ValueError('Incorrect sequence length.')
  93. for i, b in list(enumerate(n))[::-1]:
  94. try:
  95. n[i] += 1
  96. # If the value exceeds 255, i.e. the current value is the last one
  97. except ValueError:
  98. if i == 0:
  99. raise RuntimeError('BAD DAY: Sequence exhausted. No more '
  100. 'combinations are possible.')
  101. # Move one position up and try to increment that.
  102. else:
  103. n[i] = self.start
  104. continue
  105. else:
  106. return bytes(n)
  107. class LmdbStore(Store):
  108. """
  109. LMDB-backed store.
  110. This is an implementation of the RDFLib Store interface:
  111. https://github.com/RDFLib/rdflib/blob/master/rdflib/store.py
  112. Handles the interaction with a LMDB store and builds an abstraction layer
  113. for triples.
  114. This store class uses two LMDB environments (i.e. two files): one for the
  115. main (preservation-worthy) data and the other for the index data which
  116. can be rebuilt from the main database.
  117. There are 4 main data sets (preservation worthy data):
  118. - ``t:st`` (term key: serialized term; 1:1)
  119. - ``spo:c`` (joined S, P, O keys: context key; dupsort, dupfixed)
  120. - ``c:`` (context keys only, values are the empty bytestring; 1:1)
  121. - ``pfx:ns`` (prefix: pickled namespace; 1:1)
  122. And 6 indices to optimize lookup for all possible bound/unbound term
  123. combination in a triple:
  124. - ``th:t`` (term hash: term key; 1:1)
  125. - ``s:po`` (S key: joined P, O keys; dupsort, dupfixed)
  126. - ``p:so`` (P key: joined S, O keys; dupsort, dupfixed)
  127. - ``o:sp`` (O key: joined S, P keys; dupsort, dupfixed)
  128. - ``c:spo`` (context → triple association; dupsort, dupfixed)
  129. - ``ns:pfx`` (pickled namespace: prefix; 1:1)
  130. The default graph is defined in
  131. :data:`rdflib.graph.RDFLIB_DEFAULT_GRAPH_URI`. Adding
  132. triples without context will add to this graph. Looking up triples without
  133. context (also in a SPARQL query) will look in the union graph instead of
  134. in the default graph. Also, removing triples without specifying a context
  135. will remove triples from all contexts.
  136. """
  137. context_aware = True
  138. # This is a hassle to maintain for no apparent gain. If some use is devised
  139. # in the future, it may be revised.
  140. formula_aware = False
  141. graph_aware = True
  142. transaction_aware = True
  143. MAP_SIZE = 1024 ** 4 # 1Tb
  144. """
  145. LMDB map size. See http://lmdb.readthedocs.io/en/release/#environment-class
  146. """
  147. TERM_HASH_ALGO = 'sha1'
  148. """
  149. Term hashing algorithm. SHA1 is the default.
  150. """
  151. KEY_LENGTH = 5
  152. """
  153. Fixed length for term keys.
  154. 4 or 5 is a safe range. 4 allows for ~4 billion (256 ** 4) unique terms
  155. in the store. 5 allows ~1 trillion terms. While these numbers may seem
  156. huge (the total number of Internet pages indexed by Google as of 2018 is 45
  157. billions), it must be reminded that the keys cannot be reused, so a
  158. repository that deletes a lot of triples may burn through a lot of terms.
  159. If a repository runs ot of keys it can no longer store new terms and must
  160. be migrated to a new database, which will regenerate and compact the keys.
  161. For smaller repositories it should be safe to set this value to 4, which
  162. could improve performance since keys make up the vast majority of record
  163. exchange between the store and the application. However it is sensible not
  164. to expose this value as a configuration option.
  165. """
  166. KEY_START = 1
  167. """
  168. Lexical sequence start. ``\\x01`` is fine since no special characters are
  169. used, but it's good to leave a spare for potential future use.
  170. """
  171. data_keys = (
  172. # Term key to serialized term content: 1:1
  173. 't:st',
  174. # Joined triple keys to context key: 1:m, fixed-length values
  175. 'spo:c',
  176. # This has empty values and is used to keep track of empty contexts.
  177. 'c:',
  178. # Prefix to namespace: 1:1
  179. 'pfx:ns',
  180. )
  181. idx_keys = (
  182. # Namespace to prefix: 1:1
  183. 'ns:pfx',
  184. # Term hash to triple key: 1:1
  185. 'th:t',
  186. # Lookups: 1:m, fixed-length values
  187. 's:po', 'p:so', 'o:sp', 'c:spo',
  188. )
  189. _lookup_rank = ('s', 'o', 'p')
  190. """
  191. Order in which keys are looked up if two terms are bound.
  192. The indices with the smallest average number of values per key should be
  193. looked up first.
  194. If we want to get fancy, this can be rebalanced from time to time by
  195. looking up the number of keys in (s:po, p:so, o:sp).
  196. """
  197. _lookup_ordering = {
  198. 's:po': (0, 1, 2),
  199. 'p:so': (1, 0, 2),
  200. 'o:sp': (2, 0, 1),
  201. }
  202. """
  203. Order of terms in the lookup indices. Used to rebuild a triple from lookup.
  204. """
  205. data_env = None
  206. idx_env = None
  207. db = None
  208. dbs = {}
  209. data_txn = None
  210. idx_txn = None
  211. is_txn_rw = None
  212. def __init__(self, path, identifier=None):
  213. self.path = path
  214. self.__open = False
  215. self.identifier = identifier or URIRef(pathname2url(abspath(path)))
  216. super().__init__(path)
  217. self._pickle = self.node_pickler.dumps
  218. self._unpickle = self.node_pickler.loads
  219. self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
  220. def __del__(self):
  221. """Properly close store for garbage collection."""
  222. self.close(True)
  223. def __len__(self, context=None):
  224. """
  225. Return length of the dataset.
  226. :param context: Context to restrict count to.
  227. :type context: rdflib.URIRef or rdflib.Graph
  228. """
  229. context = self._normalize_context(context)
  230. if context is not None:
  231. #dataset = self.triples((None, None, None), context)
  232. with self.cur('c:spo') as cur:
  233. if cur.set_key(self._to_key(context)):
  234. return sum(1 for _ in cur.iternext_dup())
  235. else:
  236. return 0
  237. else:
  238. return self.data_txn.stat(self.dbs['spo:c'])['entries']
  239. @property
  240. def is_open(self):
  241. return self.__open
  242. def open(self, configuration=None, create=True):
  243. """
  244. Open the database.
  245. The database is best left open for the lifespan of the server. Read
  246. transactions can be opened as needed. Write transaction should be
  247. opened and closed within a single HTTP request to ensure atomicity of
  248. the request.
  249. This method is called outside of the main transaction. All cursors
  250. are created separately within the transaction.
  251. """
  252. self._init_db_environments(create)
  253. if self.data_env == NO_STORE:
  254. return NO_STORE
  255. self.__open = True
  256. return VALID_STORE
  257. def begin(self, write=False):
  258. """
  259. Begin the main write transaction and create cursors.
  260. """
  261. if not self.is_open:
  262. raise RuntimeError('Store must be opened first.')
  263. logger.debug('Beginning a {} transaction.'.format(
  264. 'read/write' if write else 'read-only'))
  265. self.data_txn = self.data_env.begin(buffers=True, write=write)
  266. self.idx_txn = self.idx_env.begin(buffers=True, write=write)
  267. self.is_txn_rw = write
  268. def stats(self):
  269. """Gather statistics about the database."""
  270. stats = {
  271. 'data_db_stats': {
  272. db_label: self.data_txn.stat(self.dbs[db_label])
  273. for db_label in self.data_keys},
  274. 'idx_db_stats': {
  275. db_label: self.idx_txn.stat(self.dbs[db_label])
  276. for db_label in self.idx_keys},
  277. 'data_db_size': os.stat(self.data_env.path()).st_size,
  278. 'idx_db_size': os.stat(self.idx_env.path()).st_size,
  279. 'num_triples': len(self),
  280. }
  281. return stats
  282. @property
  283. def is_txn_open(self):
  284. """Whether the main transaction is open."""
  285. try:
  286. self.data_txn.id()
  287. self.idx_txn.id()
  288. except (lmdb.Error, AttributeError) as e:
  289. #logger.info('Main transaction does not exist or is closed.')
  290. return False
  291. else:
  292. #logger.info('Main transaction is open.')
  293. return True
  294. def cur(self, index):
  295. """Return a new cursor by its index."""
  296. if index in self.idx_keys:
  297. txn = self.idx_txn
  298. src = self.idx_keys
  299. elif index in self.data_keys:
  300. txn = self.data_txn
  301. src = self.data_keys
  302. else:
  303. return ValueError('Cursor key not found.')
  304. return txn.cursor(self.dbs[index])
  305. def get_data_cursors(self, txn):
  306. """
  307. Build the main data cursors for a transaction.
  308. :param lmdb.Transaction txn: This can be a read or write transaction.
  309. :rtype: dict(string, lmdb.Cursor)
  310. :return: Keys are index labels, values are index cursors.
  311. """
  312. return {
  313. 'tk:t': txn.cursor(self.dbs['tk:t']),
  314. 'tk:c': txn.cursor(self.dbs['tk:c']),
  315. 'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
  316. }
  317. def get_idx_cursors(self, txn):
  318. """
  319. Build the index cursors for a transaction.
  320. :param lmdb.Transaction txn: This can be a read or write transaction.
  321. :rtype: dict(string, lmdb.Cursor)
  322. :return: dict of index labels, index cursors.
  323. """
  324. return {
  325. key: txn.cursor(self.dbs[key])
  326. for key in self.idx_keys}
  327. def close(self, commit_pending_transaction=False):
  328. """
  329. Close the database connection.
  330. Do this at server shutdown.
  331. """
  332. self.__open = False
  333. if self.is_txn_open:
  334. if commit_pending_transaction:
  335. self.commit()
  336. else:
  337. self.rollback()
  338. self.data_env.close()
  339. self.idx_env.close()
  340. def destroy(self, path):
  341. """
  342. Destroy the store.
  343. https://www.youtube.com/watch?v=lIVq7FJnPwg
  344. :param str path: Path of the folder containing the database(s).
  345. """
  346. if exists(path):
  347. rmtree(path)
  348. def add(self, triple, context=None, quoted=False):
  349. """
  350. Add a triple and start indexing.
  351. :param tuple(rdflib.Identifier) triple: Tuple of three identifiers.
  352. :param context: Context identifier. ``None`` inserts in the default
  353. graph.
  354. :type context: rdflib.Identifier or None
  355. :param bool quoted: Not used.
  356. """
  357. context = self._normalize_context(context)
  358. if context is None:
  359. context = RDFLIB_DEFAULT_GRAPH_URI
  360. Store.add(self, triple, context)
  361. #logger.info('Adding triple: {}'.format(triple))
  362. pk_trp = self._pickle(triple)
  363. pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
  364. #logger.debug('Adding quad: {} {}'.format(triple, context))
  365. pk_c = self._pickle(context)
  366. # Add new individual terms or gather keys for existing ones.
  367. keys = [None] * 4
  368. with self.cur('th:t') as icur:
  369. for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
  370. thash = self._hash(pk_t)
  371. if icur.set_key(thash):
  372. keys[i] = bytes(icur.value())
  373. else:
  374. # Put new term.
  375. with self.cur('t:st') as dcur:
  376. keys[i] = self._append(dcur, (pk_t,))[0]
  377. # Index.
  378. icur.put(thash, keys[i])
  379. # Add context in context DB.
  380. ck = keys[3]
  381. with self.cur('c:') as cur:
  382. if not cur.set_key(ck):
  383. cur.put(ck, b'')
  384. # Add triple:context association.
  385. spok = b''.join(keys[:3])
  386. with self.cur('spo:c') as dcur:
  387. if not dcur.set_key_dup(spok, ck):
  388. dcur.put(spok, ck)
  389. # Index spo:c association.
  390. with self.cur('c:spo') as icur:
  391. icur.put(ck, spok)
  392. self._index_triple('add', spok)
  393. def remove(self, triple_pattern, context=None):
  394. """
  395. Remove triples by a pattern.
  396. :param tuple:rdflib.term.Identifier|None triple_pattern: 3-tuple of
  397. either RDF terms or None, indicating the triple(s) to be removed.
  398. None is used as a wildcard.
  399. :param context: Context to remove the triples from. If None (the
  400. default) the matching triples are removed from all contexts.
  401. :type context: rdflib.term.Identifier or None
  402. """
  403. #logger.debug('Removing triples by pattern: {} on context: {}'.format(
  404. # triple_pattern, context))
  405. context = self._normalize_context(context)
  406. if context is not None:
  407. ck = self._to_key(context)
  408. # If context is specified but not found, return to avoid deleting
  409. # the wrong triples.
  410. if not ck:
  411. return
  412. else:
  413. ck = None
  414. with self.cur('spo:c') as dcur:
  415. with self.cur('c:spo') as icur:
  416. match_set = {bytes(k) for k in self._triple_keys(
  417. triple_pattern, context)}
  418. # Delete context association.
  419. if ck:
  420. for spok in match_set:
  421. if dcur.set_key_dup(spok, ck):
  422. dcur.delete()
  423. if icur.set_key_dup(ck, spok):
  424. icur.delete()
  425. self._index_triple('remove', spok)
  426. # If no context is specified, remove all associations.
  427. else:
  428. for spok in match_set:
  429. if dcur.set_key(spok):
  430. for cck in (bytes(k) for k in dcur.iternext_dup()):
  431. # Delete index first while we have the
  432. # context reference.
  433. if icur.set_key_dup(cck, spok):
  434. icur.delete()
  435. # Then delete the main entry.
  436. dcur.set_key(spok)
  437. dcur.delete(dupdata=True)
  438. self._index_triple('remove', spok)
  439. def triples(self, triple_pattern, context=None):
  440. """
  441. Generator over matching triples.
  442. :param tuple triple_pattern: 3 RDFLib terms
  443. :param context: Context graph, if available.
  444. :type context: rdflib.Graph or None
  445. :rtype: Iterator
  446. :return: Generator over triples and contexts in which each result has
  447. the following format::
  448. (s, p, o), generator(contexts)
  449. Where the contexts generator lists all context that the triple appears
  450. in.
  451. """
  452. #logger.debug('Getting triples for pattern: {} and context: {}'.format(
  453. # triple_pattern, context))
  454. # This sounds strange, RDFLib should be passing None at this point,
  455. # but anyway...
  456. context = self._normalize_context(context)
  457. with self.cur('spo:c') as cur:
  458. for spok in self._triple_keys(triple_pattern, context):
  459. if context is not None:
  460. contexts = (Graph(identifier=context),)
  461. else:
  462. if cur.set_key(spok):
  463. contexts = tuple(
  464. Graph(identifier=self._from_key(ck)[0], store=self)
  465. for ck in cur.iternext_dup())
  466. #print('Found triples: {} In contexts: {}'.format(
  467. # self._from_key(spok), contexts))
  468. yield self._from_key(spok), contexts
  469. def all_terms(self, term_type):
  470. """
  471. Return all terms of a type (``s``, ``p``, or ``o``) in the store.
  472. :param str term_type: one of ``s``, ``p`` or ``o``.
  473. :rtype: Iterator(rdflib.term.Identifier)
  474. :return: Iterator of all terms.
  475. :raise ValueError: if the term type is not one of the expected values.
  476. """
  477. if term_type == 's':
  478. idx_label = 's:po'
  479. elif term_type == 'p':
  480. idx_label = 'p:so'
  481. elif term_type == 'o':
  482. idx_label = 'o:sp'
  483. else:
  484. raise ValueError('Term type must be \'s\', \'p\' or \'o\'.')
  485. with self.cur(idx_label) as cur:
  486. for key in cur.iternext_nodup():
  487. yield self._from_key(key)[0]
  488. def bind(self, prefix, namespace):
  489. """
  490. Bind a prefix to a namespace.
  491. :param str prefix: Namespace prefix.
  492. :param rdflib.URIRef namespace: Fully qualified URI of namespace.
  493. """
  494. prefix = s2b(prefix)
  495. namespace = s2b(namespace)
  496. if self.is_txn_rw:
  497. with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
  498. cur.put(prefix, namespace)
  499. with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
  500. cur.put(namespace, prefix)
  501. else:
  502. with self.data_env.begin(write=True) as wtxn:
  503. with wtxn.cursor(self.dbs['pfx:ns']) as cur:
  504. cur.put(prefix, namespace)
  505. with self.idx_env.begin(write=True) as wtxn:
  506. with wtxn.cursor(self.dbs['ns:pfx']) as cur:
  507. cur.put(namespace, prefix)
  508. def namespace(self, prefix):
  509. """
  510. Get the namespace for a prefix.
  511. :param str prefix: Namespace prefix.
  512. """
  513. with self.cur('pfx:ns') as cur:
  514. ns = cur.get(s2b(prefix))
  515. return Namespace(b2s(ns)) if ns is not None else None
  516. def prefix(self, namespace):
  517. """
  518. Get the prefix associated with a namespace.
  519. **Note:** A namespace can be only bound to one prefix in this
  520. implementation.
  521. :param rdflib.Namespace namespace: Fully qualified namespace.
  522. :rtype: str or None
  523. """
  524. with self.cur('ns:pfx') as cur:
  525. prefix = cur.get(s2b(namespace))
  526. return b2s(prefix) if prefix is not None else None
  527. def namespaces(self):
  528. """Get an iterator of all prefix: namespace bindings.
  529. :rtype: Iterator(tuple(str, rdflib.Namespace))
  530. """
  531. with self.cur('pfx:ns') as cur:
  532. for pfx, ns in iter(cur):
  533. yield (b2s(pfx), Namespace(b2s(ns)))
  534. def contexts(self, triple=None):
  535. """
  536. Get a list of all contexts.
  537. :rtype: Iterator(rdflib.Graph)
  538. """
  539. if triple and any(triple):
  540. with self.cur('spo:c') as cur:
  541. if cur.set_key(self._to_key(triple)):
  542. for ctx_uri in cur.iternext_dup():
  543. yield Graph(
  544. identifier=self._from_key(ctx_uri)[0], store=self)
  545. else:
  546. with self.cur('c:') as cur:
  547. for ctx_uri in cur.iternext(values=False):
  548. yield Graph(
  549. identifier=self._from_key(ctx_uri)[0], store=self)
  550. def add_graph(self, graph):
  551. """
  552. Add a graph to the database.
  553. This creates an empty graph by associating the graph URI with the
  554. pickled `None` value. This prevents from removing the graph when all
  555. triples are removed.
  556. This may be called by read-only operations:
  557. https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
  558. Therefore it needs to open a write transaction. This is not ideal
  559. but the only way to handle datasets in RDFLib.
  560. :param rdflib.URIRef graph: URI of the named graph to add.
  561. """
  562. if isinstance(graph, Graph):
  563. graph = graph.identifier
  564. pk_c = self._pickle(graph)
  565. c_hash = self._hash(pk_c)
  566. with self.cur('th:t') as cur:
  567. c_exists = cur.set_key(c_hash)
  568. if not c_exists:
  569. # Insert context term if not existing.
  570. if self.is_txn_rw:
  571. # Use existing R/W transaction.
  572. with self.cur('t:st') as cur:
  573. ck = self._append(cur, (pk_c,))[0]
  574. with self.cur('th:t') as cur:
  575. cur.put(c_hash, ck)
  576. with self.cur('c:') as cur:
  577. cur.put(ck, b'')
  578. else:
  579. # Open new R/W transactions.
  580. with self.data_env.begin(write=True) as wtxn:
  581. with wtxn.cursor(self.dbs['t:st']) as cur:
  582. ck = self._append(cur, (pk_c,))[0]
  583. with wtxn.cursor(self.dbs['c:']) as cur:
  584. cur.put(ck, b'')
  585. with self.idx_env.begin(write=True) as wtxn:
  586. with wtxn.cursor(self.dbs['th:t']) as cur:
  587. cur.put(c_hash, ck)
  588. def remove_graph(self, graph):
  589. """
  590. Remove all triples from graph and the graph itself.
  591. :param rdflib.URIRef graph: URI of the named graph to remove.
  592. """
  593. if isinstance(graph, Graph):
  594. graph = graph.identifier
  595. self.remove((None, None, None), graph)
  596. with self.cur('c:') as cur:
  597. if cur.set_key(self._to_key(graph)):
  598. cur.delete()
  599. def commit(self):
  600. """Commit main transaction."""
  601. logger.debug('Committing transaction.')
  602. try:
  603. self.data_txn.commit()
  604. except (AttributeError, lmdb.Error):
  605. pass
  606. try:
  607. self.idx_txn.commit()
  608. except (AttributeError, lmdb.Error):
  609. pass
  610. self.is_txn_rw = None
  611. def rollback(self):
  612. """Roll back main transaction."""
  613. logger.debug('Rolling back transaction.')
  614. try:
  615. self.data_txn.abort()
  616. except (AttributeError, lmdb.Error):
  617. pass
  618. try:
  619. self.idx_txn.abort()
  620. except (AttributeError, lmdb.Error):
  621. pass
  622. self.is_txn_rw = None
  623. ## PRIVATE METHODS ##
  624. def _triple_keys(self, triple_pattern, context=None):
  625. """
  626. Generator over matching triple keys.
  627. This method is used by `triples` which returns native Python tuples,
  628. as well as by other methods that need to iterate and filter triple
  629. keys without incurring in the overhead of converting them to triples.
  630. :param tuple triple_pattern: 3 RDFLib terms
  631. :param context: Context graph or URI, or None.
  632. :type context: rdflib.term.Identifier or None
  633. """
  634. if context == self:
  635. context = None
  636. if context is not None:
  637. pk_c = self._pickle(context)
  638. ck = self._to_key(context)
  639. # Shortcuts
  640. if not ck:
  641. # Context not found.
  642. return iter(())
  643. with self.cur('c:spo') as cur:
  644. # s p o c
  645. if all(triple_pattern):
  646. spok = self._to_key(triple_pattern)
  647. if not spok:
  648. # A term in the triple is not found.
  649. return iter(())
  650. if cur.set_key_dup(ck, spok):
  651. yield spok
  652. return
  653. else:
  654. # Triple not found.
  655. return iter(())
  656. # ? ? ? c
  657. elif not any(triple_pattern):
  658. # Get all triples from the context
  659. if cur.set_key(ck):
  660. for spok in cur.iternext_dup():
  661. yield spok
  662. else:
  663. return iter(())
  664. # Regular lookup.
  665. else:
  666. yield from (
  667. spok for spok in self._lookup(triple_pattern)
  668. if cur.set_key_dup(ck, spok))
  669. else:
  670. yield from self._lookup(triple_pattern)
  671. def _init_db_environments(self, create=True):
  672. """
  673. Initialize the DB environment.
  674. The main database is kept in one file, the indices in a separate one
  675. (these may be even further split up depending on performance
  676. considerations).
  677. :param bool create: If True, the environment and its databases are
  678. created.
  679. """
  680. path = self.path
  681. if not exists(path):
  682. if create is True:
  683. makedirs(path)
  684. else:
  685. return NO_STORE
  686. if getattr(env, 'wsgi_options', False):
  687. self._workers = env.wsgi_options['workers']
  688. else:
  689. self._workers = 1
  690. logger.info('Max LMDB readers: {}'.format(self._workers))
  691. self.data_env = lmdb.open(
  692. path + '/main', subdir=False, create=create,
  693. map_size=self.MAP_SIZE, max_dbs=4,
  694. max_spare_txns=self._workers, readahead=False)
  695. self.idx_env = lmdb.open(
  696. path + '/index', subdir=False, create=create,
  697. map_size=self.MAP_SIZE, max_dbs=6,
  698. max_spare_txns=self._workers, readahead=False)
  699. # Clear stale readers.
  700. data_stale_readers = self.data_env.reader_check()
  701. idx_stale_readers = self.idx_env.reader_check()
  702. logger.debug(
  703. 'Cleared data stale readers: {}'.format(data_stale_readers))
  704. logger.debug(
  705. 'Cleared index stale readers: {}'.format(idx_stale_readers))
  706. # Open and optionally create main databases.
  707. self.dbs = {
  708. # Main databases.
  709. 't:st': self.data_env.open_db(b't:st', create=create),
  710. 'spo:c': self.data_env.open_db(
  711. b'spo:c', create=create, dupsort=True, dupfixed=True),
  712. 'c:': self.data_env.open_db(b'c:', create=create),
  713. 'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
  714. # One-off indices.
  715. 'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
  716. 'th:t': self.idx_env.open_db(b'th:t', create=create),
  717. }
  718. # Other index databases.
  719. for db_key in self.idx_keys:
  720. if db_key not in ('ns:pfx', 'th:t'):
  721. self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
  722. dupsort=True, dupfixed=True, create=create)
  723. def _from_key(self, key):
  724. """
  725. Convert a key into one or more terms.
  726. :param key: The key to be converted. It can be a
  727. :type key: bytes or memoryview
  728. compound one in which case the function will return multiple terms.
  729. :rtype: tuple(rdflib.term.Identifier)
  730. :return: The term(s) associated with the key(s). The result is always
  731. a tuple even for single results.
  732. """
  733. with self.cur('t:st') as cur:
  734. return tuple(
  735. self._unpickle(cur.get(k))
  736. for k in self._split_key(key))
  737. def _to_key(self, obj):
  738. """
  739. Convert a triple, quad or term into a key.
  740. The key is the checksum of the pickled object, therefore unique for
  741. that object. The hashing algorithm is specified in `TERM_HASH_ALGO`.
  742. :param Object obj: Anything that can be reduced to terms stored in the
  743. database. Pairs of terms, as well as triples and quads, are expressed
  744. as tuples.
  745. If more than one term is provided, the keys are concatenated.
  746. :rtype: memoryview or None
  747. :return: Keys stored for the term(s) or None if not found.
  748. """
  749. if not isinstance(obj, list) and not isinstance(obj, tuple):
  750. obj = (obj,)
  751. key = []
  752. with self.cur('th:t') as cur:
  753. for term in obj:
  754. tk = cur.get(self._hash(self._pickle(term)))
  755. if not tk:
  756. # If any of the terms is not found, return None immediately
  757. return None
  758. key.append(tk)
  759. return b''.join(key)
  760. def _hash(self, s):
  761. """Get the hash value of a serialized object."""
  762. return hashlib.new(self.TERM_HASH_ALGO, s).digest()
  763. def _split_key(self, keys):
  764. """
  765. Split a compound key into individual keys.
  766. This method relies on the fixed length of all term keys.
  767. :param keys: Concatenated keys.
  768. :type keys: bytes or memoryview
  769. :rtype: tuple(memoryview)
  770. """
  771. return tuple(
  772. keys[i:i+self.KEY_LENGTH]
  773. for i in range(0, len(keys), self.KEY_LENGTH))
  774. def _normalize_context(self, context):
  775. """
  776. Normalize a context parameter to conform to the model expectations.
  777. :param context: Context URI or graph.
  778. :type context: URIRef or Graph or None
  779. """
  780. if isinstance(context, Graph):
  781. if context == self or isinstance(context.identifier, Variable):
  782. context = None
  783. else:
  784. context = context.identifier
  785. #logger.debug('Converted graph into URI: {}'.format(context))
  786. return context
  787. def _lookup(self, triple_pattern):
  788. """
  789. Look up triples in the indices based on a triple pattern.
  790. :rtype: Iterator
  791. :return: Matching triple keys.
  792. """
  793. s, p, o = triple_pattern
  794. if s is not None:
  795. if p is not None:
  796. # s p o
  797. if o is not None:
  798. with self.cur('spo:c') as cur:
  799. tkey = self._to_key(triple_pattern)
  800. if tkey:
  801. yield tkey
  802. return
  803. else:
  804. return iter(())
  805. # s p ?
  806. else:
  807. yield from self._lookup_2bound({'s': s, 'p': p})
  808. else:
  809. # s ? o
  810. if o is not None:
  811. yield from self._lookup_2bound({'s': s, 'o': o})
  812. # s ? ?
  813. else:
  814. yield from self._lookup_1bound('s:po', s)
  815. else:
  816. if p is not None:
  817. # ? p o
  818. if o is not None:
  819. yield from self._lookup_2bound({'p': p, 'o': o})
  820. # ? p ?
  821. else:
  822. yield from self._lookup_1bound('p:so', p)
  823. else:
  824. # ? ? o
  825. if o is not None:
  826. yield from self._lookup_1bound('o:sp', o)
  827. # ? ? ?
  828. else:
  829. # Get all triples in the database.
  830. with self.cur('spo:c') as cur:
  831. yield from cur.iternext_nodup()
  832. def _lookup_1bound(self, idx_name, term):
  833. """
  834. Lookup triples for a pattern with one bound term.
  835. :param str idx_name: The index to look up as one of the keys of
  836. ``_lookup_ordering``.
  837. :param rdflib.URIRef term: Bound term to search for.
  838. :rtype: Iterator(bytes)
  839. :return: SPO keys matching the pattern.
  840. """
  841. k = self._to_key(term)
  842. if not k:
  843. return iter(())
  844. term_order = self._lookup_ordering[idx_name]
  845. with self.cur(idx_name) as cur:
  846. if cur.set_key(k):
  847. for match in cur.iternext_dup():
  848. subkeys = self._split_key(match)
  849. # Compose result.
  850. out = [None] * 3
  851. out[term_order[0]] = k
  852. out[term_order[1]] = subkeys[0]
  853. out[term_order[2]] = subkeys[1]
  854. yield b''.join(out)
  855. def _lookup_2bound(self, bound_terms):
  856. """
  857. Look up triples for a pattern with two bound terms.
  858. :param bound: terms (dict) Triple labels and terms to search for,
  859. in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
  860. URIRef('urn:o:1')}
  861. :rtype: iterator(bytes)
  862. :return: SPO keys matching the pattern.
  863. """
  864. if len(bound_terms) != 2:
  865. raise ValueError(
  866. 'Exactly 2 terms need to be bound. Got {}'.format(
  867. len(bound_terms)))
  868. # Establish lookup ranking.
  869. luc = None
  870. for k_label in self._lookup_rank:
  871. if k_label in bound_terms.keys():
  872. # First match is lookup term.
  873. if not luc:
  874. v_label = 'spo'.replace(k_label, '')
  875. # Lookup database key (cursor) name
  876. luc = k_label + ':' + v_label
  877. term_order = self._lookup_ordering[luc]
  878. # Term to look up
  879. luk = self._to_key(bound_terms[k_label])
  880. if not luk:
  881. return iter(())
  882. # Position of key in final triple.
  883. # Second match is the filter.
  884. else:
  885. # Filter key (position of sub-key in lookup results)
  886. fpos = v_label.index(k_label)
  887. # Fliter term
  888. ft = self._to_key(bound_terms[k_label])
  889. if not ft:
  890. return iter(())
  891. break
  892. # Look up in index.
  893. with self.cur(luc) as cur:
  894. if cur.set_key(luk):
  895. # Iterate over matches and filter by second term.
  896. for match in cur.iternext_dup():
  897. subkeys = self._split_key(match)
  898. flt_subkey = subkeys[fpos]
  899. if flt_subkey == ft:
  900. # Remainder (not filter) key used to complete the
  901. # triple.
  902. r_subkey = subkeys[1-fpos]
  903. # Compose result.
  904. out = [None, None, None]
  905. out[term_order[0]] = luk
  906. out[term_order[fpos+1]] = flt_subkey
  907. out[term_order[2-fpos]] = r_subkey
  908. yield b''.join(out)
  909. def _append(self, cur, values, **kwargs):
  910. """
  911. Append one or more values to the end of a database.
  912. :param lmdb.Cursor cur: The write cursor to act on.
  913. :param list(bytes) values: Value(s) to append.
  914. :rtype: list(memoryview)
  915. :return: Last key(s) inserted.
  916. """
  917. if not isinstance(values, list) and not isinstance(values, tuple):
  918. raise ValueError('Input must be a list or tuple.')
  919. data = []
  920. lastkey = cur.key() if cur.last() else None
  921. for v in values:
  922. lastkey = self._key_seq.next(lastkey)
  923. data.append((lastkey, v))
  924. cur.putmulti(data, **kwargs)
  925. return [d[0] for d in data]
  926. def _index_triple(self, action, spok):
  927. """
  928. Update index for a triple and context (add or remove).
  929. :param str action: 'add' or 'remove'.
  930. :param bytes spok: Triple key.
  931. """
  932. # Split and rearrange-join keys for association and indices.
  933. triple = self._split_key(spok)
  934. sk, pk, ok = triple
  935. spk = b''.join(triple[:2])
  936. spk = b''.join(triple[:2])
  937. sok = b''.join((triple[0], triple[2]))
  938. pok = b''.join(triple[1:3])
  939. # Associate cursor labels with k/v pairs.
  940. curs = {
  941. 's:po': (sk, pok),
  942. 'p:so': (pk, sok),
  943. 'o:sp': (ok, spk),
  944. }
  945. # Add or remove triple lookups.
  946. for clabel, terms in curs.items():
  947. with self.cur(clabel) as icur:
  948. if action == 'remove':
  949. if icur.set_key_dup(*terms):
  950. icur.delete()
  951. elif action == 'add':
  952. icur.put(*terms)
  953. else:
  954. raise ValueError(
  955. 'Index action \'{}\' is not supported.'.format(action))
  956. ## Convenience methods—not necessary for functioning but useful for
  957. ## debugging.
  958. def _keys_in_ctx(self, pk_ctx):
  959. """
  960. Convenience method to list all keys in a context.
  961. :param bytes pk_ctx: Pickled context URI.
  962. :rtype: Iterator(tuple)
  963. :return: Generator of triples.
  964. """
  965. with self.cur('c:spo') as cur:
  966. if cur.set_key(pk_ctx):
  967. tkeys = cur.iternext_dup()
  968. return {self._key_to_triple(tk) for tk in tkeys}
  969. else:
  970. return set()
  971. def _ctx_for_key(self, tkey):
  972. """
  973. Convenience method to list all contexts that a key is in.
  974. :param bytes tkey: Triple key.
  975. :rtype: Iterator(rdflib.URIRef)
  976. :return: Generator of context URIs.
  977. """
  978. with self.cur('spo:c') as cur:
  979. if cur.set_key(tkey):
  980. ctx = cur.iternext_dup()
  981. return {self._unpickle(c) for c in ctx}
  982. else:
  983. return set()