Browse Source

Pass all txn tests:

* Resolve MDB txn deadlock.
* Embed txn handle in MDB iterator.
Stefano Cossu 1 year ago
parent
commit
fde5290933
4 changed files with 45 additions and 23 deletions
  1. 3 7
      include/graph.h
  2. 11 8
      src/graph.c
  3. 30 8
      src/store_mdb.c
  4. 1 0
      src/term.c

+ 3 - 7
include/graph.h

@@ -245,7 +245,8 @@ void LSUP_graph_abort (LSUP_Graph *gr, void *txn);
 
 /** @brief Initialize an iterator to add triples.
  *
- * @param[in] txn Transaction handle. It may be NULL.
+ * @param[in] txn Transaction handle. It may be NULL. If not NULL, its handle
+ * will be bound to the iterator handle for its whole lifa cycle.
  *
  * @param[in] gr Graph to add to. It is added to the iterator state.
  *
@@ -267,12 +268,7 @@ LSUP_graph_add_init_txn (void *txn, LSUP_Graph *gr);
  * @param[in] spo Triple to add. Caller retains ownership.
  */
 LSUP_rc
-LSUP_graph_add_iter_txn (
-        void *txn, LSUP_GraphIterator *it, const LSUP_Triple *spo);
-
-
-/// Non-transactional version of #LSUP_graph_add_iter_txn.
-#define LSUP_graph_add_iter(...) LSUP_graph_add_iter_txn (NULL, __VA_ARGS__)
+LSUP_graph_add_iter (LSUP_GraphIterator *it, const LSUP_Triple *spo);
 
 
 /** @brief Finalize an add iteration loop and free the iterator.

+ 11 - 8
src/graph.c

@@ -260,8 +260,7 @@ LSUP_graph_add_init_txn (void *txn, LSUP_Graph *gr)
 
 
 LSUP_rc
-LSUP_graph_add_iter_txn (
-        void *txn, LSUP_GraphIterator *it, const LSUP_Triple *spo)
+LSUP_graph_add_iter (LSUP_GraphIterator *it, const LSUP_Triple *spo)
 {
     log_trace (
             "Adding triple: {%s, %s, %s}",
@@ -270,16 +269,19 @@ LSUP_graph_add_iter_txn (
     if (UNLIKELY (!sspo)) return LSUP_MEM_ERR;
     const LSUP_StoreInt *sif = it->graph->store->sif;
 
-    LSUP_rc rc = sif->add_iter_fn (it->data, sspo);
-    PCHECK (rc, finally);
+    LSUP_rc rc;
+
+    PCHECK (rc = sif->add_iter_fn (it->data, sspo), finally);
 
-    // Store datatype term permanently if the store supports it.
+    // Store datatype term permanently.
     if (rc == LSUP_OK && sif->add_term_fn) {
         for (int i = 0; i < 3; i++) {
             LSUP_Term *term = LSUP_triple_pos (spo, i);
             if (term->type == LSUP_TERM_LITERAL) {
                 LSUP_Buffer *ser_dtype = LSUP_term_serialize (term->datatype);
-                sif->add_term_fn ( it->graph->store->data, ser_dtype, txn);
+                LSUP_rc term_rc = sif->add_term_fn (
+                        it->graph->store->data, ser_dtype, it->data);
+                PCHECK (term_rc, finally);
                 LSUP_buffer_free (ser_dtype);
             }
         }
@@ -315,7 +317,7 @@ LSUP_graph_add_txn (
     for (size_t i = 0; trp[i].s != NULL; i++) {
         log_trace ("Inserting triple #%lu", i);
 
-        LSUP_rc db_rc = LSUP_graph_add_iter_txn (txn, it, trp + i);
+        LSUP_rc db_rc = LSUP_graph_add_iter (it, trp + i);
 
         if (db_rc == LSUP_OK) {
             rc = LSUP_OK;
@@ -327,6 +329,7 @@ LSUP_graph_add_txn (
             rc = db_rc;
             goto finally;
         }
+        log_trace ("Graph size at end of add iter: %lu", LSUP_graph_size (gr));
     }
 
 finally:
@@ -379,7 +382,7 @@ LSUP_graph_copy_contents_txn (
     LSUP_Triple *spo = NULL;
     LSUP_GraphIterator *add_it = LSUP_graph_add_init_txn (txn, dest);
     while (LSUP_graph_iter_next (it, &spo) != LSUP_END) {
-        LSUP_rc add_rc = LSUP_graph_add_iter_txn (txn, add_it, spo);
+        LSUP_rc add_rc = LSUP_graph_add_iter (add_it, spo);
         LSUP_triple_free (spo);
         if (LIKELY (add_rc == LSUP_OK)) rc = LSUP_OK;
         else if (add_rc < 0) {

+ 30 - 8
src/store_mdb.c

@@ -435,7 +435,12 @@ mdbstore_new (const char *id, size_t _unused)
         mdbstore_nsm_put (store, LSUP_default_nsm, txn);
 
         // Index default context.
-        mdbstore_add_term (store, LSUP_default_ctx_buf, txn);
+        // Create a dummy iterator just to use the current txn.
+        MDBIterator *it;
+        CALLOC_GUARD (it, NULL);
+        it->txn = txn;
+        mdbstore_add_term (store, LSUP_default_ctx_buf, it);
+        free (it);
     }
 
     store->flags |= LSSTORE_OPEN;
@@ -544,8 +549,8 @@ mdbiter_txn (void *h)
  * @sa #store_add_init_fn_t
  *
  * @param[in] th Previously opened MDB_txn handle, if the add loop shall be
- *  run within a broader transaction. The transaction must be read-write. The
- *  operation will always open a new transaction that is closed with
+ *  run within an enclosing transaction. The transaction must be read-write.
+ *  The operation will always open a new transaction that is closed with
  *  #mdbstore_add_done() or #mdbstore_add_abort(). If this parameter is not
  *  NULL, the loop transaction will have the passed txn set as its parent.
  */
