|
@@ -75,6 +75,8 @@ class TxnManager(ContextDecorator):
|
|
|
if len(self.store._data_queue):
|
|
|
self.store._apply_changes()
|
|
|
if len(self.store._idx_queue):
|
|
|
+ # Ditch index data. For testing data entry only.
|
|
|
+ #self.store._idx_queue = []
|
|
|
# Synchronous.
|
|
|
self.store._run_indexing()
|
|
|
# Threading.
|
|
@@ -174,7 +176,7 @@ class LmdbStore(Store):
|
|
|
). The third value can be None, and in that case, it is calculated from
|
|
|
the triple key.
|
|
|
'''
|
|
|
- _idx_queue = set()
|
|
|
+ _idx_queue = []
|
|
|
|
|
|
|
|
|
def __init__(self, path, identifier=None):
|
|
@@ -371,7 +373,7 @@ class LmdbStore(Store):
|
|
|
needs_indexing = True
|
|
|
|
|
|
if needs_indexing:
|
|
|
- self._idx_queue.add((trp_key, pk_ctx, triple))
|
|
|
+ self._idx_queue.append((trp_key, pk_ctx, triple))
|
|
|
|
|
|
|
|
|
def remove(self, triple_pattern, context=None):
|
|
@@ -402,7 +404,7 @@ class LmdbStore(Store):
|
|
|
self._enqueue_action(
|
|
|
'delete', 'tk:c', trp_key, None)
|
|
|
|
|
|
- self._idx_queue.add((trp_key, pk_ctx, triple))
|
|
|
+ self._idx_queue.append((trp_key, pk_ctx, triple))
|
|
|
|
|
|
|
|
|
def triples(self, triple_pattern, context=None):
|
|
@@ -513,7 +515,7 @@ class LmdbStore(Store):
|
|
|
pk_none = self._pickle(None)
|
|
|
pk_ctx = self._pickle(graph)
|
|
|
self._enqueue_action('delete', 'tk:c', pk_none, pk_ctx)
|
|
|
- self._idx_queue.add((None, pk_ctx, None))
|
|
|
+ self._idx_queue.append((None, pk_ctx, None))
|
|
|
|
|
|
with self.cur('c:tk') as cur:
|
|
|
if cur.set_key_dup(self._pickle(graph), self._pickle(None)):
|
|
@@ -787,6 +789,9 @@ class LmdbStore(Store):
|
|
|
data_txn.cursor(self.dbs[task[1]]))
|
|
|
for task in self._data_queue
|
|
|
}
|
|
|
+ #logger.debug('Data queue: {}'.format(self._data_queue))
|
|
|
+ #import pdb; pdb.set_trace()
|
|
|
+ logger.debug('Data queue: {} triples.'.format(len(self._data_queue)))
|
|
|
while len(self._data_queue):
|
|
|
action, db, k, v = self._data_queue.pop()
|
|
|
if action == 'put':
|
|
@@ -833,6 +838,8 @@ class LmdbStore(Store):
|
|
|
idx_curs = self.get_idx_cursors(idx_txn)
|
|
|
|
|
|
lock = Lock()
|
|
|
+ #logger.debug('Index queue: {}'.format(self._idx_queue))
|
|
|
+ logger.debug('Index queue: {}'.format(len(self._idx_queue)))
|
|
|
while len(self._idx_queue):
|
|
|
lock.acquire()
|
|
|
trp_key, pk_ctx, triple = self._idx_queue.pop()
|