Browse Source

Support transactions in DB, store interface and graph.

Stefano Cossu 2 years ago
parent
commit
259f2f3deb
6 changed files with 306 additions and 148 deletions
  1. 79 10
      include/graph.h
  2. 4 2
      include/store.h
  3. 41 25
      include/store_interface.h
  4. 9 5
      include/store_mdb.h
  5. 48 24
      src/graph.c
  6. 125 82
      src/store_mdb.c

+ 79 - 10
include/graph.h

@@ -49,9 +49,15 @@ LSUP_graph_new (
 /** @brief Copy triples from a source graph into a destination one.
  *
  * The destination graph is not initialized here, so the copy is cumulative.
+ *
+ * @param src[in] Source graph.
+ *
+ * @param dest[in] Destination graph.
+ *
+ * @param[in] txn Transaction handle.
  */
 LSUP_rc
-LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest);
+LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest, void *txn);
 
 
 /** Perform a boolean operation between two graphs.
@@ -66,6 +72,8 @@ LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest);
  *
  * @param gr2[in] Second operand.
  *
+ * @param[in] txn Transaction handle.
+ *
  * @param res[out] Result graph. The handle should be initialized via
  *  #LSUP_graph_new() or equivalent. Any preexisting contents are not removed.
  *  If an unrecoverable error occurs, this graph is freed.
@@ -75,7 +83,7 @@ LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest);
 LSUP_rc
 LSUP_graph_bool_op(
         const LSUP_bool_op op, const LSUP_Graph *gr1, const LSUP_Graph *gr2,
-        LSUP_Graph *res);
+        void *txn, LSUP_Graph *res);
 
 
 /** @brief Free a graph.
@@ -90,10 +98,12 @@ LSUP_graph_free (LSUP_Graph *gr);
  *
  * @param[in] gr2 Second operand.
  *
+ * @param[in] txn Transaction handle.
+ *
  * @return True if the graphs are topologically equal, false otherwise.
  */
 bool
-LSUP_graph_equals (const LSUP_Graph *gr1, const LSUP_Graph *gr2);
+LSUP_graph_equals (const LSUP_Graph *gr1, const LSUP_Graph *gr2, void *txn);
 
 
 /** @brief Read-only graph URI.
@@ -132,7 +142,7 @@ LSUP_graph_namespace (const LSUP_Graph *gr);
 
 /** @brief Set the namespace map for an in-memory graph.
  *
- * This has no effect on MDB graphs.
+ * This has no effect on graph stores with LSUP_STORE_PERM.
  *
  * @param[in] gr Graph to set the namespace map for.
  *
@@ -157,18 +167,70 @@ LSUP_graph_size (const LSUP_Graph *gr);
  * @return 1 if the triple is found, 0 if not found.
  */
 bool
-LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo);
+LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo, void *txn);
+
+
+/** @brief Begin a transaction.
+ *
+ * If the underlying store supports it, begin a transaction and return
+ * an opaque handle.
+ *
+ * The transaction must be either committed with #LSUP_graph_commit() or
+ * rolled back with #LSUP_graph_abort().
+ *
+ * @param[in] gr Graph handle.
+ *
+ * @param[in] flags Unused for now, use 0. TODO
+ *
+ * @param[out] txn Transaction handle. This may be passed to all subsequent
+ * operations that one wants to perform within that transaction.
+ *
+ * @return LSUP_OK on success; LSUP_VALUE_ERR if the graph store does not
+ *  support transactions; <0 on other errors from the underlying store.
+ */
+LSUP_rc
+LSUP_graph_begin (const LSUP_Graph *gr, int flags, void **txn);
+
+
+/** @brief Commit a transaction.
+ *
+ * If the underlying store supports it, commit an open transaction. In case of
+ * error, the transaction is left open and it is advisable to roll it back with
+ * #LSUP_graph_abort().
+ *
+ * @param[in] gr Graph handle.
+ *
+ * @param[in] txn Transaction handle.
+ *
+ * @return LSUP_OK if the transaction was committed successfully; LSUP_NOACTION
+ *  if NULL was passed; LSUP_TXN_ERR on error.
+ */
+LSUP_rc LSUP_graph_commit (const LSUP_Graph *gr, void *txn);
+
+
+/** @brief Abort (roll back) a transaction.
+ *
+ * If the underlying store supports it, abort an open transaction and abandon
+ * all changes.
+ *
+ * @param[in] gr Graph handle.
+ *
+ * @param[in] txn Transaction handle.
+ */
+void LSUP_graph_abort (const LSUP_Graph *gr, void *txn);
 
 
 /** @brief Initialize an iterator to add triples.
  *
  * @param[in] gr Graph to add to. It is added to the iterator state.
  *
+ * @param[in] txn Transaction handle.
+ *
  * @return Iterator handle. This should be passed to #LSUP_graph_add_iter() and
  * must be freed with #LSUP_graph_add_done().
  */
 LSUP_GraphIterator *
-LSUP_graph_add_init (LSUP_Graph *gr);
+LSUP_graph_add_init (LSUP_Graph *gr, void *txn);
 
 
 
@@ -200,11 +262,14 @@ LSUP_graph_add_done (LSUP_GraphIterator *it);
  *
  * @param[in] strp Array of buffer triples to add. The last one must be NULL.
  *
- * @param[out] inserted This will be filled with the total number of triples
+ * @param[in] txn Transaction handle.
+ *
+ * @param[out] ct This will be filled with the total number of triples
  *  inserted.
  */
 LSUP_rc
