Browse Source

Seaprate main data and index environments.

Stefano Cossu 7 years ago
parent
commit
6cd20b4b4d
1 changed files with 50 additions and 35 deletions
  1. 50 35
      lakesuperior/store_layouts/ldp_rs/lmdb_store.py

+ 50 - 35
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -75,19 +75,19 @@ class LmdbStore(Store):
     '''
     LMDB-backed store.
 
-    This store class uses two LMDB environment (i.e. two files): one for the
+    This store class uses two LMDB environments (i.e. two files): one for the
     critical (preservation-worthy) data and the other for the index data which
     can be rebuilt from the main database. @TODO For now, data and indices are
     in the same environment due to complications in handling transaction
     contexts.
 
-    There are 2 main data sets (preservation worthy data):
+    There are 3 main data sets (preservation worthy data):
 
     - tk:t (triple key: pickled triple; unique keys)
     - tk:c (Triple key: pickled context; multi-valued keys)
-    - pfx:ns (pickled prefix URI: namespace string; unique)
+    - pfx:ns (prefix: pickled namespace; unique)
 
-    And 7 indices to optimize lookup for all possible bound/unbound term
+    And 8 indices to optimize lookup for all possible bound/unbound term
     combination in a triple:
 
     - c:tk (pickled context URI: triple key)
@@ -97,7 +97,7 @@ class LmdbStore(Store):
     - spk:tk (subject + predicate key: triple key)
     - sok:tk (subject + object key: triple key)
     - pok:tk (predicate + object key: triple key)
-    - ns:pfx (namespace: pickled prefix URI; unique)
+    - ns:pfx (pickled namespace: prefix; unique)
 
     The above indices (except for ns:pfx) are all multi-valued and store
     fixed-length hash values referring to triples for economy's sake.
@@ -130,15 +130,16 @@ class LmdbStore(Store):
 
     DEFAULT_GRAPH_URI = URIRef('urn:fcrepo:default_graph')
 
-    data_keys = ('tk:c', 'tk:t', 'ns:pfx')
+    data_keys = ('tk:c', 'tk:t', 'pfx:ns')
     idx_keys = (
             'c:tk', 'sk:tk', 'pk:tk', 'ok:tk', 'spk:tk', 'sok:tk', 'pok:tk',
-            'pfx:ns')
+            'ns:pfx')
 
     db_env = None
     db = None
     dbs = {}
-    txn = None
+    main_txn = None
+    idx_txn = None
     is_txn_rw = None
 
 
@@ -169,7 +170,7 @@ class LmdbStore(Store):
         This method is called outside of the main transaction. All cursors
         are created separately within the transaction.
         '''
-        self._init_db_environment(path, create)
+        self._init_db_environments(path, create)
         if self.db_env == NO_STORE:
             return NO_STORE
         self.__open = True
@@ -183,11 +184,12 @@ class LmdbStore(Store):
         '''
         if not self.is_open:
             raise RuntimeError('Store must be opened first.')
-        self.txn = self.db_env.begin(write=write, buffers=True)
+        self.main_txn = self.db_env.begin(write=write, buffers=True)
+        self.idx_txn = self.idx_env.begin(write=write, buffers=True)
         self.is_txn_rw = write
         # Cursors.
-        self.curs = self.get_data_cursors(self.txn)
-        self.curs.update(self.get_idx_cursors(self.txn))
+        self.curs = self.get_main_cursors(self.main_txn)
+        self.curs.update(self.get_idx_cursors(self.idx_txn))
 
 
     @property
@@ -196,7 +198,8 @@ class LmdbStore(Store):
         Whether the main transaction is open.
         '''
         try:
-            self.txn.id()
+            self.main_txn.id()
+            self.idx_txn.id()
         except (lmdb.Error, AttributeError) as e:
             #logger.info('Main transaction does not exist or is closed.')
             return False
@@ -205,7 +208,7 @@ class LmdbStore(Store):
             return True
 
 
-    def get_data_cursors(self, txn):
+    def get_main_cursors(self, txn):
         '''
         Build the main data cursors for a transaction.
 
@@ -217,7 +220,7 @@ class LmdbStore(Store):
         return {
             'tk:t': txn.cursor(self.dbs['tk:t']),
             'tk:c': txn.cursor(self.dbs['tk:c']),
-            'pfx:ns': txn.cursor(self.dbs['ns:pfx']),
+            'pfx:ns': txn.cursor(self.dbs['pfx:ns']),
         }
 
 
@@ -232,7 +235,7 @@ class LmdbStore(Store):
         '''
         cur = {}
         for key in self.idx_keys:
