Forráskód Böngészése

Rename environment and transaction varaibles.

Stefano Cossu 7 éve
szülő
commit
0aa144ca65
1 módosított fájl, 31 hozzáadás és 28 törlés
  1. 31 28
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py

+ 31 - 28
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -39,9 +39,9 @@ class NoTxnError(Exception):
 
 class TxnManager(ContextDecorator):
     '''
-    Handle ACID transactions with an LmdStore.
+    Handle ACID transactions with an LmdbStore.
 
-    To use this, wrap t within a `with` statement:
+    Wrap this within a `with` statement:
 
     >>> with TxnManager(store, True):
     ...     # Do something with the database
@@ -61,14 +61,13 @@ class TxnManager(ContextDecorator):
         self.write = write
 
     def __enter__(self):
-        self._txn = self.store.begin(write=self.write)
+        self.store.begin(write=self.write)
 
     def __exit__(self, exc_type, exc_value, traceback):
         if exc_type:
             self.store.rollback()
         else:
             self.store.commit()
-        #return True
 
 
 class LmdbStore(Store):
@@ -135,10 +134,11 @@ class LmdbStore(Store):
             'c:tk', 'sk:tk', 'pk:tk', 'ok:tk', 'spk:tk', 'sok:tk', 'pok:tk',
             'ns:pfx')
 
-    db_env = None
+    data_env = None
+    idx_env = None
     db = None
     dbs = {}
-    main_txn = None
+    data_txn = None
     idx_txn = None
     is_txn_rw = None
 
@@ -171,7 +171,7 @@ class LmdbStore(Store):
         are created separately within the transaction.
         '''
         self._init_db_environments(path, create)
-        if self.db_env == NO_STORE:
+        if self.data_env == NO_STORE:
             return NO_STORE
         self.__open = True
 
@@ -184,11 +184,12 @@ class LmdbStore(Store):
         '''
         if not self.is_open:
             raise RuntimeError('Store must be opened first.')
-        self.main_txn = self.db_env.begin(write=write, buffers=True)
-        self.idx_txn = self.idx_env.begin(write=write, buffers=True)
+        self.data_txn = self.data_env.begin(write=write, buffers=True)
+        # Index transaction is read-write only for indexing jobs.
+        self.idx_txn = self.idx_env.begin(buffers=True)
         self.is_txn_rw = write
         # Cursors.
-        self.curs = self.get_main_cursors(self.main_txn)
+        self.curs = self.get_data_cursors(self.data_txn)
         self.curs.update(self.get_idx_cursors(self.idx_txn))
 
 
@@ -198,7 +199,7 @@ class LmdbStore(Store):
         Whether the main transaction is open.
         '''
         try:
-            self.main_txn.id()
+            self.data_txn.id()
             self.idx_txn.id()
         except (lmdb.Error, AttributeError) as e:
             #logger.info('Main transaction does not exist or is closed.')
@@ -208,7 +209,7 @@ class LmdbStore(Store):
             return True
 
 
-    def get_main_cursors(self, txn):
+    def get_data_cursors(self, txn):
         '''
         Build the main data cursors for a transaction.
 
@@ -253,7 +254,7 @@ class LmdbStore(Store):
             else:
                 self.rollback()
 
-        self.db_env.close()
+        self.data_env.close()
 
 
     def destroy(self, path):
@@ -280,6 +281,7 @@ class LmdbStore(Store):
         assert context != self, "Can not add triple directly to store"
         Store.add(self, triple, context)
 
+        logger.info('Adding triple: {}'.format(triple))
         if self.DEFAULT_UNION:
             raise NotImplementedError()
             # @TODO
@@ -401,7 +403,7 @@ class LmdbStore(Store):
             dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
             return len(set(dataset))
         else:
-            return self.main_txn.stat(self.dbs['tk:t'])['entries']
+            return self.data_txn.stat(self.dbs['tk:t'])['entries']
 
 
     def bind(self, prefix, namespace):
@@ -410,7 +412,7 @@ class LmdbStore(Store):
         '''
         prefix = s2b(prefix)
         namespace = s2b(namespace)
-        with self.main_txn.cursor(self.dbs['pfx:ns']) as cur:
+        with self.data_txn.cursor(self.dbs['pfx:ns']) as cur:
             cur.put(prefix, namespace)
         with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
             cur.put(namespace, prefix)
@@ -473,15 +475,16 @@ class LmdbStore(Store):
         @param graph (URIRef) URI of the named graph to add.
         '''
         if not self.is_txn_rw:
-            with self.db_env.begin(write=True).cursor(self.dbs['tk:c']) \
+            with self.data_env.begin(write=True).cursor(self.dbs['tk:c']) \
                     as tk2c_cur:
                 tk2c_cur.put(self._pickle(None), self._pickle(graph))
-            with self.idx_env.begin(write=True).cursor(self.dbs['c:tk']) \
-                    as c2tk_cur:
-                c2tk_cur.put(self._pickle(graph), self._pickle(None))
         else:
             self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
-            self.curs['c:tk'].put(self._pickle(graph), self._pickle(None))
+
+        # Open a write tx for indices.
+        with self.idx_env.begin(write=True)\
+                .cursor(self.dbs['c:tk']) as c2tk_cur:
+            c2tk_cur.put(self._pickle(graph), self._pickle(None))
 
 
     def remove_graph(self, graph):
@@ -506,9 +509,9 @@ class LmdbStore(Store):
         Commit main transaction.
         '''
         if self.is_txn_open:
-            self.main_txn.commit()
+            self.data_txn.commit()
             self.idx_txn.commit()
-        self.main_txn = self.idx_txn = self.is_txn_rw = None
+        self.data_txn = self.idx_txn = self.is_txn_rw = None
 
 
     def rollback(self):
@@ -516,9 +519,9 @@ class LmdbStore(Store):
         Roll back main transaction.
         '''
         if self.is_txn_open:
-            self.main_txn.abort()
+            self.data_txn.abort()
             self.idx_txn.abort()
-        self.main_txn = self.idx_txn = self.is_txn_rw = None
+        self.data_txn = self.idx_txn = self.is_txn_rw = None
 
 
     #def _next_lex_key(self, db=None):
@@ -626,7 +629,7 @@ class LmdbStore(Store):
                 makedirs(path)
             else:
                 return NO_STORE
-        self.db_env = lmdb.open(path + '/main', subdir=False, create=create,
+        self.data_env = lmdb.open(path + '/main', subdir=False, create=create,
                 map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
         self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
                 map_size=self.MAP_SIZE, max_dbs=10, readahead=False)
@@ -634,9 +637,9 @@ class LmdbStore(Store):
         # Open and optionally create main databases.
         self.dbs = {
             # Main databases.
-            'tk:t': self.db_env.open_db(b'tk:t', create=create),
-            'tk:c': self.db_env.open_db(b'tk:c', create=create, dupsort=True),
-            'pfx:ns': self.db_env.open_db(b'pfx:ns', create=create),
+            'tk:t': self.data_env.open_db(b'tk:t', create=create),
+            'tk:c': self.data_env.open_db(b'tk:c', create=create, dupsort=True),
+            'pfx:ns': self.data_env.open_db(b'pfx:ns', create=create),
             # Index.
             'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
         }