Quellcode durchsuchen

[WIP] Fix some segfaults when looking at data.

Stefano Cossu vor 5 Jahren
Ursprung
Commit
f2c4089180

+ 1 - 0
lakesuperior/model/base.pxd

@@ -2,3 +2,4 @@ from lakesuperior.cy_include cimport cytpl as tpl
 
 ctypedef tpl.tpl_bin Buffer
 
+cdef bytes buffer_dump(Buffer* buf)

+ 11 - 0
lakesuperior/model/base.pyx

@@ -0,0 +1,11 @@
+cdef bytes buffer_dump(const Buffer* buf):
+    """
+    Return a buffer's content as a string.
+
+    :param const Buffer* buf Pointer to a buffer to be read.
+
+    :rtype: str
+    """
+    cdef unsigned char* buf_stream = (<unsigned char*>buf.addr)
+    return buf_stream[:buf.sz]
+

+ 11 - 7
lakesuperior/model/graph/graph.pyx

@@ -16,14 +16,14 @@ from cymem.cymem cimport Pool
 from lakesuperior.cy_include cimport cylmdb as lmdb
 from lakesuperior.cy_include cimport collections as cc
 from lakesuperior.cy_include cimport spookyhash as sph
+from lakesuperior.model.base cimport Buffer, buffer_dump
 from lakesuperior.model.graph cimport term
-from lakesuperior.store.ldp_rs.lmdb_triplestore cimport (
-        KLEN, DBL_KLEN, TRP_KLEN, TripleKey)
-from lakesuperior.model.structures.hash cimport term_hash_seed32
-from lakesuperior.model.structures.keyset cimport Keyset
-from lakesuperior.model.base cimport Buffer
 from lakesuperior.model.graph.triple cimport BufferTriple
 from lakesuperior.model.structures.hash cimport hash64
+from lakesuperior.model.structures.hash cimport term_hash_seed32
+from lakesuperior.model.structures.keyset cimport Keyset
+from lakesuperior.store.ldp_rs.lmdb_triplestore cimport (
+        KLEN, DBL_KLEN, TRP_KLEN, TripleKey)
 
 cdef extern from 'spookyhash_api.h':
     uint64_t spookyhash_64(const void *input, size_t input_size, uint64_t seed)
@@ -303,17 +303,21 @@ cdef class SimpleGraph:
         cdef:
             void *void_p
             cc.HashSetIter ti
-            term.Term s, p, o
+            Buffer* ss, sp, so
 
         graph_set = set()
 
         cc.hashset_iter_init(&ti, self._triples)
         while cc.hashset_iter_next(&ti, &void_p) != cc.CC_ITER_END:
+            logger.info(f'Data loop.')
             if void_p == NULL:
                 logger.warn('Triple is NULL!')
                 break
 
             trp = <BufferTriple *>void_p
