Forráskód Böngészése

Merge branch 'txn' of scossu/lsup_rdf into master

scossu 2 éve
szülő
commit
f11f14cc94
11 módosított fájl, 641 hozzáadás és 214 törlés
  1. 2 2
      TODO.md
  2. 55 3
      include/graph.h
  3. 4 2
      include/store.h
  4. 1 1
      include/store_htable.h
  5. 137 36
      include/store_interface.h
  6. 11 6
      include/store_mdb.h
  7. 3 4
      profile.c
  8. 68 26
      src/graph.c
  9. 28 21
      src/store_htable.c
  10. 160 106
      src/store_mdb.c
  11. 172 7
      test/test_store.c

+ 2 - 2
TODO.md

@@ -22,8 +22,8 @@
     - *D* Subclass term types
 - *D* Namespaced IRIs
 - *D* Relative IRIs
-- D Flexible store interface
-- *P* Transaction control
+- *D* Flexible store interface
+- *D* Transaction control
 - *P* Turtle serialization / deserialization
 - *P* Full UTF-8 support
 - *P* Extended tests

+ 55 - 3
include/graph.h

@@ -49,6 +49,10 @@ LSUP_graph_new (
 /** @brief Copy triples from a source graph into a destination one.
  *
  * The destination graph is not initialized here, so the copy is cumulative.
+ *
+ * @param src[in] Source graph.
+ *
+ * @param dest[in] Destination graph.
  */
 LSUP_rc
 LSUP_graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest);
@@ -85,6 +89,9 @@ LSUP_graph_free (LSUP_Graph *gr);
 
 
 /** @brief Compare two graphs.
+ *
+ * Note that if any of the two graphs has an open transaction, the function
+ * is performed in the first graph's transaction.
  *
  * @param[in] gr1 First operand.
  *
@@ -132,7 +139,7 @@ LSUP_graph_namespace (const LSUP_Graph *gr);
 
 /** @brief Set the namespace map for an in-memory graph.
  *
- * This has no effect on MDB graphs.
+ * This has no effect on graph stores with LSUP_STORE_PERM.
  *
  * @param[in] gr Graph to set the namespace map for.
  *
@@ -160,6 +167,50 @@ bool
 LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo);
 
 
+/** @brief Begin a transaction.
+ *
+ * If the underlying store supports it, begin a transaction for the given
+ * graph. Note that each graph can only have one open transaction at a time.
+ *
+ * The transaction must be either committed with #LSUP_graph_commit() or
+ * rolled back with #LSUP_graph_abort().
+ *
+ * @param[in] gr Graph handle.
+ *
+ * @param[in] flags Unused for now, use 0. TODO
+ *
+ * @return LSUP_OK on success; LSUP_VALUE_ERR if the graph store does not
+ *  support transactions; LSUP_TXN_ERR if the store has already an uncommitted
+ *  transaction; <0 on other errors from the underlying store.
+ */
+LSUP_rc
+LSUP_graph_begin (LSUP_Graph *gr, int flags);
+
+
+/** @brief Commit a transaction.
+ *
+ * If the underlying store supports it, commit an open transaction. In case of
+ * error, the transaction is left open and it is advisable to roll it back with
+ * #LSUP_graph_abort().
+ *
+ * @param[in] gr Graph handle.
+ *
+ * @return LSUP_OK if the transaction was committed successfully; LSUP_NOACTION
+ *  if NULL was passed; LSUP_TXN_ERR on error.
+ */
+LSUP_rc LSUP_graph_commit (LSUP_Graph *gr);
+
+
+/** @brief Abort (roll back) a transaction.
+ *
+ * If the underlying store supports it, abort an open transaction and abandon
+ * all changes.
+ *
+ * @param[in] gr Graph handle.
+ */
+void LSUP_graph_abort (LSUP_Graph *gr);
+
+
 /** @brief Initialize an iterator to add triples.
  *
  * @param[in] gr Graph to add to. It is added to the iterator state.
@@ -200,11 +251,12 @@ LSUP_graph_add_done (LSUP_GraphIterator *it);
  *
  * @param[in] strp Array of buffer triples to add. The last one must be NULL.
  *
- * @param[out] inserted This will be filled with the total number of triples
+ * @param[out] ct This will be filled with the total number of triples
  *  inserted.
  */
 LSUP_rc
-LSUP_graph_add (LSUP_Graph *gr, const LSUP_Triple trp[], size_t *inserted);
+LSUP_graph_add (
+        LSUP_Graph *gr, const LSUP_Triple trp[], size_t *ct);
 
 
 /** @brief Delete triples by a matching pattern.

+ 4 - 2
include/store.h

@@ -58,9 +58,11 @@ const LSUP_StoreInt *LSUP_store_int (LSUP_StoreType type);
  *
  * @sa #LSUP_graph_new()
  */