-LSUP_graph_add (LSUP_Graph *gr, const LSUP_Triple trp[], size_t *inserted);
+LSUP_graph_add (
+        LSUP_Graph *gr, const LSUP_Triple trp[], void *txn, size_t *ct);
 
 
 /** @brief Delete triples by a matching pattern.
@@ -213,13 +278,15 @@ LSUP_graph_add (LSUP_Graph *gr, const LSUP_Triple trp[], size_t *inserted);
  *
  * @param ptn[in] Matching pattern. Any and all of s, p, o can be NULL.
  *
+ * @param[in] txn Transaction handle.
+ *
  * @param ct[out] If not NULL it is populated with the number of triples
  *  deleted.
  */
 LSUP_rc
 LSUP_graph_remove (
         LSUP_Graph *gr, const LSUP_Term *s, const LSUP_Term *p,
-        const LSUP_Term *o, size_t *ct);
+        const LSUP_Term *o, void *txn, size_t *ct);
 
 
 /** Look up triples by a matching pattern and yield an iterator.
@@ -229,12 +296,14 @@ LSUP_graph_remove (
  * @param spo[in] Triple to look for. Any and all terms can be NULL, which
  *  indicate unbound terms.
  *
+ * @param[in] txn Transaction handle.
+ *
  * @param it[out] Pointer to a #LSUP_GraphIterator to be generated. It must be
  *  freed with #LSUP_graph_iter_free after use.
  */
 LSUP_GraphIterator *
 LSUP_graph_lookup (const LSUP_Graph *gr, const LSUP_Term *s,
-        const LSUP_Term *p, const LSUP_Term *o, size_t *ct);
+        const LSUP_Term *p, const LSUP_Term *o, void *txn, size_t *ct);
 
 
 /** @brief Advance a cursor obtained by a lookup and return a matching triple.

+ 4 - 2
include/store.h

@@ -58,9 +58,11 @@ const LSUP_StoreInt *LSUP_store_int (LSUP_StoreType type);
  *
  * @sa #LSUP_graph_new()
  */