@@ -1100,16 +1105,32 @@ mdbstore_tkey_exists (MDBStore *store, LSUP_Key tkey)
 #endif
 
 
+/** @brief Add a term to the store.
+ *
+ * @param[in] h #MDBStore handle.
+ *
+ * @param[in] sterm Serialized term to store.
+ *
+ * @param[in] ith #MDBIterator handle. Only the transaction handle inside this
+ * is used. It may be NULL, in which case a new transaction is opened and
+ * closed for the operation.
+ *
+ * @return LSUP_OK on success; <0 on error.
+ */
 static LSUP_rc
-mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th)
+mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *ith)
 {
+    //log_trace ("Adding term to MDB store: %s", sterm->addr);
     MDBStore *store = h;
     int db_rc;
     MDB_val key, data;
 
+    MDBIterator *it = ith;
     MDB_txn *txn;
-    // If store->txn exists, open a child txn, otherwise reuse the same txn.
-    if (th) txn = th;
+    // If a transaction is active in the iterator, use it, otherwise open and
+    // close a new one.
+    bool borrowed_txn = (it && it->txn);
+    if (borrowed_txn) txn = it->txn;
     else RCCK (mdb_txn_begin (store->env, NULL, 0, &txn));
 
     MDB_cursor *cur;
@@ -1125,12 +1146,13 @@ mdbstore_add_term (void *h, const LSUP_Buffer *sterm, void *th)
     db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
     if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
 
-    if (!th) CHECK (db_rc = mdb_txn_commit (txn), fail);
+    if (!borrowed_txn) CHECK (db_rc = mdb_txn_commit (txn), fail);
 
     return LSUP_OK;
 
 fail:
-    if (!th) mdb_txn_abort (txn);
+    if (!borrowed_txn) mdb_txn_abort (txn);
+    log_trace ("Aborted txn for adding term.");
     return LSUP_DB_ERR;
 }
 

+ 1 - 0
src/term.c

@@ -330,6 +330,7 @@ LSUP_term_serialize (const LSUP_Term *term)
     LSUP_Buffer *sterm;
     MALLOC_GUARD (sterm, NULL);
 
+    //log_trace ("Effective term being serialized: %s", tmp_term->data);
     int rc = tpl_jot (
             TPL_MEM, &sterm->addr, &sterm->size, TERM_PACK_FMT,
             &tmp_term->type, &tmp_term->data, &metadata);