#include "graph.h" // Initial size of lookup graph. It will double each time capacity is reached. #define LOOKUP_GR_INIT_SIZE 64 // Expand hash table memory by this factor to keep a good load factor. #define PREALLOC_FACTOR 1.4 // Assume VERY coarsly that the number of unique terms will be in general // 1.7 times the number of triples. This is conservative to maintain load // factor low. #define IDX_SIZE_RATIO 1.7 // RAMdisk path for MDB volatile store. #define MDB_RAMDISK_PATH TMPDIR "/lsup_mem_graph" typedef enum KSetFlag { LSUP_KS_NONE = 0, LSUP_KS_CHECK_CAP = 1 << 0, LSUP_KS_CHECK_DUP = 1 << 1, } KSetFlag; /** * Static handles. */ static const char *default_ctx_label = "urn:lsup:default"; static LSUP_Buffer *default_ctx = NULL; typedef struct Graph { LSUP_store_type store_type; // In-memory or MDB-backed LSUP_Term *uri; // Graph "name" (URI) union { //LSUP_HTStore * ht_store; LSUP_MDBStore * mdb_store; }; } Graph; typedef struct GraphIterator { const Graph * graph; // Parent graph. union { // Internal store iterator. //LSUP_HTIterator * ht_iter; LSUP_MDBIterator * mdb_iter; }; size_t ct; // Total matches. size_t i; // Cursor. } GraphIterator; /** * Extern inline functions. */ size_t LSUP_graph_size (const LSUP_Graph *gr); /* * * Static prototypes. * * */ static inline LSUP_rc mdbstore_init(); /* * * Post-lookup callback prototypes * * */ int match_add_fn( LSUP_Graph *src, LSUP_Graph *dest, const LSUP_TripleKey *spok, void *ctx); int match_rm_fn( LSUP_Graph *src, LSUP_Graph *dest, const LSUP_TripleKey *spok, void *ctx); static LSUP_rc graph_iter_next_buffer (GraphIterator *it, LSUP_SerTriple *sspo); /* Atexit functions. */ void ctx_cleanup() { LSUP_buffer_free (default_ctx); } static inline bool is_null_trp (const LSUP_TripleKey *trp) { return ( *trp[0] == NULL_KEY && *trp[1] == NULL_KEY && *trp[2] == NULL_KEY); } #define ENTRY(a, b) (be) == (LSUP_STORE_##a) || static inline bool check_backend (LSUP_store_type be) { return (BACKEND_TBL false); } #undef ENTRY /* * * GRAPH * * */ Graph * LSUP_graph_new (const LSUP_store_type store_type) { if (!check_backend (store_type)) return NULL; LSUP_Graph *gr; CRITICAL (gr = malloc (sizeof (LSUP_Graph))); gr->uri = LSUP_uri_new (NULL); gr->store_type = store_type; if (mdbstore_init() != LSUP_OK) return NULL; if (gr->store_type == LSUP_STORE_MEM) { // TODO uncomment gr->ht_store = LSUP_htstore_new (0); // if (!gr->ht_store) return NULL; } else if (gr->store_type == LSUP_STORE_MDB) { gr->mdb_store = LSUP_mdbstore_new( getenv ("LSUP_MDB_STORE_PATH"), default_ctx); if (!gr->mdb_store) return NULL; } else { gr->mdb_store = LSUP_mdbstore_new( MDB_RAMDISK_PATH, default_ctx); if (!gr->mdb_store) return NULL; } return gr; } /** * Copy triples from a source graph into a destination one. * * The destination graph is not initialized here, so the copy is cumulative. */ static LSUP_rc graph_copy_contents (const LSUP_Graph *src, LSUP_Graph *dest) { LSUP_rc rc = LSUP_NOACTION; const LSUP_Triple trp = {NULL, NULL, NULL}; GraphIterator *it = LSUP_graph_lookup (src, &trp); LSUP_SerTriple sspo; while (graph_iter_next_buffer (it, &sspo) != LSUP_END) { TRACE ("Inserting triple #%lu\n", it->i); LSUP_rc add_rc = LSUP_graph_add (dest, NULL, 0, &sspo, 1, NULL); if (LIKELY (add_rc == LSUP_OK)) rc = LSUP_OK; else if (add_rc < 0) return add_rc; } return rc; } LSUP_Graph * LSUP_graph_copy (const Graph *src) { LSUP_Graph *dest = LSUP_graph_new (src->store_type); if (UNLIKELY (!dest)) return NULL; LSUP_rc rc = graph_copy_contents (src, dest); if (UNLIKELY (rc != LSUP_OK)) return NULL; return dest; } /* TODO uncomment Graph * LSUP_graph_bool_op( const LSUP_bool_op op, const Graph *gr1, const Graph *gr2) { if (UNLIKELY (gr1->store_type != LSUP_STORE_MEM)) { fprintf( stderr, "First operand %s is not an in-memory graph. " "Cannot perform boolean operation.", gr1->uri->data); return NULL; } if (UNLIKELY (gr2->store_type != LSUP_STORE_MEM)) { fprintf( stderr, "Second operand %s is not an in-memory graph. " "Cannot perform boolean operation.", gr2->uri->data); return NULL; } LSUP_Graph *res = LSUP_graph_new (LSUP_STORE_MEM); res->ht_store = LSUP_htstore_bool_op (op, gr1->ht_store, gr2->ht_store); return res; } */ void LSUP_graph_free (LSUP_Graph *gr) { if (LIKELY (gr != NULL)) { LSUP_term_free (gr->uri); if (gr->store_type == LSUP_STORE_MEM) NULL;// TODO uncomment LSUP_htstore_free (gr->ht_store); else LSUP_mdbstore_free (gr->mdb_store); free (gr); } } LSUP_Term * LSUP_graph_uri (const LSUP_Graph *gr) { return gr->uri; } LSUP_rc LSUP_graph_set_uri (LSUP_Graph *gr, const char *uri) { return LSUP_uri_init (gr->uri, uri); } LSUP_rc LSUP_graph_resize (LSUP_Graph *gr, size_t size) { return LSUP_OK; // TODO remove line. if (gr->store_type == LSUP_STORE_MEM) return 0;// TODO uncomment LSUP_htstore_resize (gr->ht_store, size); return LSUP_VALUE_ERR; } size_t LSUP_graph_capacity (const Graph *gr) { if (gr->store_type == LSUP_STORE_MEM) return 0;// TODO uncomment LSUP_htstore_capacity (gr->ht_store); return LSUP_NOT_IMPL_ERR; } size_t LSUP_graph_size (const Graph *gr) { TRACE ("Store type: %d\n", gr->store_type); if (gr->store_type == LSUP_STORE_MEM) return 0;// TODO uncomment LSUP_htstore_size (gr->ht_store); return LSUP_mdbstore_size (gr->mdb_store); } // TODO Add add_stream_init, add_stream_iter and add_stream_done for streaming // functions. GraphIterator * LSUP_graph_add_stream_init (LSUP_Graph *gr) { GraphIterator *it = malloc (sizeof (*it)); if (!it) return NULL; LSUP_Buffer *sc = LSUP_buffer_new_from_term (gr->uri); it->mdb_iter = LSUP_mdbstore_add_init (gr->mdb_store, sc); LSUP_buffer_free (sc); it->graph = gr; it->i = 0; return it; } LSUP_rc LSUP_graph_add_stream_iter (LSUP_GraphIterator *it, const LSUP_SerTriple *sspo) { return LSUP_mdbstore_add_iter (it->mdb_iter, sspo); } void LSUP_graph_add_stream_done (LSUP_GraphIterator *it) { LSUP_mdbstore_add_done(it->mdb_iter); } LSUP_rc LSUP_graph_add( LSUP_Graph *gr, const LSUP_Triple trp[], size_t trp_ct, const LSUP_SerTriple strp[], size_t strp_ct, size_t *inserted) { LSUP_rc rc; if (inserted) *inserted = 0; /* * NOTE It is possible to pass both sets of RDF triples and buffer triples. */ if (gr->store_type == LSUP_STORE_MEM) { return LSUP_NOT_IMPL_ERR; /* TODO // Resize all at once if needed. htsize_t prealloc = LSUP_htstore_size (gr->ht_store) + trp_ct + strp_ct; if (LSUP_htstore_capacity (gr->ht_store) < prealloc) { rc = LSUP_htstore_resize (gr->ht_store, prealloc * PREALLOC_FACTOR); if (UNLIKELY (rc != LSUP_OK)) return rc; } rc = LSUP_NOACTION; // Serialize and insert RDF triples. if (trp_ct > 0) { for (size_t i = 0; i < trp_ct; i++) { LSUP_SerTriple sspo; LSUP_triple_serialize (trp + i, &sspo); TRACE ("Inserting triple #%lu\n", i); LSUP_rc db_rc = LSUP_htstore_add (gr->ht_store, &sspo); if (LIKELY (db_rc == LSUP_OK)) { rc = LSUP_OK; if (inserted) (*inserted) ++; } LSUP_striple_done (&sspo); if (UNLIKELY (db_rc < 0)) return db_rc; } } // Insert serialized triples. for (size_t i = 0; i < strp_ct; i++) { TRACE ("Inserting triple #%lu\n", i); LSUP_rc db_rc = LSUP_htstore_add (gr->ht_store, strp + i); if (LIKELY (db_rc == LSUP_OK)) { rc = LSUP_OK; if (inserted) (*inserted) ++; } if (UNLIKELY (db_rc < 0)) return db_rc; } return rc; */ } else { rc = LSUP_NOACTION; // Initialize iterator. LSUP_GraphIterator *it = LSUP_graph_add_stream_init (gr); // Serialize and insert RDF triples. if (trp_ct > 0) { LSUP_SerTriple *sspo = STRP_DUMMY; for (size_t i = 0; i < trp_ct; i++) { TRACE ("Inserting triple #%lu\n", i); LSUP_triple_serialize (trp + i, sspo); LSUP_rc db_rc = LSUP_graph_add_stream_iter (it, sspo); if (db_rc == LSUP_OK) rc = LSUP_OK; if (UNLIKELY (db_rc < 0)) return db_rc; } LSUP_striple_free (sspo); } // Insert serialized triples. for (size_t i = 0; i < strp_ct; i++) { TRACE ("Inserting triple #%lu\n", i); LSUP_rc db_rc = LSUP_graph_add_stream_iter (it, strp + i); if (db_rc == LSUP_OK) rc = LSUP_OK; if (UNLIKELY (db_rc < 0)) return db_rc; } LSUP_graph_add_stream_done (it); if (inserted) *inserted = it->ct; return rc; } return LSUP_VALUE_ERR; } LSUP_rc LSUP_graph_remove (Graph *gr, const LSUP_Triple *spo, size_t *ct) { LSUP_rc rc; LSUP_SerTriple *sspo = LSUP_striple_new_from_triple (spo); LSUP_Buffer *sc = LSUP_buffer_new_from_term (gr->uri); if (gr->store_type == LSUP_STORE_MEM) rc = 0;// TODO uncomment LSUP_htstore_remove (gr->ht_store, &sspo, ct); else rc = LSUP_mdbstore_remove (gr->mdb_store, sspo, sc, ct); LSUP_striple_free (sspo); LSUP_buffer_free (sc); return rc; } GraphIterator * LSUP_graph_lookup (const Graph *gr, const LSUP_Triple *spo) { GraphIterator *it; CRITICAL (it = malloc (sizeof (GraphIterator))); it->graph = gr; LSUP_SerTriple *sspo = LSUP_striple_new_from_triple (spo); LSUP_Buffer *sc = LSUP_buffer_new_from_term (gr->uri); if (gr->store_type == LSUP_STORE_MEM) { // TODO uncomment it->ht_iter = LSUP_htstore_lookup (gr->ht_store, sspo, &it->ct); } else { it->mdb_iter = LSUP_mdbstore_lookup (gr->mdb_store, sspo, sc, &it->ct); } LSUP_striple_free (sspo); LSUP_buffer_free (sc); return it; } /** @brief Advance iterator and return serialized triple. * * This is an internal function to pass raw buffers between higher-level * functions without serializing and deserializing triples. */ inline static LSUP_rc graph_iter_next_buffer (GraphIterator *it, LSUP_SerTriple *sspo) { LSUP_rc rc; if (it->graph->store_type == LSUP_STORE_MEM) rc = 0;// TODO uncomment LSUP_htiter_next (it->ht_iter, sspo); else rc = LSUP_mdbiter_next (it->mdb_iter, sspo); return rc; } LSUP_rc LSUP_graph_iter_next (GraphIterator *it, LSUP_Triple *spo) { LSUP_SerTriple *sspo = NULL; if (spo) sspo = STRP_DUMMY; LSUP_rc rc = graph_iter_next_buffer (it, sspo); if (rc == LSUP_OK) { if (spo) { LSUP_term_deserialize (sspo->s, spo->s); LSUP_term_deserialize (sspo->p, spo->p); LSUP_term_deserialize (sspo->o, spo->o); } it->i++; } // Addresses of triple buffers are owned by the DB. Do not free. if (sspo) { free (sspo->s); free (sspo->p); free (sspo->o); free (sspo); } return rc; } void LSUP_graph_iter_free (GraphIterator *it) { if (it->graph->store_type == LSUP_STORE_MEM) NULL;// TODO uncomment LSUP_htiter_free (it->ht_iter); else LSUP_mdbiter_free (it->mdb_iter); free (it); } bool LSUP_graph_contains (const LSUP_Graph *gr, const LSUP_Triple *spo) { GraphIterator *it = LSUP_graph_lookup (gr, spo); bool rc = LSUP_graph_iter_next (it, NULL) != LSUP_NORESULT; LSUP_graph_iter_free (it); return rc; } /* * * Static functions * * */ /** @brief Initialize default context and MDB environments. * * This is done only once per process. * * The ramdisk store persists after the application is closed, but will be * wiped clean the next time this function is called. */ static inline LSUP_rc mdbstore_init() { if (UNLIKELY (!default_ctx)) { // RAM disk store. char *path = MDB_RAMDISK_PATH; if (LSUP_mdbstore_setup (&path, true) != LSUP_OK) return LSUP_DB_ERR; // Permanent store. path = getenv ("LSUP_MDB_STORE_PATH"); if (LSUP_mdbstore_setup (&path, false) != LSUP_OK) return LSUP_DB_ERR; LSUP_Term *default_ctx_uri = LSUP_uri_new (default_ctx_label); default_ctx = LSUP_buffer_new_from_term (default_ctx_uri); LSUP_term_free (default_ctx_uri); atexit (ctx_cleanup); } return LSUP_OK; }