Kaynağa Gözat

Add add_iter_abort; use child txn; pass mdbstore tests.

Stefano Cossu 3 yıl önce
ebeveyn
işleme
a2cc2755ef
4 değiştirilmiş dosya ile 162 ekleme ve 86 silme
  1. 1 1
      include/core.h
  2. 36 7
      include/store_mdb.h
  3. 9 6
      src/graph.c
  4. 116 72
      src/store_mdb.c

+ 1 - 1
include/core.h

@@ -32,7 +32,7 @@
 #define CRITICAL(exp)   if (UNLIKELY ((exp) == NULL)) abort()
 
 // TODO Cross-platform ramdisk path.
-#define TMPDIR /tmp
+#define TMPDIR "/tmp"
 
 #define KLEN sizeof(LSUP_Key)
 #define DBL_KLEN sizeof(LSUP_DoubleKey)

+ 36 - 7
include/store_mdb.h

@@ -129,24 +129,40 @@ LSUP_mdbstore_add_init(LSUP_MDBStore *store, const LSUP_Buffer *sc);
  * followed by #LSUP_mdbstore_add_done.
  *
  * @param it[in] Iterator obtained by #LSUP_mdbstore_add_init.
+ *  The following members are of interest:
+ *  it->i stores the total number of records inserted.
  *
  * @param sspo[in] Serialized triple to be added.
+ *
+ * @return LSUP_OK if the triple was inserted; LSUP_NOACTION if the triple
+ *  already existed; LSUP_DB_ERR if an MDB error occurred.
  */
 LSUP_rc
 LSUP_mdbstore_add_iter(struct MDBIterator *it, const LSUP_SerTriple *sspo);
 
 
-/** @brief Finalize an add loop.
+/** @brief Finalize an add loop and free iterator.
+ *
+ * If a count of inserted records is needed, #LSUP_mdbiter_i must be called
+ * before this function.
  *
  * This must be called after #LSUP_mdbstore_add_iter.
  *
  * @param it[in] Iterator obtained by #LSUP_mdbstore_add_init.
- *
- * @param inserted[out] If not NULL this is populated with the number of
- *  triples effectively inserted.
  */
 LSUP_rc
-LSUP_mdbstore_add_done(LSUP_MDBIterator *it, size_t *inserted);
+LSUP_mdbstore_add_done (LSUP_MDBIterator *it);
+
+
+/** @brief Abort an add loop and free iterator.
+ *
+ * Usually called on an irrecoverable error from LSUP_mdb_add_iter. None of the
+ * successful inserts in the same loop is retained.
+ *
+ * @param it[in] Iterator obtained by #LSUP_mdbstore_add_init.
+ */
+void
+LSUP_mdbstore_add_abort (LSUP_MDBIterator *it);
 
 
 /** @brief Add a batch of triples with optional context to the store.
@@ -232,8 +248,8 @@ LSUP_mdbstore_lookup(
  * @param it[in] Opaque iterator handle obtained with #LSUP_mdbstore_lookup.
  *
  * @param sspo[out] #LSUP_SerTriple to be populated with three serialized terms
- * if found, NULL if not found. Internal callers may pass NULL if they don't
- * need the serialized triples.
+ * if found, NULL if not found. Internal callers (e.g. counters) may pass NULL
+ * if they don't need the serialized triples.
  *
  * @return LSUP_OK if results were found; LSUP_END if no (more) results were
  * found; LSUP_DB_ERR if a MDB_* error occurred.
@@ -241,6 +257,19 @@ LSUP_mdbstore_lookup(
 LSUP_rc LSUP_mdbiter_next(LSUP_MDBIterator *it, LSUP_SerTriple *sspo);
 
 
+/** @brief Iterator's internal counter.
+ *
+ * This is only useful with #LSUP_mdbstore_add_iter to count inserted records.
+ *
+ * @param it[in] An iterator primed with LSUP_mdbstore_add_init.
+ *
+ * @return The value of the #i member. For an add iterator, this is the number
+ *  of succcessfully inserted records.
+ */
+size_t
+LSUP_mdbiter_i (LSUP_MDBIterator *it);
+
+
 /** @brief Free an iterator allocated by a lookup.
  *
  * @param it[in] Iterator pointer. It will be set to NULL after freeing.

+ 9 - 6
src/graph.c

@@ -101,6 +101,7 @@ LSUP_graph_new(const LSUP_store_type store_type)
 
     LSUP_Graph *gr;
     CRITICAL(gr = malloc(sizeof(LSUP_Graph)));
+    gr->uri = LSUP_uri_new(NULL);
 
     default_ctx_init();
 
@@ -111,7 +112,10 @@ LSUP_graph_new(const LSUP_store_type store_type)
         gr->mdb_store = LSUP_mdbstore_new(
                 getenv("LSUP_MDB_STORE_PATH"), default_ctx);
 
-    } else return NULL;
+    } else {
+        gr->mdb_store = LSUP_mdbstore_new(
+                TMPDIR "/lsup_mem_graph", default_ctx);
+    }
 
     return gr;
 }
@@ -255,8 +259,9 @@ LSUP_graph_add(
      * NOTE It is possible to pass both sets of RDF triples and buffer triples.
      */
 