+            print(f'trp.s: {buffer_dump(trp.s)}')
+            print(f'trp.p: {buffer_dump(trp.p)}')
+            print(f'trp.o: {buffer_dump(trp.o)}')
             graph_set.add((
                 term.deserialize_to_rdflib(trp.s),
                 term.deserialize_to_rdflib(trp.p),
@@ -761,7 +765,7 @@ cdef class SimpleGraph:
         while cc.hashset_iter_next(&it, &cur) != cc.CC_ITER_END:
             bt = <BufferTriple*>cur
             if other.trp_contains(bt):
-                print(self.remove_triple(bt))
+                self.remove_triple(bt)
 
         self |= tmp
 

+ 7 - 2
lakesuperior/model/graph/term.pyx

@@ -8,7 +8,7 @@ from libc.string cimport memcpy
 from cymem.cymem cimport Pool
 
 from lakesuperior.cy_include cimport cytpl as tpl
-from lakesuperior.model.base cimport Buffer
+from lakesuperior.model.base cimport Buffer, buffer_dump
 
 
 DEF LSUP_TERM_TYPE_URIREF = 1
@@ -44,6 +44,7 @@ cdef int deserialize(const Buffer *data, Term *term) except -1:
     """
     Return a term from serialized binary data.
     """
+    #print(f'Deserializing: {buffer_dump(data)}')
     _pk = tpl.tpl_peek(
             tpl.TPL_MEM | tpl.TPL_DATAPEEK, data[0].addr, data[0].sz,
             LSUP_TERM_PK_FMT, &(term[0].type), &(term[0].data),
@@ -120,7 +121,11 @@ cdef object to_rdflib(const Term *term):
     """
     cdef str data = (<bytes>term[0].data).decode()
     if term[0].type == LSUP_TERM_TYPE_LITERAL:
-        return Literal(data, datatype=term[0].datatype, lang=term[0].lang)
+        return Literal(
+            data,
+            datatype=term[0].datatype if not term[0].lang else None,
+            lang=term[0].lang or None
+        )
     else:
         if term[0].type == LSUP_TERM_TYPE_URIREF:
             return URIRef(data)

+ 1 - 1
lakesuperior/model/structures/keyset.pxd

@@ -7,6 +7,6 @@ cdef class Keyset:
 
         void resize(self, size_t ct) except *
         unsigned char *get_item(self, i)
-        bint next(self, void *val)
+        bint iter_next(self, unsigned char** val)
         bint contains(self, const void *val)
 

+ 6 - 6
lakesuperior/model/structures/keyset.pyx

@@ -112,7 +112,7 @@ cdef class Keyset:
         return self.get_item(i)[: self.itemsize]
 
 
-    def reset(self):
+    def iter_init(self):
         """
         Reset the cursor to the initial position.
         """
@@ -138,7 +138,7 @@ cdef class Keyset:
         return self.data + self.itemsize * i
 
 
-    cdef bint next(self, void *val):
+    cdef bint iter_next(self, unsigned char** val):
         """
         Populate the current value and advance the cursor by 1.
 
@@ -153,7 +153,7 @@ cdef class Keyset:
             val = NULL
             return False
 
-        val = self.data + self.itemsize * self._cur
+        val[0] = self.data + self.itemsize * self._cur
         self._cur += 1
 
         return True
@@ -163,10 +163,10 @@ cdef class Keyset:
         """
         Whether a value exists in the set.
         """
-        cdef void *stored_val
+        cdef unsigned char* stored_val
 
-        self.reset()
-        while self.next(stored_val):
+        self.iter_init()
+        while self.iter_next(&stored_val):
             if memcmp(val, stored_val, self.itemsize) == 0:
                 return True
         return False

+ 3 - 1
lakesuperior/store/ldp_rs/lmdb_triplestore.pxd

@@ -2,6 +2,7 @@ cimport lakesuperior.cy_include.cylmdb as lmdb
 cimport lakesuperior.cy_include.cytpl as tpl
 
 from lakesuperior.model.base cimport Buffer
+from lakesuperior.model.graph.graph cimport SimpleGraph
 from lakesuperior.model.structures.keyset cimport Keyset
 from lakesuperior.store.base_lmdb_store cimport BaseLmdbStore
 
@@ -62,13 +63,14 @@ cdef class LmdbTriplestore(BaseLmdbStore):
     cpdef void _remove_graph(self, object gr_uri) except *
     cpdef tuple all_namespaces(self)
     cpdef tuple all_contexts(self, triple=*)
+    cpdef SimpleGraph graph_lookup(self, triple_pattern, context=*)
 
     cdef:
         void _add_graph(self, Buffer *pk_gr) except *
         void _index_triple(self, str op, TripleKey spok) except *
         Keyset triple_keys(self, tuple triple_pattern, context=*)
         Keyset _all_term_keys(self, term_type)
-        inline int lookup_term(self, const Key key, Buffer *data) except -1
+        inline void lookup_term(self, const Key key, Buffer* data) except *
         Keyset _lookup(self, tuple triple_pattern)
         Keyset _lookup_1bound(self, unsigned char idx, term)
         Keyset _lookup_2bound(

+ 39 - 17
lakesuperior/store/ldp_rs/lmdb_triplestore.pyx

@@ -13,7 +13,10 @@ from libc.stdlib cimport free
 from libc.string cimport memcpy
 
 cimport lakesuperior.cy_include.cylmdb as lmdb
+from lakesuperior.model.base cimport buffer_dump
+from lakesuperior.model.graph.graph cimport SimpleGraph, Imr
 from lakesuperior.model.graph.term cimport Term
+from lakesuperior.model.graph.triple cimport BufferTriple
 
 from lakesuperior.store.base_lmdb_store cimport (
         BaseLmdbStore, data_v, dbi, key_v)
@@ -693,7 +696,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             self._cur_close(cur)
 
 
-    cpdef SimpleGraph graph(self, triple_pattern, context=None):
+    cpdef SimpleGraph graph_lookup(self, triple_pattern, context=None):
         """
         Create a SimpleGraph instance from "borrowed" buffers from the store.
 
@@ -713,22 +716,35 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         in.
         """
         cdef:
-            unsigned char spok[TRP_KLEN]
+            unsigned char* spok
             size_t cur = 0
-            lmdb.MDB_val key_v, data_v
             SimpleGraph gr = SimpleGraph()
-            BufferTriple* trp_buffer
 
         logger.debug(
                 'Getting triples for: {}, {}'.format(triple_pattern, context))
 
-        match = self.triple_keys(triple_pattern, context)
-        trp_buffer = gr.pool.alloc(len(match), sizeof(BufferTriple))
-        for spok in match:
-            (trp_buffer + cur).s = self.lookup_term(spok[: KLEN])
-            (trp_buffer + cur).p = self.lookup_term(spok[KLEN: DBL_KLEN])
-            (trp_buffer + cur).o = self.lookup_term(spok[DBL_KLEN: TRP_KLEN])
-            gr.add_triple(trp_buffer + cur)
+        spok_a = self.triple_keys(triple_pattern, context)
+        btrp = <BufferTriple*>gr.pool.alloc(spok_a.ct, sizeof(BufferTriple))
+        buffers = <Buffer*>gr.pool.alloc(3 * spok_a.ct, sizeof(Buffer))
+
+        spok_a.iter_init()
+        while spok_a.iter_next(&spok):
+            btrp[cur].s = buffers + cur * 3
+            btrp[cur].p = buffers + cur * 3 + 1
+            btrp[cur].o = buffers + cur * 3 + 2
+
+            #logger.info('Looking up key: {}'.format(spok[:KLEN]))
+            self.lookup_term(spok, buffers + cur * 3)
+            #logger.info(f'Found triple s: {buffer_dump(btrp[cur].s)}')
+            #logger.info('Looking up key: {}'.format(spok[KLEN:DBL_KLEN]))
+            self.lookup_term(spok + KLEN, buffers + cur * 3 + 1)
+            #logger.info(f'Found triple p: {buffer_dump(btrp[cur].p)}')
+            #logger.info('Looking up key: {}'.format(spok[DBL_KLEN:TRP_KLEN]))
+            self.lookup_term(spok + DBL_KLEN, buffers + cur * 3 + 2)
+            #logger.info(f'Found triple o: {buffer_dump(btrp[cur].o)}')
+
+            gr.add_triple(btrp + cur)
+            cur += 1
 
         return gr
 
@@ -1367,12 +1383,12 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         """
         cdef Buffer pk_t
 
-        #self.lookup_term(key, &pk_t)
+        self.lookup_term(key, &pk_t)
 
-        return deserialize_to_rdflib(self.lookup_term(key))
+        return deserialize_to_rdflib(&pk_t)
 
 
-    cdef inline Buffer lookup_term(self, const Key key, Buffer *data):
+    cdef inline void lookup_term(self, const Key key, Buffer* data) except *:
         """
         look up a term by key.
 
@@ -1385,11 +1401,17 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         key_v.mv_data = key
         key_v.mv_size = KLEN
 
+        logger.info('So far so good[0].')
+        logger.info(f'Size of mdb_val: {sizeof(lmdb.MDB_val)}; size of buffer: {sizeof(Buffer)}')
         _check(
-                lmdb.mdb_get(self.txn, self.get_dbi('t:st'), &key_v, &data_v),
+                lmdb.mdb_get(
+                    self.txn, self.get_dbi('t:st'), &key_v, &data_v
+                ),
                 f'Error getting data for key \'{key}\'.')
-
-        return <Buffer>data
+        logger.info('So far so good[1].')
+        data.addr = data_v.mv_data
+        data.sz = data_v.mv_size
+        logger.info('Found term: {}'.format(buffer_dump(data)))
 
 
     cdef tuple from_trp_key(self, TripleKey key):

+ 20 - 22
lakesuperior/store/ldp_rs/rsrc_centric_layout.py

@@ -220,7 +220,7 @@ class RsrcCentricLayout:
             with open(fname, 'r') as f:
                 data = Template(f.read())
                 self.ds.update(data.substitute(timestamp=arrow.utcnow()))
-            import pdb; pdb.set_trace()
+            #import pdb; pdb.set_trace()
             imr = self.get_imr('/', incl_inbound=False, incl_children=True)
 
         gr = Graph(identifier=imr.uri)
@@ -252,8 +252,7 @@ class RsrcCentricLayout:
 
         :rtype: SimpleGraph
         """
-        return SimpleGraph(
-                store=self.store, lookup=((subject, None, None), ctx))
+        return self.store.graph_lookup((subject, None, None), ctx)
 
 
     def count_rsrc(self):
@@ -295,8 +294,7 @@ class RsrcCentricLayout:
         imr = Imr(uri=nsc['fcres'][uid])
 
         for ctx in contexts:
-            imr |= SimpleGraph(
-                    lookup=((None, None, None), ctx), store=self.store)
+            imr |= self.store.graph_lookup((None, None, None), ctx)
 
         # Include inbound relationships.
         if incl_inbound and len(imr):
@@ -372,25 +370,23 @@ class RsrcCentricLayout:
         # URI with the subject URI. But the concepts of data and metadata in
         # Fedora are quite fluid anyways...
 
-        # Result graph.
-        imr = SimpleGraph(lookup=(
-            (nsc['fcres'][uid], nsc['fcrepo'].hasVersion, None),
-                nsc['fcadmin'][uid]), store=self.store)
-
         vmeta = Imr(uri=nsc['fcres'][uid])
 
         #Get version graphs proper.
-        for vtrp in imr:
+        for vtrp in self.store.graph_lookup(
+            (nsc['fcres'][uid], nsc['fcrepo'].hasVersion, None),
+            nsc['fcadmin'][uid]
+        ):
             # Add the hasVersion triple to the result graph.
             vmeta.add((vtrp,))
-            vmeta_gr = SimpleGraph(
-                lookup=((
-                    None, nsc['foaf'].primaryTopic, vtrp[2]), HIST_GR_URI),
-                store=self.store)
+            vmeta_gr = self.store.graph_lookup(
+                (None, nsc['foaf'].primaryTopic, vtrp[2]), HIST_GR_URI
+            )
             # Get triples in the meta graph filtering out undesired triples.
             for vmtrp in vmeta_gr:
-                for trp in SimpleGraph(lookup=((
-                        vmtrp[0], None, None), HIST_GR_URI), store=self.store):
+                for trp in self.store.grep_lookup(
+                    (vmtrp[0], None, None), HIST_GR_URI
+                ):
                     if (
                             (trp[1] != nsc['rdf'].type
                             or trp[2] not in self.ignore_vmeta_types)
@@ -415,6 +411,7 @@ class RsrcCentricLayout:
         :return: Inbound triples or subjects.
         """
         # Only return non-historic graphs.
+        # TODO self.store.graph_lookup?
         meta_gr = self.ds.graph(META_GR_URI)
         ptopic_uri = nsc['foaf'].primaryTopic
 
@@ -440,8 +437,9 @@ class RsrcCentricLayout:
         ctx_uri = nsc['fcstruct'][uid]
         cont_p = nsc['ldp'].contains
         def _recurse(dset, s, c):
-            new_dset = SimpleGraph(
-                    lookup=((s, cont_p, None), c), store=self.store)[s : cont_p]
+            new_dset = self.store.graph_lookup(
+                (s, cont_p, None), c
+            )[s : cont_p]
             #new_dset = set(ds.graph(c)[s : cont_p])
             for ss in new_dset:
                 dset.add((ss,))
@@ -460,9 +458,9 @@ class RsrcCentricLayout:
             return _recurse(set(), subj_uri, ctx_uri)
         else:
             #return ds.graph(ctx_uri)[subj_uri : cont_p : ])
-            return SimpleGraph(
-                    lookup=((subj_uri, cont_p, None), ctx_uri),
-                    store=self.store)[subj_uri : cont_p]
+            return self.store.graph_lookup(
+                (subj_uri, cont_p, None), ctx_uri
+            )[subj_uri : cont_p]
 
 
     def get_last_version_uid(self, uid):

+ 7 - 7
tests/1_store/test_lmdb_store.py

@@ -648,7 +648,7 @@ class TestContext:
 
         with store.txn_ctx(True):
             store.add_graph(gr_uri)
-            assert gr_uri in {gr.identifier for gr in store.contexts()}
+            assert gr_uri in {gr.uri for gr in store.contexts()}
 
 
     def test_add_graph_with_triple(self, store):
@@ -663,7 +663,7 @@ class TestContext:
             store.add(trp, ctx_uri)
 
         with store.txn_ctx():
-            assert ctx_uri in {gr.identifier for gr in store.contexts(trp)}
+            assert ctx_uri in {gr.uri for gr in store.contexts(trp)}
 
 
     def test_empty_context(self, store):
@@ -674,10 +674,10 @@ class TestContext:
 
         with store.txn_ctx(True):
             store.add_graph(gr_uri)
-            assert gr_uri in {gr.identifier for gr in store.contexts()}
+            assert gr_uri in {gr.uri for gr in store.contexts()}
         with store.txn_ctx(True):
             store.remove_graph(gr_uri)
-            assert gr_uri not in {gr.identifier for gr in store.contexts()}
+            assert gr_uri not in {gr.uri for gr in store.contexts()}
 
 
     def test_context_ro_txn(self, store):
@@ -697,10 +697,10 @@ class TestContext:
         # allow a lookup in the same transaction, but this does not seem to be
         # possible.
         with store.txn_ctx():
-            assert gr_uri in {gr.identifier for gr in store.contexts()}
+            assert gr_uri in {gr.uri for gr in store.contexts()}
         with store.txn_ctx(True):
             store.remove_graph(gr_uri)
-            assert gr_uri not in {gr.identifier for gr in store.contexts()}
+            assert gr_uri not in {gr.uri for gr in store.contexts()}
 
 
     def test_add_trp_to_ctx(self, store):
@@ -731,7 +731,7 @@ class TestContext:
             assert len(set(store.triples((None, None, None), gr_uri))) == 3
             assert len(set(store.triples((None, None, None), gr2_uri))) == 1
 
-            assert gr2_uri in {gr.identifier for gr in store.contexts()}
+            assert gr2_uri in {gr.uri for gr in store.contexts()}
             assert trp1 in _clean(store.triples((None, None, None)))
             assert trp1 not in _clean(store.triples((None, None, None),
                     RDFLIB_DEFAULT_GRAPH_URI))