Kaynağa Gözat

[WIP] more debugging.

Stefano Cossu 5 yıl önce
ebeveyn
işleme
d8125c750d

+ 8 - 0
lakesuperior/cy_include/collections.pxd

@@ -19,6 +19,10 @@ cdef extern from "common.h":
         CC_ERR_VALUE_NOT_FOUND
         CC_ERR_OUT_OF_RANGE
         CC_ITER_END
+
+cdef:
+    key_compare_ft CC_CMP_STRING
+    key_compare_ft CC_CMP_POINTER
 #
 #    int cc_common_cmp_str(const void* key1, const void* key2)
 #
@@ -55,6 +59,10 @@ cdef extern from "array.h":
 
     void array_destroy(Array* ar)
 
+    hash_ft CC_GENERAL_HASH
+    hash_ft CC_STRING_HASH
+    hash_ft CC_POINTER_HASH
+
 #    ctypedef void (*_array_destroy_cb_cb_ft)(void*)
 #
 #    void array_destroy_cb(Array* ar, _array_destroy_cb_cb_ft cb)

+ 1 - 1
lakesuperior/store/ldp_rs/lmdb_triplestore.pxd

@@ -44,7 +44,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         void _add_graph(self, Buffer* pk_gr) except *
         void _index_triple(self, int op, TripleKey spok) except *
         Keyset triple_keys(self, tuple triple_pattern, context=*)
-        void _all_term_keys(self, term_type, cc.HashSet* tkeys) except *
+        void _all_term_keys(self, term_type, cc.HashSet** tkeys) except *
         void lookup_term(self, const Key tk, Buffer* data) except *
         Keyset _lookup(self, tuple triple_pattern)
         Keyset _lookup_1bound(self, unsigned char idx, term)

+ 46 - 34
lakesuperior/store/ldp_rs/lmdb_triplestore.pyx

@@ -8,8 +8,9 @@ from lakesuperior.store.base_lmdb_store import (
         KeyExistsError, KeyNotFoundError, LmdbError)
 from lakesuperior.store.base_lmdb_store cimport _check
 
+from cpython.mem import PyMem_Malloc, PyMem_Free
 from cython.parallel import prange
-from libc.stdlib cimport free
+from libc.stdlib cimport malloc, free
 from libc.string cimport memcpy
 
 cimport lakesuperior.cy_include.collections as cc
@@ -759,13 +760,13 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             btrp[cur].o = buffers + cur * 3 + 2
 
             #logger.info('Looking up key: {}'.format(spok[:KLEN]))
-            self.lookup_term(spok, buffers + cur * 3)
+            self.lookup_term(<KeyIdx*>spok, buffers + cur * 3)
             #logger.info(f'Found triple s: {buffer_dump(btrp[cur].s)}')
             #logger.info('Looking up key: {}'.format(spok[KLEN:DBL_KLEN]))
-            self.lookup_term(spok + KLEN, buffers + cur * 3 + 1)
+            self.lookup_term(<KeyIdx*>(spok + KLEN), buffers + cur * 3 + 1)
             #logger.info(f'Found triple p: {buffer_dump(btrp[cur].p)}')
             #logger.info('Looking up key: {}'.format(spok[DBL_KLEN:TRP_KLEN]))
-            self.lookup_term(spok + DBL_KLEN, buffers + cur * 3 + 2)
+            self.lookup_term(<KeyIdx*>(spok + DBL_KLEN), buffers + cur * 3 + 2)
             #logger.info(f'Found triple o: {buffer_dump(btrp[cur].o)}')
 
             gr.add_triple(btrp + cur, copy)
@@ -886,7 +887,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                     data_v.mv_size = TRP_KLEN
 
                     flt_res = Keyset(res.ct)
-                    cc.hashset_iter_init(&it, res.data)
+                    cc.array_iter_init(&it, res.data)
                     while cc.array_iter_next(&it, &cur) != cc.CC_ITER_END:
                         #logger.debug('Checking row #{}'.format(flt_j))
                         data_v.mv_data = cur
