lmdb_store.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. import hashlib
  2. import logging
  3. import os
  4. from contextlib import ContextDecorator, ExitStack
  5. from os import makedirs
  6. from os.path import exists, abspath
  7. from shutil import rmtree
  8. from urllib.request import pathname2url
  9. import lmdb
  10. from rdflib import Graph, Namespace, URIRef, Variable
  11. from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
  12. from rdflib.store import Store, VALID_STORE, NO_STORE
  13. logger = logging.getLogger(__name__)
  14. def s2b(u, enc='UTF-8'):
  15. '''
  16. Convert a string into a bytes object.
  17. '''
  18. return u.encode(enc)
  19. def b2s(u, enc='UTF-8'):
  20. '''
  21. Convert a bytes or memoryview object into a string.
  22. '''
  23. return bytes(u).decode(enc)
  24. class TxnManager(ContextDecorator):
  25. '''
  26. Handle ACID transactions with an LmdbStore.
  27. Wrap this within a `with` statement:
  28. >>> with TxnManager(store, True):
  29. ... # Do something with the database
  30. >>>
  31. The transaction will be opened and handled automatically.
  32. '''
  33. def __init__(self, store, write=False):
  34. '''
  35. Begin and close a transaction in a store.
  36. @param store (LmdbStore) The store to open a transaction on.
  37. @param write (bool) Whether the transaction is read-write. Default is
  38. False (read-only transaction).
  39. '''
  40. self.store = store
  41. self.write = write
  42. def __enter__(self):
  43. # Only open a R/W transaction if one is not already open.
  44. if not self.write or not self.store.is_txn_rw:
  45. self.store.begin(write=self.write)
  46. def __exit__(self, exc_type, exc_value, traceback):
  47. if exc_type:
  48. self.store.rollback()
  49. else:
  50. self.store.commit()
  51. class LexicalSequence:
  52. '''
  53. Fixed-length lexicographically ordered byte sequence.
  54. Useful to generate optimized sequences of keys in LMDB.
  55. '''
  56. def __init__(self, start=1, max_len=5):
  57. '''
  58. @param start (bytes) Starting byte value. Bytes below this value are
  59. never found in this sequence. This is useful to allot special bytes
  60. to be used e.g. as separators.
  61. @param max_len (int) Maximum number of bytes that a byte string can
  62. contain. This should be chosen carefully since the number of all
  63. possible key combinations is determined by this value and the `start`
  64. value. The default args provide 255**5 (~1 Tn) unique combinations.
  65. '''
  66. self.start = start
  67. self.length = max_len
  68. def first(self):
  69. '''
  70. First possible combination.
  71. '''
  72. return bytearray([self.start] * self.length)
  73. def next(self, n):
  74. '''
  75. Calculate the next closest byte sequence in lexicographical order.
  76. This is used to fill the next available slot after the last one in
  77. LMDB. Keys are byte strings, which is a convenient way to keep key
  78. lengths as small as possible when they are referenced in several
  79. indices.
  80. This function assumes that all the keys are padded with the `start`
  81. value up to the `max_len` length.
  82. @param n (bytes) Current byte sequence to add to.
  83. '''
  84. if not n:
  85. n = self.first()
  86. elif isinstance(n, bytes) or isinstance(n, memoryview):
  87. n = bytearray(n)
  88. elif not isinstance(n, bytearray):
  89. raise ValueError('Input sequence must be bytes or a bytearray.')
  90. if not len(n) == self.length:
  91. raise ValueError('Incorrect sequence length.')
  92. for i, b in list(enumerate(n))[::-1]:
  93. try:
  94. n[i] += 1
  95. # If the value exceeds 255, i.e. the current value is the last one
  96. except ValueError:
  97. if i == 0:
  98. raise RuntimeError('BAD DAY: Sequence exhausted. No more '
  99. 'combinations are possible.')
  100. # Move one position up and try to increment that.
  101. else:
  102. n[i] = self.start
  103. continue
  104. else:
  105. return bytes(n)
  106. class LmdbStore(Store):
  107. '''
  108. LMDB-backed store.
  109. This is an implementation of the RDFLib Store interface:
  110. https://github.com/RDFLib/rdflib/blob/master/rdflib/store.py
  111. Handles the interaction with a LMDB store and builds an abstraction layer
  112. for triples.
  113. This store class uses two LMDB environments (i.e. two files): one for the
  114. main (preservation-worthy) data and the other for the index data which
  115. can be rebuilt from the main database.
  116. There are 4 main data sets (preservation worthy data):
  117. - t:st (term key: serialized term; 1:1)
  118. - spo:c (joined S, P, O keys: context key; dupsort, dupfixed)
  119. - c: (context keys only, values are the empty bytestring; 1:1)
  120. - pfx:ns (prefix: pickled namespace; 1:1)
  121. And 6 indices to optimize lookup for all possible bound/unbound term
  122. combination in a triple:
  123. - th:t (term hash: term key; 1:1)
  124. - s:po (S key: joined P, O keys; dupsort, dupfixed)
  125. - p:so (P key: joined S, O keys; dupsort, dupfixed)
  126. - o:sp (O key: joined S, P keys; dupsort, dupfixed)
  127. - c:spo (context → triple association; dupsort, dupfixed)
  128. - ns:pfx (pickled namespace: prefix; 1:1)
  129. The default graph is defined in `RDFLIB_DEFAULT_GRAPH_URI`. Adding triples
  130. without context will add to this graph. Looking up triples without context
  131. (also in a SPARQL query) will look in the union graph instead of in the
  132. default graph. Also, removing triples without specifying a context will
  133. remove triples from all contexts.
  134. '''
  135. context_aware = True
  136. # This is a hassle to maintain for no apparent gain. If some use is devised
  137. # in the future, it may be revised.
  138. formula_aware = False
  139. graph_aware = True
  140. transaction_aware = True
  141. '''
  142. LMDB map size. See http://lmdb.readthedocs.io/en/release/#environment-class
  143. '''
  144. MAP_SIZE = 1024 ** 4 # 1Tb
  145. '''
  146. Key hashing algorithm. If you are paranoid, use SHA1. Otherwise, MD5 is
  147. faster and takes up less space (16 bytes vs. 20 bytes). This may make a
  148. visible difference because keys are generated and parsed very often.
  149. '''
  150. KEY_HASH_ALGO = 'sha1'
  151. '''
  152. Fixed length for term keys.
  153. 4 or 5 is a safe range. 4 allows for ~4 billion (256 ** 4) unique terms
  154. in the store. 5 allows ~1 trillion terms. While these numbers may seem
  155. huge (the total number of Internet pages indexed by Google as of 2018 is 45
  156. billions), it must be reminded that the keys cannot be reused, so a
  157. repository that deletes a lot of triples may burn through a lot of terms.
  158. If a repository runs ot of keys it can no longer store new terms and must
  159. be migrated to a new database, which will regenerate and compact the keys.
  160. For smaller repositories it should be safe to set this value to 4, which
  161. could improve performance since keys make up the vast majority of record
  162. exchange between the store and the application. However it is sensible not
  163. to expose this value as a configuration option.
  164. '''
  165. KEY_LENGTH = 5
  166. '''
  167. Lexical sequence start. `\x01` is fine since no special characters are used,
  168. but it's good to leave a spare for potential future use.
  169. '''
  170. KEY_START = 1
  171. data_keys = (
  172. # Term key to serialized term content: 1:1
  173. 't:st',
  174. # Joined triple keys to context key: 1:m, fixed-length values
  175. 'spo:c',
  176. # This has empty values and is used to keep track of empty contexts.
  177. 'c:',
  178. # Prefix to namespace: 1:1
  179. 'pfx:ns',
  180. )
  181. idx_keys = (
  182. # Namespace to prefix: 1:1
  183. 'ns:pfx',
  184. # Term hash to triple key: 1:1
  185. 'th:t',
  186. # Lookups: 1:m, fixed-length values
  187. 's:po', 'p:so', 'o:sp', 'c:spo',
  188. )
  189. '''
  190. Order in which keys are looked up if two terms are bound.
  191. The indices with the smallest average number of values per key should be
  192. looked up first.
  193. If we want to get fancy, this can be rebalanced from time to time by
  194. looking up the number of keys in (s:po, p:so, o:sp).
  195. '''
  196. _lookup_rank = ('s', 'o', 'p')
  197. '''
  198. Order of terms in the lookup indices. Used to rebuild a triple from lookup.
  199. '''
  200. _lookup_ordering = {
  201. 's:po': (0, 1, 2),
  202. 'p:so': (1, 0, 2),
  203. 'o:sp': (2, 0, 1),
  204. }
  205. data_env = None
  206. idx_env = None
  207. db = None
  208. dbs = {}
  209. data_txn = None
  210. idx_txn = None
  211. is_txn_rw = None
  212. def __init__(self, path, identifier=None):
  213. self.path = path
  214. self.__open = False
  215. self.identifier = identifier or URIRef(pathname2url(abspath(path)))
  216. super().__init__(path)
  217. self._pickle = self.node_pickler.dumps
  218. self._unpickle = self.node_pickler.loads
  219. self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
  220. def __del__(self):
  221. '''
  222. Properly close store for garbage collection.
  223. '''
  224. self.close(True)
  225. def __len__(self, context=None):
  226. '''
  227. Return length of the dataset.
  228. @param context (rdflib.URIRef | rdflib.Graph) Context to restrict count
  229. to.
  230. '''
  231. context = self._normalize_context(context)
  232. if context is not None:
  233. #dataset = self.triples((None, None, None), context)
  234. with self.cur('c:spo') as cur:
  235. if cur.set_key(self._to_key(context)):
  236. return sum(1 for _ in cur.iternext_dup())
  237. else:
  238. return 0
  239. else:
  240. return self.data_txn.stat(self.dbs['spo:c'])['entries']
  241. @property
  242. def is_open(self):
  243. return self.__open
  244. def open(self, configuration=None, create=True):
  245. '''
  246. Open the database.
  247. The database is best left open for the lifespan of the server. Read
  248. transactions can be opened as needed. Write transaction should be
  249. opened and closed within a single HTTP request to ensure atomicity of
  250. the request.
  251. This method is called outside of the main transaction. All cursors
  252. are created separately within the transaction.
  253. '''
  254. self._init_db_environments(create)
  255. if self.data_env == NO_STORE:
  256. return NO_STORE
  257. self.__open = True
  258. return VALID_STORE
  259. def begin(self, write=False):
  260. '''
  261. Begin the main write transaction and create cursors.
  262. '''
  263. if not self.is_open:
  264. raise RuntimeError('Store must be opened first.')
  265. logger.debug('Beginning a {} transaction.'.format(
  266. 'read/write' if write else 'read-only'))
  267. self.data_txn = self.data_env.begin(buffers=True, write=write)
  268. self.idx_txn = self.idx_env.begin(buffers=True, write=write)
  269. self.is_txn_rw = write
  270. def stats(self):
  271. '''
  272. Gather statistics about the database.
  273. '''
  274. stats = {
  275. 'data_db_stats': {
  276. db_label: self.data_txn.stat(self.dbs[db_label])
  277. for db_label in self.data_keys},
  278. 'idx_db_stats': {
  279. db_label: self.idx_txn.stat(self.dbs[db_label])
  280. for db_label in self.idx_keys},
  281. 'data_db_size': os.stat(self.data_env.path()).st_size,
  282. 'idx_db_size': os.stat(self.idx_env.path()).st_size,
  283. 'num_triples': len(self),
  284. }
  285. return stats
  286. @property
  287. def is_txn_open(self):
  288. '''
  289. Whether the main transaction is open.
  290. '''
  291. try:
  292. self.data_txn.id()
  293. self.idx_txn.id()
  294. except (lmdb.Error, AttributeError) as e:
  295. #logger.info('Main transaction does not exist or is closed.')
  296. return False
  297. else:
  298. #logger.info('Main transaction is open.')
  299. return True
  300. def cur(self, index):
  301. '''
  302. Return a new cursor by its index.
  303. '''
  304. if index in self.idx_keys:
  305. txn = self.idx_txn
  306. src = self.idx_keys
  307. elif index in self.data_keys:
  308. txn = self.data_txn
  309. src = self.data_keys
  310. else:
  311. return ValueError('Cursor key not found.')
  312. return txn.cursor(self.dbs[index])
  313. def get_data_cursors(self, txn):
  314. '''
  315. Build the main data cursors for a transaction.
  316. @param txn (lmdb.Transaction) This can be a read or write transaction.
  317. @return dict(string, lmdb.Cursor) Keys are index labels, values are
  318. index cursors.
  319. '''
  320. return {
  321. 'tk:t': txn.cursor(self.dbs['tk:t']),
  322. 'tk:c': txn.cursor(self.dbs['tk:c']),
  323. 'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
  324. }
  325. def get_idx_cursors(self, txn):
  326. '''
  327. Build the index cursors for a transaction.
  328. @param txn (lmdb.Transaction) This can be a read or write transaction.
  329. @return dict(string, lmdb.Cursor) Keys are index labels, values are
  330. index cursors.
  331. '''
  332. return {
  333. key: txn.cursor(self.dbs[key])
  334. for key in self.idx_keys}
  335. def close(self, commit_pending_transaction=False):
  336. '''
  337. Close the database connection.
  338. Do this at server shutdown.
  339. '''
  340. self.__open = False
  341. if self.is_txn_open:
  342. if commit_pending_transaction:
  343. self.commit()
  344. else:
  345. self.rollback()
  346. self.data_env.close()
  347. self.idx_env.close()
  348. def destroy(self, path):
  349. '''
  350. Destroy the store.
  351. https://www.youtube.com/watch?v=lIVq7FJnPwg
  352. @param path (string) Path of the folder containing the database(s).
  353. '''
  354. if exists(path):
  355. rmtree(path)
  356. def add(self, triple, context=None, quoted=False):
  357. '''
  358. Add a triple and start indexing.
  359. @param triple (tuple:rdflib.Identifier) Tuple of three identifiers.
  360. @param context (rdflib.Identifier | None) Context identifier.
  361. 'None' inserts in the default graph.
  362. @param quoted (bool) Not used.
  363. '''
  364. context = self._normalize_context(context)
  365. if context is None:
  366. context = RDFLIB_DEFAULT_GRAPH_URI
  367. Store.add(self, triple, context)
  368. #logger.info('Adding triple: {}'.format(triple))
  369. pk_trp = self._pickle(triple)
  370. pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
  371. #logger.debug('Adding quad: {} {}'.format(triple, context))
  372. pk_c = self._pickle(context)
  373. # Add new individual terms or gather keys for existing ones.
  374. keys = [None, None, None, None]
  375. with self.cur('th:t') as icur:
  376. for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
  377. thash = self._hash(pk_t)
  378. if icur.set_key(thash):
  379. keys[i] = bytes(icur.value())
  380. else:
  381. # Put new term.
  382. with self.cur('t:st') as dcur:
  383. keys[i] = self._append(dcur, (pk_t,))[0]
  384. # Index.
  385. icur.put(thash, keys[i])
  386. # Add context in context DB.
  387. ck = keys[3]
  388. with self.cur('c:') as cur:
  389. if not cur.set_key(ck):
  390. cur.put(ck, b'')
  391. # Add triple:context association.
  392. spok = b''.join(keys[:3])
  393. with self.cur('spo:c') as dcur:
  394. if not dcur.set_key_dup(spok, ck):
  395. dcur.put(spok, ck)
  396. # Index spo:c association.
  397. with self.cur('c:spo') as icur:
  398. icur.put(ck, spok)
  399. self._index_triple('add', spok)
  400. def remove(self, triple_pattern, context=None):
  401. '''
  402. Remove triples by a pattern.
  403. @param triple_pattern (tuple:rdflib.term.Identifier|None) 3-tuple of
  404. either RDF terms or None, indicating the triple(s) to be removed.
  405. None is used as a wildcard.
  406. @param context (rdflib.term.Identifier|None) Context to remove the
  407. triples from. If None (the default) the matching triples are removed
  408. from all contexts.
  409. '''
  410. #logger.debug('Removing triples by pattern: {} on context: {}'.format(
  411. # triple_pattern, context))
  412. context = self._normalize_context(context)
  413. if context is not None:
  414. ck = self._to_key(context)
  415. # If context is specified but not found, return to avoid deleting
  416. # the wrong triples.
  417. if not ck:
  418. return
  419. else:
  420. ck = None
  421. with self.cur('spo:c') as dcur:
  422. with self.cur('c:spo') as icur:
  423. match_set = {bytes(k) for k in self._triple_keys(
  424. triple_pattern, context)}
  425. # Delete context association.
  426. if ck:
  427. for spok in match_set:
  428. if dcur.set_key_dup(spok, ck):
  429. dcur.delete()
  430. if icur.set_key_dup(ck, spok):
  431. icur.delete()
  432. self._index_triple('remove', spok)
  433. # If no context is specified, remove all associations.
  434. else:
  435. for spok in match_set:
  436. if dcur.set_key(spok):
  437. for cck in (bytes(k) for k in dcur.iternext_dup()):
  438. # Delete index first while we have the
  439. # context reference.
  440. if icur.set_key_dup(cck, spok):
  441. icur.delete()
  442. # Then delete the main entry.
  443. dcur.set_key(spok)
  444. dcur.delete(dupdata=True)
  445. self._index_triple('remove', spok)
  446. def triples(self, triple_pattern, context=None):
  447. '''
  448. Generator over matching triples.
  449. @param triple_pattern (tuple) 3 RDFLib terms
  450. @param context (rdflib.Graph | None) Context graph, if available.
  451. @return Generator over triples and contexts in which each result has
  452. the following format:
  453. > (s, p, o), generator(contexts)
  454. Where the contexts generator lists all context that the triple appears
  455. in.
  456. '''
  457. #logger.debug('Getting triples for pattern: {} and context: {}'.format(
  458. # triple_pattern, context))
  459. # This sounds strange, RDFLib should be passing None at this point,
  460. # but anyway...
  461. context = self._normalize_context(context)
  462. with self.cur('spo:c') as cur:
  463. for spok in self._triple_keys(triple_pattern, context):
  464. if context is not None:
  465. contexts = (Graph(identifier=context),)
  466. else:
  467. if cur.set_key(spok):
  468. contexts = tuple(
  469. Graph(identifier=self._from_key(ck)[0], store=self)
  470. for ck in cur.iternext_dup())
  471. #print('Found triples: {} In contexts: {}'.format(
  472. # self._from_key(spok), contexts))
  473. yield self._from_key(spok), contexts
  474. def bind(self, prefix, namespace):
  475. '''
  476. Bind a prefix to a namespace.
  477. @param prefix (string) Namespace prefix.
  478. @param namespace (rdflib.URIRef) Fully qualified URI of namespace.
  479. '''
  480. prefix = s2b(prefix)
  481. namespace = s2b(namespace)
  482. if self.is_txn_rw:
  483. with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
  484. cur.put(prefix, namespace)
  485. with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
  486. cur.put(namespace, prefix)
  487. else:
  488. with self.data_env.begin(write=True) as wtxn:
  489. with wtxn.cursor(self.dbs['pfx:ns']) as cur:
  490. cur.put(prefix, namespace)
  491. with self.idx_env.begin(write=True) as wtxn:
  492. with wtxn.cursor(self.dbs['ns:pfx']) as cur:
  493. cur.put(namespace, prefix)
  494. def namespace(self, prefix):
  495. '''
  496. Get the namespace for a prefix.
  497. @param prefix (string) Namespace prefix.
  498. '''
  499. with self.cur('pfx:ns') as cur:
  500. ns = cur.get(s2b(prefix))
  501. return Namespace(b2s(ns)) if ns is not None else None
  502. def prefix(self, namespace):
  503. '''
  504. Get the prefix associated with a namespace.
  505. @NOTE A namespace can be only bound to one prefix in this
  506. implementation.
  507. @param namespace (rdflib.URIRef) Fully qualified URI of namespace.
  508. '''
  509. with self.cur('ns:pfx') as cur:
  510. prefix = cur.get(s2b(namespace))
  511. return b2s(prefix) if prefix is not None else None
  512. def namespaces(self):
  513. '''
  514. Get an iterator of all prefix: namespace bindings.
  515. '''
  516. with self.cur('pfx:ns') as cur:
  517. for pfx, ns in iter(cur):
  518. yield (b2s(pfx), Namespace(b2s(ns)))
  519. def contexts(self, triple=None):
  520. '''
  521. Get a list of all contexts.
  522. @return generator(Graph)
  523. '''
  524. if triple and any(triple):
  525. with self.cur('spo:c') as cur:
  526. if cur.set_key(self._to_key(triple)):
  527. for ctx_uri in cur.iternext_dup():
  528. yield Graph(
  529. identifier=self._from_key(ctx_uri)[0], store=self)
  530. else:
  531. with self.cur('c:') as cur:
  532. for ctx_uri in cur.iternext(values=False):
  533. yield Graph(
  534. identifier=self._from_key(ctx_uri)[0], store=self)
  535. def add_graph(self, graph):
  536. '''
  537. Add a graph to the database.
  538. This creates an empty graph by associating the graph URI with the
  539. pickled `None` value. This prevents from removing the graph when all
  540. triples are removed.
  541. This may be called by read-only operations:
  542. https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
  543. Therefore it needs to open a write transaction. This is not ideal
  544. but the only way to handle datasets in RDFLib.
  545. @param graph (URIRef) URI of the named graph to add.
  546. '''
  547. if isinstance(graph, Graph):
  548. graph = graph.identifier
  549. pk_c = self._pickle(graph)
  550. c_hash = self._hash(pk_c)
  551. with self.cur('th:t') as cur:
  552. c_exists = cur.set_key(c_hash)
  553. if not c_exists:
  554. # Insert context term if not existing.
  555. if self.is_txn_rw:
  556. # Use existing R/W transaction.
  557. with self.cur('t:st') as cur:
  558. ck = self._append(cur, (pk_c,))[0]
  559. with self.cur('th:t') as cur:
  560. cur.put(c_hash, ck)
  561. with self.cur('c:') as cur:
  562. cur.put(ck, b'')
  563. else:
  564. # Open new R/W transactions.
  565. with self.data_env.begin(write=True) as wtxn:
  566. with wtxn.cursor(self.dbs['t:st']) as cur:
  567. ck = self._append(cur, (pk_c,))[0]
  568. with wtxn.cursor(self.dbs['c:']) as cur:
  569. cur.put(ck, b'')
  570. with self.idx_env.begin(write=True) as wtxn:
  571. with wtxn.cursor(self.dbs['th:t']) as cur:
  572. cur.put(c_hash, ck)
  573. def remove_graph(self, graph):
  574. '''
  575. Remove all triples from graph and the graph itself.
  576. @param graph (URIRef) URI of the named graph to remove.
  577. '''
  578. if isinstance(graph, Graph):
  579. graph = graph.identifier
  580. self.remove((None, None, None), graph)
  581. with self.cur('c:') as cur:
  582. if cur.set_key(self._to_key(graph)):
  583. cur.delete()
  584. def commit(self):
  585. '''
  586. Commit main transaction and push action queue.
  587. '''
  588. logger.debug('Committing transaction.')
  589. try:
  590. self.data_txn.commit()
  591. except (AttributeError, lmdb.Error):
  592. pass
  593. try:
  594. self.idx_txn.commit()
  595. except (AttributeError, lmdb.Error):
  596. pass
  597. self.is_txn_rw = None
  598. def rollback(self):
  599. '''
  600. Roll back main transaction.
  601. '''
  602. logger.debug('Rolling back transaction.')
  603. try:
  604. self.data_txn.abort()
  605. except (AttributeError, lmdb.Error):
  606. pass
  607. try:
  608. self.idx_txn.abort()
  609. except (AttributeError, lmdb.Error):
  610. pass
  611. self.is_txn_rw = None
  612. ## PRIVATE METHODS ##
  613. def _triple_keys(self, triple_pattern, context=None):
  614. '''
  615. Generator over matching triple keys.
  616. This method is used by `triples` which returns native Python tuples,
  617. as well as by other methods that need to iterate and filter triple
  618. keys without incurring in the overhead of converting them to triples.
  619. @param triple_pattern (tuple) 3 RDFLib terms
  620. @param context (rdflib.Graph | None) Context graph or URI, or None.
  621. '''
  622. if context == self:
  623. context = None
  624. if context is not None:
  625. pk_c = self._pickle(context)
  626. ck = self._to_key(context)
  627. # Shortcuts
  628. if not ck:
  629. # Context not found.
  630. return iter(())
  631. with self.cur('c:spo') as cur:
  632. # s p o c
  633. if all(triple_pattern):
  634. spok = self._to_key(triple_pattern)
  635. if not spok:
  636. # A term in the triple is not found.
  637. return iter(())
  638. if cur.set_key_dup(ck, spok):
  639. yield spok
  640. return
  641. else:
  642. # Triple not found.
  643. return iter(())
  644. # ? ? ? c
  645. elif not any(triple_pattern):
  646. # Get all triples from the context
  647. if cur.set_key(ck):
  648. for spok in cur.iternext_dup():
  649. yield spok
  650. else:
  651. return iter(())
  652. # Regular lookup.
  653. else:
  654. yield from (
  655. spok for spok in self._lookup(triple_pattern)
  656. if cur.set_key_dup(ck, spok))
  657. else:
  658. yield from self._lookup(triple_pattern)
  659. def _init_db_environments(self, create=True):
  660. '''
  661. Initialize the DB environment.
  662. The main database is kept in one file, the indices in a separate one
  663. (these may be even further split up depending on performance
  664. considerations).
  665. @param create (bool) If True, the environment and its databases are
  666. created.
  667. '''
  668. path = self.path
  669. if not exists(path):
  670. if create is True:
  671. makedirs(path)
  672. else:
  673. return NO_STORE
  674. self.data_env = lmdb.open(path + '/main', subdir=False, create=create,
  675. map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
  676. self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
  677. map_size=self.MAP_SIZE, max_dbs=6, readahead=False)
  678. # Clear stale readers.
  679. data_stale_readers = self.data_env.reader_check()
  680. idx_stale_readers = self.idx_env.reader_check()
  681. logger.debug(
  682. 'Cleared data stale readers: {}'.format(data_stale_readers))
  683. logger.debug(
  684. 'Cleared index stale readers: {}'.format(idx_stale_readers))
  685. # Open and optionally create main databases.
  686. self.dbs = {
  687. # Main databases.
  688. 't:st': self.data_env.open_db(b't:st', create=create),
  689. 'spo:c': self.data_env.open_db(
  690. b'spo:c', create=create, dupsort=True, dupfixed=True),
  691. 'c:': self.data_env.open_db(b'c:', create=create),
  692. 'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
  693. # One-off indices.
  694. 'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
  695. 'th:t': self.idx_env.open_db(b'th:t', create=create),
  696. }
  697. # Other index databases.
  698. for db_key in self.idx_keys:
  699. if db_key not in ('ns:pfx', 'th:t'):
  700. self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
  701. dupsort=True, dupfixed=True, create=create)
  702. def _from_key(self, key):
  703. '''
  704. Convert a key into one or more terms.
  705. @param key (bytes | memoryview) The key to be converted. It can be a
  706. compound one in which case the function will return multiple terms.
  707. @return tuple
  708. '''
  709. with self.cur('t:st') as cur:
  710. return tuple(
  711. self._unpickle(cur.get(k))
  712. for k in self._split_key(key))
  713. def _to_key(self, obj):
  714. '''
  715. Convert a triple, quad or term into a key.
  716. The key is the checksum of the pickled object, therefore unique for
  717. that object. The hashing algorithm is specified in `KEY_HASH_ALGO`.
  718. @param obj (Object) Anything that can be reduced to terms stored in the
  719. database. Pairs of terms, as well as triples and quads, are expressed
  720. as tuples.
  721. If more than one term is provided, the keys are concatenated.
  722. @return bytes
  723. '''
  724. if not isinstance(obj, list) and not isinstance(obj, tuple):
  725. obj = (obj,)
  726. key = []
  727. with self.cur('th:t') as cur:
  728. for term in obj:
  729. tk = cur.get(self._hash(self._pickle(term)))
  730. if not tk:
  731. # If any of the terms is not found, return None immediately
  732. return None
  733. key.append(tk)
  734. return b''.join(key)
  735. def _hash(self, s):
  736. '''
  737. Get the hash value of a serialized object.
  738. '''
  739. return hashlib.new(self.KEY_HASH_ALGO, s).digest()
  740. def _split_key(self, keys):
  741. '''
  742. Split a compound key into individual keys.
  743. This method relies on the fixed length of all term keys.
  744. @param keys (bytes | memoryview) Concatenated keys.
  745. @return tuple: bytes | memoryview
  746. '''
  747. return tuple(
  748. keys[i:i+self.KEY_LENGTH]
  749. for i in range(0, len(keys), self.KEY_LENGTH))
  750. def _normalize_context(self, context):
  751. '''
  752. Normalize a context parameter to conform to the model expectations.
  753. @param context (URIRef | Graph | None) Context URI or graph.
  754. '''
  755. if isinstance(context, Graph):
  756. if context == self or isinstance(context.identifier, Variable):
  757. context = None
  758. else:
  759. context = context.identifier
  760. #logger.debug('Converted graph into URI: {}'.format(context))
  761. return context
  762. def _lookup(self, triple_pattern):
  763. '''
  764. Look up triples in the indices based on a triple pattern.
  765. @return iterator of matching triple keys.
  766. '''
  767. s, p, o = triple_pattern
  768. if s is not None:
  769. if p is not None:
  770. # s p o
  771. if o is not None:
  772. with self.cur('spo:c') as cur:
  773. tkey = self._to_key(triple_pattern)
  774. if cur.set_key(tkey):
  775. yield tkey
  776. return
  777. else:
  778. return iter(())
  779. # s p ?
  780. else:
  781. yield from self._lookup_2bound({'s': s, 'p': p})
  782. else:
  783. # s ? o
  784. if o is not None:
  785. yield from self._lookup_2bound({'s': s, 'o': o})
  786. # s ? ?
  787. else:
  788. yield from self._lookup_1bound('s', s)
  789. else:
  790. if p is not None:
  791. # ? p o
  792. if o is not None:
  793. yield from self._lookup_2bound({'p': p, 'o': o})
  794. # ? p ?
  795. else:
  796. yield from self._lookup_1bound('p', p)
  797. else:
  798. # ? ? o
  799. if o is not None:
  800. yield from self._lookup_1bound('o', o)
  801. # ? ? ?
  802. else:
  803. # Get all triples in the database.
  804. with self.cur('spo:c') as cur:
  805. yield from cur.iternext_nodup()
  806. def _lookup_1bound(self, label, term):
  807. '''
  808. Lookup triples for a pattern with one bound term.
  809. @param label (string) Which term is being searched for. One of `s`,
  810. `p`, or `o`.
  811. @param term (rdflib.URIRef) Bound term to search for.
  812. @return iterator(bytes) SPO keys matching the pattern.
  813. '''
  814. k = self._to_key(term)
  815. if not k:
  816. return iter(())
  817. idx_name = '{}:{}'.format(label, 'spo'.replace(label, ''))
  818. term_order = self._lookup_ordering[idx_name]
  819. with self.cur(idx_name) as cur:
  820. if cur.set_key(k):
  821. for match in cur.iternext_dup():
  822. subkeys = self._split_key(match)
  823. # Compose result.
  824. out = [None, None, None]
  825. out[term_order[0]] = k
  826. out[term_order[1]] = subkeys[0]
  827. out[term_order[2]] = subkeys[1]
  828. yield b''.join(out)
  829. def _lookup_2bound(self, bound_terms):
  830. '''
  831. Look up triples for a pattern with two bound terms.
  832. @param bound terms (dict) Triple labels and terms to search for,
  833. in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
  834. URIRef('urn:o:1')}
  835. @return iterator(bytes) SPO keys matching the pattern.
  836. '''
  837. if len(bound_terms) != 2:
  838. raise ValueError(
  839. 'Exactly 2 terms need to be bound. Got {}'.format(
  840. len(bound_terms)))
  841. # Establish lookup ranking.
  842. luc = None
  843. for k_label in self._lookup_rank:
  844. if k_label in bound_terms.keys():
  845. # First match is lookup term.
  846. if not luc:
  847. v_label = 'spo'.replace(k_label, '')
  848. # Lookup database key (cursor) name
  849. luc = k_label + ':' + v_label
  850. term_order = self._lookup_ordering[luc]
  851. # Term to look up
  852. luk = self._to_key(bound_terms[k_label])
  853. if not luk:
  854. return iter(())
  855. # Position of key in final triple.
  856. # Second match is the filter.
  857. else:
  858. # Filter key (position of sub-key in lookup results)
  859. fpos = v_label.index(k_label)
  860. # Fliter term
  861. ft = self._to_key(bound_terms[k_label])
  862. if not ft:
  863. return iter(())
  864. break
  865. # Look up in index.
  866. with self.cur(luc) as cur:
  867. if cur.set_key(luk):
  868. # Iterate over matches and filter by second term.
  869. for match in cur.iternext_dup():
  870. subkeys = self._split_key(match)
  871. flt_subkey = subkeys[fpos]
  872. if flt_subkey == ft:
  873. # Remainder (not filter) key used to complete the
  874. # triple.
  875. r_subkey = subkeys[1-fpos]
  876. # Compose result.
  877. out = [None, None, None]
  878. out[term_order[0]] = luk
  879. out[term_order[fpos+1]] = flt_subkey
  880. out[term_order[2-fpos]] = r_subkey
  881. yield b''.join(out)
  882. def _append(self, cur, values, **kwargs):
  883. '''
  884. Append one or more values to the end of a database.
  885. @param cur (lmdb.Cursor) The write cursor to act on.
  886. @param data (list(bytes)) Value(s) to append.
  887. @return list(bytes) Last key(s) inserted.
  888. '''
  889. if not isinstance(values, list) and not isinstance(values, tuple):
  890. raise ValueError('Input must be a list or tuple.')
  891. data = []
  892. lastkey = cur.key() if cur.last() else None
  893. for v in values:
  894. lastkey = self._key_seq.next(lastkey)
  895. data.append((lastkey, v))
  896. cur.putmulti(data, **kwargs)
  897. return [d[0] for d in data]
  898. def _index_triple(self, action, spok):
  899. '''
  900. Update index for a triple and context (add or remove).
  901. @param action (string) 'add' or 'remove'.
  902. @param spok (bytes) Triple key.
  903. indexed. Context MUST be specified for 'add'.
  904. '''
  905. # Split and rearrange-join keys for association and indices.
  906. triple = self._split_key(spok)
  907. sk, pk, ok = triple
  908. spk = b''.join(triple[:2])
  909. spk = b''.join(triple[:2])
  910. sok = b''.join((triple[0], triple[2]))
  911. pok = b''.join(triple[1:3])
  912. # Associate cursor labels with k/v pairs.
  913. curs = {
  914. 's:po': (sk, pok),
  915. 'p:so': (pk, sok),
  916. 'o:sp': (ok, spk),
  917. }
  918. # Add or remove triple lookups.
  919. for clabel, terms in curs.items():
  920. with self.cur(clabel) as icur:
  921. if action == 'remove':
  922. if icur.set_key_dup(*terms):
  923. icur.delete()
  924. elif action == 'add':
  925. icur.put(*terms)
  926. else:
  927. raise ValueError(
  928. 'Index action \'{}\' is not supported.'.format(action))
  929. ## Convenience methods—not necessary for functioning but useful for
  930. ## debugging.
  931. def _keys_in_ctx(self, pk_ctx):
  932. '''
  933. Convenience method to list all keys in a context.
  934. @param pk_ctx (bytes) Pickled context URI.
  935. @return Iterator:tuple Generator of triples.
  936. '''
  937. with self.cur('c:spo') as cur:
  938. if cur.set_key(pk_ctx):
  939. tkeys = cur.iternext_dup()
  940. return {self._key_to_triple(tk) for tk in tkeys}
  941. else:
  942. return set()
  943. def _ctx_for_key(self, tkey):
  944. '''
  945. Convenience method to list all contexts that a key is in.
  946. @param tkey (bytes) Triple key.
  947. @return Iterator:URIRef Generator of context URIs.
  948. '''
  949. with self.cur('spo:c') as cur:
  950. if cur.set_key(tkey):
  951. ctx = cur.iternext_dup()
  952. return {self._unpickle(c) for c in ctx}
  953. else:
  954. return set()