base_lmdb_store.pyx 22 KB


  1. import logging
  2. import os
  3. import threading
  4. import multiprocessing
  5. from contextlib import contextmanager
  6. from os import makedirs, path
  7. from shutil import rmtree
  8. from lakesuperior import env, wsgi
  9. from lakesuperior.cy_include cimport cylmdb as lmdb
  10. from libc cimport errno
  11. from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
  12. from cython.parallel import parallel, prange
  13. logger = logging.getLogger(__name__)
  14. cdef void _check(int rc, str message='') except *:
  15. """
  16. Check return code.
  17. """
  18. if rc == lmdb.MDB_NOTFOUND:
  19. raise KeyNotFoundError()
  20. if rc == lmdb.MDB_KEYEXIST:
  21. raise KeyExistsError()
  22. if rc == errno.EINVAL:
  23. raise InvalidParamError(
  24. 'Invalid LMDB parameter error.\n'
  25. 'Please verify that a transaction is open and valid for the '
  26. 'current operation.'
  27. )
  28. if rc != lmdb.MDB_SUCCESS:
  29. out_msg = (
  30. message + '\nInternal error ({}): '.format(rc)
  31. if len(message) else 'LMDB Error ({}): '.format(rc))
  32. out_msg += lmdb.mdb_strerror(rc).decode()
  33. raise LmdbError(out_msg)
  34. class LmdbError(Exception):
  35. pass
  36. class KeyNotFoundError(LmdbError):
  37. pass
  38. class KeyExistsError(LmdbError):
  39. pass
  40. class InvalidParamError(LmdbError):
  41. pass
  42. cdef class BaseLmdbStore:
  43. """
  44. Generic LMDB store abstract class.
  45. This class contains convenience method to create an LMDB store for any
  46. purpose and provides some convenience methods to wrap cursors and
  47. transactions into contexts.
  48. Example usage::
  49. >>> class MyStore(BaseLmdbStore):
  50. ... path = '/base/store/path'
  51. ... dbi_flags = ('db1', 'db2')
  52. ...
  53. >>> ms = MyStore()
  54. >>> # "with" wraps the operation in a transaction.
  55. >>> with ms.cur(index='db1', write=True):
  56. ... cur.put(b'key1', b'val1')
  57. True
  58. """
  59. dbi_labels = []
  60. dbi_flags = {}
  61. """
  62. Configuration of databases in the environment.
  63. This is an OderedDict whose keys are the database labels and whose values
  64. are LMDB flags for creating and opening the databases as per
  65. `http://www.lmdb.tech/doc/group__mdb.html#gac08cad5b096925642ca359a6d6f0562a`_
  66. .
  67. If the environment has only one database, do not override this value (i.e.
  68. leave it to ``None``).
  69. :rtype: dict or None
  70. """
  71. env_flags = 0
  72. """
  73. LMDB environment flags.
  74. See `mdb_env_open
  75. <http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340>`_
  76. """
  77. env_perms = 0o640
  78. """
  79. The UNIX permissions to set on created environment files and semaphores.
  80. See `mdb_env_open
  81. <http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340>`_
  82. """
  83. options = {}
  84. """
  85. LMDB environment option overrides. Setting this is not required.
  86. See `LMDB documentation
  87. <http://lmdb.readthedocs.io/en/release/#environment-class`>_ for details
  88. on available options.
  89. Default values are available for the following options:
  90. - ``map_size``: 1 Gib
  91. - ``max_dbs``: dependent on the number of DBs defined in
  92. :py:meth:``dbi_flags``. Only override if necessary.
  93. - ``max_spare_txns``: dependent on the number of threads, if accessed via
  94. WSGI, or ``1`` otherwise. Only override if necessary.
  95. :rtype: dict
  96. """
  97. readers_mult = 4
  98. """
  99. Number to multiply WSGI workers by to set the numer of LMDB reader slots.
  100. """
  101. ### INIT & TEARDOWN ###
  102. def __init__(self, env_path, open_env=True, create=True):
  103. """
  104. Initialize DB environment and databases.
  105. :param str env_path: The file path of the store.
  106. :param bool open: Whether to open the store immediately. If ``False``
  107. the store can be manually opened with :py:meth:`opn_env`.
  108. :param bool create: Whether the file and directory structure should
  109. be created if the store is opened immediately.
  110. """
  111. self._open = False
  112. self.is_txn_open = False
  113. self.env_path = env_path
  114. if open_env:
  115. self.open_env(create)
  116. #logger.info('Init DB with path: {}'.format(self.env_path))
  117. def __dealloc__(self):
  118. self.close_env()
  119. @property
  120. def is_open(self):
  121. return self._open
  122. @property
  123. def readers(self):
  124. return self._readers
  125. def open_env(self, create):
  126. """
  127. Open, and optionally create, store environment.
  128. """
  129. if self.is_open:
  130. logger.warning('Environment already open.')
  131. return
  132. logger.debug('Opening environment at {}.'.format(self.env_path))
  133. if create:
  134. #logger.info('Creating db env at {}'.format(self.env_path))
  135. parent_path = (
  136. path.dirname(self.env_path)
  137. if lmdb.MDB_NOSUBDIR & self.flags
  138. else self.env_path)
  139. if not path.exists(parent_path):
  140. #logger.info(
  141. # 'Creating store directory at {}'.format(parent_path))
  142. try:
  143. makedirs(parent_path, mode=0o750, exist_ok=True)
  144. except Exception as e:
  145. raise LmdbError(
  146. 'Could not create store at {}. Error: {}'.format(
  147. self.env_path, e))
  148. # Create environment handle.
  149. _check(
  150. lmdb.mdb_env_create(&self.dbenv),
  151. 'Error creating DB environment handle: {}')
  152. logger.debug('Created DBenv @ {:x}'.format(<unsigned long>self.dbenv))
  153. # Set map size.
  154. _check(
  155. lmdb.mdb_env_set_mapsize(self.dbenv, self.options.get(
  156. 'map_size', 1024 ** 3)),
  157. 'Error setting map size: {}')
  158. # Set max databases.
  159. max_dbs = self.options.get('max_dbs', len(self.dbi_labels))
  160. _check(
  161. lmdb.mdb_env_set_maxdbs(self.dbenv, max_dbs),
  162. 'Error setting max. databases: {}')
  163. # Set max readers.
  164. self._readers = self.options.get(
  165. 'max_spare_txns', wsgi.workers * self.readers_mult)
  166. _check(
  167. lmdb.mdb_env_set_maxreaders(self.dbenv, self._readers),
  168. 'Error setting max. readers: {}')
  169. logger.debug('Max. readers: {}'.format(self._readers))
  170. # Clear stale readers.
  171. self._clear_stale_readers()
  172. # Open DB environment.
  173. logger.debug('DBenv address: {:x}'.format(<unsigned long>self.dbenv))
  174. _check(
  175. lmdb.mdb_env_open(
  176. self.dbenv, self.env_path.encode(),
  177. self.env_flags, self.env_perms),
  178. f'Error opening the database environment: {self.env_path}.')
  179. self._init_dbis(create)
  180. self._open = True
  181. cdef void _clear_stale_readers(self) except *:
  182. """
  183. Clear stale readers.
  184. """
  185. cdef int stale_readers
  186. _check(lmdb.mdb_reader_check(self.dbenv, &stale_readers))
  187. if stale_readers > 0:
  188. logger.debug('Cleared {} stale readers.'.format(stale_readers))
  189. cdef void _init_dbis(self, create=True) except *:
  190. """
  191. Initialize databases and cursors.
  192. """
  193. cdef:
  194. size_t i
  195. lmdb.MDB_txn *txn
  196. lmdb.MDB_dbi dbi
  197. # At least one slot (for environments without a database)
  198. self.dbis = <lmdb.MDB_dbi *>PyMem_Malloc(
  199. max(len(self.dbi_labels), 1) * sizeof(lmdb.MDB_dbi))
  200. if not self.dbis:
  201. raise MemoryError()
  202. # DBIs seem to start from 2. We want to map cursor pointers in the
  203. # array to DBIs, so we need an extra slot.
  204. self.curs = <lmdb.MDB_cursor **>PyMem_Malloc(
  205. (len(self.dbi_labels) + 2) * sizeof(lmdb.MDB_cursor*))
  206. if not self.curs:
  207. raise MemoryError()
  208. create_flag = lmdb.MDB_CREATE if create is True else 0
  209. txn_flags = 0 if create else lmdb.MDB_RDONLY
  210. rc = lmdb.mdb_txn_begin(self.dbenv, NULL, txn_flags, &txn)
  211. logger.info(f'Creating DBs.')
  212. try:
  213. if len(self.dbi_labels):
  214. for i, dblabel in enumerate(self.dbi_labels):
  215. flags = self.dbi_flags.get(dblabel, 0) | create_flag
  216. _check(lmdb.mdb_dbi_open(
  217. txn, dblabel.encode(), flags, self.dbis + i))
  218. dbi = self.dbis[i]
  219. logger.debug(f'Created DB {dblabel}: {dbi}')
  220. # Open and close cursor to initialize the memory slot.
  221. _check(lmdb.mdb_cursor_open(
  222. txn, dbi, self.curs + dbi))
  223. #lmdb.mdb_cursor_close(self.curs[dbi])
  224. else:
  225. _check(lmdb.mdb_dbi_open(txn, NULL, 0, self.dbis))
  226. _check(lmdb.mdb_cursor_open(txn, self.dbis[0], self.curs))
  227. #lmdb.mdb_cursor_close(self.curs[self.dbis[0]])
  228. _check(lmdb.mdb_txn_commit(txn))
  229. except:
  230. lmdb.mdb_txn_abort(txn)
  231. raise
  232. cpdef void close_env(self, bint commit_pending_transaction=False) except *:
  233. logger.debug('Cleaning up store env.')
  234. if self.is_open:
  235. logger.debug('Closing store env.')
  236. if self.is_txn_open is True:
  237. if commit_pending_transaction:
  238. self._txn_commit()
  239. else:
  240. self._txn_abort()
  241. self._clear_stale_readers()
  242. PyMem_Free(self.dbis)
  243. PyMem_Free(self.curs)
  244. lmdb.mdb_env_close(self.dbenv)
  245. self._open = False
  246. cpdef void destroy(self, _path='') except *:
  247. """
  248. Destroy the store.
  249. https://www.youtube.com/watch?v=lIVq7FJnPwg
  250. :param str _path: unused. Left for RDFLib API compatibility. (actually
  251. quite dangerous if it were used: it could turn into a
  252. general-purpose recursive file and folder delete method!)
  253. """
  254. if path.exists(self.env_path):
  255. if lmdb.MDB_NOSUBDIR & self.flags:
  256. try:
  257. os.unlink(self.env_path)
  258. except FileNotFoundError:
  259. pass
  260. try:
  261. os.unlink(self.env_path + '-lock')
  262. except FileNotFoundError:
  263. pass
  264. else:
  265. rmtree(self.env_path)
  266. ### PYTHON-ACCESSIBLE METHODS ###
  267. @contextmanager
  268. def txn_ctx(self, write=False):
  269. """
  270. Transaction context manager.
  271. :param bool write: Whether a write transaction is to be opened.
  272. :rtype: lmdb.Transaction
  273. """
  274. if not self.is_open:
  275. raise LmdbError('Store is not open.')
  276. if self.is_txn_open:
  277. logger.debug(
  278. 'Transaction is already active. Not opening another one.')
  279. #logger.debug('before yield')
  280. yield
  281. #logger.debug('after yield')
  282. else:
  283. #logger.debug('Beginning {} transaction.'.format(
  284. # 'RW' if write else 'RO'))
  285. try:
  286. self._txn_begin(write=write)
  287. self.is_txn_rw = write
  288. #logger.debug('In txn_ctx, before yield')
  289. yield
  290. #logger.debug('In txn_ctx, after yield')
  291. self._txn_commit()
  292. #logger.debug('after _txn_commit')
  293. except:
  294. self._txn_abort()
  295. raise
  296. def begin(self, write=False):
  297. """
  298. Begin a transaction manually if not already in a txn context.
  299. The :py:meth:`txn_ctx` context manager should be used whenever
  300. possible rather than this method.
  301. """
  302. if not self.is_open:
  303. raise RuntimeError('Store must be opened first.')
  304. #logger.debug('Beginning a {} transaction.'.format(
  305. # 'read/write' if write else 'read-only'))
  306. self._txn_begin(write=write)
  307. def commit(self):
  308. """Commit main transaction."""
  309. #logger.debug('Committing transaction.')
  310. self._txn_commit()
  311. def abort(self):
  312. """Abort main transaction."""
  313. #logger.debug('Rolling back transaction.')
  314. self._txn_abort()
  315. def rollback(self):
  316. """Alias for :py:meth:`abort`"""
  317. self.abort()
  318. def key_exists(self, key, dblabel='', new_txn=True):
  319. """
  320. Return whether a key exists in a database (Python-facing method).
  321. Wrap in a new transaction. Only use this if a transaction has not been
  322. opened.
  323. """
  324. if new_txn is True:
  325. with self.txn_ctx():
  326. return self._key_exists(
  327. key, len(key), dblabel=dblabel.encode())
  328. else:
  329. return self._key_exists(key, len(key), dblabel=dblabel.encode())
  330. cdef inline bint _key_exists(
  331. self, unsigned char *key, unsigned char klen,
  332. unsigned char *dblabel=b'') except -1:
  333. """
  334. Return whether a key exists in a database.
  335. To be used within an existing transaction.
  336. """
  337. cdef lmdb.MDB_val key_v, data_v
  338. key_v.mv_data = key
  339. key_v.mv_size = klen
  340. #logger.debug(
  341. # 'Checking if key {} with size {} exists...'.format(key, klen))
  342. try:
  343. _check(lmdb.mdb_get(
  344. self.txn, self.get_dbi(dblabel), &key_v, &data_v))
  345. except KeyNotFoundError:
  346. #logger.debug('...no.')
  347. return False
  348. #logger.debug('...yes.')
  349. return True
  350. def put(self, key, data, dblabel='', flags=0):
  351. """
  352. Put one key/value pair (Python-facing method).
  353. """
  354. self._put(
  355. key, len(key), data, len(data), dblabel=dblabel.encode(),
  356. txn=self.txn, flags=flags)
  357. cdef void _put(
  358. self, unsigned char *key, size_t key_size, unsigned char *data,
  359. size_t data_size, unsigned char *dblabel='',
  360. lmdb.MDB_txn *txn=NULL, unsigned int flags=0) except *:
  361. """
  362. Put one key/value pair.
  363. """
  364. if txn is NULL:
  365. txn = self.txn
  366. key_v.mv_data = key
  367. key_v.mv_size = key_size
  368. data_v.mv_data = data
  369. data_v.mv_size = data_size
  370. #logger.debug('Putting: {}, {} into DB {}'.format(key[: key_size],
  371. # data[: data_size], dblabel))
  372. rc = lmdb.mdb_put(txn, self.get_dbi(dblabel), &key_v, &data_v, flags)
  373. _check(rc, 'Error putting data: {}, {}'.format(
  374. key[: key_size], data[: data_size]))
  375. cpdef bytes get_data(self, key, dblabel=''):
  376. """
  377. Get a single value (non-dup) for a key (Python-facing method).
  378. """
  379. cdef lmdb.MDB_val rv
  380. try:
  381. self._get_data(key, len(key), &rv, dblabel=dblabel.encode())
  382. return (<unsigned char *>rv.mv_data)[: rv.mv_size]
  383. except KeyNotFoundError:
  384. return None
  385. cdef void _get_data(
  386. self, unsigned char *key, size_t klen, lmdb.MDB_val *rv,
  387. unsigned char *dblabel='') except *:
  388. """
  389. Get a single value (non-dup) for a key.
  390. """
  391. cdef:
  392. unsigned char *ret
  393. key_v.mv_data = key
  394. key_v.mv_size = len(key)
  395. _check(
  396. lmdb.mdb_get(self.txn, self.get_dbi(dblabel), &key_v, rv),
  397. 'Error getting data for key \'{}\'.'.format(key.decode()))
  398. def delete(self, key, dblabel=''):
  399. """
  400. Delete one single value by key. Python-facing method.
  401. """
  402. self._delete(key, len(key), dblabel.encode())
  403. cdef void _delete(
  404. self, unsigned char *key, size_t klen,
  405. unsigned char *dblabel=b'') except *:
  406. """
  407. Delete one single value by key from a non-dup database.
  408. TODO Allow deleting duplicate keys.
  409. """
  410. key_v.mv_data = key
  411. key_v.mv_size = klen
  412. try:
  413. _check(lmdb.mdb_del(self.txn, self.get_dbi(dblabel), &key_v, NULL))
  414. except KeyNotFoundError:
  415. pass
  416. cpdef dict stats(self):
  417. """Gather statistics about the database."""
  418. return self._stats()
  419. cdef dict _stats(self):
  420. """
  421. Gather statistics about the database.
  422. Cython-only, non-transaction-aware method.
  423. """
  424. cdef:
  425. lmdb.MDB_stat stat
  426. size_t entries
  427. lmdb.mdb_env_stat(self.dbenv, &stat)
  428. env_stats = <dict>stat
  429. db_stats = {}
  430. for i, dblabel in enumerate(self.dbi_labels):
  431. _check(
  432. lmdb.mdb_stat(self.txn, self.dbis[i], &stat),
  433. 'Error getting datbase stats: {}')
  434. entries = stat.ms_entries
  435. db_stats[dblabel.encode()] = <dict>stat
  436. return {
  437. 'env_stats': env_stats,
  438. 'env_size': os.stat(self.env_path).st_size,
  439. 'db_stats': {
  440. db_label: db_stats[db_label.encode()]
  441. for db_label in self.dbi_labels
  442. },
  443. }
  444. # UNFINISHED
  445. #cdef int _reader_list_callback(self, const unsigned char *msg, void *ctx):
  446. # """
  447. # Callback for reader info function.
  448. # Example from py-lmdb:
  449. # static int env_readers_callback(const char *msg, void *str_)
  450. # {
  451. # PyObject **str = str_;
  452. # PyObject *s = PyUnicode_FromString(msg);
  453. # PyObject *new;
  454. # if(! s) {
  455. # return -1;
  456. # }
  457. # new = PyUnicode_Concat(*str, s);
  458. # Py_CLEAR(*str);
  459. # *str = new;
  460. # if(! new) {
  461. # return -1;
  462. # }
  463. # return 0;
  464. # }
  465. # """
  466. # cdef:
  467. # unicode str = ctx[0].decode('utf-8')
  468. # unicode s = msg.decode('utf-8')
  469. # if not len(s):
  470. # return -1
  471. # str += s
  472. # logger.info('message: {}'.format(msg))
  473. # if not len(str):
  474. # return -1
  475. # ctx = &str
  476. #cpdef str reader_list(self):
  477. # """
  478. # Information about the reader lock table.
  479. # """
  480. # cdef unsigned char *ctx
  481. # lmdb.mdb_reader_list(self.dbenv, <lmdb.MDB_msg_func *>self._reader_list_callback, &ctx)
  482. # logger.info('Reader info: {}'.format(ctx))
  483. # return (ctx).decode('ascii')
  484. ### CYTHON METHODS ###
  485. cdef void _txn_begin(self, write=True, lmdb.MDB_txn *parent=NULL) except *:
  486. if not self.is_open:
  487. raise LmdbError('Store is not open.')
  488. cdef:
  489. unsigned int flags
  490. flags = 0 if write else lmdb.MDB_RDONLY
  491. logger.debug('Opening {} transaction in PID {}, thread {}'.format(
  492. 'RW' if write else 'RO',
  493. multiprocessing.current_process().pid,
  494. threading.currentThread().getName()))
  495. #logger.debug('Readers: {}'.format(self.reader_list()))
  496. rc = lmdb.mdb_txn_begin(self.dbenv, parent, flags, &self.txn)
  497. _check(rc, 'Error opening transaction.')
  498. logger.debug('Opened transaction @ {:x}'.format(<unsigned long>self.txn))
  499. self.is_txn_open = True
  500. self.is_txn_rw = write
  501. logger.debug('txn is open: {}'.format(self.is_txn_open))
  502. cdef void _txn_commit(self) except *:
  503. txid = '{:x}'.format(<unsigned long>self.txn)
  504. try:
  505. _check(lmdb.mdb_txn_commit(self.txn))
  506. logger.debug('Transaction @ {} committed.'.format(txid))
  507. self.is_txn_open = False
  508. self.is_txn_rw = False
  509. except:
  510. self._txn_abort()
  511. raise
  512. cdef void _txn_abort(self) except *:
  513. txid = '{:x}'.format(<unsigned long>self.txn)
  514. lmdb.mdb_txn_abort(self.txn)
  515. self.is_txn_open = False
  516. self.is_txn_rw = False
  517. logger.info('Transaction @ {} aborted.'.format(txid))
  518. cpdef int txn_id(self):
  519. return self._txn_id()
  520. cdef size_t _txn_id(self) except -1:
  521. return lmdb.mdb_txn_id(self.txn)
  522. cdef lmdb.MDB_dbi get_dbi(
  523. self, unsigned char *dblabel=NULL, lmdb.MDB_txn *txn=NULL):
  524. """
  525. Return a DB handle by database name.
  526. """
  527. cdef size_t dbidx
  528. if txn is NULL:
  529. txn = self.txn
  530. if dblabel is NULL:
  531. logger.debug('Getting DBI without label.')
  532. dbidx = (
  533. 0 if dblabel is NULL
  534. else self.dbi_labels.index(dblabel.decode()))
  535. #logger.debug(
  536. # f'Got DBI {self.dbis[dbidx]} with label {dblabel} '
  537. # f'and index #{dbidx}')
  538. return self.dbis[dbidx]
  539. cdef lmdb.MDB_cursor *_cur_open(
  540. self, unsigned char *dblabel=NULL, lmdb.MDB_txn *txn=NULL) except *:
  541. cdef:
  542. lmdb.MDB_dbi dbi
  543. if txn is NULL:
  544. txn = self.txn
  545. dbi = self.get_dbi(dblabel, txn=txn)
  546. logger.debug(f'Opening cursor for DB {dblabel} (DBI {dbi})...')
  547. #try:
  548. # # FIXME Either reuse the cursor, if it works, or remove this code.
  549. # _check(lmdb.mdb_cursor_renew(txn, self.curs[dbi]))
  550. # logger.debug(f'Repurposed existing cursor for DBI {dbi}.')
  551. #except LmdbError as e:
  552. # _check(
  553. # lmdb.mdb_cursor_open(txn, dbi, self.curs + dbi),
  554. # f'Error opening cursor: {dblabel}')
  555. # logger.debug(f'Created brand new cursor for DBI {dbi}.')
  556. _check(
  557. lmdb.mdb_cursor_open(txn, dbi, self.curs + dbi),
  558. f'Error opening cursor: {dblabel}')
  559. logger.debug('...opened @ {:x}.'.format(<unsigned long>self.curs[dbi]))
  560. return self.curs[dbi]
  561. cdef void _cur_close(self, lmdb.MDB_cursor *cur) except *:
  562. """Close a cursor."""
  563. #logger.info('Closing cursor @ {:x} for DBI {}...'.format(
  564. # <unsigned long>cur, lmdb.mdb_cursor_dbi(cur) ))
  565. lmdb.mdb_cursor_close(cur)
  566. #logger.info('...closed.')