@@ -902,7 +903,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                             #logger.debug('Discarding source[{}].'.format(j))
                             continue
                         else:
-                            cc.hashset_add(flt_res_data, cur)
+                            cc.array_add(flt_res.data, cur)
 
                     return flt_res
             finally:
@@ -1030,7 +1031,9 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             size_t ct, ret_offset = 0, src_pos, ret_pos
             size_t j # Must be signed for older OpenMP versions
             lmdb.MDB_cursor *icur
-            #Key luk
+            lmdb.MDB_val key_v, data_v
+            Key luk
+            TripleKey spok
 
         #logger.debug(f'lookup 1bound: {idx}, {term}')
         try:
@@ -1079,18 +1082,24 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 #            (<unsigned char *>data_v.mv_data)[: data_v.mv_size]))
                 for j in prange(data_v.mv_size // DBL_KLEN, nogil=True):
                     src_pos = DBL_KLEN * j
-                    ret_pos = (ret_offset + ret.itemsize * j)
-                    # TODO Try to fit this in the Key / TripleKey schema.
-                    memcpy(ret.data + ret_pos + asm_rng[0], luk, KLEN)
-                    memcpy(ret.data + ret_pos + asm_rng[1],
-                            data_v.mv_data + src_pos, KLEN)
-                    memcpy(ret.data + ret_pos + asm_rng[2],
-                            data_v.mv_data + src_pos + KLEN, KLEN)
+                    spok[0] = luk[0]
+                    spok[1] = <KeyIdx>(data_v.mv_data + src_pos)
+                    spok[2] = <KeyIdx>(data_v.mv_data + src_pos + KLEN)
+
+                    cc.array_add(ret.data, spok)
+                    #ret_pos = ret_offset + ret.itemsize * j
+                    ## TODO Try to fit this in the Key / TripleKey schema.
+                    #memcpy(ret.data + ret_pos + asm_rng[0], luk, KLEN)
+                    #memcpy(ret.data + ret_pos + asm_rng[1],
+                    #        data_v.mv_data + src_pos, KLEN)
+                    #memcpy(ret.data + ret_pos + asm_rng[2],
+                    #        data_v.mv_data + src_pos + KLEN, KLEN)
+
 
                 # Increment MUST be done before MDB_NEXT_MULTIPLE otherwise
                 # data_v.mv_size will be overwritten with the *next* page size
                 # and cause corruption in the output data.
-                ret_offset += data_v.mv_size // DBL_KLEN * ret.itemsize
+                #ret_offset += data_v.mv_size // DBL_KLEN * ret.itemsize
 
                 try:
                     # Get results by the page.
@@ -1233,7 +1242,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             self._cur_close(icur)
 
 
-    cdef void _all_term_keys(self, term_type, cc.HashSet* tkeys) except *:
+    cdef void _all_term_keys(self, term_type, cc.HashSet** tkeys) except *:
         """
         Return all keys of a (``s:po``, ``p:so``, ``o:sp``) index.
         """
@@ -1248,14 +1257,14 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         try:
             _check(lmdb.mdb_stat(self.txn, lmdb.mdb_cursor_dbi(icur), &stat))
 
-            cc.hash_conf_init(&tkeys_conf)
+            cc.hashset_conf_init(&tkeys_conf)
             tkeys_conf.initial_capacity = 1024
             tkeys_conf.load_factor = .75
             tkeys_conf.key_length = KLEN
-            tkeys_conf.key_compare = cc.CMP_POINTER
-            tkeys_conf.hash = cc.POINTER_HASH
+            tkeys_conf.key_compare = cc.CC_CMP_POINTER
+            tkeys_conf.hash = cc.CC_POINTER_HASH
 
-            cc.hashset_new_conf(&tkeys_conf, &tkeys)
+            cc.hashset_new_conf(&tkeys_conf, tkeys)
 
             try:
                 _check(lmdb.mdb_cursor_get(
@@ -1264,7 +1273,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 return
 
             while True:
-                cc.hashset_add(tkeys, key_v.mv_data)
+                cc.hashset_add(tkeys[0], key_v.mv_data)
 
                 rc = lmdb.mdb_cursor_get(
                     icur, &key_v, NULL, lmdb.MDB_NEXT_NODUP)
@@ -1284,19 +1293,20 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         """
         cdef:
             void* cur
-            cc.HashSet tkeys
+            cc.HashSet* tkeys
             cc.HashSetIter it
 
         ret = set()
 
         try:
             self._all_term_keys(term_type, &tkeys)
-            cc.hashset_iter(&it, &tkeys)
-            while hashset_iter_next(&it, &cur):
+            cc.hashset_iter_init(&it, tkeys)
+            while cc.hashset_iter_next(&it, &cur):
                 #logger.debug('Yielding: {}'.format(key))
                 ret.add(self.from_key(<Key>cur))
         finally:
-            free(tkeys)
+            if tkeys:
+                free(tkeys)
 
         return ret
 
@@ -1344,6 +1354,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         :rtype: Iterator(lakesuperior.model.graph.graph.Imr)
         """
         cdef:
+            size_t ct
             lmdb.MDB_cursor_op seek_op, scan_op
             lmdb.MDB_stat stat
             lmdb.MDB_val key_v
@@ -1353,6 +1364,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 self._cur_open('spo:c') if triple and all(triple)
                 else self._cur_open('c:'))
 
+        key_v.mv_data = &spok
         if triple and all(triple):
             lmdb_seek_op = lmdb.MDB_SET_KEY
             lmdb_scan_op = lmdb.MDB_NEXT_DUP
@@ -1361,12 +1373,11 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 self._to_key_idx(triple[1]),
                 self._to_key_idx(triple[2]),
             ]
-            key_v.mv_data = &spok
             key_v.mv_size = TRP_KLEN
         else:
-            lmdb_seek_op = MDB_FIRST
-            lmdb_scan_op = MDB_NEXT
-            key_v = NULL
+            lmdb_seek_op = lmdb.MDB_FIRST
+            lmdb_scan_op = lmdb.MDB_NEXT
+            key_v.mv_size = 0
 
         try:
             _check(lmdb.mdb_stat(
@@ -1376,13 +1387,14 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 _check(lmdb.mdb_cursor_get(
                         cur, &key_v, &data_v, seek_op))
             except KeyNotFoundError:
-                return tuple()
+                ctx[0] = NULL
+                return
 
-            ctx[0] = <KeyIdx*>PyMem_Malloc(stat.ms_entries * KLEN)
+            ctx[0] = <KeyIdx*>malloc(stat.ms_entries * KLEN)
             sz[0] = 0
 
             while True:
-                ctx[0][sz[0]] = data_v.mv_data
+                ctx[0][sz[0]] = <KeyIdx>data_v.mv_data[0]
                 try:
                     _check(lmdb.mdb_cursor_get(
                         cur, &key_v, &data_v, scan_op))
@@ -1415,7 +1427,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 lmdb.mdb_get(
                     self.txn, self.get_dbi('t:st'), &key_v, &data_v
                 ),
-                f'Error getting data for key \'{key}\'.')
+                f'Error getting data for key \'{tk[0]}\'.')
         data.addr = data_v.mv_data
         data.sz = data_v.mv_size
         #logger.info('Found term: {}'.format(buffer_dump(data)))
@@ -1429,7 +1441,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         """
         cdef Buffer pk_t
 
-        self.lookup_term(key, &pk_t)
+        self.lookup_term(tk, &pk_t)
 
         # TODO Make Term a class and return that.
         return deserialize_to_rdflib(&pk_t)