-    /* TODO uncomment
     if (gr->store_type == LSUP_STORE_MEM) {
+        return LSUP_NOT_IMPL_ERR;
+    /* TODO
         // Resize all at once if needed.
         htsize_t prealloc = LSUP_htstore_size(gr->ht_store) + trp_ct + strp_ct;
         if (LSUP_htstore_capacity(gr->ht_store) < prealloc) {
@@ -299,10 +304,8 @@ LSUP_graph_add(
         }
 
         return rc;
-    }
     */
-
-    if (gr->store_type == LSUP_STORE_MDB) {
+    } else {
         rc = LSUP_NOACTION;
 
         LSUP_Buffer sc;
@@ -343,7 +346,7 @@ LSUP_graph_add(
             if (UNLIKELY (db_rc < 0)) return db_rc;
         }
 
-        LSUP_mdbstore_add_done(it, NULL);
+        LSUP_mdbstore_add_done(it);
 
         return rc;
     }

+ 116 - 72
src/store_mdb.c

@@ -199,7 +199,7 @@ static const uint8_t lookup_ordering_2bound[3][3] = {
  */
 static int index_triple(
         LSUP_MDBStore *store, StoreOp op,
-        LSUP_TripleKey spok, LSUP_Key ck);
+        LSUP_TripleKey spok, LSUP_Key ck, MDB_txn *txn);
 
 inline static LSUP_rc lookup_0bound(
         MDBStore *store, MDBIterator *it, size_t *ct);
@@ -380,15 +380,19 @@ LSUP_mdbstore_add_init(LSUP_MDBStore *store, const LSUP_Buffer *sc)
     /* An iterator is used here. Some members are a bit misused but it does
      * its job without having to define a very similar struct.
      */
-    MDBIterator *it;
-    CRITICAL (it = malloc (sizeof (*it)));
+    MDBIterator *it = malloc (sizeof (*it));
+    if (!it) return NULL;
+
     it->store = store;
     it->i = 0;
 
-    if (!store->txn) {
-        mdb_txn_begin(store->env, NULL, 0, &store->txn);
-        // We are starting the main DB txn and we need to close it afterwards.
-        it->state |= LSSTORE_DIRTY_TXN;
+    // If the main store transaction is open, use that, otherwise open a new
+    // RW child transaction owned by the iterator.
+    if (!it->store->txn) {
+        mdb_txn_begin(store->env, NULL, 0, &it->store->txn);
+        it->txn = it->store->txn;
+    } else {
+        mdb_txn_begin(store->env, it->store->txn, 0, &it->txn);
     }
 
     // Take care of context first.
@@ -410,7 +414,7 @@ LSUP_mdbstore_add_init(LSUP_MDBStore *store, const LSUP_Buffer *sc)
         if (mdb_put(
                 store->txn, store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE) != MDB_SUCCESS)
-            it->rc = LSUP_DB_ERR;
+            return NULL;
     }
 
     return it;
