瀏覽代碼

[WIP] More adjusting to new structures.

Stefano Cossu 6 年之前
父節點
當前提交
f5c86b66ca
共有 2 個文件被更改,包括 142 次插入138 次删除
  1. 6 8
      lakesuperior/store/ldp_rs/lmdb_triplestore.pxd
  2. 136 130
      lakesuperior/store/ldp_rs/lmdb_triplestore.pyx

+ 6 - 8
lakesuperior/store/ldp_rs/lmdb_triplestore.pxd

@@ -17,7 +17,7 @@ cdef:
     unsigned char lookup_rank[3]
     unsigned char lookup_ordering[3][3]
     unsigned char lookup_ordering_2bound[3][3]
-    unsigned int MDB_INT_KEY_MASK = (
+    unsigned int INT_KEY_MASK = (
         lmdb.MDB_DUPSORT | lmdb.MDB_DUPFIXED | lmdb.MDB_INTEGERKEY
         | lmdb.MDB_REVERSEKEY # TODO Check endianness.
     )
@@ -42,19 +42,17 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
     cdef:
         void _add_graph(self, Buffer* pk_gr) except *
-        void _index_triple(self, str op, TripleKey spok) except *
+        void _index_triple(self, int op, TripleKey spok) except *
         Keyset triple_keys(self, tuple triple_pattern, context=*)
         void _all_term_keys(self, term_type, cc.HashSet* tkeys) except *
-        inline void lookup_term(self, const Key key, Buffer* data) except *
+        void lookup_term(self, const Key tk, Buffer* data) except *
         Keyset _lookup(self, tuple triple_pattern)
         Keyset _lookup_1bound(self, unsigned char idx, term)
         Keyset _lookup_2bound(
                 self, unsigned char idx1, term1, unsigned char idx2, term2)
-        object from_key(self, const Key key)
-        tuple from_trp_key(self, TripleKey key)
-        Key _to_key(self, term)
-        void _to_triple_key(
-                self, tuple terms, TripleKey* tkey) except *
+        object from_key(self, const Key tk)
+        tuple from_trp_key(self, const TripleKey spok)
+        KeyIdx _to_key_idx(self, term)
         void all_contexts(self, KeyIdx** ctx, size_t* sz, triple=*) except *
         KeyIdx _append(
                 self, Buffer *value,

+ 136 - 130
lakesuperior/store/ldp_rs/lmdb_triplestore.pyx

@@ -17,7 +17,7 @@ cimport lakesuperior.cy_include.cylmdb as lmdb
 
 from lakesuperior.model.base cimport (
     KLEN, DBL_KLEN, TRP_KLEN, QUAD_KLEN,
-    KeyIdx, Key, DoubleKey, TripleKey,
+    KeyIdx, Key, DoubleKey, TripleKey, QuadKey,
     Buffer, buffer_dump
 )
 from lakesuperior.model.graph.graph cimport SimpleGraph, Imr
@@ -149,9 +149,11 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         """
         cdef:
             size_t ct
+            Key ck
 
         if context is not None:
-            key_v.mv_data = <Key>self._to_key(context)
+            ck = [self._to_key_idx(context)]
+            key_v.mv_data = &ck
             key_v.mv_size = KLEN
 
             cur = self._cur_open('c:spo')
@@ -212,7 +214,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                     key_v.mv_size = HLEN
                     _check(lmdb.mdb_get(
                             self.txn, self.get_dbi('th:t'), &key_v, &data_v))
-                    spock[i] = (<Key>data_v.mv_data)[0]
+                    spock[i] = <KeyIdx>(data_v.mv_data[0])
                     #logger.debug('Hash {} found. Not adding.'.format(thash[: HLEN]))
                 except KeyNotFoundError:
                     # If term_obj is not found, add it...
@@ -225,7 +227,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                     #        thash[: HLEN], spock[i])
                     key_v.mv_data = thash
                     key_v.mv_size = HLEN
-                    data_v.mv_data = spock[i]
+                    data_v.mv_data = spock + i * KLEN
                     data_v.mv_size = KLEN
                     _check(
                         lmdb.mdb_cursor_put(icur, &key_v, &data_v, 0),
@@ -308,51 +310,61 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             Hash128 chash
             Key ck
             lmdb.MDB_txn *tmp_txn
-            lmdb.MDB_cursor *th_cur
-            lmdb.MDB_cursor *pk_cur
-            lmdb.MDB_cursor *ck_cur
 
         hash128(pk_gr, &chash)
         #logger.debug('Adding a graph.')
         if not self._key_exists(chash, HLEN, b'th:t'):
             # Insert context term if not existing.
             if self.is_txn_rw:
+                tmp_txn = self.txn
+            else:
+                _check(lmdb.mdb_txn_begin(self.dbenv, NULL, 0, &tmp_txn))
+                # Open new R/W transactions.
+                #logger.debug('Opening a temporary RW transaction.')
+
+            try:
                 #logger.debug('Working in existing RW transaction.')
                 # Use existing R/W transaction.
                 # Main entry.
-                ck[0] = self._append(pk_gr, b't:st')
+                ck[0] = self._append(pk_gr, b't:st', txn=tmp_txn)
                 # Index.
-                self._put(chash, HLEN, ck, KLEN, b'th:t')
+
+                key_v.mv_data = chash
+                key_v.mv_size = HLEN
+                data_v.mv_data = ck
+                data_v.mv_size = KLEN
+                _check(lmdb.mdb_put(
+                    self.txn, self.get_dbi(b'th:t'), &key_v, &data_v, 0
+                ))
+
                 # Add to list of contexts.
-                self._put(ck, KLEN, b'', 0, 'c:')
-            else:
-                # Open new R/W transactions.
-                #logger.debug('Opening a temporary RW transaction.')
-                _check(lmdb.mdb_txn_begin(self.dbenv, NULL, 0, &tmp_txn))
-                try:
-                    ck[0] = self._append(pk_gr, b't:st', txn=tmp_txn)
-                    # Index.
-                    self._put(chash, HLEN, ck, KLEN, b'th:t', txn=tmp_txn)
-                    # Add to list of contexts.
-                    self._put(ck, KLEN, b'', 0, b'c:', txn=tmp_txn)
+                key_v.mv_data = ck
+                key_v.mv_size = KLEN
+                data_v.mv_data = ck # Whatever, length is zero anyways
+                data_v.mv_size = 0
+                _check(lmdb.mdb_put(
+                    self.txn, self.get_dbi(b'c:'), &key_v, &data_v, 0
+                ))
+                if not self.is_txn_rw:
                     _check(lmdb.mdb_txn_commit(tmp_txn))
-                    #logger.debug('Temp RW transaction closed.')
-                except:
+            except:
+                if not self.is_txn_rw:
                     lmdb.mdb_txn_abort(tmp_txn)
-                    raise
+                raise
 
 
     cpdef void _remove(self, tuple triple_pattern, context=None) except *:
         cdef:
             unsigned char spok[TRP_KLEN]
-            size_t i = 0
+            void* cur
+            cc.ArrayIter it
             Key ck
             lmdb.MDB_val spok_v, ck_v
 
         #logger.debug('Removing triple: {}'.format(triple_pattern))
         if context is not None:
             try:
-                ck = self._to_key(context)
+                ck = [self._to_key_idx(context)]
             except KeyNotFoundError:
                 # If context is specified but not found, return to avoid
                 # deleting the wrong triples.
@@ -367,13 +379,13 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         try:
             spok_v.mv_size = TRP_KLEN
             # If context was specified, remove only associations with that context.
+            cc.array_iter_init(&it, match_set.data)
             if context is not None:
                 #logger.debug('Removing triples in matching context.')
                 ck_v.mv_data = ck
                 ck_v.mv_size = KLEN
-                while i < match_set.ct:
-                    spok = match_set.data[i]
-                    spok_v.mv_data = spok
+                while cc.array_iter_next(&it, &cur) != cc.CC_ITER_END:
+                    spok_v.mv_data = cur
                     # Delete spo:c entry.
                     try:
                         _check(lmdb.mdb_cursor_get(
@@ -398,22 +410,21 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                         # Delete lookup indices, only if no other context
                         # association is present.
 
-                        # spok has changed on mdb_cursor_del. Restore.
-                        spok_v.mv_data = spok
+                        # spok_v has changed on mdb_cursor_del. Restore.
+                        spok_v.mv_data = cur
                         try:
                             _check(lmdb.mdb_cursor_get(
                                 dcur, &spok_v, NULL, lmdb.MDB_SET))
                         except KeyNotFoundError:
-                            self._index_triple(IDX_OP_REMOVE, spok)
+                            self._index_triple(IDX_OP_REMOVE, <TripleKey>cur)
                     i += 1
 
             # If no context is specified, remove all associations.
             else:
                 #logger.debug('Removing triples in all contexts.')
                 # Loop over all SPO matching the triple pattern.
-                while i < match_set.ct:
-                    spok = match_set.data[i]
-                    spok_v.mv_data = spok
+                while cc.array_iter_next(&it, &cur) != cc.CC_ITER_END:
+                    spok_v.mv_data = cur
                     # Loop over all context associations for this SPO.
                     try:
                         _check(lmdb.mdb_cursor_get(
@@ -423,7 +434,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                         continue
                     else:
                         ck = <Key>ck_v.mv_data
-                        logger.debug(f'Removing {spok[: TRP_KLEN]} from main.')
+                        logger.debug(f'Removing {<TripleKey>cur} from main.')
                         while True:
 
                             # Delete c:spo association.
@@ -435,7 +446,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                             else:
                                 lmdb.mdb_cursor_del(icur, 0)
                                 # Restore the pointer to the deleted SPO.
-                                spok_v.mv_data = spok
+                                spok_v.mv_data = cur
                             # Move on to next associated context.
                             try:
                                 _check(lmdb.mdb_cursor_get(
@@ -450,7 +461,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                             pass
                         else:
                             lmdb.mdb_cursor_del(dcur, lmdb.MDB_NODUPDATA)
-                            self._index_triple(IDX_OP_REMOVE, spok)
+                            self._index_triple(IDX_OP_REMOVE, <TripleKey>cur)
                             #ck_v.mv_data = ck # Unnecessary?
                     finally:
                         i += 1
@@ -475,12 +486,12 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             size_t i = 0
             lmdb.MDB_val key_v, dbl_key_v
 
-        keys[0] = spok[0] # sk
-        keys[1] = spok[1] # pk
-        keys[2] = spok[2] # ok
+        keys[0] = spok[:1] # sk
+        keys[1] = spok[1:2] # pk
+        keys[2] = spok[2:3] # ok
 
         dbl_keys[0] = spok[1:3] # pok
-        dbl_keys[1] = [spok[0], spok[2]] # pok
+        dbl_keys[1] = [spok[0], spok[2]] # sok
         dbl_keys[2] = spok[:2] # spk
 
         #logger.debug('''Indices:
@@ -500,9 +511,9 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
         #logger.debug('Start indexing: {}.'.format(spok[: TRP_KLEN]))
         if op == IDX_OP_REMOVE:
-            logger.debug(f'Remove {spok[ : TRP_KLEN]} from indices.')
+            logger.debug(f'Remove {spok[0]} from indices.')
         else:
-            logger.debug(f'Add {spok[ : TRP_KLEN]} to indices.')
+            logger.debug(f'Add {spok[0]} to indices.')
 
         while i < 3:
             cur1 = self._cur_open(self.lookup_indices[i]) # s:po, p:so, o:sp
@@ -590,7 +601,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
         # Gather information on the graph prior to deletion.
         try:
-            ck = self._to_key(gr_uri)
+            ck = [self._to_key_idx(gr_uri)]
         except KeyNotFoundError:
             return
 
@@ -623,16 +634,22 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         """
         Get a list of all contexts.
 
-        :rtype: Iterator(lakesuperior.model.graph.graph.Imr)
+        :rtype: set(URIRef)
         """
         cdef:
-            size_t sz, i = 0
+            size_t sz, i
             KeyIdx* match
 
-        self.all_contexts(&match, &sz, triple)
-        while i < sz:
-            yield URIRef(self.from_key(match[i]))
-            cur += 1
+        try:
+            self.all_contexts(&match, &sz, triple)
+            ret = set()
+
+            for i in range(sz):
+                ret.add(self.from_key(match + i))
+        finally:
+            free(match)
+
+        return ret
 
 
     def triples(self, triple_pattern, context=None):
@@ -719,23 +736,24 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         in.
         """
         cdef:
-            TripleKey spok
+            void* spok
             size_t cur = 0
             Buffer* buffers
             BufferTriple* btrp
             SimpleGraph gr
+            cc.ArrayIter it
 
         gr = Imr(uri=uri) if uri else SimpleGraph()
 
         #logger.debug(
         #        'Getting triples for: {}, {}'.format(triple_pattern, context))
 
-        spok_a = self.triple_keys(triple_pattern, context)
-        btrp = <BufferTriple*>gr.pool.alloc(spok_a.ct, sizeof(BufferTriple))
-        buffers = <Buffer*>gr.pool.alloc(3 * spok_a.ct, sizeof(Buffer))
+        match = self.triple_keys(triple_pattern, context)
+        btrp = <BufferTriple*>gr.pool.alloc(match.ct, sizeof(BufferTriple))
+        buffers = <Buffer*>gr.pool.alloc(3 * match.ct, sizeof(Buffer))
 
-        spok_a.iter_init()
-        while spok_a.iter_next(&spok):
+        cc.array_iter_init(&it, match.data)
+        while cc.array_iter_next(&it, &spok):
             btrp[cur].s = buffers + cur * 3
             btrp[cur].p = buffers + cur * 3 + 1
             btrp[cur].o = buffers + cur * 3 + 2
@@ -772,7 +790,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         cdef:
             size_t ct = 0, flt_j = 0, i = 0, j = 0, c_size
             void* cur
-            cc.ArrayIterator it
+            cc.ArrayIter it
             lmdb.MDB_cursor *icur
             lmdb.MDB_val key_v, data_v
             Key tk, ck
@@ -782,7 +800,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         if context is not None:
             #serialize(context, &pk_c, &c_size)
             try:
-                ck = self._to_key(context)
+                ck = [self._to_key_idx(context)]
             except KeyNotFoundError:
                 # Context not found.
                 return Keyset()
@@ -798,7 +816,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                     #logger.debug('Lookup: s p o c')
                     for i, term in enumerate(triple_pattern):
                         try:
-                            tk = self._to_key(term)
+                            tk = [self._to_key_idx(term)]
                         except KeyNotFoundError:
                             # Context not found.
                             return Keyset()
@@ -924,7 +942,11 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                     spok_v.mv_data = spok
                     spok_v.mv_size = TRP_KLEN
                     try:
-                        self._to_triple_key(triple_pattern, &spok)
+                        spok = [
+                            self._to_key_idx(triple_pattern[0]),
+                            self._to_key_idx(triple_pattern[1]),
+                            self._to_key_idx(triple_pattern[2]),
+                        ]
                         _check(lmdb.mdb_get(
                             self.txn, self.get_dbi('spo:c'), &spok_v, &ck_v))
                     except KeyNotFoundError:
@@ -1012,7 +1034,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
         #logger.debug(f'lookup 1bound: {idx}, {term}')
         try:
-            luk = self._to_key(term)
+            luk = [self._to_key_idx(term)]
         except KeyNotFoundError:
             return Keyset()
         logging.debug('luk: {}'.format(luk))
@@ -1116,8 +1138,8 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 f'2bound lookup for term {term1} at position {idx1} '
                 f'and term {term2} at position {idx2}.')
         try:
-            luk1 = self._to_key(term1)
-            luk2 = self._to_key(term2)
+            luk1 = [self._to_key_idx(term1)]
+            luk2 = [self._to_key_idx(term2)]
         except KeyNotFoundError:
             return Keyset()
         logging.debug('luk1: {}'.format(luk1))
@@ -1263,7 +1285,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         cdef:
             void* cur
             cc.HashSet tkeys
-            cc.HashSetIterator it
+            cc.HashSetIter it
 
         ret = set()
 
@@ -1272,7 +1294,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
             cc.hashset_iter(&it, &tkeys)
             while hashset_iter_next(&it, &cur):
                 #logger.debug('Yielding: {}'.format(key))
-                ret.add(self.from_key(<Key*>cur))
+                ret.add(self.from_key(<Key>cur))
         finally:
             free(tkeys)
 
@@ -1324,6 +1346,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         cdef:
             lmdb.MDB_cursor_op seek_op, scan_op
             lmdb.MDB_stat stat
+            lmdb.MDB_val key_v
             TripleKey spok
 
         cur = (
@@ -1331,10 +1354,14 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 else self._cur_open('c:'))
 
         if triple and all(triple):
-            lmdb_seek_op = MDB_SET_KEY
-            lmdb_scan_op = MDB_NEXT_DUP
-            self._to_triple_key(triple, &spok)
-            key_v.mv_data = spok
+            lmdb_seek_op = lmdb.MDB_SET_KEY
+            lmdb_scan_op = lmdb.MDB_NEXT_DUP
+            spok = [
+                self._to_key_idx(triple[0]),
+                self._to_key_idx(triple[1]),
+                self._to_key_idx(triple[2]),
+            ]
+            key_v.mv_data = &spok
             key_v.mv_size = TRP_KLEN
         else:
             lmdb_seek_op = MDB_FIRST
@@ -1364,9 +1391,6 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
                 sz[0] += 1
 
-            # FIXME This needs to get the triples and convert them.
-            return ret
-
         finally:
             #pass
             self._cur_close(cur)
@@ -1374,20 +1398,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
 
     # Key conversion methods.
 
-    cdef object from_key(self, const Key* key):
-        """
-        Convert a single key into one term.
-
-        :param Key key: The key to be converted.
-        """
-        cdef Buffer pk_t
-
-        self.lookup_term(key, &pk_t)
-
-        return deserialize_to_rdflib(&pk_t)
-
-
-    cdef inline void lookup_term(self, const Key* key, Buffer* data) except *:
+    cdef inline void lookup_term(self, const Key tk, Buffer* data) except *:
         """
         look up a term by key.
 
@@ -1397,10 +1408,9 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         cdef:
             lmdb.MDB_val key_v, data_v
 
-        key_v.mv_data = key
+        key_v.mv_data = tk
         key_v.mv_size = KLEN
 
-        #logger.info(f'Size of mdb_val: {sizeof(lmdb.MDB_val)}; size of buffer: {sizeof(Buffer)}')
         _check(
                 lmdb.mdb_get(
                     self.txn, self.get_dbi('t:st'), &key_v, &data_v
@@ -1411,22 +1421,36 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         #logger.info('Found term: {}'.format(buffer_dump(data)))
 
 
-    cdef tuple from_trp_key(self, const TripleKey* key):
+    cdef object from_key(self, const Key tk):
+        """
+        Convert a single key into one term.
+
+        :param Key key: The key to be converted.
+        """
+        cdef Buffer pk_t
+
+        self.lookup_term(key, &pk_t)
+
+        # TODO Make Term a class and return that.
+        return deserialize_to_rdflib(&pk_t)
+
+
+    cdef tuple from_trp_key(self, const TripleKey spok):
         """
         Convert a triple key into a tuple of 3 terms.
 
-        :param TripleKey key: The triple key to be converted.
+        :param TripleKey spok: The triple key to be converted.
         """
         #logger.debug(f'From triple key: {key[: TRP_KLEN]}')
         return (
-                self.from_key(key),
-                self.from_key(key + KLEN),
-                self.from_key(key + DBL_KLEN))
+                self.from_key(spok),
+                self.from_key(spok + KLEN),
+                self.from_key(spok + DBL_KLEN))
 
 
-    cdef inline Key _to_key(self, term):
+    cdef inline KeyIdx _to_key_idx(self, term):
         """
-        Convert a triple, quad or term into a key.
+        Convert a triple, quad or term into a key index (bare number).
 
         The key is the checksum of the serialized object, therefore unique for
         that object.
@@ -1451,25 +1475,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
         _check(lmdb.mdb_get(self.txn, dbi, &key_v, &data_v))
         #logger.debug('Found key: {}'.format((<Key>data_v.mv_data)[: KLEN]))
 
-        return <Key>data_v.mv_data
-
-
-    cdef inline void _to_triple_key(
-            self, tuple terms, TripleKey *tkey) except *:
-        """
-        Convert a tuple of 3 terms into a triple key.
-        """
-        cdef:
-            unsigned char i = 0
-
-        while  i < 3:
-            tkey[0][i] = self._to_key(terms[i])
-            if tkey[0][i] is NULL:
-                # A term in the triple is not found.
-                # TODO Probably unnecessary, because _to_key will have already
-                # raised a LmdbError.
-                raise KeyNotFoundError(f'Term key {tkey[0][i]} not found.')
-            i += 1
+        return <KeyIdx>data_v.mv_data[0]
 
 
     cdef KeyIdx _append(
@@ -1520,27 +1526,27 @@ cdef class LmdbTriplestore(BaseLmdbStore):
                 flags | lmdb.MDB_APPEND)
 
 
-    cdef inline KeyIdx bytes_to_idx(const unsigned char* bs):
-        """
-        Convert a byte string as stored in LMDB to a size_t key index.
+    #cdef inline KeyIdx bytes_to_idx(self, const unsigned char* bs):
+    #    """
+    #    Convert a byte string as stored in LMDB to a size_t key index.
 
-        TODO Force big endian?
-        """
-        cdef KeyIdx ret
+    #    TODO Force big endian?
+    #    """
+    #    cdef KeyIdx ret
 
-        memcpy(&ret, bs, KLEN)
+    #    memcpy(&ret, bs, KLEN)
 
-        return ret
+    #    return ret
 
 
-    cdef inline unsigned char* idx_to_bytes(KeyIdx idx):
-        """
-        Convert a size_t key index to bytes.
+    #cdef inline unsigned char* idx_to_bytes(KeyIdx idx):
+    #    """
+    #    Convert a size_t key index to bytes.
 
-        TODO Force big endian?
-        """
-        cdef unsigned char* ret
+    #    TODO Force big endian?
+    #    """
+    #    cdef unsigned char* ret
 
-        memcpy(&ret, idx, KLEN)
+    #    memcpy(&ret, idx, KLEN)
 
-        return ret
+    #    return ret