소스 검색

Small LMDB optimizations.

* Set max. parallel readers depending on WSGI threads
* Improve lookup_1bound logic
Stefano Cossu 6 년 전
부모
커밋
0031063393
2개의 변경된 파일27개의 추가작업 그리고 15개의 파일을 삭제
  1. 24 14
      lakesuperior/store/ldp_rs/lmdb_store.py
  2. 3 1
      lakesuperior/wsgi.py

+ 24 - 14
lakesuperior/store/ldp_rs/lmdb_store.py

@@ -14,6 +14,7 @@ from rdflib import Graph, Namespace, URIRef, Variable
 from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
 from rdflib.store import Store, VALID_STORE, NO_STORE
 
+from lakesuperior.env import env
 
 logger = logging.getLogger(__name__)
 
@@ -473,7 +474,7 @@ class LmdbStore(Store):
         pk_c = self._pickle(context)
 
         # Add new individual terms or gather keys for existing ones.
-        keys = [None, None, None, None]
+        keys = [None] * 4
         with self.cur('th:t') as icur:
             for i, pk_t in enumerate((pk_s, pk_p, pk_o, pk_c)):
                 thash = self._hash(pk_t)
@@ -856,10 +857,20 @@ class LmdbStore(Store):
             else:
                 return NO_STORE
 
-        self.data_env = lmdb.open(path + '/main', subdir=False, create=create,
-                map_size=self.MAP_SIZE, max_dbs=4, readahead=False)
-        self.idx_env = lmdb.open(path + '/index', subdir=False, create=create,
-                map_size=self.MAP_SIZE, max_dbs=6, readahead=False)
+        if getattr(env, 'wsgi_options', False):
+            self._workers = env.wsgi_options['workers']
+        else:
+            self._workers = 1
+        logger.info('Max LMDB readers: {}'.format(self._workers))
+
+        self.data_env = lmdb.open(
+                path + '/main', subdir=False, create=create,
+                map_size=self.MAP_SIZE, max_dbs=4,
+                max_spare_txns=self._workers, readahead=False)
+        self.idx_env = lmdb.open(
+                path + '/index', subdir=False, create=create,
+                map_size=self.MAP_SIZE, max_dbs=6,
+                max_spare_txns=self._workers, readahead=False)
 
         # Clear stale readers.
         data_stale_readers = self.data_env.reader_check()
@@ -1003,7 +1014,7 @@ class LmdbStore(Store):
                     yield from self._lookup_2bound({'s': s, 'o': o})
                 # s ? ?
                 else:
-                    yield from self._lookup_1bound('s', s)
+                    yield from self._lookup_1bound('s:po', s)
         else:
             if p is not None:
                 # ? p o
@@ -1011,11 +1022,11 @@ class LmdbStore(Store):
                     yield from self._lookup_2bound({'p': p, 'o': o})
                 # ? p ?
                 else:
-                    yield from self._lookup_1bound('p', p)
+                    yield from self._lookup_1bound('p:so', p)
             else:
                 # ? ? o
                 if o is not None:
-                    yield from self._lookup_1bound('o', o)
+                    yield from self._lookup_1bound('o:sp', o)
                 # ? ? ?
                 else:
                     # Get all triples in the database.
@@ -1023,21 +1034,20 @@ class LmdbStore(Store):
                         yield from cur.iternext_nodup()
 
 
-    def _lookup_1bound(self, label, term):
+    def _lookup_1bound(self, idx_name, term):
         """
         Lookup triples for a pattern with one bound term.
 
-        :param str label: Which term is being searched for. One of `s`,
-        `p`, or `o`.
+        :param str idx_name: The index to look up as one of the keys of
+            ``_lookup_ordering``.
         :param rdflib.URIRef term: Bound term to search for.
 
-        :rtype: iterator(bytes)
+        :rtype: Iterator(bytes)
         :return: SPO keys matching the pattern.
         """
         k = self._to_key(term)
         if not k:
             return iter(())
-        idx_name = '{}:{}'.format(label, 'spo'.replace(label, ''))
         term_order = self._lookup_ordering[idx_name]
         with self.cur(idx_name) as cur:
             if cur.set_key(k):
@@ -1045,7 +1055,7 @@ class LmdbStore(Store):
                     subkeys = self._split_key(match)
 
                     # Compose result.
-                    out = [None, None, None]
+                    out = [None] * 3
                     out[term_order[0]] = k
                     out[term_order[1]] = subkeys[0]
                     out[term_order[2]] = subkeys[1]

+ 3 - 1
lakesuperior/wsgi.py

@@ -5,8 +5,8 @@ from os import environ, makedirs, path
 
 import gunicorn.app.base
 
-from lakesuperior.server import fcrepo
 from lakesuperior.config_parser import default_config_dir
+from lakesuperior.env import env
 
 
 config_file = '{}/gunicorn.yml'.format(default_config_dir)
@@ -47,6 +47,7 @@ options = {
     'accesslog': '{}/gunicorn-access.log'.format(log_dir),
     'errorlog': '{}/gunicorn-error.log'.format(log_dir),
 }
+env.wsgi_options = options
 
 class WsgiApp(gunicorn.app.base.BaseApplication):
 
@@ -64,6 +65,7 @@ class WsgiApp(gunicorn.app.base.BaseApplication):
 
 
 def run():
+    from lakesuperior.server import fcrepo
     WsgiApp(fcrepo, options).run()