lmdb_store.py 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. import hashlib
  2. import logging
  3. import os
  4. from contextlib import ContextDecorator, ExitStack
  5. from os import makedirs
  6. from os.path import exists, abspath
  7. from shutil import rmtree
  8. from urllib.request import pathname2url
  9. import lmdb
  10. from rdflib import Graph, Namespace, URIRef, Variable
  11. from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
  12. from rdflib.store import Store, VALID_STORE, NO_STORE
  13. logger = logging.getLogger(__name__)
  14. def s2b(u, enc='UTF-8'):
  15. '''
  16. Convert a string into a bytes object.
  17. '''
  18. return u.encode(enc)
  19. def b2s(u, enc='UTF-8'):
  20. '''
  21. Convert a bytes or memoryview object into a string.
  22. '''
  23. return bytes(u).decode(enc)
  24. class TxnManager(ContextDecorator):
  25. '''
  26. Handle ACID transactions with an LmdbStore.
  27. Wrap this within a `with` statement:
  28. >>> with TxnManager(store, True):
  29. ... # Do something with the database
  30. >>>
  31. The transaction will be opened and handled automatically.
  32. '''
  33. def __init__(self, store, write=False):
  34. '''
  35. Begin and close a transaction in a store.
  36. @param store (LmdbStore) The store to open a transaction on.
  37. @param write (bool) Whether the transaction is read-write. Default is
  38. False (read-only transaction).
  39. '''
  40. self.store = store
  41. self.write = write
  42. def __enter__(self):
  43. # Only open a R/W transaction if one is not already open.
  44. if not self.write or not self.store.is_txn_rw:
  45. self.store.begin(write=self.write)
  46. def __exit__(self, exc_type, exc_value, traceback):
  47. if exc_type:
  48. self.store.rollback()
  49. else:
  50. self.store.commit()
  51. class LexicalSequence:
  52. '''
  53. Fixed-length lexicographically ordered byte sequence.
  54. Useful to generate optimized sequences of keys in LMDB.
  55. '''
  56. def __init__(self, start=1, max_len=5):
  57. '''
  58. @param start (bytes) Starting byte value. Bytes below this value are
  59. never found in this sequence. This is useful to allot special bytes
  60. to be used e.g. as separators.
  61. @param max_len (int) Maximum number of bytes that a byte string can
  62. contain. This should be chosen carefully since the number of all
  63. possible key combinations is determined by this value and the `start`
  64. value. The default args provide 255**5 (~1 Tn) unique combinations.
  65. '''
  66. self.start = start
  67. self.length = max_len
  68. def first(self):
  69. '''
  70. First possible combination.
  71. '''
  72. return bytearray([self.start] * self.length)
  73. def next(self, n):
  74. '''
  75. Calculate the next closest byte sequence in lexicographical order.
  76. This is used to fill the next available slot after the last one in
  77. LMDB. Keys are byte strings, which is a convenient way to keep key
  78. lengths as small as possible when they are referenced in several
  79. indices.
  80. This function assumes that all the keys are padded with the `start`
  81. value up to the `max_len` length.
  82. @param n (bytes) Current byte sequence to add to.
  83. '''
  84. if not n:
  85. n = self.first()
  86. elif isinstance(n, bytes) or isinstance(n, memoryview):
  87. n = bytearray(n)
  88. elif not isinstance(n, bytearray):
  89. raise ValueError('Input sequence must be bytes or a bytearray.')
  90. if not len(n) == self.length:
  91. raise ValueError('Incorrect sequence length.')
  92. for i, b in list(enumerate(n))[::-1]:
  93. try:
  94. n[i] += 1
  95. # If the value exceeds 255, i.e. the current value is the last one
  96. except ValueError:
  97. if i == 0:
  98. raise RuntimeError('BAD DAY: Sequence exhausted. No more '
  99. 'combinations are possible.')
  100. # Move one position up and try to increment that.
  101. else:
  102. n[i] = self.start
  103. continue
  104. else:
  105. return bytes(n)
  106. class LmdbStore(Store):
  107. '''
  108. LMDB-backed store.
  109. This is an implementation of the RDFLib Store interface:
  110. https://github.com/RDFLib/rdflib/blob/master/rdflib/store.py
  111. Handles the interaction with a LMDB store and builds an abstraction layer
  112. for triples.
  113. This store class uses two LMDB environments (i.e. two files): one for the
  114. main (preservation-worthy) data and the other for the index data which
  115. can be rebuilt from the main database.
  116. There are 4 main data sets (preservation worthy data):
  117. - t:st (term key: serialized term; 1:1)
  118. - spo:c (joined S, P, O keys: context key; dupsort, dupfixed)
  119. - c: (context keys only, values are the empty bytestring; 1:1)
  120. - pfx:ns (prefix: pickled namespace; 1:1)
  121. And 6 indices to optimize lookup for all possible bound/unbound term
  122. combination in a triple:
  123. - th:t (term hash: term key; 1:1)
  124. - s:po (S key: joined P, O keys; dupsort, dupfixed)
  125. - p:so (P key: joined S, O keys; dupsort, dupfixed)
  126. - o:sp (O key: joined S, P keys; dupsort, dupfixed)
  127. - c:spo (context → triple association; dupsort, dupfixed)
  128. - ns:pfx (pickled namespace: prefix; 1:1)
  129. '''
  130. context_aware = True
  131. # This is a hassle to maintain for no apparent gain. If some use is devised
  132. # in the future, it may be revised.
  133. formula_aware = False
  134. graph_aware = True
  135. transaction_aware = True
  136. '''
  137. LMDB map size. See http://lmdb.readthedocs.io/en/release/#environment-class
  138. '''
  139. MAP_SIZE = 1024 ** 4 # 1Tb
  140. '''
  141. Key hashing algorithm. If you are paranoid, use SHA1. Otherwise, MD5 is
  142. faster and takes up less space (16 bytes vs. 20 bytes). This may make a
  143. visible difference because keys are generated and parsed very often.
  144. '''
  145. KEY_HASH_ALGO = 'sha1'
  146. '''Separator byte. Used to join and split individual term keys.'''
  147. SEP_BYTE = b'\x00'
  148. KEY_LENGTH = 5 # Max key length for terms. That allows for A LOT of terms.
  149. KEY_START = 2 # \x00 is reserved as a separator. \x01 is spare.
  150. data_keys = (
  151. # Term key to serialized term content: 1:1
  152. 't:st',
  153. # Joined triple keys to context key: 1:m, fixed-length values
  154. 'spo:c',
  155. # This has empty values and is used to keep track of empty contexts.
  156. 'c:',
  157. # Prefix to namespace: 1:1
  158. 'pfx:ns',
  159. )
  160. idx_keys = (
  161. # Namespace to prefix: 1:1
  162. 'ns:pfx',
  163. # Term hash to triple key: 1:1
  164. 'th:t',
  165. # Lookups: 1:m, fixed-length values
  166. 's:po', 'p:so', 'o:sp', 'c:spo',
  167. )
  168. '''
  169. Order in which keys are looked up if two terms are bound.
  170. The indices with the smallest average number of values per key should be
  171. looked up first.
  172. If we want to get fancy, this can be rebalanced from time to time by
  173. looking up the number of keys in (s:spo, p:spo, o:spo).
  174. '''
  175. _lookup_rank = ('s', 'o', 'p')
  176. '''
  177. Order of terms in the lookup indices. Used to rebuild a triple from lookup.
  178. '''
  179. _lookup_ordering = {
  180. 's:po': (0, 1, 2),
  181. 'p:so': (1, 0, 2),
  182. 'o:sp': (2, 0, 1),
  183. }
  184. data_env = None
  185. idx_env = None
  186. db = None
  187. dbs = {}
  188. data_txn = None
  189. idx_txn = None
  190. is_txn_rw = None
  191. '''
  192. List of actions to be performed when a transaction is committed.
  193. Each element is a tuple of (action name, database index, key, value).
  194. '''
  195. _data_queue = []
  196. '''
  197. Set of indices to update. A set has been preferred to a list since the
  198. index update don't need to be sequential and there may be duplicate entries
  199. that can be eliminated.
  200. Each element is a tuple of (triple key, pickled context, pre-pickled triple
  201. ). The third value can be None, and in that case, it is calculated from
  202. the triple key.
  203. '''
  204. _idx_queue = []
  205. def __init__(self, path, identifier=None):
  206. self.path = path
  207. self.__open = False
  208. self.identifier = identifier or URIRef(pathname2url(abspath(path)))
  209. super().__init__(path)
  210. self._pickle = self.node_pickler.dumps
  211. self._unpickle = self.node_pickler.loads
  212. self._key_seq = LexicalSequence(self.KEY_START, self.KEY_LENGTH)
  213. def __len__(self, context=None):
  214. '''
  215. Return length of the dataset.
  216. @param context (rdflib.URIRef | rdflib.Graph) Context to restrict count
  217. to.
  218. '''
  219. context = self._normalize_context(context)
  220. if context is not None:
  221. #dataset = self.triples((None, None, None), context)
  222. with self.cur('c:spo') as cur:
  223. if cur.set_key(self._to_key(context)):
  224. dataset = set(cur.iternext_dup())
  225. return len(dataset)
  226. else:
  227. return 0
  228. else:
  229. return self.data_txn.stat(self.dbs['spo:c'])['entries']
  230. @property
  231. def is_open(self):
  232. return self.__open
  233. def open(self, configuration=None, create=True):
  234. '''
  235. Open the database.
  236. The database is best left open for the lifespan of the server. Read
  237. transactions can be opened as needed. Write transaction should be
  238. opened and closed within a single HTTP request to ensure atomicity of
  239. the request.
  240. This method is called outside of the main transaction. All cursors
  241. are created separately within the transaction.
  242. '''
  243. self._init_db_environments(create)
  244. if self.data_env == NO_STORE:
  245. return NO_STORE
  246. self.__open = True
  247. return VALID_STORE
  248. def begin(self, write=False):
  249. '''
  250. Begin the main write transaction and create cursors.
  251. '''
  252. if not self.is_open:
  253. raise RuntimeError('Store must be opened first.')
  254. logger.debug('Beginning a {} transaction.'.format(
  255. 'read/write' if write else 'read-only'))
  256. self.data_txn = self.data_env.begin(buffers=True, write=write)
  257. self.idx_txn = self.idx_env.begin(buffers=False, write=write)
  258. self.is_txn_rw = write
  259. def stats(self):
  260. '''
  261. Gather statistics about the database.
  262. '''
  263. stats = {
  264. 'data_db_stats': {
  265. db_label: self.data_txn.stat(self.dbs[db_label])
  266. for db_label in self.data_keys},
  267. 'idx_db_stats': {
  268. db_label: self.idx_txn.stat(self.dbs[db_label])
  269. for db_label in self.idx_keys},
  270. 'data_db_size': os.stat(self.data_env.path()).st_size,
  271. 'idx_db_size': os.stat(self.idx_env.path()).st_size,
  272. 'num_triples': len(self),
  273. }
  274. return stats
  275. @property
  276. def is_txn_open(self):
  277. '''
  278. Whether the main transaction is open.
  279. '''
  280. try:
  281. self.data_txn.id()
  282. self.idx_txn.id()
  283. except (lmdb.Error, AttributeError) as e:
  284. #logger.info('Main transaction does not exist or is closed.')
  285. return False
  286. else:
  287. #logger.info('Main transaction is open.')
  288. return True
  289. def cur(self, index):
  290. '''
  291. Return a new cursor by its index.
  292. '''
  293. if index in self.idx_keys:
  294. txn = self.idx_txn
  295. src = self.idx_keys
  296. elif index in self.data_keys:
  297. txn = self.data_txn
  298. src = self.data_keys
  299. else:
  300. return ValueError('Cursor key not found.')
  301. return txn.cursor(self.dbs[index])
  302. def get_data_cursors(self, txn):
  303. '''
  304. Build the main data cursors for a transaction.
  305. @param txn (lmdb.Transaction) This can be a read or write transaction.
  306. @return dict(string, lmdb.Cursor) Keys are index labels, values are
  307. index cursors.
  308. '''
  309. return {
  310. 'tk:t': txn.cursor(self.dbs['tk:t']),
  311. 'tk:c': txn.cursor(self.dbs['tk:c']),
  312. 'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
  313. }
  314. def get_idx_cursors(self, txn):
  315. '''
  316. Build the index cursors for a transaction.
  317. @param txn (lmdb.Transaction) This can be a read or write transaction.
  318. @return dict(string, lmdb.Cursor) Keys are index labels, values are
  319. index cursors.
  320. '''
  321. cur = {}
  322. for key in self.idx_keys:
  323. cur[key] = txn.cursor(self.dbs[key])
  324. return cur
  325. def close(self, commit_pending_transaction=False):
  326. '''
  327. Close the database connection.
  328. Do this at server shutdown.
  329. '''
  330. self.__open = False
  331. if self.is_txn_open:
  332. if commit_pending_transaction:
  333. self.commit()
  334. else:
  335. self.rollback()
  336. self.data_env.close()
  337. def destroy(self, path):
  338. '''
  339. Destroy the store.
  340. https://www.youtube.com/watch?v=lIVq7FJnPwg
  341. @param path (string) Path of the folder containing the database(s).
  342. '''
  343. if exists(path):
  344. rmtree(path)
  345. def add(self, triple, context=None, quoted=False):
  346. '''
  347. Add a triple and start indexing.
  348. @param triple (tuple:rdflib.Identifier) Tuple of three identifiers.
  349. @param context (rdflib.Identifier | None) Context identifier.
  350. 'None' inserts in the default graph.
  351. @param quoted (bool) Not used.
  352. '''
  353. #import pdb; pdb.set_trace()
  354. context = self._normalize_context(context)
  355. if context is None:
  356. context = RDFLIB_DEFAULT_GRAPH_URI
  357. Store.add(self, triple, context)
  358. #logger.info('Adding triple: {}'.format(triple))
  359. pk_trp = self._pickle(triple)
  360. pk_s, pk_p, pk_o = [self._pickle(t) for t in triple]
  361. #logger.debug('Adding quad: {} {}'.format(triple, context))
  362. pk_c = self._pickle(context)
  363. # Add new individual terms or gather keys for existing ones.
  364. keys = [None, None, None, None]
  365. with self.cur('th:t') as icur:
  366. for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
  367. thash = self._hash(pk_t)
  368. if icur.set_key(thash):
  369. keys[i] = icur.value()
  370. else:
  371. # Put new term.
  372. with self.cur('t:st') as dcur:
  373. keys[i] = self._append(dcur, (pk_t,))[0]
  374. # Index.
  375. icur.put(thash, keys[i])
  376. # Add context in context DB.
  377. ck = keys[3]
  378. with self.cur('c:') as cur:
  379. if not cur.set_key(ck):
  380. cur.put(ck, b'')
  381. # Add triple:context association.
  382. spok = self.SEP_BYTE.join(keys[:3])
  383. with self.cur('spo:c') as dcur:
  384. if not dcur.set_key_dup(spok, ck):
  385. dcur.put(spok, ck)
  386. # Index spo:c association.
  387. with self.cur('c:spo') as icur:
  388. icur.put(ck, spok)
  389. self._index_triple('add', spok)
  390. def remove(self, triple_pattern, context=None):
  391. '''
  392. Remove triples by a pattern.
  393. '''
  394. #logger.debug('Removing triples by pattern: {} on context: {}'.format(
  395. # triple_pattern, context))
  396. context = self._normalize_context(context)
  397. if context is not None:
  398. ck = self._to_key(context)
  399. # If context is specified but not found, return to avoid deleting
  400. # the wrong triples.
  401. if not ck:
  402. return
  403. else:
  404. ck = None
  405. for spok in set(self._triple_keys(triple_pattern, context)):
  406. # Delete context association.
  407. with self.cur('spo:c') as dcur:
  408. with self.cur('c:spo') as icur:
  409. if ck:
  410. if dcur.set_key_dup(spok, ck):
  411. dcur.delete()
  412. if icur.set_key_dup(ck, spok):
  413. icur.delete()
  414. else:
  415. # If no context is specified, remove all associations.
  416. if dcur.set_key(spok):
  417. for ck in dcur.iternext_dup():
  418. # Delete index first while we have the
  419. # context reference.
  420. if icur.set_key_dup(ck, spok):
  421. icur.delete()
  422. # Then delete the main entry.
  423. dcur.set_key(spok)
  424. dcur.delete(dupdata=True)
  425. self._index_triple('remove', spok)
  426. def triples(self, triple_pattern, context=None):
  427. '''
  428. Generator over matching triples.
  429. @param triple_pattern (tuple) 3 RDFLib terms
  430. @param context (rdflib.Graph | None) Context graph, if available.
  431. @return Generator over triples and contexts in which each result has
  432. the following format:
  433. > (s, p, o), generator(contexts)
  434. Where the contexts generator lists all context that the triple appears
  435. in.
  436. '''
  437. #logger.debug('Getting triples for pattern: {} and context: {}'.format(
  438. # triple_pattern, context))
  439. # This sounds strange, RDFLib should be passing None at this point,
  440. # but anyway...
  441. context = self._normalize_context(context)
  442. if context == RDFLIB_DEFAULT_GRAPH_URI:
  443. context = None
  444. with self.cur('spo:c') as cur:
  445. for spok in self._triple_keys(triple_pattern, context):
  446. if context is not None:
  447. contexts = (Graph(identifier=context),)
  448. else:
  449. if cur.set_key(spok):
  450. contexts = (
  451. Graph(identifier=self._from_key(ck)[0], store=self)
  452. for ck in cur.iternext_dup())
  453. #print('Found triples: {} In contexts: {}'.format(
  454. # self._from_key(spok), contexts))
  455. yield self._from_key(spok), contexts
  456. def bind(self, prefix, namespace):
  457. '''
  458. Bind a prefix to a namespace.
  459. '''
  460. prefix = s2b(prefix)
  461. namespace = s2b(namespace)
  462. if self.is_txn_rw:
  463. with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
  464. cur.put(prefix, namespace)
  465. with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
  466. cur.put(namespace, prefix)
  467. else:
  468. with self.data_env.begin(write=True) as wtxn:
  469. with wtxn.cursor(self.dbs['pfx:ns']) as cur:
  470. cur.put(prefix, namespace)
  471. with self.idx_env.begin(write=True) as wtxn:
  472. with wtxn.cursor(self.dbs['ns:pfx']) as cur:
  473. cur.put(namespace, prefix)
  474. def namespace(self, prefix):
  475. '''
  476. Get the namespace for a prefix.
  477. '''
  478. with self.cur('pfx:ns') as cur:
  479. ns = cur.get(s2b(prefix))
  480. return Namespace(b2s(ns)) if ns is not None else None
  481. def prefix(self, namespace):
  482. '''
  483. Get the prefix associated with a namespace.
  484. @NOTE A namespace can be only bound to one prefix in this
  485. implementation.
  486. '''
  487. with self.cur('ns:pfx') as cur:
  488. prefix = cur.get(s2b(namespace))
  489. return b2s(prefix) if prefix is not None else None
  490. def namespaces(self):
  491. '''
  492. Get an iterator of all prefix: namespace bindings.
  493. '''
  494. with self.cur('pfx:ns') as cur:
  495. for pfx, ns in iter(cur):
  496. yield (b2s(pfx), Namespace(b2s(ns)))
  497. def contexts(self, triple=None):
  498. '''
  499. Get a list of all contexts.
  500. @return generator(Graph)
  501. '''
  502. if triple and any(triple):
  503. with self.cur('spo:c') as cur:
  504. if cur.set_key(self._to_key(triple)):
  505. for ctx_uri in cur.iternext_dup():
  506. yield Graph(
  507. identifier=self._from_key(ctx_uri)[0], store=self)
  508. else:
  509. with self.cur('c:') as cur:
  510. for ctx_uri in cur.iternext(values=False):
  511. yield Graph(
  512. identifier=self._from_key(ctx_uri)[0], store=self)
  513. def add_graph(self, graph):
  514. '''
  515. Add a graph to the database.
  516. This creates an empty graph by associating the graph URI with the
  517. pickled `None` value. This prevents from removing the graph when all
  518. triples are removed.
  519. This may be called by read-only operations:
  520. https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
  521. Therefore it needs to open a write transaction. This is not ideal
  522. but the only way to handle datasets in RDFLib.
  523. @param graph (URIRef) URI of the named graph to add.
  524. '''
  525. #import pdb; pdb.set_trace()
  526. if isinstance(graph, Graph):
  527. graph = graph.identifier
  528. pk_c = self._pickle(graph)
  529. c_hash = self._hash(pk_c)
  530. with self.cur('th:t') as cur:
  531. c_exists = cur.set_key(c_hash)
  532. if not c_exists:
  533. # Insert context term if not existing.
  534. if self.is_txn_rw:
  535. # Use existing R/W transaction.
  536. with self.cur('t:st') as cur:
  537. ck = self._append(cur, (pk_c,))[0]
  538. with self.cur('th:t') as cur:
  539. cur.put(c_hash, ck)
  540. with self.cur('c:') as cur:
  541. cur.put(ck, b'')
  542. else:
  543. # Open new R/W transactions.
  544. with self.data_env.begin(write=True) as wtxn:
  545. with wtxn.cursor(self.dbs['t:st']) as cur:
  546. ck = self._append(cur, (pk_c,))[0]
  547. with wtxn.cursor(self.dbs['c:']) as cur:
  548. cur.put(ck, b'')
  549. with self.idx_env.begin(write=True) as wtxn:
  550. with wtxn.cursor(self.dbs['th:t']) as cur:
  551. cur.put(c_hash, ck)
  552. def remove_graph(self, graph):
  553. '''
  554. Remove all triples from graph and the graph itself.
  555. @param graph (URIRef) URI of the named graph to remove.
  556. '''
  557. if isinstance(graph, Graph):
  558. graph = graph.identifier
  559. self.remove((None, None, None), graph)
  560. with self.cur('c:') as cur:
  561. if cur.set_key(self._to_key(graph)):
  562. cur.delete()
  563. def commit(self):
  564. '''
  565. Commit main transaction and push action queue.
  566. '''
  567. if self.is_txn_open:
  568. self.data_txn.commit()
  569. self.idx_txn.commit()
  570. self.data_txn = self.idx_txn = self.is_txn_rw = None
  571. def rollback(self):
  572. '''
  573. Roll back main transaction.
  574. '''
  575. if self.is_txn_open:
  576. self.data_txn.abort()
  577. self.idx_txn.abort()
  578. self.data_txn = self.idx_txn = self.is_txn_rw = None
  579. ## PRIVATE METHODS ##
  580. def _triple_keys(self, triple_pattern, context=None):
  581. '''
  582. Generator over matching triple keys.
  583. This method is used by `triples` which returns native Python tuples,
  584. as well as by other methods that need to iterate and filter triple
  585. keys without incurring in the overhead of converting them to triples.
  586. @param triple_pattern (tuple) 3 RDFLib terms
  587. @param context (rdflib.Graph | None) Context graph or URI, or None.
  588. '''
  589. if context == self:
  590. context = None
  591. if context is not None:
  592. pk_c = self._pickle(context)
  593. ck = self._to_key(context)
  594. # Shortcuts
  595. if not ck:
  596. # Context not found.
  597. return iter(())
  598. with self.cur('c:spo') as cur:
  599. # s p o c
  600. if all(triple_pattern):
  601. spok = self._to_key(triple_pattern)
  602. if not spok:
  603. # A term in the triple is not found.
  604. return iter(())
  605. if cur.set_key_dup(ck, spok):
  606. yield spok
  607. return
  608. else:
  609. # Triple not found.
  610. return iter(())
  611. # ? ? ? c
  612. elif not any(triple_pattern):
  613. # Get all triples from the context
  614. if cur.set_key(ck):
  615. for spok in cur.iternext_dup():
  616. yield spok
  617. else:
  618. return iter(())
  619. # Regular lookup.
  620. else:
  621. yield from (
  622. spok for spok in self._lookup(triple_pattern)
  623. if cur.set_key_dup(ck, spok))
  624. else:
  625. yield from self._lookup(triple_pattern)
  626. def _init_db_environments(self, create=True):
  627. '''
  628. Initialize the DB environment.
  629. The main database is kept in one file, the indices in a separate one
  630. (these may be even further split up depending on performance
  631. considerations).
  632. @param create (bool) If True, the environment and its databases are
  633. created.
  634. '''
  635. path = self.path
  636. if not exists(path):
  637. if create is True:
  638. makedirs(path)
  639. else:
  640. return NO_STORE
  641. self.data_env = lmdb.open(path + '/main', subdir=False, create=create,
  642. map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
  643. self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
  644. map_size=self.MAP_SIZE, max_dbs=6, readahead=False)
  645. # Open and optionally create main databases.
  646. self.dbs = {
  647. # Main databases.
  648. 't:st': self.data_env.open_db(b't:st', create=create),
  649. 'spo:c': self.data_env.open_db(
  650. b'spo:c', create=create, dupsort=True, dupfixed=True),
  651. 'c:': self.data_env.open_db(b'c:', create=create),
  652. 'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
  653. # One-off indices.
  654. 'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
  655. 'th:t': self.idx_env.open_db(b'th:t', create=create),
  656. }
  657. # Other index databases.
  658. for db_key in self.idx_keys:
  659. if db_key not in ('ns:pfx', 'th:t'):
  660. self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
  661. dupsort=True, dupfixed=True, create=create)
  662. def _from_key(self, key):
  663. '''
  664. Convert a key into one or more terms.
  665. @param key (bytes) The key to be converted. It can be a compound one
  666. in which case the function will return multiple terms.
  667. '''
  668. terms = []
  669. with self.cur('t:st') as cur:
  670. for k in bytes(key).split(self.SEP_BYTE):
  671. pk_t = cur.get(k)
  672. terms.append(self._unpickle(pk_t))
  673. return tuple(terms)
  674. def _to_key(self, obj):
  675. '''
  676. Convert a triple, quad or term into a key.
  677. The key is the checksum of the pickled object, therefore unique for
  678. that object. The hashing algorithm is specified in `KEY_HASH_ALGO`.
  679. @param obj (Object) Anything that can be reduced to terms stored in the
  680. database. Pairs of terms, as well as triples and quads, are expressed
  681. as tuples.
  682. If more than one term is provided, the keys are concatenated using the
  683. designated separator byte (`\x00`).
  684. @return bytes
  685. '''
  686. if not isinstance(obj, list) and not isinstance(obj, tuple):
  687. obj = (obj,)
  688. key = []
  689. with self.cur('th:t') as cur:
  690. for term in obj:
  691. tk = cur.get(self._hash(self._pickle(term)))
  692. if not tk:
  693. # If any of the terms is not found, return None immediately
  694. return None
  695. key.append(tk)
  696. return self.SEP_BYTE.join(key)
  697. def _hash(self, s):
  698. '''
  699. Get the hash value of a serialized object.
  700. '''
  701. return hashlib.new(self.KEY_HASH_ALGO, s).digest()
  702. def _normalize_context(self, context):
  703. '''
  704. Normalize a context parameter to conform to the model expectations.
  705. @param context (URIRef | Graph | None) Context URI or graph.
  706. '''
  707. if isinstance(context, Graph):
  708. if context == self or isinstance(context.identifier, Variable):
  709. context = None
  710. else:
  711. context = context.identifier
  712. #logger.debug('Converted graph into URI: {}'.format(context))
  713. return context
  714. def _lookup(self, triple_pattern):
  715. '''
  716. Look up triples in the indices based on a triple pattern.
  717. @return iterator of matching triple keys.
  718. '''
  719. s, p, o = triple_pattern
  720. if s is not None:
  721. if p is not None:
  722. # s p o
  723. if o is not None:
  724. with self.cur('spo:c') as cur:
  725. tkey = self._to_key(triple_pattern)
  726. if cur.set_key(tkey):
  727. yield tkey
  728. return
  729. else:
  730. return iter(())
  731. # s p ?
  732. else:
  733. yield from self._lookup_2bound({'s': s, 'p': p})
  734. else:
  735. # s ? o
  736. if o is not None:
  737. yield from self._lookup_2bound({'s': s, 'o': o})
  738. # s ? ?
  739. else:
  740. yield from self._lookup_1bound('s', s)
  741. else:
  742. if p is not None:
  743. # ? p o
  744. if o is not None:
  745. yield from self._lookup_2bound({'p': p, 'o': o})
  746. # ? p ?
  747. else:
  748. yield from self._lookup_1bound('p', p)
  749. else:
  750. # ? ? o
  751. if o is not None:
  752. yield from self._lookup_1bound('o', o)
  753. # ? ? ?
  754. else:
  755. # Get all triples in the database.
  756. with self.cur('spo:c') as cur:
  757. yield from cur.iternext_nodup()
  758. def _lookup_1bound(self, label, term):
  759. '''
  760. Lookup triples for a pattern with one bound term.
  761. @TODO This can be called millions of times in a larger SPARQL
  762. query, so it better be as efficient as it gets.
  763. '''
  764. #import pdb; pdb.set_trace()
  765. k = self._to_key(term)
  766. if not k:
  767. return iter(())
  768. idx_name = '{}:{}'.format(label, 'spo'.replace(label, ''))
  769. term_order = self._lookup_ordering[idx_name]
  770. with self.cur(idx_name) as cur:
  771. if cur.set_key(k):
  772. for match in cur.iternext_dup():
  773. subkeys = bytes(match).split(self.SEP_BYTE)
  774. # Compose result.
  775. out = [None, None, None]
  776. out[term_order[0]] = k
  777. out[term_order[1]] = subkeys[0]
  778. out[term_order[2]] = subkeys[1]
  779. yield self.SEP_BYTE.join(out)
  780. def _lookup_2bound(self, bound_terms):
  781. '''
  782. Look up triples for a pattern with two bound terms.
  783. @param bound terms (dict) Triple labels and terms to search for,
  784. in the format of, e.g. {'s': URIRef('urn:s:1'), 'o':
  785. URIRef('urn:o:1')}
  786. '''
  787. #import pdb; pdb.set_trace()
  788. if len(bound_terms) != 2:
  789. raise ValueError(
  790. 'Exactly 2 terms need to be bound. Got {}'.format(
  791. len(bound_terms)))
  792. # Establish lookup ranking.
  793. luc = None
  794. for k_label in self._lookup_rank:
  795. if k_label in bound_terms.keys():
  796. # First match is lookup term.
  797. if not luc:
  798. v_label = 'spo'.replace(k_label, '')
  799. # Lookup database key (cursor) name
  800. luc = k_label + ':' + v_label
  801. term_order = self._lookup_ordering[luc]
  802. # Term to look up
  803. luk = self._to_key(bound_terms[k_label])
  804. if not luk:
  805. return iter(())
  806. # Position of key in final triple.
  807. # Second match is the filter.
  808. else:
  809. # Filter key (position of sub-key in lookup results)
  810. fpos = v_label.index(k_label)
  811. # Fliter term
  812. ft = self._to_key(bound_terms[k_label])
  813. if not ft:
  814. return iter(())
  815. break
  816. # Look up in index.
  817. with self.cur(luc) as cur:
  818. if cur.set_key(luk):
  819. # Iterate over matches and filter by second term.
  820. for match in cur.iternext_dup():
  821. subkeys = bytes(match).split(self.SEP_BYTE)
  822. flt_subkey = subkeys[fpos]
  823. if flt_subkey == ft:
  824. # Remainder (not filter) key used to complete the
  825. # triple.
  826. r_subkey = subkeys[1-fpos]
  827. # Compose result.
  828. out = [None, None, None]
  829. out[term_order[0]] = luk
  830. out[term_order[fpos+1]] = flt_subkey
  831. out[term_order[2-fpos]] = r_subkey
  832. yield self.SEP_BYTE.join(out)
  833. def _append(self, cur, values, **kwargs):
  834. '''
  835. Append one or more values to the end of a database.
  836. @param cur (lmdb.Cursor) The write cursor to act on.
  837. @param data (list(bytes)) Value(s) to append.
  838. @return list(bytes) Last key(s) inserted.
  839. '''
  840. if not isinstance(values, list) and not isinstance(values, tuple):
  841. raise ValueError('Input must be a list or tuple.')
  842. data = []
  843. lastkey = cur.key() if cur.last() else None
  844. for v in values:
  845. lastkey = self._key_seq.next(lastkey)
  846. data.append((lastkey, v))
  847. cur.putmulti(data, **kwargs)
  848. return [d[0] for d in data]
  849. def _index_triple(self, action, spok):
  850. '''
  851. Update index for a triple and context (add or remove).
  852. @param action (string) 'add' or 'remove'.
  853. @param spok (bytes) Triple key.
  854. indexed. Context MUST be specified for 'add'.
  855. '''
  856. # Split and rearrange-join keys for association and indices.
  857. triple = bytes(spok).split(self.SEP_BYTE)
  858. sk, pk, ok = triple
  859. spk = self.SEP_BYTE.join(triple[:2])
  860. sok = bytes(triple[0]) + self.SEP_BYTE + bytes(triple[2])
  861. pok = self.SEP_BYTE.join(triple[1:3])
  862. # Associate cursor labels with k/v pairs.
  863. curs = {
  864. 's:po': (sk, pok),
  865. 'p:so': (pk, sok),
  866. 'o:sp': (ok, spk),
  867. }
  868. # Add or remove triple lookups.
  869. for clabel, terms in curs.items():
  870. with self.cur(clabel) as icur:
  871. if action == 'remove':
  872. if icur.set_key_dup(*terms):
  873. icur.delete()
  874. elif action == 'add':
  875. icur.put(*terms)
  876. else:
  877. raise ValueError(
  878. 'Index action \'{}\' is not supported.'.format(action))
  879. ## Convenience methods—not necessary for functioning but useful for
  880. ## debugging.
  881. def _keys_in_ctx(self, pk_ctx):
  882. '''
  883. Convenience method to list all keys in a context.
  884. @param pk_ctx (bytes) Pickled context URI.
  885. @return Iterator:tuple Generator of triples.
  886. '''
  887. with self.cur('c:spo') as cur:
  888. if cur.set_key(pk_ctx):
  889. tkeys = cur.iternext_dup()
  890. return {self._key_to_triple(tk) for tk in tkeys}
  891. else:
  892. return set()
  893. def _ctx_for_key(self, tkey):
  894. '''
  895. Convenience method to list all contexts that a key is in.
  896. @param tkey (bytes) Triple key.
  897. @return Iterator:URIRef Generator of context URIs.
  898. '''
  899. with self.cur('spo:c') as cur:
  900. if cur.set_key(tkey):
  901. ctx = cur.iternext_dup()
  902. return {self._unpickle(c) for c in ctx}
  903. else:
  904. return set()