@@ -421,30 +425,29 @@ LSUP_rc
 LSUP_mdbstore_add_iter(MDBIterator *it, const LSUP_SerTriple *sspo)
 {
     int db_rc;
+    LSUP_rc rc;
     LSUP_TripleKey spok = NULL_TRP;
 
     // Add triple.
-    for (int j = 0; j < 3; j++) {
-        LSUP_Buffer *st = LSUP_striple_pos(sspo, j);
+    for (int i = 0; i < 3; i++) {
+        LSUP_Buffer *st = LSUP_striple_pos(sspo, i);
 
         printf("Inserting term: ");
         LSUP_buffer_print(st);
         printf("\n");
 
-        spok[j] = LSUP_sterm_to_key(st);
+        spok[i] = LSUP_sterm_to_key(st);
 
-        it->key.mv_data = spok + j;
+        it->key.mv_data = spok + i;
         it->key.mv_size = KLEN;
         it->data.mv_data = st->addr;
         it->data.mv_size = st->size;
 
         db_rc = mdb_put(
-                it->store->txn, it->store->dbi[IDX_T_ST],
+                it->txn, it->store->dbi[IDX_T_ST],
                 &it->key, &it->data, MDB_NOOVERWRITE);
-        if (db_rc == MDB_SUCCESS) it->rc = LSUP_OK;
-        else if (db_rc != MDB_KEYEXIST) {
-            it->rc = LSUP_DB_ERR;
-            return it->rc;
+        if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
+            return LSUP_DB_ERR;
         }
     }
 
@@ -459,54 +462,70 @@ LSUP_mdbstore_add_iter(MDBIterator *it, const LSUP_SerTriple *sspo)
     it->data.mv_size = it->ck == NULL_KEY ? 0 : KLEN;
 
     db_rc = mdb_put(
-            it->store->txn, it->store->dbi[IDX_SPO_C],
+            it->txn, it->store->dbi[IDX_SPO_C],
             &it->key, &it->data, MDB_NODUPDATA);
-    if (db_rc == MDB_SUCCESS) it->rc = LSUP_OK;
-    else if (db_rc != MDB_KEYEXIST) {
-        it->rc = LSUP_DB_ERR;
-        return it->rc;
-    }
 
-    // Index.
-    it->rc = index_triple(it->store, OP_ADD, spok, it->ck);
+    if (db_rc == MDB_KEYEXIST) return LSUP_NOACTION;
+    if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
 
-    if(it->rc == LSUP_OK) it->i++;
+    // Index.
+    rc = index_triple (it->store, OP_ADD, spok, it->ck, it->txn);
+    if (rc == LSUP_OK) it->i++;
 
-    return it->rc;
+    return rc;
 }
 
 
 LSUP_rc
-LSUP_mdbstore_add_done(MDBIterator *it, size_t *inserted)
+LSUP_mdbstore_add_done (MDBIterator *it)
 {
-    // Only return commit rc if it fails.
-    if (it->state & LSSTORE_DIRTY_TXN) {
-        if (it->rc == LSUP_OK) {
-            if(mdb_txn_commit(it->store->txn) != MDB_SUCCESS) {
-                mdb_txn_abort(it->store->txn);
-                it->rc = LSUP_DB_ERR;
-            }
-        } else mdb_txn_abort(it->store->txn);
+    LSUP_rc rc = LSUP_OK;
 
-        it->store->txn = NULL;
+    if (mdb_txn_commit (it->txn) != MDB_SUCCESS) {
+        mdb_txn_abort (it->txn);
+        rc = LSUP_DB_ERR;
     }
 
-    return it->rc;
+    if (it->txn == it->store->txn) it->store->txn = NULL;
+    it->txn = NULL;
+
+    free (it);
+
+    return rc;
+}
+
+
+void
+LSUP_mdbstore_add_abort (MDBIterator *it)
+{
+    mdb_txn_abort (it->txn);
+
+    if (it->txn == it->store->txn) it->store->txn = NULL;
+    it->txn = NULL;
+
+    free (it);
 }
 
 
 LSUP_rc
-LSUP_mdbstore_add(
+LSUP_mdbstore_add (
         LSUP_MDBStore *store, const LSUP_Buffer *sc,
         const LSUP_SerTriple strp[], const size_t ct, size_t *inserted)
 {
     MDBIterator *it = LSUP_mdbstore_add_init(store, sc);
-    if (it->rc < 0) {
-        for (size_t i = 0; i < ct; i++)
-            if (LSUP_mdbstore_add_iter(it, strp + i) < 0) break;
+    if (UNLIKELY (!it)) return LSUP_DB_ERR;
+
+    for (size_t i = 0; i < ct; i++) {
+        LSUP_rc rc = LSUP_mdbstore_add_iter (it, strp + i);
+        if (UNLIKELY (rc < 0)) {
+            LSUP_mdbstore_add_abort (it);
+            return rc;
+        }
     }
 
-    return LSUP_mdbstore_add_done(it, inserted);
+    *inserted = it->i;
+
+    return LSUP_mdbstore_add_done(it);
 }
 
 
@@ -704,6 +723,11 @@ LSUP_mdbiter_next(LSUP_MDBIterator *it, LSUP_SerTriple *sspo)
 }
 
 
+size_t
+LSUP_mdbiter_i (LSUP_MDBIterator *it)
+{ return it->i; }
+
+
 void
 LSUP_mdbiter_free(MDBIterator *it)
 {
@@ -774,7 +798,7 @@ LSUP_mdbstore_remove(
         if (rc == MDB_SUCCESS) continue;
         if (UNLIKELY(rc != MDB_NOTFOUND)) goto _remove_abort;
 
-        index_triple(store, OP_REMOVE, it->spok, ck);
+        index_triple(store, OP_REMOVE, it->spok, ck, NULL);
     }
 
     if(UNLIKELY(mdb_txn_commit(txn) != MDB_SUCCESS)) {
@@ -812,16 +836,27 @@ static int rmrf(char *path)
 */
 
 
+/** @brief Index an added or removed triple.
+ *
+ * @param store[in] MDB store to index.
+ * @param op[in] Store operation. One of OP_ADD or OP_REMOVE.
+ * @param spok[in] Triple key to index.
+ * @param ck[in] Context to index, may be NULL.
+ * @param txn[in] If not NULL, use this transaction instead of the store one.
+ */
 static LSUP_rc
 index_triple(
         LSUP_MDBStore *store, StoreOp op,
-        LSUP_TripleKey spok, LSUP_Key ck)
+        LSUP_TripleKey spok, LSUP_Key ck, MDB_txn *txn)
 {
-    int rc = LSUP_NOACTION;
+    int db_rc;
+    LSUP_rc rc = LSUP_NOACTION;
     MDB_val v1, v2;
 
     printf("Indexing triple: %lx %lx %lx\n", spok[0], spok[1], spok[2]);
 
+    if (!txn) txn = store->txn;
+
     // Index c:spo.
     if (op == OP_REMOVE) {
         if (ck != NULL_KEY) {
@@ -832,9 +867,13 @@ index_triple(
             v2.mv_data = spok;
             v2.mv_size = TRP_KLEN;
 
-            mdb_cursor_open(store->txn, store->dbi[IDX_C_SPO], &cur);
-            rc = mdb_cursor_get(cur, &v1, &v2, MDB_GET_BOTH);
-            if(rc == MDB_SUCCESS) mdb_cursor_del(cur, 0);
+            mdb_cursor_open(txn, store->dbi[IDX_C_SPO], &cur);
+            if (mdb_cursor_get(cur, &v1, &v2, MDB_GET_BOTH) == MDB_SUCCESS) {
+                db_rc = mdb_cursor_del (cur, 0);
+                if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
+
+                rc = LSUP_OK;
+            }
 
             mdb_cursor_close(cur);
         }
@@ -846,9 +885,10 @@ index_triple(
             v2.mv_data = spok;
             v2.mv_size = TRP_KLEN;
 
-            mdb_put(
-                    store->txn, store->dbi[IDX_C_SPO],
-                    &v1, &v2, MDB_NODUPDATA);
+            db_rc = mdb_put(
+                    txn, store->dbi[IDX_C_SPO], &v1, &v2, MDB_NODUPDATA);
+            if (db_rc != MDB_SUCCESS) return LSUP_DB_ERR;
+            if (db_rc != MDB_KEYEXIST) rc = LSUP_OK;
         }
 
     } else return LSUP_VALUE_ERR;
@@ -863,7 +903,6 @@ index_triple(
     v1.mv_size = KLEN;
     v2.mv_size = DBL_KLEN;
 
-    int db_rc;
     for (int i = 0; i < 3; i++) {
         MDB_dbi db1 = store->dbi[lookup_indices[i]];      // s:po, p:so, o:sp
         MDB_dbi db2 = store->dbi[lookup_indices[i + 3]];  // po:s, so:p, sp:o
@@ -873,10 +912,10 @@ index_triple(
 
         if (op == OP_REMOVE) {
             MDB_cursor *cur1, *cur2;
-            mdb_cursor_open(store->txn, store->dbi[lookup_indices[i]], &cur1);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
 
-            rc = mdb_cursor_get(cur1, &v1, &v2, MDB_GET_BOTH);
-            if (rc == MDB_SUCCESS) mdb_cursor_del(cur1, 0);
+            db_rc = mdb_cursor_get(cur1, &v1, &v2, MDB_GET_BOTH);
+            if (db_rc == MDB_SUCCESS) mdb_cursor_del(cur1, 0);
 
             mdb_cursor_close(cur1);
 
@@ -884,32 +923,37 @@ index_triple(
             v1.mv_data = spok + i;
             v2.mv_data = dbl_keys[i];
 
-            mdb_cursor_open(
-                    store->txn, store->dbi[lookup_indices[i + 3]], &cur2);
+            mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
 
-            rc = mdb_cursor_get(cur2, &v2, &v1, MDB_GET_BOTH);
-            if (rc == MDB_SUCCESS) mdb_cursor_del(cur2, 0);
+            db_rc = mdb_cursor_get(cur2, &v2, &v1, MDB_GET_BOTH);
+            if (db_rc == MDB_SUCCESS) mdb_cursor_del(cur2, 0);
+            // TODO error handling.
+            rc = LSUP_OK;
 
             mdb_cursor_close(cur2);
 
         } else { // OP_ADD is guaranteed.
-            printf("Indexing in %s: ", db_labels[lookup_indices[i]]);
-            printf(
+            // 1-bound index.
+            TRACE("Indexing in %s: ", db_labels[lookup_indices[i]]);
+            TRACE(
                     "%lx: %lx %lx\n", *(size_t*)(v1.mv_data),
                     *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
 
-            db_rc = mdb_put(store->txn, db1, &v1, &v2, MDB_NODUPDATA);
-            if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST)
-                return LSUP_DB_ERR;
+            db_rc = mdb_put(txn, db1, &v1, &v2, MDB_NODUPDATA);
+
+            if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
+            else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;
 
-            printf("Indexing in %s: ", db_labels[lookup_indices[i + 3]]);
-            printf(
+            // 2-bound index.
+            TRACE("Indexing in %s: ", db_labels[lookup_indices[i + 3]]);
+            TRACE(
                     "%lx %lx: %lx\n", *(size_t*)(v2.mv_data),
                     *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
 
-            db_rc = mdb_put(store->txn, db2, &v2, &v1, MDB_NODUPDATA);
-            if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST)
-                return LSUP_DB_ERR;
+            db_rc = mdb_put(txn, db2, &v2, &v1, MDB_NODUPDATA);
+
+            if (db_rc == MDB_SUCCESS) rc = LSUP_OK;
+            else if (db_rc != MDB_KEYEXIST) return LSUP_DB_ERR;
         }
     }
 
@@ -1013,10 +1057,10 @@ it_next_3bound(MDBIterator *it)
 inline static LSUP_rc
 lookup_0bound(MDBStore *store, MDBIterator *it, size_t *ct)
 {
-    if(store->txn) it->txn = store->txn;
+    if (store->txn) it->txn = store->txn;
     else {
         it->rc = mdb_txn_begin(store->env, NULL, MDB_RDONLY, &it->txn);
-        if (it->rc != MDB_SUCCESS) abort();
+        if (it->rc != MDB_SUCCESS) abort(); // TODO handle error
     }
 
     if(ct) {