-typedef struct store_it {
+typedef struct store_t {
     LSUP_StoreType                  type;   ///< Store type.
-    char *                          id;     /**< Store ID. NOTE: This is
+    char *                          id;     /**< Store ID.
+                                             *
+                                             *   NOTE: This is
                                              *   NULL for volatile stores.
                                              */
     const LSUP_StoreInt *           sif;    ///< Store interface.

+ 1 - 1
include/store_htable.h

@@ -21,7 +21,7 @@
 #define _LSUP_STORE_HTABLE_H
 
 #include "buffer.h"
-#include "store_base.h"
+#include "store_interface.h"
 
 
 extern const LSUP_StoreInt htstore_int;

+ 137 - 36
include/store_base.h → include/store_interface.h

@@ -1,4 +1,4 @@
-/** @file store.h
+/** @file store_interface.h
  *
  * @brief Common store back end interfaces.
  *
@@ -32,8 +32,8 @@
  */
 
 
-#ifndef _LSUP_STORE_BASE_H
-#define _LSUP_STORE_BASE_H
+#ifndef _LSUP_STORE_INTERFACE_H
+#define _LSUP_STORE_INTERFACE_H
 
 #include "environment.h"
 
@@ -102,7 +102,7 @@ typedef void (*store_free_fn_t)(void *store);
 
 /** @brief Prototype: get the store ID.
  *
- * @param store[in] Store handle.
+ * @param[in] store  Store handle.
  *
  * @return store ID string. This is a copy and should be freed after use.
  */
@@ -111,22 +111,65 @@ typedef char * (*store_id_fn_t)(const void *store);
 
 /** @brief Prototype: get store size.
  *
- * @param store[in] The store to calculate size of.
+ * @param[in] store  The store to calculate size of.
  *
  * @return Number of stored SPO triples (across all contexts if supported).
  */
 typedef size_t (*store_size_fn_t)(const void *store);
 
 
+#if 0
 /** @brief Print stats about a store.
  *
  * TODO
  *
- * @param store[in] The store to get stats for.
+ * @param[in] store The store to get stats for.
  */
-/* TODO
 typedef LSUP_rc (*store_stat_fn_t)(void *store, void *stat);
-*/
+#endif
+
+
+/** @brief Begin a transaction.
+ *
+ * Only for LSUP_STORE_TXN stores.
+ *
+ * The transaction handle is managed by the store implementation and can be any
+ * data type.
+ *
+ * @param[in] store Store handle.
+ *
+ * @param[in] flags Transaction flags. These vary with each implementation.
+ *
+ * @param[out] txn Will point to the new open transaction on success, or to
+ * undefined content on failure.
+ *
+ * @return LSUP_OK if the transaction started successfully, <0 on error.
+ */
+typedef LSUP_rc (*store_txn_begin_fn_t)(void *store, int flags, void **txn);
+
+
+/** @brief Commit a transaction.
+ *
+ * Only for LSUP_STORE_TXN stores.
+ *
+ * @param[in] store Store handle.
+ *
+ * @param[in] txn Transaction handle generated by #store_txn_begin_fn_t.
+ *
+ * @return LSUP_OK if the transaction was committed successfully, <0 on error.
+ */
+typedef LSUP_rc (*store_txn_commit_fn_t)(void *store);
+
+
+/** @brief Abort a transaction.
+ *
+ * Only for LSUP_STORE_TXN stores.
+ *
+ * @param[in] store Store handle.
+ *
+ * @param[in] txn Transaction handle generated by #store_txn_begin_fn_t.
+ */
+typedef void (*store_txn_abort_fn_t)(void *store);
 
 
 /** @brief Initialize bulk triple load.
@@ -143,9 +186,13 @@ typedef LSUP_rc (*store_stat_fn_t)(void *store, void *stat);
  *  the value of sc, triples will be added with no context. Only meaningful
  *  for stores with the LSUP_STORE_CTX feature.
  *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
+ *
  * @return Iterator handle to be passed to the following load steps.
  */
-typedef void * (*store_add_init_fn_t)(void *store, const LSUP_Buffer * sc);
+typedef void * (*store_add_init_fn_t)(
+        void *store, const LSUP_Buffer *sc, void *udata);
 
 
 /** @brief Add one triple into the store.
@@ -154,7 +201,7 @@ typedef void * (*store_add_init_fn_t)(void *store, const LSUP_Buffer * sc);
  * yielded by that function. It may be called multiple times and must be
  * followed by #add_done_fn or #add_abort_fn (if supported).
  *
- * @param it[in] Iterator obtained by #LSUP_mdbstore_add_init.
+ * @param it[in] Iterator obtained by #store_add_init_fn_t.
  *  The following members are of interest:
  *  it->i stores the total number of records inserted.
  *
@@ -172,11 +219,30 @@ typedef LSUP_rc (*store_add_iter_fn_t)(
  * Usually called on an irrecoverable error from #add_iter_fn. None of the
  * successful inserts in the same loop is retained.
  *
- * @param it[in] Iterator obtained by #LSUP_mdbstore_add_init.
+ * @param it[in] Iterator obtained by #store_add_init_fn_t.
  */
 typedef void (*store_add_abort_fn_t)(void *it);
 
 
+/*
+ * Iterator function types.
+ */
+
+/** @brief Get iterator active transaction handle.
+ *
+ * This function is used to get an active transaction during an iteration loop
+ * in order to perform an action using the store state within that loop. Some
+ * stores (e.g. MDB) only support one R/W open transaction per thread, so this
+ * is also the only way to perform anything else than iterating or committing
+ * while a loop is open.
+ *
+ * @param[in] it Iterator handle to get the transaction from.
+ *
+ * @return Transaction handle. DO NOT close this transaction directly.
+ */
+typedef void * (*iter_txn_fn_t)(void *it);
+
+
 /** @brief Finalize an add loop and free iterator.
  *
  * This must be called after #add_iter_fn.
@@ -192,7 +258,8 @@ typedef LSUP_rc (*store_add_done_fn_t)(void *it);
  *
  * @param[in] sterm Serialized term to store.
  */
-typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
+typedef LSUP_rc (*store_add_term_fn_t)(
+        void *store, const LSUP_Buffer *sterm, void *udata);
 
 
 /** @brief Prototype: look up triples by pattern matching.
@@ -200,17 +267,20 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
  * This function may return a count of matches and/or an iterator of results as
  * serialized triples.
  *
+ * For stores with #LSUP_STORE_TXN, this opens a read-only transaction. The
+ * transaction handle is held in the iterator structure and is closed when the
+ * iterator is freed with #iter_free_fn_t().
+ *
  * Any and all of the terms may be NULL, which indicates an unbound query
- * term. Stores with context not set or witout context support will always
- * ignore the fourth term.
+ * term. Stores witout context support will always ignore sc.
  *
  * @param[in] store The store to be queried.
  *
- * @param[in] ss Buffer representing the serialized s term.
+ * @param[in] ss Serialized s term.
  *
- * @param[in] sp Buffer representing the serialized p term.
+ * @param[in] sp Serialized p term.
  *
- * @param[in] so Buffer representing the serialized o term.
+ * @param[in] so Serialized o term.
  *
  * @param[in] sc Serialized context to limit search to. It may be NULL, in
  * which case search is done in all contexts. Note that triples inserted
@@ -221,6 +291,9 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
  *  much less so for 1-bound and 2-bound context lookups, in which cases it
  *  should be set only if needed.
  *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
+ *
  * @return Iterator handle that will be populated with a result iterator. This
  * is always created even if no matches are found and must be freed with
  * #LSUP_mdbiter_free() after use. If matches are found, the iterator points to
@@ -229,7 +302,7 @@ typedef LSUP_rc (*store_add_term_fn_t)(void *store, const LSUP_Buffer *sterm);
 typedef void * (*store_lookup_fn_t)(
         void *store,
         const LSUP_Buffer *ss, const LSUP_Buffer *sp, const LSUP_Buffer *so,
-        const LSUP_Buffer *sc, size_t *ct);
+        const LSUP_Buffer *sc, void *udata, size_t *ct);
 
 
 /** @brief Prototype: check for existence of a triple (T/F).
@@ -250,13 +323,16 @@ typedef bool (*store_trp_exist_fn_t)(
 /** @brief Prototype: delete triples by pattern matching.
  *
  * The ss, sp, so, sc terms act as a matching pattern as documented in
- * #store_lookup_fn. if not NULL, ct yields the number of triples actually
+ * @sa #store_lookup_fn. if not NULL, ct yields the number of triples actually
  * deleted.
+ *
+ *  @param[in] udata User data. Consult individual store implementations for
+ *   how this is interpreted.
  */
 typedef LSUP_rc (*store_remove_fn_t)(
         void *store,
         const LSUP_Buffer *ss, const LSUP_Buffer *sp, const LSUP_Buffer *so,
-        const LSUP_Buffer *sc, size_t *ct);
+        const LSUP_Buffer *sc, void *udata, size_t *ct);
 
 
 /** @brief Put an in-memory namespace map into a permanent back end.
@@ -278,11 +354,15 @@ typedef LSUP_rc (*store_remove_fn_t)(
  *
  * @param[out] nsm Namespace map handle to store.
  *
+ * @param[in] udata User-defined data. Consult individual implementations for
+ *  details.
+ *
  * @return LSUP_OK if all terms were updated; LSUP_CONFLICT if one or more
  *  namespaces or terms were not updated because they already existed; <0 if
  *  an error occurred.
  */
-typedef LSUP_rc (*store_nsm_put_fn_t)(void *store, const LSUP_NSMap * nsm);
+typedef LSUP_rc (*store_nsm_put_fn_t)(
+        void *store, const LSUP_NSMap * nsm, void *udata);
 
 
 /** @brief Get the store's namespace prefix map.
@@ -294,10 +374,6 @@ typedef LSUP_rc (*store_nsm_put_fn_t)(void *store, const LSUP_NSMap * nsm);
 typedef LSUP_NSMap * (*store_nsm_get_fn_t)(void *store);
 
 
-/*
- * Iterator function types.
- */
-
 /** @brief Prototype: yield the matching triples and advance the iterator.
  *
  * NOTE: Iterators keep transactions open. Don't hold on to them longer than
@@ -349,6 +425,18 @@ typedef void (*iter_free_fn_t)(void * it);
  */
 
 /** @brief Store interface.
+ *
+ * New store implementations should define a static structure with the relevant
+ * members filled in. Some members are only relevant to certain types of stores
+ * and may be set to NULL.
+ *
+ * #setup_fn may be optionally defined and MUST cause an idempotent action,
+ * unless the `clear` argument is set to `true`. Callers should check if this
+ * member is NULL and if it is not, call it at the beginning of the
+ * interaction with the store.
+ *
+ * Transaction control members are only applicable to stores with the
+ * #LSUP_STORE_TXN feature.
  */
 typedef struct store_if_t {
     // Basic properties.
@@ -364,18 +452,26 @@ typedef struct store_if_t {
     store_size_fn_t     size_fn;        ///< Number of triples in the store.
     store_id_fn_t       id_fn;          ///< Get store ID.
 
+    // Transaction control.
+    store_txn_begin_fn_t txn_begin_fn;  ///< Begin transaction.
+    store_txn_commit_fn_t txn_commit_fn; ///< Commit transaction.
+    store_txn_abort_fn_t txn_abort_fn;  ///< Abort transaction.
+    iter_txn_fn_t       iter_txn_fn;    ///< Get iterator's transaction.
+
     // Addition.
     store_add_init_fn_t add_init_fn;    ///< Initialize add iteration.
     store_add_iter_fn_t add_iter_fn;    ///< Add one triple.
-    store_add_abort_fn_t add_abort_fn;  /**< Abort (roll back) the add
-                                         *  process.  Only available in
-                                         *  stores with #LSUP_STORE_TXN
-                                         *  feature. Optional.
+    store_add_abort_fn_t add_abort_fn;  /**< Abort (roll back) the add process.
+                                         *
+                                         *   Only available in
+                                         *   stores with #LSUP_STORE_TXN
+                                         *   feature. Optional.
                                          */
     store_add_done_fn_t add_done_fn;    ///< Complete the add process.
     store_add_term_fn_t add_term_fn;    /**< Add (index) a term to the store.
-                                         *  Only available in stores with
-                                         *  #LSUP_STORE_IDX feature. Optional.
+                                         *
+                                         *   Only available in stores with
+                                         *   #LSUP_STORE_IDX feature. Optional.
                                          */
 
     // Look up.
@@ -388,14 +484,14 @@ typedef struct store_if_t {
     store_remove_fn_t   remove_fn;      ///< Remove triples by pattern.
 
     // Namespace prefix mapping.
-    store_nsm_put_fn_t  nsm_put_fn;     /**< Add a namespace/prefix pair to
-                                         *  the prefix map.
+    store_nsm_put_fn_t  nsm_put_fn;     /**< Add a ns/pfx pair to the map.
+                                         *
                                          *  Only available (and mandatory)
                                          *  in stores with the
                                          *  #LSUP_STORE_IDX feature.
                                          */
-    store_nsm_get_fn_t  nsm_get_fn;     /**< Get a namespace/prefix from
-                                         *  the prefix map.
+    store_nsm_get_fn_t  nsm_get_fn;     /**< Get a namespace from the map.
+                                         *
                                          *  Only available (and mandatory)
                                          *  in stores with the
                                          *  #LSUP_STORE_IDX feature.
@@ -417,6 +513,11 @@ const LSUP_StoreInt my_store_int = {
     .free_fn        = my_free_fn,
 
     .size_fn        = my_size_fn,
+    .id_fn          = my_id_fn,
+
+    .txn_begin_fn   = my_txn_begin_fn,
+    .txn_commit_fn  = my_txn_commit_fn,
+    .txn_abort_fn   = my_txn_abort_fn,
 
     .add_init_fn    = my_init_fn,
     .add_iter_fn    = my_iter_fn,
@@ -435,4 +536,4 @@ const LSUP_StoreInt my_store_int = {
 };
 */
 
-#endif  /* _LSUP_STORE_BASE_H */
+#endif  /* _LSUP_STORE_INTERFACE_H */

+ 11 - 6
include/store_mdb.h

@@ -11,11 +11,15 @@
  * per session. Within that session multiple R/W operations can be performed
  * using transactions.
  *
- * Note that, even though the terms "graph", "context", etc. are used, no code
- * in this module checks for valid RDF data. In theory any term can be any
- * binary data. This allows using the store for non-RDF graph data.
- *
- * TODO more doc
+ * This store supports transactions. Under the hood, LMDB supports nested RW
+ * transactions, which are used here, but not exposed to the caller. Some
+ * functions have a transaction handle parameter that may be NULL. In that
+ * case, a new transaction is opened and closed within the scope of the
+ * function (or, in cases such as #mdbstore_lookup(), within the life cycle of
+ * the iterator); if not, the transaction handle may either be used as the
+ * parent for a new transaction (which is closed as in the previous case), or
+ * the function uses the same transaction (i.e. changes are only committed 
+ * after the parent transaction is committed).
  */
 
 
@@ -25,10 +29,11 @@
 #include "lmdb.h"
 
 #include "buffer.h"
-#include "store_base.h"
+#include "store_interface.h"
 
 
 // FIXME find a better cross-platform path.
+/// Default MDB store identifier and location.
 #define LSUP_MDB_STORE_URN "file://" TMPDIR "/mdb_store"
 
 /// MDB store interface.

+ 3 - 4
profile.c

@@ -31,14 +31,13 @@ int main(int argc, char *argv[])
 {
     size_t nt = (argc > 1) ? atoi (argv[1]) : NT;
     // Set env variable to test path.
-    putenv ("LSUP_MDB_STORE_PATH=" TMPDIR "/lsup_profile_mdb");
-    // Clear out database from previous test.
-    rm_r (getenv ("LSUP_MDB_STORE_PATH"));
+    putenv ("LSUP_MDB_STORE_URN=file://" TMPDIR "/lsup_profile_mdb");
 
     if (LSUP_init() != LSUP_OK) {
         log_fatal ("Failed to initialize LSUP environment.");
         exit (-1);
     }
+    LSUP_store_int (LSUP_STORE_MDB)->setup_fn (NULL, true);
 
     int rc;
     clock_t start, tc1, tc2, end;
@@ -54,7 +53,7 @@ int main(int argc, char *argv[])
 
     log_info ("Inserting triples.");
     LSUP_Graph *gr = LSUP_graph_new (
-            LSUP_iriref_new (NULL, NULL), LSUP_STORE_MDB);
+            LSUP_iriref_new (NULL, NULL), LSUP_STORE_MDB, NULL, NULL, nt);
     if (!gr) {
         log_error ("Error creating graph!");
         return -1;

+ 68 - 26
src/graph.c

@@ -7,12 +7,16 @@
 struct graph_t {
     LSUP_Term               *uri;           ///< Graph "name" (URI).
     LSUP_Store *            store;          ///< Store handle.
-    LSUP_NSMap *            nsm;            /**< Namespace map. NOTE: This is
-                                               * NULL for permanent stores. */
+    LSUP_NSMap *            nsm;            /**< Namespace map.
+                                              *
+                                              * NOTE: This is
+                                              * NULL for permanent stores.
+                                              */
+    void *                  txn;            ///< Store transaction.
 };
 
 struct graph_iter_t {
-    LSUP_Store *            store;          ///< Store tied to the iterator.
+    const LSUP_Graph *      graph;          ///< Parent graph.
     void *                  data;           ///< Iterator state.
     size_t                  ct;             ///< Total lookup matches.
 };
@@ -72,6 +76,8 @@ BACKEND_TBL
     if (gr->store->sif->features & LSUP_STORE_PERM) gr->nsm = NULL;
     else gr->nsm = nsm ? nsm : LSUP_default_nsm;
 
+    gr->txn = NULL;
+
     log_debug ("Graph created.");
     return gr;
 }
@@ -110,15 +116,16 @@ LSUP_graph_bool_op(
     LSUP_BufferTriple *sspo = BTRP_DUMMY;
     size_t ct;
 
-    add_it = res->store->sif->add_init_fn (res->store->data, res_sc);
+    add_it = res->store->sif->add_init_fn (res->store->data, res_sc, gr1->txn);
 
     if (op == LSUP_BOOL_XOR) {
         // Add triples from gr2 if not found in gr1.
         lu2_it = gr2->store->sif->lookup_fn (
-                gr2->store->data, NULL, NULL, NULL, gr2_sc, NULL);
+                gr2->store->data, NULL, NULL, NULL, gr2_sc, NULL, gr1->txn);
         while (gr2->store->sif->lu_next_fn (lu2_it, sspo, NULL) == LSUP_OK) {
             lu1_it = gr1->store->sif->lookup_fn (
-                    gr1->store->data, sspo->s, sspo->p, sspo->o, gr1_sc, &ct);
+                    gr1->store->data, sspo->s, sspo->p, sspo->o, gr1_sc,
+                    gr1->txn, &ct);
             if (ct > 0)
                 res->store->sif->add_iter_fn (add_it, sspo);
             gr1->store->sif->lu_free_fn (lu1_it);
@@ -127,10 +134,11 @@ LSUP_graph_bool_op(
     }
 
     lu1_it = gr1->store->sif->lookup_fn (
-            gr1->store->data, NULL, NULL, NULL, gr1_sc, NULL);
+            gr1->store->data, NULL, NULL, NULL, gr1_sc, gr1->txn, NULL);
     while (gr1->store->sif->lu_next_fn (lu1_it, sspo, NULL) == LSUP_OK) {
         lu2_it = gr2->store->sif->lookup_fn (
-                gr2->store->data, sspo->s, sspo->p, sspo->o, gr2_sc, &ct);
+                gr2->store->data, sspo->s, sspo->p, sspo->o, gr2_sc,
+                gr1->txn, &ct);
         // For XOR and subtraction, add if not found.
         // For intersection, add if found.
         if ((ct == 0) ^ (op == LSUP_BOOL_INTERSECTION))
@@ -229,10 +237,10 @@ LSUP_graph_add_init (LSUP_Graph *gr)
 
     LSUP_Buffer *sc = LSUP_term_serialize (gr->uri);
 
-    it->data = gr->store->sif->add_init_fn (gr->store->data, sc);
+    it->data = gr->store->sif->add_init_fn (gr->store->data, sc, gr->txn);
     LSUP_buffer_free (sc);
 
-    it->store = gr->store;
+    it->graph = gr;
 
     return it;
 }
@@ -244,17 +252,21 @@ LSUP_graph_add_iter (LSUP_GraphIterator *it, const LSUP_Triple *spo)
 
     LSUP_BufferTriple *sspo = LSUP_triple_serialize (spo);
     if (UNLIKELY (!sspo)) return LSUP_MEM_ERR;
+    const LSUP_StoreInt *sif = it->graph->store->sif;
 
-    LSUP_rc rc = it->store->sif->add_iter_fn (it->data, sspo);
+    LSUP_rc rc = sif->add_iter_fn (it->data, sspo);
     PCHECK (rc, finally);
 
     // Store datatype term permanently if the store supports it.
-    if (rc == LSUP_OK && it->store->sif->add_term_fn) {
+    if (rc == LSUP_OK && sif->add_term_fn) {
+        void *txn;
         for (int i = 0; i < 3; i++) {
             LSUP_Term *term = LSUP_triple_pos (spo, i);
             if (term->type == LSUP_TERM_LITERAL) {
                 LSUP_Buffer *ser_dtype = LSUP_term_serialize (term->datatype);
-                it->store->sif->add_term_fn (it->store->data, ser_dtype);
+                // Run add_term in the iterator's txn.
+                txn = sif->iter_txn_fn ? sif->iter_txn_fn (it->data) : NULL;
+                sif->add_term_fn ( it->graph->store->data, ser_dtype, txn);
                 LSUP_buffer_free (ser_dtype);
             }
         }
@@ -271,7 +283,7 @@ finally:
 void
 LSUP_graph_add_done (LSUP_GraphIterator *it)
 {
-    it->store->sif->add_done_fn (it->data);
+    it->graph->store->sif->add_done_fn (it->data);
     free (it);
 }
 
@@ -323,7 +335,8 @@ LSUP_graph_remove (
         *so = LSUP_term_serialize (o),
         *sc = LSUP_term_serialize (gr->uri);
 
-    rc = gr->store->sif->remove_fn (gr->store->data, ss, sp, so, sc, ct);
+    rc = gr->store->sif->remove_fn (
+            gr->store->data, ss, sp, so, sc, gr->txn, ct);
 
     LSUP_buffer_free (ss);
     LSUP_buffer_free (sp);
@@ -374,25 +387,27 @@ LSUP_graph_lookup (
     LSUP_GraphIterator *it;
     MALLOC_GUARD (it, NULL);
 
-    it->store = gr->store;
-
     LSUP_Buffer
         *ss = LSUP_term_serialize (s),
         *sp = LSUP_term_serialize (p),
         *so = LSUP_term_serialize (o),
         *sc = LSUP_term_serialize (gr->uri);
 
-    it->data = it->store->sif->lookup_fn (it->store->data, ss, sp, so, sc, ct);
-    if (UNLIKELY (!it->data)) {
-        free (it);
-        it = NULL;
-    }
+    it->data = gr->store->sif->lookup_fn (
+            gr->store->data, ss, sp, so, sc, gr->txn, ct);
 
     LSUP_buffer_free (ss);
     LSUP_buffer_free (sp);
     LSUP_buffer_free (so);
     LSUP_buffer_free (sc);
 
+    if (UNLIKELY (!it->data)) {
+        free (it);
+        return NULL;
+    }
+
+    it->graph = gr;
+
     return it;
 }
 
@@ -402,7 +417,7 @@ LSUP_graph_iter_next (LSUP_GraphIterator *it, LSUP_Triple *spo)
 {
     LSUP_Buffer *ss, *sp, *so;
     LSUP_BufferTriple *sspo;
-    if (it->store->sif->features & LSUP_STORE_COW) {
+    if (it->graph->store->sif->features & LSUP_STORE_COW) {
         CALLOC_GUARD (ss, LSUP_MEM_ERR);
         CALLOC_GUARD (sp, LSUP_MEM_ERR);
         CALLOC_GUARD (so, LSUP_MEM_ERR);
@@ -422,7 +437,7 @@ LSUP_graph_iter_next (LSUP_GraphIterator *it, LSUP_Triple *spo)
         if (!spo->o) return LSUP_ERROR;
     }
 
-    if (it->store->sif->features & LSUP_STORE_COW) {
+    if (it->graph->store->sif->features & LSUP_STORE_COW) {
         LSUP_btriple_free_shallow (sspo);
     } else {
         // TODO copy-on-retrieval stores. None yet.
@@ -435,7 +450,7 @@ LSUP_graph_iter_next (LSUP_GraphIterator *it, LSUP_Triple *spo)
 void
 LSUP_graph_iter_free (LSUP_GraphIterator *it)
 {
-    it->store->sif->lu_free_fn (it->data);
+    it->graph->store->sif->lu_free_fn (it->data);
     free (it);
 }
 
@@ -455,6 +470,33 @@ LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo)
 }
 
 
+LSUP_rc
+LSUP_graph_begin (LSUP_Graph *gr, int flags) {
+    if (!(gr->store->sif->features & LSUP_STORE_TXN)) return LSUP_VALUE_ERR;
+
+    return gr->store->sif->txn_begin_fn(gr->store->data, flags, &gr->txn);
+}
+
+
+LSUP_rc
+LSUP_graph_commit (LSUP_Graph *gr)
+{
+    LSUP_rc rc = gr->store->sif->txn_commit_fn (gr->txn);
+
+    if (rc == LSUP_OK) gr->txn = NULL;
+
+    return rc;
+}
+
+
+void
+LSUP_graph_abort (LSUP_Graph *gr)
+{
+    gr->store->sif->txn_abort_fn (gr->txn);
+    gr->txn = NULL;
+}
+
+
 /*
  * Static functions.
  */
@@ -466,7 +508,7 @@ LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo)
  */
 inline static LSUP_rc
 graph_iter_next_buffer (LSUP_GraphIterator *it, LSUP_BufferTriple *sspo)
-{ return it->store->sif->lu_next_fn (it->data, sspo, NULL); }
+{ return it->graph->store->sif->lu_next_fn (it->data, sspo, NULL); }
 
 
 /**

+ 28 - 21
src/store_htable.c

@@ -151,12 +151,12 @@ htiter_next_key (HTIterator *it);
  * @param[in] id Graph identifier. This may or may not be set. The store does
  *  not use this value internally, and does not check for duplicates.
  *
- * @param[in] size Initial size of the store (in number of triples). It may be
- *  0.
+ * @param[in] size Initial size of the store (in number of triples to
+ * preallocate). It may be 0.
  *
  * @return New graph store.
  */
-void *
+static void *
 htstore_new (const char *id, size_t size)
 {
     HTStore *ht;
@@ -177,7 +177,7 @@ htstore_new (const char *id, size_t size)
 
 
 #if 0
-LSUP_rc
+static LSUP_rc
 htstore_copy_contents (HTStore *dest, const HTStore *src)
 {
     size_t i = 0;
@@ -198,7 +198,7 @@ htstore_copy_contents (HTStore *dest, const HTStore *src)
 #endif
 
 
-void
+static void
 htstore_free (void *h)
 {
     HTStore *store = h;
@@ -208,7 +208,7 @@ htstore_free (void *h)
 }
 
 
-size_t
+static size_t
 htstore_size (const void *h)
 {
     const HTStore *store = h;
@@ -216,9 +216,10 @@ htstore_size (const void *h)
 }
 
 
-LSUP_rc
-htstore_add_term (void *h, const LSUP_Buffer *sterm)
+static LSUP_rc
+htstore_add_term (void *h, const LSUP_Buffer *sterm, void *_unused)
 {
+    (void) _unused;
     HTStore *store = h;
     IndexEntry entry_s = {
         .key = LSUP_buffer_hash (sterm),
@@ -237,10 +238,11 @@ htstore_add_term (void *h, const LSUP_Buffer *sterm)
 }
 
 
-void *
-htstore_add_init (void *h, const LSUP_Buffer *_unused)
+static void *
+htstore_add_init (void *h, const LSUP_Buffer *_unused, void *_unused2)
 {
     (void) _unused;
+    (void) _unused2;
     HTIterator *it;
     MALLOC_GUARD (it, NULL);
 
@@ -250,7 +252,7 @@ htstore_add_init (void *h, const LSUP_Buffer *_unused)
 }
 
 
-LSUP_rc
+static LSUP_rc
 htstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
 {
     HTIterator *it = h;
@@ -265,13 +267,13 @@ htstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
     if (rc != LSUP_OK) return rc;
 
     for (int i = 0; i < 3; i++)
-        htstore_add_term (it->store, LSUP_btriple_pos (sspo, i));
+        htstore_add_term (it->store, LSUP_btriple_pos (sspo, i), NULL);
 
     return rc;
 }
 
 
-LSUP_rc
+static LSUP_rc
 htstore_add_done (void *h)
 {
     free (h);
@@ -279,12 +281,13 @@ htstore_add_done (void *h)
 }
 
 
-void *
+static void *
 htstore_lookup (
         void *h,
         const LSUP_Buffer *ss, const LSUP_Buffer *sp, const LSUP_Buffer *so,
-        const LSUP_Buffer *sc, size_t *ct)
+        const LSUP_Buffer *sc, void *_unused, size_t *ct)
 {
+    (void) _unused;
     HTStore *store = h;
     HTIterator *it;
     CALLOC_GUARD (it, NULL);
@@ -348,16 +351,18 @@ htstore_lookup (
 }
 
 
-LSUP_rc
+static LSUP_rc
 htstore_remove(
         void *h, const LSUP_Buffer *ss, const LSUP_Buffer *sp,
-        const LSUP_Buffer *so,  const LSUP_Buffer *_unused, size_t *ct_p)
+        const LSUP_Buffer *so,  const LSUP_Buffer *_unused,
+        void *_unused2, size_t *ct_p)
 {
     (void) _unused;
+    (void) _unused2;
     HTStore *store = h;
     size_t ct;
 
-    HTIterator *it = htstore_lookup (store, ss, sp, so, NULL, &ct);
+    HTIterator *it = htstore_lookup (store, ss, sp, so, NULL, NULL, &ct);
     if (UNLIKELY (!it)) return LSUP_DB_ERR;
 
     LSUP_rc rc;
@@ -383,7 +388,7 @@ finally:
 }
 
 
-LSUP_rc
+static LSUP_rc
 htiter_next_key (HTIterator *it)
 {
     if (UNLIKELY (!it)) return LSUP_VALUE_ERR;
@@ -412,7 +417,7 @@ htiter_next_key (HTIterator *it)
 }
 
 
-LSUP_rc
+static LSUP_rc
 htiter_next (void *h, LSUP_BufferTriple *sspo, LSUP_Buffer **_unused)
 {
     (void) _unused;
@@ -447,7 +452,9 @@ const LSUP_StoreInt htstore_int = {
 };
 
 
-/* * * Statics * * */
+/*
+ * Other statics.
+ */
 
 inline static LSUP_rc
 tkey_to_strp (

+ 160 - 106
src/store_mdb.c

@@ -1,5 +1,3 @@
-#include <ftw.h>
-
 #include "store_mdb.h"
 
 /**
@@ -11,12 +9,12 @@
  * Memory map size.
  */
 #if (defined DEBUG || defined TESTING)
-    #define DEFAULT_MAPSIZE 1<<24 // 16Mb (limit for Valgrind)
+#define DEFAULT_MAPSIZE 1<<24 // 16Mb (limit for Valgrind)
 #elif !(defined __LP64__ || defined __LLP64__) || \
         defined _WIN32 && !defined _WIN64
-    #define DEFAULT_MAPSIZE 1<<31 // 2Gb (limit for 32-bit systems)
+#define DEFAULT_MAPSIZE 1<<31 // 2Gb (limit for 32-bit systems)
 #else
-    #define DEFAULT_MAPSIZE 1UL<<40 // 1Tb
+#define DEFAULT_MAPSIZE 1UL<<40 // 1Tb
 #endif
 
 #define ENV_DIR_MODE 0750
@@ -30,10 +28,24 @@
 typedef char DbLabel[8];
 typedef struct mdbstore_iter_t MDBIterator;
 
+/// Store state flags.
 typedef enum {
     LSSTORE_OPEN         = 1<<0,                    ///< Env is open.
-    LSSTORE_DIRTY_TXN    = LSSTORE_OPEN + (1<<1),   ///< Main txn is open.
-} StoreState;
+} StoreFlags;
+
+/// Iterator state flags.
+typedef enum {
+    ITER_OPEN_TXN       = 1<<0,         /**< A transaction is open.
+                                          *
+                                          *  The iterator has begun a new
+                                          *  transaction on initialization
+                                          *  which needs to be closed.  If
+                                          *  false, the iterator is using an
+                                          *  existing transaction which will
+                                          *  not be closed with
+                                          *  #mdbiter_free().
+                                          */
+} IterFlags;
 
 typedef enum {
     OP_ADD,
@@ -42,10 +54,8 @@ typedef enum {
 
 typedef struct mdbstore_t {
     MDB_env *           env;            ///< Environment handle.
-    MDB_txn *           txn;            ///< Current transaction.
     MDB_dbi             dbi[N_DB];      ///< DB handles. Refer to DbIdx enum.
-    StoreState          state;          ///< Store state.
-    int                 features;       ///< Store feature flags.
+    StoreFlags          flags;          ///< Store state flags.
 } MDBStore;
 
 /** @brief Iterator operation.
@@ -64,13 +74,17 @@ typedef void (*iter_op_fn_t)(MDBIterator *it);
 /// Triple iterator.
 typedef struct mdbstore_iter_t {
     MDBStore *          store;      ///< MDB store handle.
+    IterFlags           flags;      ///< Iterator flags.
     MDB_txn *           txn;        ///< MDB transaction.
     MDB_cursor *        cur;        ///< MDB cursor.
     MDB_cursor *        ctx_cur;    ///< MDB c:spo index cursor.
-    MDB_val             key, data;  ///< Internal data handlers.
+    MDB_val             key;        ///< Internal data handler.
+    MDB_val             data;       ///< Internal data handler.
     LSUP_TripleKey      spok;       ///< Triple to be populated with match.
-    LSUP_Key *          ck;         ///< Context array to be populated for each
-                                    ///< matching triple if requested.
+    LSUP_Key *          ck;         /**< Context array.
+                                      *
+                                      *  This shall be populated for each
+                                      * matching triple if requested. */
     iter_op_fn_t        iter_op_fn; ///< Function used to look up next match.
     const uint8_t *     term_order; ///< Term order used in 1-2bound look-ups.
     LSUP_Key            luk[3];     ///< 0÷3 lookup keys.
@@ -186,8 +200,9 @@ static const uint8_t lookup_ordering_2bound[3][3] = {
  * Static prototypes.
  */
 static int index_triple(
-        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck);
-static LSUP_rc mdbstore_add_term (void *h, const LSUP_Buffer *sterm);
+        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck,
+        MDB_txn *txn);
+static LSUP_rc mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th);
 
 inline static LSUP_rc lookup_0bound (MDBIterator *it, size_t *ct);
 inline static LSUP_rc lookup_1bound (
@@ -201,7 +216,7 @@ inline static LSUP_rc lookup_3bound(MDBIterator *it, size_t *ct);
  * Store interface.
  */
 
-LSUP_NSMap *
+static LSUP_NSMap *
 mdbstore_nsm_get (void *h)
 {
     MDBStore *store = h;
@@ -241,12 +256,12 @@ finally:
 }
 
 
-LSUP_rc
-mdbstore_nsm_put (void *h, const LSUP_NSMap *nsm)
+static LSUP_rc
+mdbstore_nsm_put (void *h, const LSUP_NSMap *nsm, void *th)
 {
     MDBStore *store = h;
     MDB_txn *txn;
-    RCCK (mdb_txn_begin (store->env, store->txn, 0, &txn));
+    RCCK (mdb_txn_begin (store->env, (MDB_txn *) th, 0, &txn));
 
     LSUP_rc rc = LSUP_NOACTION;
     int db_rc;
@@ -332,7 +347,7 @@ mdbstore_path_from_id (const char *id)
  * and checking that it's a writable directory. If the path is not specified
  * in the LSUP_MDB_STORE_URN environment variable, a default directory is used.
  */
-LSUP_rc
+static LSUP_rc
 mdbstore_setup (const char *id, bool clear)
 {
     const char *path = mdbstore_path_from_id (id);
@@ -380,7 +395,7 @@ mdbstore_setup (const char *id, bool clear)
  *   it is not necessary to modify this, unless one is operating under memory
  *   and disk constraints. The default map size is 1Tb.
  */
-void *
+static void *
 mdbstore_new (const char *id, size_t _unused)
 {
     (void) _unused;
@@ -405,42 +420,43 @@ mdbstore_new (const char *id, size_t _unused)
     CHECK (mdb_env_open (store->env, path, 0, ENV_FILE_MODE), fail);
 
     // Assign DB handles to store->dbi.
-    mdb_txn_begin (store->env, NULL, 0, &store->txn);
+    MDB_txn *txn = NULL;
+    CHECK (mdb_txn_begin (store->env, NULL, 0, &txn), fail);
     for (int i = 0; i < N_DB; i++)
         CHECK (mdb_dbi_open (
-                store->txn, db_labels[i], db_flags[i], store->dbi + i), fail);
+                txn, db_labels[i], db_flags[i], store->dbi + i), fail);
 
     // Bootstrap the permanent store with initial data.
     MDB_stat stat;
-    CHECK (mdb_stat (store->txn, store->dbi[IDX_PFX_NS], &stat), fail);
+    CHECK (mdb_stat (txn, store->dbi[IDX_PFX_NS], &stat), fail);
     if (stat.ms_entries == 0) {
         log_debug ("Loading initial data into %s", path);
         // Load initial NS map.
-        mdbstore_nsm_put (store, LSUP_default_nsm);
+        mdbstore_nsm_put (store, LSUP_default_nsm, txn);
 
         // Index default context.
-        mdbstore_add_term (store, LSUP_default_ctx_buf);
+        mdbstore_add_term (store, LSUP_default_ctx_buf, txn);
     }
 
-    store->state |= LSSTORE_OPEN;
-    mdb_txn_commit (store->txn);
-    store->txn = NULL;
+    store->flags |= LSSTORE_OPEN;
+    mdb_txn_commit (txn);
+    txn = NULL;
 
     return store;
 
 fail:
-    if (store->txn) mdb_txn_abort (store->txn);
+    if (txn) mdb_txn_abort (txn);
     mdb_env_close (store->env);
 
     return NULL;
 }
 
 
-void
+static void
 mdbstore_free (void *h)
 {
     MDBStore *store = h;
-    if (store->state & LSSTORE_OPEN) {
+    if (store->flags & LSSTORE_OPEN) {
         const char *path;
         mdb_env_get_path (store->env, &path);
         log_info ("Closing MDB env at %s.", path);
@@ -451,7 +467,9 @@ mdbstore_free (void *h)
 }
 
 
-char *mdbstore_id (const void *h)
+#if 0
+static char *
+mdbstore_id (const void *h)
 {
     const MDBStore *store = h;
     const char *path;
@@ -459,12 +477,13 @@ char *mdbstore_id (const void *h)
 
     return strcat ("file://", path);
 }
+#endif
 
 
-LSUP_rc
+static LSUP_rc
 mdbstore_stat (const MDBStore *store, MDB_stat *stat)
 {
-    if (!(store->state & LSSTORE_OPEN)) return 0;
+    if (!(store->flags & LSSTORE_OPEN)) return 0;
 
     MDB_txn *txn;
     mdb_txn_begin (store->env, NULL, MDB_RDONLY, &txn);
@@ -477,7 +496,7 @@ mdbstore_stat (const MDBStore *store, MDB_stat *stat)
 }
 
 
-size_t
+static size_t
 mdbstore_size (const void *h)
 {
     const MDBStore *store = h;
@@ -490,8 +509,48 @@ mdbstore_size (const void *h)
 }
 
 
-void *
-mdbstore_add_init (void *h, const LSUP_Buffer *sc)
+static LSUP_rc
+mdbstore_txn_begin (void *h, int flags, void **th)
+{
+    MDBStore *store = h;
+
+    RCCK (mdb_txn_begin (store->env, NULL, flags, (MDB_txn **) th));
+
+    return LSUP_OK;
+}
+
+
+static LSUP_rc
+mdbstore_txn_commit (void *th)
+{
+    RCCK (mdb_txn_commit ((MDB_txn *) th));
+
+    return LSUP_OK;
+}
+
+
+static void
+mdbstore_txn_abort (void *th)
+{ mdb_txn_abort ((MDB_txn *) th); }
+
+
+static void *
+mdbiter_txn (void *h)
+{ return ((MDBIterator *) h)->txn; }
+
+
+/** @brief Begin an add loop.
+ *
+ * @sa #store_add_init_fn_t
+ *
+ * @param[in] th Previously opened MDB_txn handle, if the add loop shall be
+ *  run within a broader transaction. The transaction must be read-write. The
+ *  operation will always open a new transaction that is closed with
+ *  #mdbstore_add_done() or #mdbstore_add_abort(). If this parameter is not
+ *  NULL, the loop transaction will have the passed txn set as its parent.
+ */
+static void *
+mdbstore_add_init (void *h, const LSUP_Buffer *sc, void *th)
 {
     MDBStore *store = h;
     /* An iterator is used here. Some members are a bit misused but it does
@@ -503,12 +562,7 @@ mdbstore_add_init (void *h, const LSUP_Buffer *sc)
     it->store = store;
     it->i = 0;
 
-    // No other write transaction may be open.
-    if (UNLIKELY (it->store->txn)) {
-        log_error ("A write transaction is already open.");
-        return NULL;
-    }
-    mdb_txn_begin (store->env, NULL, 0, &it->store->txn);
+    mdb_txn_begin (store->env, (MDB_txn *) th, 0, &it->txn);
 
     if (sc) {
         // Store context if it's not the default one.
@@ -522,11 +576,11 @@ mdbstore_add_init (void *h, const LSUP_Buffer *sc)
         it->data.mv_size = sc->size;
 
         int db_rc = mdb_put(
-                it->store->txn, it->store->dbi[IDX_T_ST],
+                it->txn, it->store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE);
         if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
             LOG_RC (db_rc);
-            mdb_txn_abort (it->store->txn);
+            mdb_txn_abort (it->txn);
             return NULL;
         }
     } else {
@@ -544,7 +598,7 @@ mdbstore_add_init (void *h, const LSUP_Buffer *sc)
  * #mdbstore_add_abort or #mdbstore_add_done. FIXME
  *
  */
-LSUP_rc
+static LSUP_rc
 mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
 {
     if (UNLIKELY (!h)) return LSUP_VALUE_ERR;
@@ -565,7 +619,7 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
         it->data.mv_size = st->size;
 
         db_rc = mdb_put(
-                it->store->txn, it->store->dbi[IDX_T_ST],
+                it->txn, it->store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE);
         if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
             LOG_RC (db_rc);
@@ -585,7 +639,7 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
     it->data.mv_size = it->luc == NULL_KEY ? 0 : KLEN;
 
     db_rc = mdb_put(
-            it->store->txn, it->store->dbi[IDX_SPO_C],
+            it->txn, it->store->dbi[IDX_SPO_C],
             &it->key, &it->data, MDB_NODUPDATA);
 
     if (db_rc == MDB_KEYEXIST) return LSUP_NOACTION;
@@ -596,46 +650,43 @@ mdbstore_add_iter (void *h, const LSUP_BufferTriple *sspo)
     }
 
     // Index.
-    LSUP_rc rc = index_triple (it->store, OP_ADD, spok, it->luc);
+    LSUP_rc rc = index_triple (it->store, OP_ADD, spok, it->luc, it->txn);
     if (rc == LSUP_OK) it->i++;
 
     return rc;
 }
 
 
-LSUP_rc
+static LSUP_rc
 mdbstore_add_done (void *h)
 {
     MDBIterator *it = h;
     LSUP_rc rc = LSUP_OK;
 
-    if (mdb_txn_commit (it->store->txn) != MDB_SUCCESS) {
-        mdb_txn_abort (it->store->txn);
-        rc = LSUP_DB_ERR;
+    if (mdb_txn_commit (it->txn) != MDB_SUCCESS) {
+        mdb_txn_abort (it->txn);
+        rc = LSUP_TXN_ERR;
     }
 
-    it->store->txn = NULL;
-
     free (it);
 
     return rc;
 }
 
 
-void
+static void
 mdbstore_add_abort (void *h)
 {
     MDBIterator *it = h;
-    mdb_txn_abort (it->store->txn);
-
-    it->store->txn = NULL;
+    mdb_txn_abort (it->txn);
 
     free (it);
 }
 
 
+#if 0
 /* TODO deprecate. Use low-level instead and abstract at graph level. */
-LSUP_rc
+static LSUP_rc
 mdbstore_add (
         void *h, const LSUP_Buffer *sc,
         const LSUP_BufferTriple strp[], const size_t ct, size_t *inserted)
@@ -655,6 +706,7 @@ mdbstore_add (
 
     return mdbstore_add_done (it);
 }
+#endif
 
 
 static LSUP_rc
@@ -682,12 +734,11 @@ key_to_sterm (MDBIterator *it, const LSUP_Key key, LSUP_Buffer *sterm)
 }
 
 
-void *
+static void *
 mdbstore_lookup (
         void *h, const LSUP_Buffer *ss, const LSUP_Buffer *sp,
-        const LSUP_Buffer *so, const LSUP_Buffer *sc, size_t *ct)
+        const LSUP_Buffer *so, const LSUP_Buffer *sc, void *th, size_t *ct)
 {
-    MDBStore *store = h;
     LSUP_TripleKey spok = {
         LSUP_buffer_hash (ss),
         LSUP_buffer_hash (sp),
@@ -697,7 +748,7 @@ mdbstore_lookup (
     MDBIterator *it;
     CALLOC_GUARD (it, NULL);
 
-    it->store = store;
+    it->store = h;
     it->luc = LSUP_buffer_hash (sc);
     log_debug ("Lookup context: %lx", it->luc);
 
@@ -706,13 +757,14 @@ mdbstore_lookup (
     uint8_t idx0, idx1;
 
     // Start RO transaction if not in a write txn already.
-    if (it->store->txn) it->txn = it->store->txn;
+    if (th) it->txn = th;
     else {
         it->rc = mdb_txn_begin (it->store->env, NULL, MDB_RDONLY, &it->txn);
         if (it->rc != MDB_SUCCESS) {
             log_error ("Database error: %s", LSUP_strerror (it->rc));
             return NULL;
         }
+        it->flags |= ITER_OPEN_TXN;
     }
 
     // Context index loop.
@@ -780,7 +832,7 @@ mdbstore_lookup (
  * ckset is filled with an array of contexts that the triple appears
  * in, if not NULL.
  */
-inline static LSUP_rc
+static LSUP_rc
 mdbiter_next_key (MDBIterator *it)
 {
     if (UNLIKELY (!it)) return LSUP_VALUE_ERR;
@@ -872,7 +924,7 @@ mdbiter_next_key (MDBIterator *it)
 }
 
 
-LSUP_rc
+static LSUP_rc
 mdbiter_next (
         void *h, LSUP_BufferTriple *sspo, LSUP_Buffer **ctx_p)
 {
@@ -911,7 +963,7 @@ mdbiter_next (
 }
 
 
-void
+static void
 mdbiter_free (void *h)
 {
     MDBIterator *it = h;
@@ -919,17 +971,17 @@ mdbiter_free (void *h)
 
     if (it->cur) mdb_cursor_close (it->cur);
     if (it->ctx_cur) mdb_cursor_close (it->ctx_cur);
-    if (it->store->txn != it->txn) mdb_txn_abort (it->txn);
+    if (it->flags & ITER_OPEN_TXN) mdb_txn_abort (it->txn);
     free (it->ck);
 
     free (it);
 }
 
 
-LSUP_rc
-mdbstore_remove(
+static LSUP_rc
+mdbstore_remove (
         void *h, const LSUP_Buffer *ss, const LSUP_Buffer *sp,
-        const LSUP_Buffer *so, const LSUP_Buffer *sc, size_t *ct)
+        const LSUP_Buffer *so, const LSUP_Buffer *sc, void *th, size_t *ct)
 {
     MDBStore *store = h;
     LSUP_rc rc = LSUP_NOACTION, db_rc;
@@ -939,13 +991,12 @@ mdbstore_remove(
     if (sc == NULL) sc = LSUP_default_ctx_buf;
     ck = LSUP_buffer_hash (sc);
 
-    // No other write transaction may be open.
-    if (UNLIKELY (store->txn)) return LSUP_TXN_ERR;
-    mdb_txn_begin (store->env, NULL, 0, &store->txn);
+    MDB_txn *txn;
+    mdb_txn_begin (store->env, (MDB_txn *) th, 0, &txn);
 
     MDB_cursor *dcur, *icur;
-    mdb_cursor_open (store->txn, store->dbi[IDX_SPO_C], &dcur);
-    mdb_cursor_open (store->txn, store->dbi[IDX_C_SPO], &icur);
+    mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &dcur);
+    mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &icur);
 
     MDB_val spok_v, ck_v;
 
@@ -953,7 +1004,8 @@ mdbstore_remove(
     ck_v.mv_size = KLEN;
     ck_v.mv_data = &ck;
 
-    MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, ct);
+    // The lookup operates within the current (bottom) write transaction.
+    MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, txn, ct);
     if (UNLIKELY (!it)) return LSUP_DB_ERR;
     if (ct) log_debug ("Found %lu triples to remove.", *ct);
 
@@ -993,22 +1045,20 @@ mdbstore_remove(
         if (db_rc == MDB_SUCCESS) continue;
         if (UNLIKELY (db_rc != MDB_NOTFOUND)) goto fail;
 
-        rc = index_triple (store, OP_REMOVE, it->spok, ck);
+        rc = index_triple (store, OP_REMOVE, it->spok, ck, txn);
     }
 
     mdbiter_free (it);
 
-    if (UNLIKELY (mdb_txn_commit (store->txn) != MDB_SUCCESS)) {
+    if (UNLIKELY (mdb_txn_commit (txn) != MDB_SUCCESS)) {
         rc = LSUP_TXN_ERR;
         goto fail;
     }
-    store->txn = NULL;
 
     return rc;
 
 fail:
-    mdb_txn_abort (store->txn);
-    store->txn = NULL;
+    mdb_txn_abort (txn);
 
     log_error ("Database error: %s", LSUP_strerror (db_rc));
 
@@ -1016,7 +1066,8 @@ fail:
 }
 
 
-int
+#if 0
+static int
 mdbstore_tkey_exists (MDBStore *store, LSUP_Key tkey)
 {
     int db_rc, rc;
@@ -1044,22 +1095,23 @@ mdbstore_tkey_exists (MDBStore *store, LSUP_Key tkey)
 
     return rc;
 }
+#endif
 
 
 static LSUP_rc
-mdbstore_add_term (void *h, const LSUP_Buffer *sterm)
+mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th)
 {
     MDBStore *store = h;
     int db_rc;
     MDB_val key, data;
 
     MDB_txn *txn;
-    // If store->txn exists, open a child txn, otherwise parent should be NULL.
-    RCCK (mdb_txn_begin (store->env, store->txn, 0, &txn));
+    // If store->txn exists, open a child txn, otherwise reuse the same txn.
+    if (th) txn = th;
+    else RCCK (mdb_txn_begin (store->env, NULL, 0, &txn));
 
     MDB_cursor *cur;
-    db_rc = mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur);
-    if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
+    CHECK (mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur), fail);
 
     LSUP_Key k = LSUP_buffer_hash (sterm);
     key.mv_data = &k;
@@ -1071,17 +1123,12 @@ mdbstore_add_term (void *h, const LSUP_Buffer *sterm)
     db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
     if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
 
-    if (txn != store->txn) {
-        db_rc = mdb_txn_commit (txn);
-        txn = NULL;
-        CHECK (db_rc, fail);
-    }
+    if (!th) CHECK (db_rc = mdb_txn_commit (txn), fail);
 
     return LSUP_OK;
 
 fail:
-    log_error (mdb_strerror (db_rc));
-    if (txn) mdb_txn_abort (txn);
+    if (!th) mdb_txn_abort (txn);
     return LSUP_DB_ERR;
 }
 
@@ -1097,6 +1144,11 @@ const LSUP_StoreInt mdbstore_int = {
 
     .size_fn        = mdbstore_size,
 
+    .txn_begin_fn   = mdbstore_txn_begin,
+    .txn_commit_fn  = mdbstore_txn_commit,
+    .txn_abort_fn   = mdbstore_txn_abort,
+    .iter_txn_fn    = mdbiter_txn,
+
     .add_init_fn    = mdbstore_add_init,
     .add_iter_fn    = mdbstore_add_iter,
     .add_abort_fn   = mdbstore_add_abort,
@@ -1122,9 +1174,13 @@ const LSUP_StoreInt mdbstore_int = {
  * @param op[in] Store operation. One of OP_ADD or OP_REMOVE.
  * @param spok[in] Triple key to index.
  * @param ck[in] Context to index, may be NULL.
+ * @param[in] th Transaction handle. This MUST be a valid pointer to an open
+ *  RW transaction.
  */
 static LSUP_rc
-index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
+index_triple(
+        MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck,
+        MDB_txn *txn)
 {
     int db_rc;
     LSUP_rc rc = LSUP_NOACTION;
@@ -1143,7 +1199,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v2.mv_data = spok;
             v2.mv_size = TRP_KLEN;
 
-            mdb_cursor_open (store->txn, store->dbi[IDX_C_SPO], &cur);
+            mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur);
             if (mdb_cursor_get (cur, &v1, &v2, MDB_GET_BOTH) == MDB_SUCCESS) {
                 db_rc = mdb_cursor_del (cur, 0);
                 if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
@@ -1163,7 +1219,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v2.mv_size = TRP_KLEN;
 
             db_rc = mdb_put(
-                    store->txn, store->dbi[IDX_C_SPO],
+                    txn, store->dbi[IDX_C_SPO],
                     &v1, &v2, MDB_NODUPDATA);
             if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
             if (db_rc != MDB_KEYEXIST) rc = LSUP_OK;
@@ -1190,8 +1246,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
 
         if (op == OP_REMOVE) {
             MDB_cursor *cur1, *cur2;
-            mdb_cursor_open(
-                    store->txn, store->dbi[lookup_indices[i]], &cur1);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
 
             db_rc = mdb_cursor_get (cur1, &v1, &v2, MDB_GET_BOTH);
             if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur1, 0);
@@ -1202,8 +1257,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
             v1.mv_data = spok + i;
             v2.mv_data = dbl_keys[i];
 
-            mdb_cursor_open(
-                    store->txn, store->dbi[lookup_indices[i + 3]], &cur2);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
 
             db_rc = mdb_cursor_get (cur2, &v2, &v1, MDB_GET_BOTH);
             if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur2, 0);
@@ -1219,7 +1273,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
                     "%lx: %lx %lx", *(size_t*)(v1.mv_data),
                     *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
 
-            db_rc = mdb_put (store->txn, db1, &v1, &v2, MDB_NODUPDATA);
+            db_rc = mdb_put (txn, db1, &v1, &v2, MDB_NODUPDATA);
 
             if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
             else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;
@@ -1230,7 +1284,7 @@ index_triple(MDBStore *store, StoreOp op, LSUP_TripleKey spok, LSUP_Key ck)
                     "%lx %lx: %lx", *(size_t*)(v2.mv_data),
                     *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
 
-            db_rc = mdb_put (store->txn, db2, &v2, &v1, MDB_NODUPDATA);
+            db_rc = mdb_put (txn, db2, &v2, &v1, MDB_NODUPDATA);
 
             if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
             else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;

+ 172 - 7
test/test_store.c

@@ -34,7 +34,7 @@ static int test_triple_store()
     }
 
     // Test adding.
-    void *it = store->sif->add_init_fn (store->data, NULL);
+    void *it = store->sif->add_init_fn (store->data, NULL, NULL);
     size_t ins = 0;
     for (size_t i = 0; i < NUM_TRP; i++) {
         LSUP_rc rc = store->sif->add_iter_fn (it, ser_trp + i);
@@ -102,7 +102,8 @@ static int test_triple_store()
         log_info ("Testing triple lookup #%d.", i);
 
         void *it = store->sif->lookup_fn (
-                store->data, lut[i][0], lut[i][1], lut[i][2], luc[i], &ct);
+                store->data, lut[i][0], lut[i][1], lut[i][2], luc[i],
+                NULL, &ct);
         ASSERT (it != NULL, "Error creating iterator!");
         EXPECT_INT_EQ (ct, results[i]);
 
@@ -153,7 +154,7 @@ static int test_quad_store()
     size_t ins;
 
     // Only triples 0÷5 in default context.
-    it = store->sif->add_init_fn (store->data, NULL);
+    it = store->sif->add_init_fn (store->data, NULL, NULL);
     ins = 0;
     for (size_t i = 0; i < 6; i++) {
         log_info ("Inserting triple #%d in default context.", i);
@@ -170,7 +171,7 @@ static int test_quad_store()
     LSUP_Buffer *sc2 = LSUP_term_serialize (ctx2);
 
     // Only triples 4÷9 in context 2 (effectively 4 non-duplicates).
-    it = store->sif->add_init_fn (store->data, sc2);
+    it = store->sif->add_init_fn (store->data, sc2, NULL);
     ASSERT (it != NULL, "Error creating iterator!");
     ins = 0;
     for (size_t i = 4; i < 10; i++) {
@@ -322,7 +323,8 @@ static int test_quad_store()
 
         log_info ("Checking triple #%d.", i);
         void *it = store->sif->lookup_fn (
-                store->data, lut[i][0], lut[i][1], lut[i][2], luc[i], &ct);
+                store->data, lut[i][0], lut[i][1], lut[i][2], luc[i],
+                NULL, &ct);
         ASSERT (it != NULL, "Lookup error!");
         EXPECT_INT_EQ (ct, results[i]);
 
@@ -333,7 +335,7 @@ static int test_quad_store()
     for (int i = 0; i < 10; i++) {
         void *it = store->sif->lookup_fn (
                 store->data, ser_trp[i].s, ser_trp[i].p, ser_trp[i].o,
-                NULL, NULL);
+                NULL, NULL, NULL);
         log_info ("Checking contexts for triple %d.", i);
         LSUP_Buffer *ctx_a;
         EXPECT_PASS (store->sif->lu_next_fn (it, NULL, &ctx_a));
@@ -366,6 +368,167 @@ static int test_quad_store()
 }
 
 
+static int test_txn_commit (void)
+{
+    if (!(store->sif->features & LSUP_STORE_TXN)) return 0;
+
+    ASSERT (
+            store->sif->txn_begin_fn != NULL,
+            "Transaction begin function is NULL!");
+    ASSERT (
+            store->sif->txn_commit_fn != NULL,
+            "Transaction commit function is NULL!");
+    ASSERT (
+            store->sif->txn_abort_fn != NULL,
+            "Transaction abort function is NULL!");
+    ASSERT (
+            store->sif->add_abort_fn != NULL,
+            "Add abort function is NULL!");
+    ASSERT (
+            store->sif->iter_txn_fn != NULL,
+            "Iterator transaction function is NULL!");
+
+    if (store->sif->setup_fn)
+        EXPECT_PASS (store->sif->setup_fn (store->id, true));
+
+    log_info ("Testing transaction control for %s", store->id);
+    store->data = store->sif->new_fn (store->id, 0);
+    ASSERT (store->data != NULL, "Error creating store data back end!");
+
+    LSUP_Triple *trp = create_triples();
+    LSUP_BufferTriple ser_trp[NUM_TRP];
+
+    for (int i = 0; i < NUM_TRP; i++) {
+        LSUP_BufferTriple *tmp = LSUP_triple_serialize (trp + i);
+        ser_trp[i] = *tmp;
+        free (tmp);
+    }
+
+    void *it;
+    // Start adding then commit.
+    it = store->sif->add_init_fn (store->data, NULL, NULL);
+    for (size_t i = 0; i < NUM_TRP; i++) {
+        LSUP_rc rc = store->sif->add_iter_fn (it, ser_trp + i);
+        ASSERT (rc >= 0, "Error inserting triples!");
+    }
+    store->sif->add_abort_fn (it);
+
+    EXPECT_INT_EQ (store->sif->size_fn (store->data), 0);
+
+    // Add within a transaction, commit, then commit parent transaction.
+    void *txn;
+    EXPECT_PASS (store->sif->txn_begin_fn (store->data, 0, &txn));
+    it = store->sif->add_init_fn (store->data, NULL, txn);
+    for (size_t i = 0; i < NUM_TRP; i++) {
+        LSUP_rc rc = store->sif->add_iter_fn (it, ser_trp + i);
+        ASSERT (rc >= 0, "Error inserting triples!");
+    }
+    store->sif->add_done_fn (it);
+
+    // Triples are added in child txn but parent is still open.
+    // Size function always calculates outside of all transactions.
+    EXPECT_INT_EQ (store->sif->size_fn (store->data), 0);
+
+    size_t ct;
+    it = store->sif->lookup_fn (
+            store->data, NULL, NULL, NULL, NULL, txn, &ct);
+    store->sif->lu_free_fn (it);
+    // Should show triples added within the parent txn.
+    EXPECT_INT_EQ (ct, 8);
+
+    // commit child txn operations.
+    EXPECT_PASS (store->sif->txn_commit_fn (txn));
+    it = store->sif->lookup_fn (
+            store->data, NULL, NULL, NULL, NULL, NULL, &ct);
+    store->sif->lu_free_fn (it);
+    EXPECT_INT_EQ (ct, 8);
+    EXPECT_INT_EQ (store->sif->size_fn (store->data), 8);
+
+    for (int i = 0; i < NUM_TRP; i++) {
+        LSUP_buffer_free (ser_trp[i].s);
+        LSUP_buffer_free (ser_trp[i].p);
+        LSUP_buffer_free (ser_trp[i].o);
+    }
+
+    store->sif->free_fn (store->data);
+    free_triples (trp);
+
+    return 0;
+}
+
+
+static int test_txn_abort (void)
+{
+    if (!(store->sif->features & LSUP_STORE_TXN)) return 0;
+
+    if (store->sif->setup_fn)
+        EXPECT_PASS (store->sif->setup_fn (store->id, true));
+
+    log_info ("Testing transaction control for %s", store->id);
+    store->data = store->sif->new_fn (store->id, 0);
+    ASSERT (store->data != NULL, "Error creating store data back end!");
+
+    LSUP_Triple *trp = create_triples();
+    LSUP_BufferTriple ser_trp[NUM_TRP];
+
+    for (int i = 0; i < NUM_TRP; i++) {
+        LSUP_BufferTriple *tmp = LSUP_triple_serialize (trp + i);
+        ser_trp[i] = *tmp;
+        free (tmp);
+    }
+
+    void *it;
+    // Start adding then abort.
+    it = store->sif->add_init_fn (store->data, NULL, NULL);
+    for (size_t i = 0; i < NUM_TRP; i++) {
+        LSUP_rc rc = store->sif->add_iter_fn (it, ser_trp + i);
+        ASSERT (rc >= 0, "Error inserting triples!");
+    }
+    store->sif->add_abort_fn (it);
+
+    EXPECT_INT_EQ (store->sif->size_fn (store->data), 0);
+
+    // Add within a transaction, commit, then abort parent transaction.
+    void *txn;
+    EXPECT_PASS (store->sif->txn_begin_fn (store->data, 0, &txn));
+    it = store->sif->add_init_fn (store->data, NULL, txn);
+    for (size_t i = 0; i < NUM_TRP; i++) {
+        LSUP_rc rc = store->sif->add_iter_fn (it, ser_trp + i);
+        ASSERT (rc >= 0, "Error inserting triples!");
+    }
+    store->sif->add_done_fn (it);
+
+    // Triples are added in child txn but parent is still open.
+    // Size function always calculates outside of all transactions.
+    EXPECT_INT_EQ (store->sif->size_fn (store->data), 0);
+
+    size_t ct;
+    it = store->sif->lookup_fn (
+            store->data, NULL, NULL, NULL, NULL, txn, &ct);
+    store->sif->lu_free_fn (it);
+    // Should show triples added within the parent txn.
+    EXPECT_INT_EQ (ct, 8);
+
+    // Discard child txn operations.
+    store->sif->txn_abort_fn (txn);
+    it = store->sif->lookup_fn (
+            store->data, NULL, NULL, NULL, NULL, NULL, &ct);
+    store->sif->lu_free_fn (it);
+    EXPECT_INT_EQ (ct, 0);
+
+    for (int i = 0; i < NUM_TRP; i++) {
+        LSUP_buffer_free (ser_trp[i].s);
+        LSUP_buffer_free (ser_trp[i].p);
+        LSUP_buffer_free (ser_trp[i].o);
+    }
+
+    store->sif->free_fn (store->data);
+    free_triples (trp);
+
+    return 0;
+}
+
+
 int store_tests()
 {
 #define ENTRY(a, b) \
@@ -373,7 +536,9 @@ int store_tests()
     store->id = STORE_ID_##a; \
     store->sif = &b; \
     RUN (test_triple_store); \
-    RUN (test_quad_store);
+    RUN (test_quad_store); \
+    RUN (test_txn_commit); \
+    RUN (test_txn_abort);
 BACKEND_TBL
 #undef ENTRY
     return 0;