-            cur[key] = self.txn.cursor(self.dbs[key])
+            cur[key] = txn.cursor(self.dbs[key])
 
         return cur
 
@@ -398,7 +401,7 @@ class LmdbStore(Store):
             dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
             return len(set(dataset))
         else:
-            return self.txn.stat(self.dbs['tk:t'])['entries']
+            return self.main_txn.stat(self.dbs['tk:t'])['entries']
 
 
     def bind(self, prefix, namespace):
@@ -407,10 +410,10 @@ class LmdbStore(Store):
         '''
         prefix = s2b(prefix)
         namespace = s2b(namespace)
-        with self.txn.cursor(self.dbs['ns:pfx']) as cur:
-            cur.put(namespace, prefix)
-        with self.txn.cursor(self.dbs['pfx:ns']) as cur:
+        with self.main_txn.cursor(self.dbs['pfx:ns']) as cur:
             cur.put(prefix, namespace)
+        with self.idx_txn.cursor(self.dbs['ns:pfx']) as cur:
+            cur.put(namespace, prefix)
 
 
     def namespace(self, prefix):
@@ -470,11 +473,12 @@ class LmdbStore(Store):
         @param graph (URIRef) URI of the named graph to add.
         '''
         if not self.is_txn_rw:
-            with self.db_env.begin(write=True) as txn:
-                with txn.cursor(self.dbs['tk:c']) as tk2c_cur:
-                    tk2c_cur.put(self._pickle(None), self._pickle(graph))
-                with txn.cursor(self.dbs['c:tk']) as c2tk_cur:
-                    c2tk_cur.put(self._pickle(graph), self._pickle(None))
+            with self.db_env.begin(write=True).cursor(self.dbs['tk:c']) \
+                    as tk2c_cur:
+                tk2c_cur.put(self._pickle(None), self._pickle(graph))
+            with self.idx_env.begin(write=True).cursor(self.dbs['c:tk']) \
+                    as c2tk_cur:
+                c2tk_cur.put(self._pickle(graph), self._pickle(None))
         else:
             self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
             self.curs['c:tk'].put(self._pickle(graph), self._pickle(None))
@@ -502,8 +506,9 @@ class LmdbStore(Store):
         Commit main transaction.
         '''
         if self.is_txn_open:
-            self.txn.commit()
-        self.txn = self.is_txn_rw = None
+            self.main_txn.commit()
+            self.idx_txn.commit()
+        self.main_txn = self.idx_txn = self.is_txn_rw = None
 
 
     def rollback(self):
@@ -511,8 +516,9 @@ class LmdbStore(Store):
         Roll back main transaction.
         '''
         if self.is_txn_open:
-            self.txn.abort()
-        self.txn = self.is_txn_rw = None
+            self.main_txn.abort()
+            self.idx_txn.abort()
+        self.main_txn = self.idx_txn = self.is_txn_rw = None
 
 
     #def _next_lex_key(self, db=None):
@@ -603,18 +609,27 @@ class LmdbStore(Store):
             return
 
 
-    def _init_db_environment(self, path, create=True):
+    def _init_db_environments(self, path, create=True):
         '''
         Initialize the DB environment.
-        If `create` is True, the environment and its databases are created.
+
+        The main database is kept in one file, the indices in a separate one
+        (these may be even further split up depending on performance
+        considerations).
+
+        @param path The base path to contain the databases.
+        @param create (bool) If True, the environment and its databases are
+        created.
         '''
         if not exists(path):
             if create is True:
                 makedirs(path)
             else:
                 return NO_STORE
-        self.db_env = lmdb.open(path, create=create, map_size=self.MAP_SIZE,
-                max_dbs=12, readahead=False)
+        self.db_env = lmdb.open(path + '/main', subdir=False, create=create,
+                map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
+        self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
+                map_size=self.MAP_SIZE, max_dbs=10, readahead=False)
 
         # Open and optionally create main databases.
         self.dbs = {
@@ -623,11 +638,11 @@ class LmdbStore(Store):
             'tk:c': self.db_env.open_db(b'tk:c', create=create, dupsort=True),
             'pfx:ns': self.db_env.open_db(b'pfx:ns', create=create),
             # Index.
-            'ns:pfx': self.db_env.open_db(b'ns:pfx', create=create),
+            'ns:pfx': self.idx_env.open_db(b'ns:pfx', create=create),
         }
         # Other index databases.
         for db_key in self.idx_keys:
-            self.dbs[db_key] = self.db_env.open_db(s2b(db_key),
+            self.dbs[db_key] = self.idx_env.open_db(s2b(db_key),
                     dupsort=True, dupfixed=True, create=create)