-typedef struct store_it {
+typedef struct store_t {
     LSUP_StoreType                  type;   ///< Store type.
-    char *                          id;     /**< Store ID. NOTE: This is
+    char *                          id;     /**< Store ID.
+                                             *
+                                             *   NOTE: This is
                                              *   NULL for volatile stores.
                                              */
     const LSUP_StoreInt *           sif;    ///< Store interface.

+ 41 - 25
include/store_interface.h

@@ -140,8 +140,8 @@ typedef LSUP_rc (*store_stat_fn_t)(void *store, void *stat);
  *
  * @param[in] flags Transaction flags. These vary with each implementation.
  *
- * @param[out] txn Will be populated with the new open transaction on success,
- *  or left undefined on failure.
+ * @param[out] txn Will point to the new open transaction on success, or to
+ * undefined content on failure.
  *
  * @return LSUP_OK if the transaction started successfully, <0 on error.
  */
@@ -167,7 +167,7 @@ typedef LSUP_rc (*store_txn_commit_fn_t)(void *store);
  *
  * @param[in] store Store handle.
  *
- * @param[in] txn Transaction to abort.
+ * @param[in] txn Transaction handle generated by #store_txn_begin_fn_t.
  */
 typedef void (*store_txn_abort_fn_t)(void *store);
 
@@ -178,10 +178,6 @@ typedef void (*store_txn_abort_fn_t)(void *store);
  * need to be pre-processed, which can be done in the same loop as the next
  * step to keep memory usage low.
  *
- * For stores with the #LSUP_STORE_TXN feature, this function should open a
- * transaction that can be committed with #store_add_done_fn_t or rolled back
- * with #store_add_abort_fn_t.
- *
  * @param store[in] The store to add to.
  *
  * @param sc[in] Context as a serialized term. If this is NULL, and the
@@ -190,9 +186,13 @@ typedef void (*store_txn_abort_fn_t)(void *store);
  *  the value of sc, triples will be added with no context. Only meaningful
  *  for stores with the LSUP_STORE_CTX feature.
  *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
+ *
  * @return Iterator handle to be passed to the following load steps.
  */
-typedef void * (*store_add_init_fn_t)(void *store, const LSUP_Buffer * sc);
+typedef void * (*store_add_init_fn_t)(
+        void *store, const LSUP_Buffer *sc, void *udata);
 
 
 /** @brief Add one triple into the store.
@@ -239,7 +239,8 @@ typedef LSUP_rc (*store_add_done_fn_t)(void *it);
  *
  * @param[in] sterm Serialized term to store.
  */
-typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
+typedef LSUP_rc (*store_add_term_fn_t)(
+        void *store, const LSUP_Buffer *sterm, void *udata);
 
 
 /** @brief Prototype: look up triples by pattern matching.
@@ -247,9 +248,12 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
  * This function may return a count of matches and/or an iterator of results as
  * serialized triples.
  *
+ * For stores with #LSUP_STORE_TXN, this opens a read-only transaction. The
+ * transaction handle is held in the iterator structure and is closed when the
+ * iterator is freed with #iter_free_fn_t().
+ *
  * Any and all of the terms may be NULL, which indicates an unbound query
- * term. Stores with context not set or witout context support will always
- * ignore the fourth term.
+ * term. Stores witout context support will always ignore sc.
  *
  * @param[in] store The store to be queried.
  *
@@ -268,6 +272,9 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
  *  much less so for 1-bound and 2-bound context lookups, in which cases it
  *  should be set only if needed.
  *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
+ *
  * @return Iterator handle that will be populated with a result iterator. This
  * is always created even if no matches are found and must be freed with
  * #LSUP_mdbiter_free() after use. If matches are found, the iterator points to
@@ -276,7 +283,7 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
 typedef void * (*store_lookup_fn_t)(
         void *store,
         const LSUP_Buffer *ss, const LSUP_Buffer *sp, const LSUP_Buffer *so,
-        const LSUP_Buffer *sc, size_t *ct);
+        const LSUP_Buffer *sc, void *udata, size_t *ct);
 
 
 /** @brief Prototype: check for existence of a triple (T/F).
@@ -297,13 +304,16 @@ typedef bool (*store_trp_exist_fn_t)(
 /** @brief Prototype: delete triples by pattern matching.
  *
  * The ss, sp, so, sc terms act as a matching pattern as documented in
- * #store_lookup_fn. if not NULL, ct yields the number of triples actually
+ * @sa #store_lookup_fn. if not NULL, ct yields the number of triples actually
  * deleted.
+ *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
  */
 typedef LSUP_rc (*store_remove_fn_t)(
         void *store,
         const LSUP_Buffer *ss, const LSUP_Buffer *sp, const LSUP_Buffer *so,
-        const LSUP_Buffer *sc, size_t *ct);
+        const LSUP_Buffer *sc, size_t *ct, void *udata);
 
 
 /** @brief Put an in-memory namespace map into a permanent back end.
@@ -325,11 +335,15 @@ typedef LSUP_rc (*store_remove_fn_t)(
  *
  * @param[out] nsm Namespace map handle to store.
  *
+ * @param[in] udata User-defined data. Consult individual implementations for
+ *  details.
+ *
  * @return LSUP_OK if all terms were updated; LSUP_CONFLICT if one or more
  *  namespaces or terms were not updated because they already existed; <0 if
  *  an error occurred.
  */
-typedef LSUP_rc (*store_nsm_put_fn_t)(void *store, const LSUP_NSMap * nsm);
+typedef LSUP_rc (*store_nsm_put_fn_t)(
+        void *store, const LSUP_NSMap * nsm, void *udata);
 
 
 /** @brief Get the store's namespace prefix map.
@@ -431,15 +445,17 @@ typedef struct store_if_t {
     // Addition.
     store_add_init_fn_t add_init_fn;    ///< Initialize add iteration.
     store_add_iter_fn_t add_iter_fn;    ///< Add one triple.
-    store_add_abort_fn_t add_abort_fn;  /**< Abort (roll back) the add
-                                         *  process.  Only available in
-                                         *  stores with #LSUP_STORE_TXN
-                                         *  feature. Optional.
+    store_add_abort_fn_t add_abort_fn;  /**< Abort (roll back) the add process.
+                                         *
+                                         *   Only available in
+                                         *   stores with #LSUP_STORE_TXN
+                                         *   feature. Optional.
                                          */
     store_add_done_fn_t add_done_fn;    ///< Complete the add process.
     store_add_term_fn_t add_term_fn;    /**< Add (index) a term to the store.
-                                         *  Only available in stores with
-                                         *  #LSUP_STORE_IDX feature. Optional.
+                                         *
+                                         *   Only available in stores with
+                                         *   #LSUP_STORE_IDX feature. Optional.
                                          */
 
     // Look up.
@@ -452,14 +468,14 @@ typedef struct store_if_t {
     store_remove_fn_t   remove_fn;      ///< Remove triples by pattern.
 
     // Namespace prefix mapping.
-    store_nsm_put_fn_t  nsm_put_fn;     /**< Add a namespace/prefix pair to
-                                         *  the prefix map.
+    store_nsm_put_fn_t  nsm_put_fn;     /**< Add a ns/pfx pair to the map.
+                                         *
                                          *  Only available (and mandatory)
                                          *  in stores with the
                                          *  #LSUP_STORE_IDX feature.
                                          */
-    store_nsm_get_fn_t  nsm_get_fn;     /**< Get a namespace/prefix from
-                                         *  the prefix map.
+    store_nsm_get_fn_t  nsm_get_fn;     /**< Get a namespace from the map.
+                                         *
                                          *  Only available (and mandatory)
                                          *  in stores with the
                                          *  #LSUP_STORE_IDX feature.

+ 9 - 5
include/store_mdb.h

@@ -11,11 +11,15 @@
  * per session. Within that session multiple R/W operations can be performed
  * using transactions.
  *
- * Note that, even though the terms "graph", "context", etc. are used, no code
- * in this module checks for valid RDF data. In theory any term can be any
- * binary data. This allows using the store for non-RDF graph data.
- *
- * TODO more doc
+ * This store supports transactions. Under the hood, LMDB supports nested RW
+ * transactions, which are used here, but not exposed to the caller. Some
+ * functions have a transaction handle parameter that may be NULL. In that
+ * case, a new transaction is opened and closed within the scope of the
+ * function (or, in cases such as #mdbstore_lookup(), within the life cycle of
+ * the iterator); if not, the transaction handle may either be used as the
+ * parent for a new transaction (which is closed as in the previous case), or
+ * the function uses the same transaction (i.e. changes are only committed 
+ * after the parent transaction is committed).
  */
 
 

+ 48 - 24
src/graph.c

@@ -15,6 +15,7 @@ struct graph_iter_t {
     LSUP_Store *            store;          ///< Store tied to the iterator.
     void *                  data;           ///< Iterator state.
     size_t                  ct;             ///< Total lookup matches.
+    void *                  txn;            ///< Store transaction.
 };
 
 
@@ -80,7 +81,7 @@ BACKEND_TBL
 LSUP_rc
 LSUP_graph_bool_op(
         const LSUP_bool_op op, const LSUP_Graph *gr1, const LSUP_Graph *gr2,
-        LSUP_Graph *res)
+        void *txn, LSUP_Graph *res)
 {
     LSUP_rc rc = LSUP_NOACTION;
     if (UNLIKELY (
@@ -94,9 +95,9 @@ LSUP_graph_bool_op(
     }
 
     if (op == LSUP_BOOL_UNION) {
-        rc = LSUP_graph_copy_contents (gr1, res);
+        rc = LSUP_graph_copy_contents (gr1, res, txn);
         PCHECK (rc, fail);
-        rc = LSUP_graph_copy_contents (gr2, res);
+        rc = LSUP_graph_copy_contents (gr2, res, txn);
         PCHECK (rc, fail);
 
         return LSUP_OK;
@@ -110,15 +111,16 @@ LSUP_graph_bool_op(
     LSUP_BufferTriple *sspo = BTRP_DUMMY;
     size_t ct;
 
-    add_it = res->store->sif->add_init_fn (res->store->data, res_sc);
+    add_it = res->store->sif->add_init_fn (res->store->data, res_sc, txn);
 
     if (op == LSUP_BOOL_XOR) {
         // Add triples from gr2 if not found in gr1.
         lu2_it = gr2->store->sif->lookup_fn (
-                gr2->store->data, NULL, NULL, NULL, gr2_sc, NULL);
+                gr2->store->data, NULL, NULL, NULL, gr2_sc, NULL, txn);
         while (gr2->store->sif->lu_next_fn (lu2_it, sspo, NULL) == LSUP_OK) {
             lu1_it = gr1->store->sif->lookup_fn (
-                    gr1->store->data, sspo->s, sspo->p, sspo->o, gr1_sc, &ct);
+                    gr1->store->data, sspo->s, sspo->p, sspo->o, gr1_sc,
+                    txn, &ct);
             if (ct > 0)
                 res->store->sif->add_iter_fn (add_it, sspo);
             gr1->store->sif->lu_free_fn (lu1_it);
@@ -127,10 +129,10 @@ LSUP_graph_bool_op(
     }
 
     lu1_it = gr1->store->sif->lookup_fn (
-            gr1->store->data, NULL, NULL, NULL, gr1_sc, NULL);
+            gr1->store->data, NULL, NULL, NULL, gr1_sc, txn, NULL);
     while (gr1->store->sif->lu_next_fn (lu1_it, sspo, NULL) == LSUP_OK) {
         lu2_it = gr2->store->sif->lookup_fn (
-                gr2->store->data, sspo->s, sspo->p, sspo->o, gr2_sc, &ct);
+                gr2->store->data, sspo->s, sspo->p, sspo->o, gr2_sc, txn, &ct);
         // For XOR and subtraction, add if not found.
         // For intersection, add if found.
         if ((ct == 0) ^ (op == LSUP_BOOL_INTERSECTION))
@@ -209,10 +211,10 @@ LSUP_graph_size (const LSUP_Graph *gr)
 
 
 bool
-LSUP_graph_equals (const LSUP_Graph *gr1, const LSUP_Graph *gr2)
+LSUP_graph_equals (const LSUP_Graph *gr1, const LSUP_Graph *gr2, void *txn)
 {
     LSUP_Graph *res = LSUP_graph_new (NULL, LSUP_STORE_HTABLE, NULL, NULL, 0);
-    LSUP_graph_bool_op (LSUP_BOOL_XOR, gr1, gr2, res);
+    LSUP_graph_bool_op (LSUP_BOOL_XOR, gr1, gr2, txn, res);
     bool ret = (LSUP_graph_size (res) == 0);
 
     LSUP_graph_free (res);
@@ -222,14 +224,14 @@ LSUP_graph_equals (const LSUP_Graph *gr1, const LSUP_Graph *gr2)
 
 
 LSUP_GraphIterator *
-LSUP_graph_add_init (LSUP_Graph *gr)
+LSUP_graph_add_init (LSUP_Graph *gr, void *txn)
 {
     LSUP_GraphIterator *it;
     CALLOC_GUARD (it, NULL);
 
     LSUP_Buffer *sc = LSUP_term_serialize (gr->uri);
 
-    it->data = gr->store->sif->add_init_fn (gr->store->data, sc);
+    it->data = gr->store->sif->add_init_fn (gr->store->data, sc, txn);
     LSUP_buffer_free (sc);
 
     it->store = gr->store;
@@ -254,7 +256,8 @@ LSUP_graph_add_iter (LSUP_GraphIterator *it, const LSUP_Triple *spo)
             LSUP_Term *term = LSUP_triple_pos (spo, i);
             if (term->type == LSUP_TERM_LITERAL) {
                 LSUP_Buffer *ser_dtype = LSUP_term_serialize (term->datatype);
-                it->store->sif->add_term_fn (it->store->data, ser_dtype);
+                it->store->sif->add_term_fn (
+                        it->store->data, ser_dtype, it->txn);
                 LSUP_buffer_free (ser_dtype);
             }
         }
@@ -277,12 +280,12 @@ LSUP_graph_add_done (LSUP_GraphIterator *it)
 
 
 LSUP_rc
-LSUP_graph_add (LSUP_Graph *gr, const LSUP_Triple trp[], size_t *ct)
+LSUP_graph_add (LSUP_Graph *gr, const LSUP_Triple trp[], void *txn, size_t *ct)
 {
     LSUP_rc rc = LSUP_NOACTION;
 
     // Initialize iterator.
-    LSUP_GraphIterator *it = LSUP_graph_add_init (gr);
+    LSUP_GraphIterator *it = LSUP_graph_add_init (gr, txn);
 
     if (ct) *ct = 0;
     // Serialize and insert RDF triples.
@@ -313,7 +316,7 @@ finally:
 LSUP_rc
 LSUP_graph_remove (
         LSUP_Graph *gr, const LSUP_Term *s, const LSUP_Term *p,
-        const LSUP_Term *o, size_t *ct)
+        const LSUP_Term *o, void *txn, size_t *ct)
 {
     LSUP_rc rc;
 
@@ -323,7 +326,7 @@ LSUP_graph_remove (
         *so = LSUP_term_serialize (o),
         *sc = LSUP_term_serialize (gr->uri);
 
-    rc = gr->store->sif->remove_fn (gr->store->data, ss, sp, so, sc, ct);
+    rc = gr->store->sif->remove_fn (gr->store->data, ss, sp, so, sc, txn, ct);
 
     LSUP_buffer_free (ss);
     LSUP_buffer_free (sp);
@@ -340,15 +343,16 @@ LSUP_graph_remove (
  * The destination graph is not initialized here, so the copy is cumulative.
  */
 LSUP_rc
-LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest)
+LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest, void *txn)
 {
     LSUP_rc rc = LSUP_NOACTION;
 
-    LSUP_GraphIterator *it = LSUP_graph_lookup (src, NULL, NULL, NULL, NULL);
+    LSUP_GraphIterator *it = LSUP_graph_lookup (
+            src, NULL, NULL, NULL, txn, NULL);
 
     LSUP_Triple spo;
 
-    LSUP_GraphIterator *add_it = LSUP_graph_add_init (dest);
+    LSUP_GraphIterator *add_it = LSUP_graph_add_init (dest, txn);
     while (LSUP_graph_iter_next (it, &spo) != LSUP_END) {
         LSUP_rc add_rc = LSUP_graph_add_iter (add_it, &spo);
         LSUP_triple_done (&spo);
@@ -369,7 +373,7 @@ LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest)
 LSUP_GraphIterator *
 LSUP_graph_lookup (
         const LSUP_Graph *gr, const LSUP_Term *s, const LSUP_Term *p,
-        const LSUP_Term *o, size_t *ct)
+        const LSUP_Term *o, void *txn, size_t *ct)
 {
     LSUP_GraphIterator *it;
     MALLOC_GUARD (it, NULL);
@@ -382,7 +386,9 @@ LSUP_graph_lookup (
         *so = LSUP_term_serialize (o),
         *sc = LSUP_term_serialize (gr->uri);
 
-    it->data = it->store->sif->lookup_fn (it->store->data, ss, sp, so, sc, ct);
+    it->txn = txn;
+    it->data = it->store->sif->lookup_fn (
+            it->store->data, ss, sp, so, sc, it->txn, ct);
     if (UNLIKELY (!it->data)) {
         free (it);
         it = NULL;
@@ -441,10 +447,10 @@ LSUP_graph_iter_free (LSUP_GraphIterator *it)
 
 
 bool
-LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo)
+LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo, void *txn)
 {
     LSUP_GraphIterator *it = LSUP_graph_lookup (
-            gr, spo->s, spo->p, spo->o, NULL);
+            gr, spo->s, spo->p, spo->o, txn, NULL);
     LSUP_Triple *tmp_spo = TRP_DUMMY;
     bool rc = LSUP_graph_iter_next (it, tmp_spo) != LSUP_END;
 
@@ -455,6 +461,24 @@ LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo)
 }
 
 
+LSUP_rc
+LSUP_graph_begin (const LSUP_Graph *gr, int flags, void **txn) {
+    if (!(gr->store->sif->features & LSUP_STORE_TXN)) return LSUP_VALUE_ERR;
+
+    return gr->store->sif->txn_begin_fn(gr->store->data, flags, txn);
+}
+
+
+LSUP_rc
+LSUP_graph_commit (const LSUP_Graph *gr, void *txn)
+{ return gr->store->sif->txn_commit_fn (txn); }
+
+
+void
+LSUP_graph_abort (const LSUP_Graph *gr, void *txn)
+{ gr->store->sif->txn_abort_fn (txn); }
+
+
 /*
  * Static functions.
  */

+ 125 - 82
src/store_mdb.c

@@ -28,10 +28,24 @@
 typedef char DbLabel[8];
 typedef struct mdbstore_iter_t MDBIterator;
 
+/// Store state flags.
 typedef enum {
     LSSTORE_OPEN         = 1<<0,                    ///< Env is open.
-    LSSTORE_DIRTY_TXN    = LSSTORE_OPEN + (1<<1),   ///< Main txn is open.
-} StoreState;
+} StoreFlags;
+
+/// Iterator state flags.
+typedef enum {
+    ITER_OPEN_TXN       = 1<<0,         /**< A transaction is open.
+                                          *
+                                          *  The iterator has begun a new
+                                          *  transaction on initialization
+                                          *  which needs to be closed.  If
+                                          *  false, the iterator is using an
+                                          *  existing transaction which will
+                                          *  not be closed with
+                                          *  #mdbiter_free().
+                                          */
+} IterFlags;
 
 typedef enum {
     OP_ADD,
@@ -40,10 +54,8 @@ typedef enum {
 
 typedef struct mdbstore_t {
     MDB_env *           env;            ///< Environment handle.
-    MDB_txn *           txn;            ///< Current transaction.
     MDB_dbi             dbi[N_DB];      ///< DB handles. Refer to DbIdx enum.
-    StoreState          state;          ///< Store state.
-    int                 features;       ///< Store feature flags.
+    StoreFlags          flags;          ///< Store state flags.
 } MDBStore;
 
 /** @brief Iterator operation.
@@ -62,13 +74,17 @@ typedef void (*iter_op_fn_t)(MDBIterator *it);
 /// Triple iterator.
 typedef struct mdbstore_iter_t {
     MDBStore *          store;      ///< MDB store handle.
+    IterFlags           flags;      ///< Iterator flags.
     MDB_txn *           txn;        ///< MDB transaction.
     MDB_cursor *        cur;        ///< MDB cursor.
     MDB_cursor *        ctx_cur;    ///< MDB c:spo index cursor.
-    MDB_val             key, data;  ///< Internal data handlers.
+    MDB_val             key;        ///< Internal data handler.
+    MDB_val             data;       ///< Internal data handler.
     LSUP_TripleKey      spok;       ///< Triple to be populated with match.
-    LSUP_Key *          ck;         ///< Context array to be populated for each
-                                    ///< matching triple if requested.
+    LSUP_Key *          ck;         /**< Context array.
+                                      *
+                                      *  This shall be populated for each
+                                      * matching triple if requested. */
     iter_op_fn_t        iter_op_fn; ///< Function used to look up next match.
     const uint8_t *     term_order; ///< Term order used in 1-2bound look-ups.
     LSUP_Key            luk[3];     ///< 0÷3 lookup keys.
@@ -184,8 +200,9 @@ static const uint8_t lookup_ordering_2bound[3][3] = {
  * Static prototypes.
  */
 static int index_triple(
-        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck);
-static LSUP_rc mdbstore_add_term (void *h, const LSUP_Buffer *sterm);
+        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck,
+        MDB_txn *txn);
+static LSUP_rc mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th);
 
 inline static LSUP_rc lookup_0bound (MDBIterator *it, size_t *ct);
 inline static LSUP_rc lookup_1bound (
@@ -240,11 +257,11 @@ finally:
 
 
 static LSUP_rc
-mdbstore_nsm_put (void *h, const LSUP_NSMap *nsm)
+mdbstore_nsm_put (void *h, const LSUP_NSMap *nsm, void *th)
 {
     MDBStore *store = h;
     MDB_txn *txn;
-    RCCK (mdb_txn_begin (store->env, store->txn, 0, &txn));
+    RCCK (mdb_txn_begin (store->env, (MDB_txn *) th, 0, &txn));
 
     LSUP_rc rc = LSUP_NOACTION;
     int db_rc;
@@ -403,31 +420,32 @@ mdbstore_new (const char *id, size_t _unused)
     CHECK (mdb_env_open (store->env, path, 0, ENV_FILE_MODE), fail);
 
     // Assign DB handles to store->dbi.
-    mdb_txn_begin (store->env, NULL, 0, &store->txn);
+    MDB_txn *txn = NULL;
+    CHECK (mdb_txn_begin (store->env, NULL, 0, &txn), fail);
     for (int i = 0; i < N_DB; i++)
         CHECK (mdb_dbi_open (
-                store->txn, db_labels[i], db_flags[i], store->dbi + i), fail);
+                txn, db_labels[i], db_flags[i], store->dbi + i), fail);
 
     // Bootstrap the permanent store with initial data.
     MDB_stat stat;
-    CHECK (mdb_stat (store->txn, store->dbi[IDX_PFX_NS], &stat), fail);
+    CHECK (mdb_stat (txn, store->dbi[IDX_PFX_NS], &stat), fail);
     if (stat.ms_entries == 0) {
         log_debug ("Loading initial data into %s", path);
         // Load initial NS map.
-        mdbstore_nsm_put (store, LSUP_default_nsm);
+        mdbstore_nsm_put (store, LSUP_default_nsm, txn);
 
         // Index default context.
-        mdbstore_add_term (store, LSUP_default_ctx_buf);
+        mdbstore_add_term (store, LSUP_default_ctx_buf, txn);
     }
 
-    store->state |= LSSTORE_OPEN;
-    mdb_txn_commit (store->txn);
-    store->txn = NULL;
+    store->flags |= LSSTORE_OPEN;
+    mdb_txn_commit (txn);
+    txn = NULL;
 
     return store;
 
 fail:
-    if (store->txn) mdb_txn_abort (store->txn);
+    if (txn) mdb_txn_abort (txn);
     mdb_env_close (store->env);
 
     return NULL;
@@ -438,7 +456,7 @@ static void
 mdbstore_free (void *h)
 {
     MDBStore *store = h;
-    if (store->state & LSSTORE_OPEN) {
+    if (store->flags & LSSTORE_OPEN) {
         const char *path;
         mdb_env_get_path (store->env, &path);
         log_info ("Closing MDB env at %s.", path);
@@ -463,7 +481,7 @@ mdbstore_id (const void *h)
 static LSUP_rc
 mdbstore_stat (const MDBStore *store, MDB_stat *stat)
 {
-    if (!(store->state & LSSTORE_OPEN)) return 0;
+    if (!(store->flags & LSSTORE_OPEN)) return 0;
 
     MDB_txn *txn;
     mdb_txn_begin (store->env, NULL, MDB_RDONLY, &txn);
@@ -489,8 +507,43 @@ mdbstore_size (const void *h)
 }
 
 
+static LSUP_rc
+mdbstore_txn_begin (void *h, int flags, void **th)
+{
+    MDBStore *store = h;
+
+    RCCK (mdb_txn_begin (store->env, NULL, flags, (MDB_txn **) th));
+
+    return LSUP_OK;
+}
+
+
+static LSUP_rc
+mdbstore_txn_commit (void *th)
+{
+    RCCK (mdb_txn_commit ((MDB_txn *) th));
+
+    return LSUP_OK;
+}
+
+
+static void
+mdbstore_txn_abort (void *th)
+{ mdb_txn_abort ((MDB_txn *) th); }
+
+
+/** @brief Begin an add loop.
+ *
+ * @sa #store_add_init_fn_t
+ *
+ * @param[in] th Previously opened MDB_txn handle, if the add loop shall be
+ *  run within a broader transaction. The transaction must be read-write. The
+ *  operation will always open a new transaction that is closed with
+ *  #mdbstore_add_done() or #mdbstore_add_abort(). If this parameter is not
+ *  NULL, the loop transaction will have the passed txn set as its parent.
+ */
 static void *
-mdbstore_add_init (void *h, const LSUP_Buffer *sc)
+mdbstore_add_init (void *h, const LSUP_Buffer *sc, void *th)
 {
     MDBStore *store = h;
     /* An iterator is used here. Some members are a bit misused but it does
@@ -502,12 +555,7 @@ mdbstore_add_init (void *h, const LSUP_Buffer *sc)
     it->store = store;
     it->i = 0;
 
-    // No other write transaction may be open.
-    if (UNLIKELY (it->store->txn)) {
-        log_error ("A write transaction is already open.");
-        return NULL;
-    }
-    mdb_txn_begin (store->env, NULL, 0, &it->store->txn);
+    mdb_txn_begin (store->env, (MDB_txn *) th, 0, &it->txn);
 
     if (sc) {
         // Store context if it's not the default one.
@@ -521,11 +569,11 @@ mdbstore_add_init (void *h, const LSUP_Buffer *sc)
         it->data.mv_size = sc->size;
 
         int db_rc = mdb_put(
-                it->store->txn, it->store->dbi[IDX_T_ST],
+                it->txn, it->store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE);
         if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
             LOG_RC (db_rc);
-            mdb_txn_abort (it->store->txn);
+            mdb_txn_abort (it->txn);
             return NULL;
         }
     } else {
@@ -564,7 +612,7 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
         it->data.mv_size = st->size;
 
         db_rc = mdb_put(
-                it->store->txn, it->store->dbi[IDX_T_ST],
+                it->txn, it->store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE);
         if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
             LOG_RC (db_rc);
@@ -584,7 +632,7 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
     it->data.mv_size = it->luc == NULL_KEY ? 0 : KLEN;
 
     db_rc = mdb_put(
-            it->store->txn, it->store->dbi[IDX_SPO_C],
+            it->txn, it->store->dbi[IDX_SPO_C],
             &it->key, &it->data, MDB_NODUPDATA);
 
     if (db_rc == MDB_KEYEXIST) return LSUP_NOACTION;
@@ -595,7 +643,7 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
     }
 
     // Index.
-    LSUP_rc rc = index_triple (it->store, OP_ADD, spok, it->luc);
+    LSUP_rc rc = index_triple (it->store, OP_ADD, spok, it->luc, it->txn);
     if (rc == LSUP_OK) it->i++;
 
     return rc;
@@ -608,13 +656,11 @@ mdbstore_add_done (void *h)
     MDBIterator *it = h;
     LSUP_rc rc = LSUP_OK;
 
-    if (mdb_txn_commit (it->store->txn) != MDB_SUCCESS) {
-        mdb_txn_abort (it->store->txn);
-        rc = LSUP_DB_ERR;
+    if (mdb_txn_commit (it->txn) != MDB_SUCCESS) {
+        mdb_txn_abort (it->txn);
+        rc = LSUP_TXN_ERR;
     }
 
-    it->store->txn = NULL;
-
     free (it);
 
     return rc;
@@ -625,9 +671,7 @@ static void
 mdbstore_add_abort (void *h)
 {
     MDBIterator *it = h;
-    mdb_txn_abort (it->store->txn);
-
-    it->store->txn = NULL;
+    mdb_txn_abort (it->txn);
 
     free (it);
 }
@@ -686,9 +730,8 @@ key_to_sterm (MDBIterator *it, const LSUP_Key key, LSUP_Buffer *sterm)
 static void *
 mdbstore_lookup (
         void *h, const LSUP_Buffer *ss, const LSUP_Buffer *sp,
-        const LSUP_Buffer *so, const LSUP_Buffer *sc, size_t *ct)
+        const LSUP_Buffer *so, const LSUP_Buffer *sc, void *th, size_t *ct)
 {
-    MDBStore *store = h;
     LSUP_TripleKey spok = {
         LSUP_buffer_hash (ss),
         LSUP_buffer_hash (sp),
@@ -698,7 +741,7 @@ mdbstore_lookup (
     MDBIterator *it;
     CALLOC_GUARD (it, NULL);
 
-    it->store = store;
+    it->store = h;
     it->luc = LSUP_buffer_hash (sc);
     log_debug ("Lookup context: %lx", it->luc);
 
@@ -707,13 +750,14 @@ mdbstore_lookup (
     uint8_t idx0, idx1;
 
     // Start RO transaction if not in a write txn already.
-    if (it->store->txn) it->txn = it->store->txn;
+    if (th) it->txn = th;
     else {
         it->rc = mdb_txn_begin (it->store->env, NULL, MDB_RDONLY, &it->txn);
         if (it->rc != MDB_SUCCESS) {
             log_error ("Database error: %s", LSUP_strerror (it->rc));
             return NULL;
         }
+        it->flags |= ITER_OPEN_TXN;
     }
 
     // Context index loop.
@@ -920,7 +964,7 @@ mdbiter_free (void *h)
 
     if (it->cur) mdb_cursor_close (it->cur);
     if (it->ctx_cur) mdb_cursor_close (it->ctx_cur);
-    if (it->store->txn != it->txn) mdb_txn_abort (it->txn);
+    if (it->flags & ITER_OPEN_TXN) mdb_txn_abort (it->txn);
     free (it->ck);
 
     free (it);
@@ -928,9 +972,9 @@ mdbiter_free (void *h)
 
 
 static LSUP_rc
-mdbstore_remove(
+mdbstore_remove (
         void *h, const LSUP_Buffer *ss, const LSUP_Buffer *sp,
-        const LSUP_Buffer *so, const LSUP_Buffer *sc, size_t *ct)
+        const LSUP_Buffer *so, const LSUP_Buffer *sc, size_t *ct, void *th)
 {
     MDBStore *store = h;
     LSUP_rc rc = LSUP_NOACTION, db_rc;
@@ -940,13 +984,12 @@ mdbstore_remove(
     if (sc == NULL) sc = LSUP_default_ctx_buf;
     ck = LSUP_buffer_hash (sc);
 
-    // No other write transaction may be open.
-    if (UNLIKELY (store->txn)) return LSUP_TXN_ERR;
-    mdb_txn_begin (store->env, NULL, 0, &store->txn);
+    MDB_txn *txn;
+    mdb_txn_begin (store->env, (MDB_txn *) th, 0, &txn);
 
     MDB_cursor *dcur, *icur;
-    mdb_cursor_open (store->txn, store->dbi[IDX_SPO_C], &dcur);
-    mdb_cursor_open (store->txn, store->dbi[IDX_C_SPO], &icur);
+    mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &dcur);
+    mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &icur);
 
     MDB_val spok_v, ck_v;
 
@@ -954,7 +997,8 @@ mdbstore_remove(
     ck_v.mv_size = KLEN;
     ck_v.mv_data = &ck;
 
-    MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, ct);
+    // The lookup operates within the current (bottom) write transaction.
+    MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, txn, ct);
     if (UNLIKELY (!it)) return LSUP_DB_ERR;
     if (ct) log_debug ("Found %lu triples to remove.", *ct);
 
@@ -994,22 +1038,20 @@ mdbstore_remove(
         if (db_rc == MDB_SUCCESS) continue;
         if (UNLIKELY (db_rc != MDB_NOTFOUND)) goto fail;
 
-        rc = index_triple (store, OP_REMOVE, it->spok, ck);
+        rc = index_triple (store, OP_REMOVE, it->spok, ck, txn);
     }
 
     mdbiter_free (it);
 
-    if (UNLIKELY (mdb_txn_commit (store->txn) != MDB_SUCCESS)) {
+    if (UNLIKELY (mdb_txn_commit (txn) != MDB_SUCCESS)) {
         rc = LSUP_TXN_ERR;
         goto fail;
     }
-    store->txn = NULL;
 
     return rc;
 
 fail:
-    mdb_txn_abort (store->txn);
-    store->txn = NULL;
+    mdb_txn_abort (txn);
 
     log_error ("Database error: %s", LSUP_strerror (db_rc));
 
@@ -1048,19 +1090,19 @@ mdbstore_tkey_exists (MDBStore *store, LSUP_Key tkey)
 
 
 static LSUP_rc
-mdbstore_add_term (void *h, const LSUP_Buffer *sterm)
+mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th)
 {
     MDBStore *store = h;
     int db_rc;
     MDB_val key, data;
 
     MDB_txn *txn;
-    // If store->txn exists, open a child txn, otherwise parent should be NULL.
-    RCCK (mdb_txn_begin (store->env, store->txn, 0, &txn));
+    // If store->txn exists, open a child txn, otherwise reuse the same txn.
+    if (th) txn = th;
+    else RCCK (mdb_txn_begin (store->env, NULL, 0, &txn));
 
     MDB_cursor *cur;
-    db_rc = mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur);
-    if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
+    CHECK (mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur), fail);
 
     LSUP_Key k = LSUP_buffer_hash (sterm);
     key.mv_data = &k;
@@ -1072,17 +1114,12 @@ mdbstore_add_term (void *h, const LSUP_Buffer *sterm)
     db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
     if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
 
-    if (txn != store->txn) {
-        db_rc = mdb_txn_commit (txn);
-        txn = NULL;
-        CHECK (db_rc, fail);
-    }
+    if (!th) CHECK (db_rc = mdb_txn_commit (txn), fail);
 
     return LSUP_OK;
 
 fail:
-    log_error (mdb_strerror (db_rc));
-    if (txn) mdb_txn_abort (txn);
+    if (!th) mdb_txn_abort (txn);
     return LSUP_DB_ERR;
 }
 
@@ -1098,6 +1135,10 @@ const LSUP_StoreInt mdbstore_int = {
 
     .size_fn        = mdbstore_size,
 
+    .txn_begin_fn   = mdbstore_txn_begin,
+    .txn_commit_fn  = mdbstore_txn_commit,
+    .txn_abort_fn   = mdbstore_txn_abort,
+
     .add_init_fn    = mdbstore_add_init,
     .add_iter_fn    = mdbstore_add_iter,
     .add_abort_fn   = mdbstore_add_abort,
@@ -1123,9 +1164,13 @@ const LSUP_StoreInt mdbstore_int = {
  * @param op[in] Store operation. One of OP_ADD or OP_REMOVE.
  * @param spok[in] Triple key to index.
  * @param ck[in] Context to index, may be NULL.
+ * @param[in] th Transaction handle. This MUST be a valid pointer to an open
+ *  RW transaction.
  */
 static LSUP_rc
-index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
+index_triple(
+        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck,
+        MDB_txn *txn)
 {
     int db_rc;
     LSUP_rc rc = LSUP_NOACTION;
@@ -1144,7 +1189,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v2.mv_data = spok;
             v2.mv_size = TRP_KLEN;
 
-            mdb_cursor_open (store->txn, store->dbi[IDX_C_SPO], &cur);
+            mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur);
             if (mdb_cursor_get (cur, &v1, &v2, MDB_GET_BOTH) == MDB_SUCCESS) {
                 db_rc = mdb_cursor_del (cur, 0);
                 if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
@@ -1164,7 +1209,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v2.mv_size = TRP_KLEN;
 
             db_rc = mdb_put(
-                    store->txn, store->dbi[IDX_C_SPO],
+                    txn, store->dbi[IDX_C_SPO],
                     &v1, &v2, MDB_NODUPDATA);
             if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
             if (db_rc != MDB_KEYEXIST) rc = LSUP_OK;
@@ -1191,8 +1236,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
 
         if (op == OP_REMOVE) {
             MDB_cursor *cur1, *cur2;
-            mdb_cursor_open(
-                    store->txn, store->dbi[lookup_indices[i]], &cur1);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
 
             db_rc = mdb_cursor_get (cur1, &v1, &v2, MDB_GET_BOTH);
             if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur1, 0);
@@ -1203,8 +1247,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v1.mv_data = spok + i;
             v2.mv_data = dbl_keys[i];
 
-            mdb_cursor_open(
-                    store->txn, store->dbi[lookup_indices[i + 3]], &cur2);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
 
             db_rc = mdb_cursor_get (cur2, &v2, &v1, MDB_GET_BOTH);
             if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur2, 0);
@@ -1220,7 +1263,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
                     "%lx: %lx %lx", *(size_t*)(v1.mv_data),
                     *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
 
-            db_rc = mdb_put (store->txn, db1, &v1, &v2, MDB_NODUPDATA);
+            db_rc = mdb_put (txn, db1, &v1, &v2, MDB_NODUPDATA);
 
             if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
             else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;
@@ -1231,7 +1274,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
                     "%lx %lx: %lx", *(size_t*)(v2.mv_data),
                     *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
 
-            db_rc = mdb_put (store->txn, db2, &v2, &v1, MDB_NODUPDATA);
+            db_rc = mdb_put (txn, db2, &v2, &v1, MDB_NODUPDATA);
 
             if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
             else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;