lmdb_store.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209
  1. import hashlib
  2. import logging
  3. import os
  4. from contextlib import ContextDecorator, ExitStack
  5. from os import makedirs
  6. from os.path import exists, abspath
  7. from shutil import rmtree
  8. from urllib.request import pathname2url
  9. import lmdb
  10. from rdflib import Graph, Namespace, URIRef, Variable
  11. from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
  12. from rdflib.store import Store, VALID_STORE, NO_STORE
  13. logger = logging.getLogger(__name__)
  14. def s2b(u, enc='UTF-8'):
  15. """
  16. Convert a string into a bytes object.
  17. """
  18. return u.encode(enc)
  19. def b2s(u, enc='UTF-8'):
  20. """
  21. Convert a bytes or memoryview object into a string.
  22. """
  23. return bytes(u).decode(enc)
  24. class TxnManager(ContextDecorator):
  25. """
  26. Handle ACID transactions with an LmdbStore.
  27. Wrap this within a ``with`` statement:
  28. >>> with TxnManager(store, True):
  29. ... # Do something with the database
  30. >>>
  31. The transaction will be opened and handled automatically.
  32. """
  33. def __init__(self, store, write=False):
  34. """
  35. Begin and close a transaction in a store.
  36. :param LmdbStore store: The store to open a transaction on.
  37. :param bool write: Whether the transaction is read-write. Default is
  38. ``False`` (read-only transaction).
  39. """
  40. self.store = store
  41. self.write = write
  42. def __enter__(self):
  43. # Only open a R/W transaction if one is not already open.
  44. if not self.write or not self.store.is_txn_rw:
  45. self.store.begin(write=self.write)
  46. def __exit__(self, exc_type, exc_value, traceback):
  47. if exc_type:
  48. self.store.rollback()
  49. else:
  50. self.store.commit()
  51. class LexicalSequence:
  52. """
  53. Fixed-length lexicographically ordered byte sequence.
  54. Useful to generate optimized sequences of keys in LMDB.
  55. """
  56. def __init__(self, start=1, max_len=5):
  57. """
  58. Create a new lexical sequence.
  59. :param bytes start: Starting byte value. Bytes below this value are
  60. never found in this sequence. This is useful to allot special bytes
  61. to be used e.g. as separators.
  62. :param int max_len: Maximum number of bytes that a byte string can
  63. contain. This should be chosen carefully since the number of all
  64. possible key combinations is determined by this value and the
  65. ``start`` value. The default args provide 255**5 (~1 Tn) unique
  66. combinations.
  67. """
  68. self.start = start
  69. self.length = max_len
  70. def first(self):
  71. """First possible combination."""
  72. return bytearray([self.start] * self.length)
  73. def next(self, n):
  74. """
  75. Calculate the next closest byte sequence in lexicographical order.
  76. This is used to fill the next available slot after the last one in
  77. LMDB. Keys are byte strings, which is a convenient way to keep key
  78. lengths as small as possible when they are referenced in several
  79. indices.
  80. This function assumes that all the keys are padded with the `start`
  81. value up to the `max_len` length.
  82. :param bytes n: Current byte sequence to add to.
  83. """
  84. if not n:
  85. n = self.first()
  86. elif isinstance(n, bytes) or isinstance(n, memoryview):
  87. n = bytearray(n)
  88. elif not isinstance(n, bytearray):
  89. raise ValueError('Input sequence must be bytes or a bytearray.')
  90. if not len(n) == self.length:
  91. raise ValueError('Incorrect sequence length.')
  92. for i, b in list(enumerate(n))[::-1]:
  93. try:
  94. n[i] += 1
  95. # If the value exceeds 255, i.e. the current value is the last one
  96. except ValueError:
  97. if i == 0:
  98. raise RuntimeError('BAD DAY: Sequence exhausted. No more '
  99. 'combinations are possible.')
  100. # Move one position up and try to increment that.
  101. else:
  102. n[i] = self.start
  103. continue
  104. else:
  105. return bytes(n)
  106. class LmdbStore(Store):
  107. """
  108. LMDB-backed store.
  109. This is an implementation of the RDFLib Store interface:
  110. https://github.com/RDFLib/rdflib/blob/master/rdflib/store.py
  111. Handles the interaction with a LMDB store and builds an abstraction layer
  112. for triples.
  113. This store class uses two LMDB environments (i.e. two files): one for the
  114. main (preservation-worthy) data and the other for the index data which
  115. can be rebuilt from the main database.
  116. There are 4 main data sets (preservation worthy data):
  117. - `t:st` (term key: serialized term; 1:1)
  118. - `spo:c` (joined S, P, O keys: context key; dupsort, dupfixed)
  119. - `c:` (context keys only, values are the empty bytestring; 1:1)
  120. - `pfx:ns` (prefix: pickled namespace; 1:1)
  121. And 6 indices to optimize lookup for all possible bound/unbound term
  122. combination in a triple:
  123. - `th:t` (term hash: term key; 1:1)
  124. - `s:po` (S key: joined P, O keys; dupsort, dupfixed)
  125. - `p:so` (P key: joined S, O keys; dupsort, dupfixed)
  126. - `o:sp` (O key: joined S, P keys; dupsort, dupfixed)
  127. - `c:spo` (context → triple association; dupsort, dupfixed)
  128. - `ns:pfx` (pickled namespace: prefix; 1:1)
  129. The default graph is defined in :data:`RDFLIB_DEFAULT_GRAPH_URI`. Adding
  130. triples without context will add to this graph. Looking up triples without
  131. context (also in a SPARQL query) will look in the union graph instead of
  132. in the default graph. Also, removing triples without specifying a context
  133. will remove triples from all contexts.
  134. """
  135. context_aware = True
  136. # This is a hassle to maintain for no apparent gain. If some use is devised
  137. # in the future, it may be revised.
  138. formula_aware = False
  139. graph_aware = True
  140. transaction_aware = True
  141. MAP_SIZE = 1024 ** 4 # 1Tb
  142. """
  143. LMDB map size. See http://lmdb.readthedocs.io/en/release/#environment-class
  144. """
  145. TERM_HASH_ALGO = 'sha1'
  146. """
  147. Term hashing algorithm. SHA1 is the default.
  148. """
  149. KEY_LENGTH = 5
  150. """
  151. Fixed length for term keys.
  152. 4 or 5 is a safe range. 4 allows for ~4 billion (256 ** 4) unique terms
  153. in the store. 5 allows ~1 trillion terms. While these numbers may seem
  154. huge (the total number of Internet pages indexed by Google as of 2018 is 45
  155. billions), it must be reminded that the keys cannot be reused, so a
  156. repository that deletes a lot of triples may burn through a lot of terms.
  157. If a repository runs ot of keys it can no longer store new terms and must
  158. be migrated to a new database, which will regenerate and compact the keys.
  159. For smaller repositories it should be safe to set this value to 4, which
  160. could improve performance since keys make up the vast majority of record
  161. exchange between the store and the application. However it is sensible not
  162. to expose this value as a configuration option.
  163. """
  164. KEY_START = 1
  165. """
  166. Lexical sequence start. ``\\x01`` is fine since no special characters are
  167. used, but it's good to leave a spare for potential future use.
  168. """
  169. data_keys = (
  170. # Term key to serialized term content: 1:1
  171. 't:st',
  172. # Joined triple keys to context key: 1:m, fixed-length values
  173. 'spo:c',
  174. # This has empty values and is used to keep track of empty contexts.
  175. 'c:',
  176. # Prefix to namespace: 1:1
  177. 'pfx:ns',
  178. )
  179. idx_keys = (
  180. # Namespace to prefix: 1:1
  181. 'ns:pfx',
  182. # Term hash to triple key: 1:1
  183. 'th:t',
  184. # Lookups: 1:m, fixed-length values
  185. 's:po', 'p:so', 'o:sp', 'c:spo',
  186. )
  187. _lookup_rank = ('s', 'o', 'p')
  188. """
  189. Order in which keys are looked up if two terms are bound.
  190. The indices with the smallest average number of values per key should be
  191. looked up first.
  192. If we want to get fancy, this can be rebalanced from time to time by
  193. looking up the number of keys in (s:po, p:so, o:sp).
  194. """
  195. _lookup_ordering = {
  196. 's:po': (0, 1, 2),
  197. 'p:so': (1, 0, 2),
  198. 'o:sp': (2, 0, 1),
  199. }
  200. """
  201. Order of terms in the lookup indices. Used to rebuild a triple from lookup.
  202. """
  203. data_env = None
  204. idx_env = None
  205. db = None
  206. dbs = {}
  207. data_txn = None
  208. idx_txn = None
  209. is_txn_rw = None
  210. def __init__(self, path, identifier=None):
  211. self.path = path
  212. self.__open = False
  213. self.identifier = identifier or URIRef(pathname2url(abspath(path)))
  214. super().__init__(path)
  215. self._pickle = self.node_pickler.dumps
  216. self._unpickle = self.node_pickler.loads
  217. self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
  218. def __del__(self):
  219. """Properly close store for garbage collection."""
  220. self.close(True)
  221. def __len__(self, context=None):
  222. """
  223. Return length of the dataset.
  224. :param context: Context to restrict count to.
  225. :type context: rdflib.URIRef or rdflib.Graph
  226. """
  227. context = self._normalize_context(context)
  228. if context is not None:
  229. #dataset = self.triples((None, None, None), context)
  230. with self.cur('c:spo') as cur:
  231. if cur.set_key(self._to_key(context)):
  232. return sum(1 for _ in cur.iternext_dup())
  233. else:
  234. return 0
  235. else:
  236. return self.data_txn.stat(self.dbs['spo:c'])['entries']
  237. @property
  238. def is_open(self):
  239. return self.__open
  240. def open(self, configuration=None, create=True):
  241. """
  242. Open the database.
  243. The database is best left open for the lifespan of the server. Read
  244. transactions can be opened as needed. Write transaction should be
  245. opened and closed within a single HTTP request to ensure atomicity of
  246. the request.
  247. This method is called outside of the main transaction. All cursors
  248. are created separately within the transaction.
  249. """
  250. self._init_db_environments(create)
  251. if self.data_env == NO_STORE:
  252. return NO_STORE
  253. self.__open = True
  254. return VALID_STORE
  255. def begin(self, write=False):
  256. """
  257. Begin the main write transaction and create cursors.
  258. """
  259. if not self.is_open:
  260. raise RuntimeError('Store must be opened first.')
  261. logger.debug('Beginning a {} transaction.'.format(
  262. 'read/write' if write else 'read-only'))
  263. self.data_txn = self.data_env.begin(buffers=True, write=write)
  264. self.idx_txn = self.idx_env.begin(buffers=True, write=write)
  265. self.is_txn_rw = write
  266. def stats(self):
  267. """Gather statistics about the database."""
  268. stats = {
  269. 'data_db_stats': {
  270. db_label: self.data_txn.stat(self.dbs[db_label])
  271. for db_label in self.data_keys},
  272. 'idx_db_stats': {
  273. db_label: self.idx_txn.stat(self.dbs[db_label])
  274. for db_label in self.idx_keys},
  275. 'data_db_size': os.stat(self.data_env.path()).st_size,
  276. 'idx_db_size': os.stat(self.idx_env.path()).st_size,
  277. 'num_triples': len(self),
  278. }
  279. return stats
  280. @property
  281. def is_txn_open(self):
  282. """Whether the main transaction is open."""
  283. try:
  284. self.data_txn.id()
  285. self.idx_txn.id()
  286. except (lmdb.Error, AttributeError) as e:
  287. #logger.info('Main transaction does not exist or is closed.')
  288. return False
  289. else:
  290. #logger.info('Main transaction is open.')
  291. return True
  292. def cur(self, index):
  293. """Return a new cursor by its index."""
  294. if index in self.idx_keys:
  295. txn = self.idx_txn
  296. src = self.idx_keys
  297. elif index in self.data_keys:
  298. txn = self.data_txn
  299. src = self.data_keys
  300. else:
  301. return ValueError('Cursor key not found.')
  302. return txn.cursor(self.dbs[index])
  303. def get_data_cursors(self, txn):
  304. """
  305. Build the main data cursors for a transaction.
  306. :param lmdb.Transaction txn: This can be a read or write transaction.
  307. :rtype: dict(string, lmdb.Cursor)
  308. :return: Keys are index labels, values are index cursors.
  309. """
  310. return {
  311. 'tk:t': txn.cursor(self.dbs['tk:t']),
  312. 'tk:c': txn.cursor(self.dbs['tk:c']),
  313. 'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
  314. }
  315. def get_idx_cursors(self, txn):
  316. """
  317. Build the index cursors for a transaction.
  318. :param lmdb.Transaction txn: This can be a read or write transaction.
  319. :rtype: dict(string, lmdb.Cursor)
  320. :return: dict of index labels, index cursors.
  321. """
  322. return {
  323. key: txn.cursor(self.dbs[key])
  324. for key in self.idx_keys}
  325. def close(self, commit_pending_transaction=False):
  326. """
  327. Close the database connection.
  328. Do this at server shutdown.
  329. """
  330. self.__open = False
  331. if self.is_txn_open:
  332. if commit_pending_transaction:
  333. self.commit()
  334. else:
  335. self.rollback()
  336. self.data_env.close()
  337. self.idx_env.close()
  338. def destroy(self, path):
  339. """
  340. Destroy the store.
  341. https://www.youtube.com/watch?v=lIVq7FJnPwg
  342. :param str path: Path of the folder containing the database(s).
  343. """
  344. if exists(path):
  345. rmtree(path)
  346. def add(self, triple, context=None, quoted=False):
  347. """
  348. Add a triple and start indexing.
  349. :param tuple(rdflib.Identifier) triple: Tuple of three identifiers.
  350. :param context: Context identifier. ``None`` inserts in the default
  351. graph.
  352. :type context: rdflib.Identifier or None
  353. :param bool quoted: Not used.
  354. """
  355. context = self._normalize_context(context)
  356. if context is None:
  357. context = RDFLIB_DEFAULT_GRAPH_URI
  358. Store.add(self, triple, context)
  359. #logger.info('Adding triple: {}'.format(triple))
  360. pk_trp = self._pickle(triple)
  361. pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
  362. #logger.debug('Adding quad: {} {}'.format(triple, context))
  363. pk_c = self._pickle(context)
  364. # Add new individual terms or gather keys for existing ones.
  365. keys = [None, None, None, None]
  366. with self.cur('th:t') as icur:
  367. for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
  368. thash = self._hash(pk_t)
  369. if icur.set_key(thash):
  370. keys[i] = bytes(icur.value())
  371. else:
  372. # Put new term.
  373. with self.cur('t:st') as dcur:
  374. keys[i] = self._append(dcur, (pk_t,))[0]
  375. # Index.
  376. icur.put(thash, keys[i])
  377. # Add context in context DB.
  378. ck = keys[3]
  379. with self.cur('c:') as cur:
  380. if not cur.set_key(ck):
  381. cur.put(ck, b'')
  382. # Add triple:context association.
  383. spok = b''.join(keys[:3])
  384. with self.cur('spo:c') as dcur:
  385. if not dcur.set_key_dup(spok, ck):
  386. dcur.put(spok, ck)
  387. # Index spo:c association.
  388. with self.cur('c:spo') as icur:
  389. icur.put(ck, spok)
  390. self._index_triple('add', spok)
  391. def remove(self, triple_pattern, context=None):
  392. """
  393. Remove triples by a pattern.
  394. :param tuple:rdflib.term.Identifier|None triple_pattern: 3-tuple of
  395. either RDF terms or None, indicating the triple(s) to be removed.
  396. None is used as a wildcard.
  397. :param context: Context to remove the triples from. If None (the
  398. default) the matching triples are removed from all contexts.
  399. :type context: rdflib.term.Identifier or None
  400. """
  401. #logger.debug('Removing triples by pattern: {} on context: {}'.format(
  402. # triple_pattern, context))
  403. context = self._normalize_context(context)
  404. if context is not None:
  405. ck = self._to_key(context)
  406. # If context is specified but not found, return to avoid deleting
  407. # the wrong triples.
  408. if not ck:
  409. return
  410. else:
  411. ck = None
  412. with self.cur('spo:c') as dcur:
  413. with self.cur('c:spo') as icur:
  414. match_set = {bytes(k) for k in self._triple_keys(
  415. triple_pattern, context)}
  416. # Delete context association.
  417. if ck:
  418. for spok in match_set:
  419. if dcur.set_key_dup(spok, ck):
  420. dcur.delete()
  421. if icur.set_key_dup(ck, spok):
  422. icur.delete()
  423. self._index_triple('remove', spok)
  424. # If no context is specified, remove all associations.
  425. else:
  426. for spok in match_set:
  427. if dcur.set_key(spok):
  428. for cck in (bytes(k) for k in dcur.iternext_dup()):
  429. # Delete index first while we have the
  430. # context reference.
  431. if icur.set_key_dup(cck, spok):
  432. icur.delete()
  433. # Then delete the main entry.
  434. dcur.set_key(spok)
  435. dcur.delete(dupdata=True)
  436. self._index_triple('remove', spok)
  437. def triples(self, triple_pattern, context=None):
  438. """
  439. Generator over matching triples.
  440. :param tuple triple_pattern: 3 RDFLib terms
  441. :param context: Context graph, if available.
  442. :type context: rdflib.Graph or None
  443. :rtype: Iterator
  444. :return: Generator over triples and contexts in which each result has
  445. the following format::
  446. (s, p, o), generator(contexts)
  447. Where the contexts generator lists all context that the triple appears
  448. in.
  449. """
  450. #logger.debug('Getting triples for pattern: {} and context: {}'.format(
  451. # triple_pattern, context))
  452. # This sounds strange, RDFLib should be passing None at this point,
  453. # but anyway...
  454. context = self._normalize_context(context)
  455. with self.cur('spo:c') as cur:
  456. for spok in self._triple_keys(triple_pattern, context):
  457. if context is not None:
  458. contexts = (Graph(identifier=context),)
  459. else:
  460. if cur.set_key(spok):
  461. contexts = tuple(
  462. Graph(identifier=self._from_key(ck)[0], store=self)
  463. for ck in cur.iternext_dup())
  464. #print('Found triples: {} In contexts: {}'.format(
  465. # self._from_key(spok), contexts))
  466. yield self._from_key(spok), contexts
  467. def all_terms(self, term_type):
  468. """
  469. Return all terms of a type (``s``, ``p``, or ``o``) in the store.
  470. :param str term_type: one of ``s``, ``p`` or ``o``.
  471. :rtype: Iterator(rdflib.term.Identifier)
  472. :return: Iterator of all terms.
  473. :raise ValueError: if the term type is not one of the expected values.
  474. """
  475. if term_type == 's':
  476. idx_label = 's:po'
  477. elif term_type == 'p':
  478. idx_label = 'p:so'
  479. elif term_type == 'o':
  480. idx_label = 'o:sp'
  481. else:
  482. raise ValueError('Term type must be \'s\', \'p\' or \'o\'.')
  483. with self.cur(idx_label) as cur:
  484. for key in cur.iternext_nodup():
  485. yield self._from_key(key)[0]
  486. def bind(self, prefix, namespace):
  487. """
  488. Bind a prefix to a namespace.
  489. :param str prefix: Namespace prefix.
  490. :param rdflib.URIRef namespace: Fully qualified URI of namespace.
  491. """
  492. prefix = s2b(prefix)
  493. namespace = s2b(namespace)
  494. if self.is_txn_rw:
  495. with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
  496. cur.put(prefix, namespace)
  497. with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
  498. cur.put(namespace, prefix)
  499. else:
  500. with self.data_env.begin(write=True) as wtxn:
  501. with wtxn.cursor(self.dbs['pfx:ns']) as cur:
  502. cur.put(prefix, namespace)
  503. with self.idx_env.begin(write=True) as wtxn:
  504. with wtxn.cursor(self.dbs['ns:pfx']) as cur:
  505. cur.put(namespace, prefix)
  506. def namespace(self, prefix):
  507. """
  508. Get the namespace for a prefix.
  509. :param str prefix: Namespace prefix.
  510. """
  511. with self.cur('pfx:ns') as cur:
  512. ns = cur.get(s2b(prefix))
  513. return Namespace(b2s(ns)) if ns is not None else None
  514. def prefix(self, namespace):
  515. """
  516. Get the prefix associated with a namespace.
  517. **Note:** A namespace can be only bound to one prefix in this
  518. implementation.
  519. :param rdflib.Namespace namespace: Fully qualified namespace.
  520. :rtype: str or None
  521. """
  522. with self.cur('ns:pfx') as cur:
  523. prefix = cur.get(s2b(namespace))
  524. return b2s(prefix) if prefix is not None else None
  525. def namespaces(self):
  526. """Get an iterator of all prefix: namespace bindings.
  527. :rtype: Iterator(tuple(str, rdflib.Namespace))
  528. """
  529. with self.cur('pfx:ns') as cur:
  530. for pfx, ns in iter(cur):
  531. yield (b2s(pfx), Namespace(b2s(ns)))
  532. def contexts(self, triple=None):
  533. """
  534. Get a list of all contexts.
  535. :rtype: Iterator(rdflib.Graph)
  536. """
  537. if triple and any(triple):
  538. with self.cur('spo:c') as cur:
  539. if cur.set_key(self._to_key(triple)):
  540. for ctx_uri in cur.iternext_dup():
  541. yield Graph(
  542. identifier=self._from_key(ctx_uri)[0], store=self)
  543. else:
  544. with self.cur('c:') as cur:
  545. for ctx_uri in cur.iternext(values=False):
  546. yield Graph(
  547. identifier=self._from_key(ctx_uri)[0], store=self)
  548. def add_graph(self, graph):
  549. """
  550. Add a graph to the database.
  551. This creates an empty graph by associating the graph URI with the
  552. pickled `None` value. This prevents from removing the graph when all
  553. triples are removed.
  554. This may be called by read-only operations:
  555. https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
  556. Therefore it needs to open a write transaction. This is not ideal
  557. but the only way to handle datasets in RDFLib.
  558. :param rdflib.URIRef graph: URI of the named graph to add.
  559. """
  560. if isinstance(graph, Graph):
  561. graph = graph.identifier
  562. pk_c = self._pickle(graph)
  563. c_hash = self._hash(pk_c)
  564. with self.cur('th:t') as cur:
  565. c_exists = cur.set_key(c_hash)
  566. if not c_exists:
  567. # Insert context term if not existing.
  568. if self.is_txn_rw:
  569. # Use existing R/W transaction.
  570. with self.cur('t:st') as cur:
  571. ck = self._append(cur, (pk_c,))[0]
  572. with self.cur('th:t') as cur:
  573. cur.put(c_hash, ck)
  574. with self.cur('c:') as cur:
  575. cur.put(ck, b'')
  576. else:
  577. # Open new R/W transactions.
  578. with self.data_env.begin(write=True) as wtxn:
  579. with wtxn.cursor(self.dbs['t:st']) as cur:
  580. ck = self._append(cur, (pk_c,))[0]
  581. with wtxn.cursor(self.dbs['c:']) as cur:
  582. cur.put(ck, b'')
  583. with self.idx_env.begin(write=True) as wtxn:
  584. with wtxn.cursor(self.dbs['th:t']) as cur:
  585. cur.put(c_hash, ck)
  586. def remove_graph(self, graph):
  587. """
  588. Remove all triples from graph and the graph itself.
  589. :param rdflib.URIRef graph: URI of the named graph to remove.
  590. """
  591. if isinstance(graph, Graph):
  592. graph = graph.identifier
  593. self.remove((None, None, None), graph)
  594. with self.cur('c:') as cur:
  595. if cur.set_key(self._to_key(graph)):
  596. cur.delete()
  597. def commit(self):
  598. """Commit main transaction."""
  599. logger.debug('Committing transaction.')
  600. try:
  601. self.data_txn.commit()
  602. except (AttributeError, lmdb.Error):
  603. pass
  604. try:
  605. self.idx_txn.commit()
  606. except (AttributeError, lmdb.Error):
  607. pass
  608. self.is_txn_rw = None
  609. def rollback(self):
  610. """Roll back main transaction."""
  611. logger.debug('Rolling back transaction.')
  612. try:
  613. self.data_txn.abort()
  614. except (AttributeError, lmdb.Error):
  615. pass
  616. try:
  617. self.idx_txn.abort()
  618. except (AttributeError, lmdb.Error):
  619. pass
  620. self.is_txn_rw = None
  621. ## PRIVATE METHODS ##
  622. def _triple_keys(self, triple_pattern, context=None):
  623. """
  624. Generator over matching triple keys.
  625. This method is used by `triples` which returns native Python tuples,
  626. as well as by other methods that need to iterate and filter triple
  627. keys without incurring in the overhead of converting them to triples.
  628. :param tuple triple_pattern: 3 RDFLib terms
  629. :param context: Context graph or URI, or None.
  630. :type context: rdflib.term.Identifier or None
  631. """
  632. if context == self:
  633. context = None
  634. if context is not None:
  635. pk_c = self._pickle(context)
  636. ck = self._to_key(context)
  637. # Shortcuts
  638. if not ck:
  639. # Context not found.
  640. return iter(())
  641. with self.cur('c:spo') as cur:
  642. # s p o c
  643. if all(triple_pattern):
  644. spok = self._to_key(triple_pattern)
  645. if not spok:
  646. # A term in the triple is not found.
  647. return iter(())
  648. if cur.set_key_dup(ck, spok):
  649. yield spok
  650. return
  651. else:
  652. # Triple not found.
  653. return iter(())
  654. # ? ? ? c
  655. elif not any(triple_pattern):
  656. # Get all triples from the context
  657. if cur.set_key(ck):
  658. for spok in cur.iternext_dup():
  659. yield spok
  660. else:
  661. return iter(())
  662. # Regular lookup.
  663. else:
  664. yield from (
  665. spok for spok in self._lookup(triple_pattern)
  666. if cur.set_key_dup(ck, spok))
  667. else:
  668. yield from self._lookup(triple_pattern)
  669. def _init_db_environments(self, create=True):
  670. """
  671. Initialize the DB environment.
  672. The main database is kept in one file, the indices in a separate one
  673. (these may be even further split up depending on performance
  674. considerations).
  675. :param bool create: If True, the environment and its databases are
  676. created.
  677. """
  678. path = self.path
  679. if not exists(path):
  680. if create is True:
  681. makedirs(path)
  682. else:
  683. return NO_STORE
  684. self.data_env = lmdb.open(path + '/main', subdir=False, create=create,
  685. map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
  686. self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
  687. map_size=self.MAP_SIZE, max_dbs=6, readahead=False)
  688. # Clear stale readers.
  689. data_stale_readers = self.data_env.reader_check()
  690. idx_stale_readers = self.idx_env.reader_check()
  691. logger.debug(
  692. 'Cleared data stale readers: {}'.format(data_stale_readers))
  693. logger.debug(
  694. 'Cleared index stale readers: {}'.format(idx_stale_readers))
  695. # Open and optionally create main databases.
  696. self.dbs = {
  697. # Main databases.
  698. 't:st': self.data_env.open_db(b't:st', create=create),
  699. 'spo:c': self.data_env.open_db(
  700. b'spo:c', create=create, dupsort=True, dupfixed=True),
  701. 'c:': self.data_env.open_db(b'c:', create=create),
  702. 'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
  703. # One-off indices.
  704. 'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
  705. 'th:t': self.idx_env.open_db(b'th:t', create=create),
  706. }
  707. # Other index databases.
  708. for db_key in self.idx_keys:
  709. if db_key not in ('ns:pfx', 'th:t'):
  710. self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
  711. dupsort=True, dupfixed=True, create=create)
  712. def _from_key(self, key):
  713. """
  714. Convert a key into one or more terms.
  715. :param key: The key to be converted. It can be a
  716. :type key: bytes or memoryview
  717. compound one in which case the function will return multiple terms.
  718. :rtype: tuple(rdflib.term.Identifier)
  719. :return: The term(s) associated with the key(s). The result is always
  720. a tuple even for single results.
  721. """
  722. with self.cur('t:st') as cur:
  723. return tuple(
  724. self._unpickle(cur.get(k))
  725. for k in self._split_key(key))
  726. def _to_key(self, obj):
  727. """
  728. Convert a triple, quad or term into a key.
  729. The key is the checksum of the pickled object, therefore unique for
  730. that object. The hashing algorithm is specified in `TERM_HASH_ALGO`.
  731. :param Object obj: Anything that can be reduced to terms stored in the
  732. database. Pairs of terms, as well as triples and quads, are expressed
  733. as tuples.
  734. If more than one term is provided, the keys are concatenated.
  735. :rtype: memoryview
  736. :return: Keys stored for the term(s)
  737. """
  738. if not isinstance(obj, list) and not isinstance(obj, tuple):
  739. obj = (obj,)
  740. key = []
  741. with self.cur('th:t') as cur:
  742. for term in obj:
  743. tk = cur.get(self._hash(self._pickle(term)))
  744. if not tk:
  745. # If any of the terms is not found, return None immediately
  746. return None
  747. key.append(tk)
  748. return b''.join(key)
  749. def _hash(self, s):
  750. """Get the hash value of a serialized object."""
  751. return hashlib.new(self.TERM_HASH_ALGO, s).digest()
  752. def _split_key(self, keys):
  753. """
  754. Split a compound key into individual keys.
  755. This method relies on the fixed length of all term keys.
  756. :param keys: Concatenated keys.
  757. :type keys: bytes or memoryview
  758. :rtype: tuple(memoryview)
  759. """
  760. return tuple(
  761. keys[i:i+self.KEY_LENGTH]
  762. for i in range(0, len(keys), self.KEY_LENGTH))
  763. def _normalize_context(self, context):
  764. """
  765. Normalize a context parameter to conform to the model expectations.
  766. :param context: Context URI or graph.
  767. :type context: URIRef or Graph or None
  768. """
  769. if isinstance(context, Graph):
  770. if context == self or isinstance(context.identifier, Variable):
  771. context = None
  772. else:
  773. context = context.identifier
  774. #logger.debug('Converted graph into URI: {}'.format(context))
  775. return context
  776. def _lookup(self, triple_pattern):
  777. """
  778. Look up triples in the indices based on a triple pattern.
  779. :rtype: Iterator
  780. :return: Matching triple keys.
  781. """
  782. s, p, o = triple_pattern
  783. if s is not None:
  784. if p is not None:
  785. # s p o
  786. if o is not None:
  787. with self.cur('spo:c') as cur:
  788. tkey = self._to_key(triple_pattern)
  789. if cur.set_key(tkey):
  790. yield tkey
  791. return
  792. else:
  793. return iter(())
  794. # s p ?
  795. else:
  796. yield from self._lookup_2bound({'s': s, 'p': p})
  797. else:
  798. # s ? o
  799. if o is not None:
  800. yield from self._lookup_2bound({'s': s, 'o': o})
  801. # s ? ?
  802. else:
  803. yield from self._lookup_1bound('s', s)
  804. else:
  805. if p is not None:
  806. # ? p o
  807. if o is not None:
  808. yield from self._lookup_2bound({'p': p, 'o': o})
  809. # ? p ?
  810. else:
  811. yield from self._lookup_1bound('p', p)
  812. else:
  813. # ? ? o
  814. if o is not None:
  815. yield from self._lookup_1bound('o', o)
  816. # ? ? ?
  817. else:
  818. # Get all triples in the database.
  819. with self.cur('spo:c') as cur:
  820. yield from cur.iternext_nodup()
  821. def _lookup_1bound(self, label, term):
  822. """
  823. Lookup triples for a pattern with one bound term.
  824. :param str label: Which term is being searched for. One of `s`,
  825. `p`, or `o`.
  826. :param rdflib.URIRef term: Bound term to search for.
  827. :rtype: iterator(bytes)
  828. :return: SPO keys matching the pattern.
  829. """
  830. k = self._to_key(term)
  831. if not k:
  832. return iter(())
  833. idx_name = '{}:{}'.format(label, 'spo'.replace(label, ''))
  834. term_order = self._lookup_ordering[idx_name]
  835. with self.cur(idx_name) as cur:
  836. if cur.set_key(k):
  837. for match in cur.iternext_dup():
  838. subkeys = self._split_key(match)
  839. # Compose result.
  840. out = [None, None, None]
  841. out[term_order[0]] = k
  842. out[term_order[1]] = subkeys[0]
  843. out[term_order[2]] = subkeys[1]
  844. yield b''.join(out)
  845. def _lookup_2bound(self, bound_terms):
  846. """
  847. Look up triples for a pattern with two bound terms.
  848. :param bound: terms (dict) Triple labels and terms to search for,
  849. in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
  850. URIRef('urn:o:1')}
  851. :rtype: iterator(bytes)
  852. :return: SPO keys matching the pattern.
  853. """
  854. if len(bound_terms) != 2:
  855. raise ValueError(
  856. 'Exactly 2 terms need to be bound. Got {}'.format(
  857. len(bound_terms)))
  858. # Establish lookup ranking.
  859. luc = None
  860. for k_label in self._lookup_rank:
  861. if k_label in bound_terms.keys():
  862. # First match is lookup term.
  863. if not luc:
  864. v_label = 'spo'.replace(k_label, '')
  865. # Lookup database key (cursor) name
  866. luc = k_label + ':' + v_label
  867. term_order = self._lookup_ordering[luc]
  868. # Term to look up
  869. luk = self._to_key(bound_terms[k_label])
  870. if not luk:
  871. return iter(())
  872. # Position of key in final triple.
  873. # Second match is the filter.
  874. else:
  875. # Filter key (position of sub-key in lookup results)
  876. fpos = v_label.index(k_label)
  877. # Fliter term
  878. ft = self._to_key(bound_terms[k_label])
  879. if not ft:
  880. return iter(())
  881. break
  882. # Look up in index.
  883. with self.cur(luc) as cur:
  884. if cur.set_key(luk):
  885. # Iterate over matches and filter by second term.
  886. for match in cur.iternext_dup():
  887. subkeys = self._split_key(match)
  888. flt_subkey = subkeys[fpos]
  889. if flt_subkey == ft:
  890. # Remainder (not filter) key used to complete the
  891. # triple.
  892. r_subkey = subkeys[1-fpos]
  893. # Compose result.
  894. out = [None, None, None]
  895. out[term_order[0]] = luk
  896. out[term_order[fpos+1]] = flt_subkey
  897. out[term_order[2-fpos]] = r_subkey
  898. yield b''.join(out)
  899. def _append(self, cur, values, **kwargs):
  900. """
  901. Append one or more values to the end of a database.
  902. :param lmdb.Cursor cur: The write cursor to act on.
  903. :param list(bytes) values: Value(s) to append.
  904. :rtype: list(memoryview)
  905. :return: Last key(s) inserted.
  906. """
  907. if not isinstance(values, list) and not isinstance(values, tuple):
  908. raise ValueError('Input must be a list or tuple.')
  909. data = []
  910. lastkey = cur.key() if cur.last() else None
  911. for v in values:
  912. lastkey = self._key_seq.next(lastkey)
  913. data.append((lastkey, v))
  914. cur.putmulti(data, **kwargs)
  915. return [d[0] for d in data]
  916. def _index_triple(self, action, spok):
  917. """
  918. Update index for a triple and context (add or remove).
  919. :param str action: 'add' or 'remove'.
  920. :param bytes spok: Triple key.
  921. """
  922. # Split and rearrange-join keys for association and indices.
  923. triple = self._split_key(spok)
  924. sk, pk, ok = triple
  925. spk = b''.join(triple[:2])
  926. spk = b''.join(triple[:2])
  927. sok = b''.join((triple[0], triple[2]))
  928. pok = b''.join(triple[1:3])
  929. # Associate cursor labels with k/v pairs.
  930. curs = {
  931. 's:po': (sk, pok),
  932. 'p:so': (pk, sok),
  933. 'o:sp': (ok, spk),
  934. }
  935. # Add or remove triple lookups.
  936. for clabel, terms in curs.items():
  937. with self.cur(clabel) as icur:
  938. if action == 'remove':
  939. if icur.set_key_dup(*terms):
  940. icur.delete()
  941. elif action == 'add':
  942. icur.put(*terms)
  943. else:
  944. raise ValueError(
  945. 'Index action \'{}\' is not supported.'.format(action))
  946. ## Convenience methods—not necessary for functioning but useful for
  947. ## debugging.
  948. def _keys_in_ctx(self, pk_ctx):
  949. """
  950. Convenience method to list all keys in a context.
  951. :param bytes pk_ctx: Pickled context URI.
  952. :rtype: Iterator(tuple)
  953. :return: Generator of triples.
  954. """
  955. with self.cur('c:spo') as cur:
  956. if cur.set_key(pk_ctx):
  957. tkeys = cur.iternext_dup()
  958. return {self._key_to_triple(tk) for tk in tkeys}
  959. else:
  960. return set()
  961. def _ctx_for_key(self, tkey):
  962. """
  963. Convenience method to list all contexts that a key is in.
  964. :param bytes tkey: Triple key.
  965. :rtype: Iterator(rdflib.URIRef)
  966. :return: Generator of context URIs.
  967. """
  968. with self.cur('spo:c') as cur:
  969. if cur.set_key(tkey):
  970. ctx = cur.iternext_dup()
  971. return {self._unpickle(c) for c in ctx}
  972. else:
  973. return set()