Browse Source

Move transaction handling up to HTTP request level; fix bootstrap; many test fixes.

Stefano Cossu 6 years ago
parent
commit
e39a076734

+ 10 - 6
conftest.py

@@ -10,6 +10,7 @@ from PIL import Image
 
 from lakesuperior.app import create_app
 from lakesuperior.config_parser import config
+from lakesuperior.store_layouts.ldp_rs.lmdb_store import TxnManager
 from util.generators import random_image
 from util.bootstrap import bootstrap_binary_store
 
@@ -27,16 +28,19 @@ def db(app):
     Set up and tear down test triplestore.
     '''
     db = app.rdfly
-    db.bootstrap()
+    import pdb; pdb.set_trace()
+    if hasattr(db.store, 'begin'):
+        with TxnManager(db.store, True) as txn:
+            db.bootstrap()
     bootstrap_binary_store(app)
 
     yield db
 
-    print('Tearing down fixture graph store.')
-    for g in db.ds.graphs():
-        db.ds.remove_graph(g)
-
-    db.ds.store.commit()
+    #print('Tearing down fixture graph store.')
+    #if hasattr(db.store, 'begin'):
+    #    with TxnManager(db.store, True) as txn:
+    #        for g in db.ds.graphs():
+    #            db.ds.remove_graph(g)
 
 
 @pytest.fixture

+ 79 - 1
lakesuperior/endpoints/ldp.py

@@ -2,6 +2,7 @@ import logging
 
 from collections import defaultdict
 from pprint import pformat
+from functools import wraps
 from uuid import uuid4
 
 import arrow
@@ -22,6 +23,7 @@ from lakesuperior.model.ldp_factory import LdpFactory
 from lakesuperior.model.ldp_nr import LdpNr
 from lakesuperior.model.ldp_rs import LdpRs
 from lakesuperior.model.ldpr import Ldpr
+from lakesuperior.store_layouts.ldp_rs.lmdb_store import LmdbStore, TxnManager
 from lakesuperior.toolbox import Toolbox
 
 
@@ -99,12 +101,77 @@ def log_request_end(rsp):
     return rsp
 
 
+def transaction(write=False):
+    '''
+    Handle atomic operations in a store.
+
+    This wrapper ensures that a write operation is performed atomically. It
+    also takes care of sending a message for each resource changed in the
+    transaction.
+    '''
+    def _transaction_deco(fn):
+        @wraps(fn)
+        def _wrapper(*args, **kwargs):
+            g.changelog = []
+            store = current_app.rdfly.store
+            if isinstance(store, LmdbStore):
+                with TxnManager(store, write=write) as txn:
+                    ret = fn(*args, **kwargs)
+                return ret
+            else:
+                try:
+                    ret = fn(*args, **kwargs)
+                except:
+                    logger.warn('Rolling back transaction.')
+                    store.rollback()
+                    raise
+                else:
+                    logger.info('Committing transaction.')
+                    #if hasattr(store, '_edits'):
+                    #    # @FIXME ugly.
+                    #    self.rdfly._conn.optimize_edits()
+                    store.commit()
+                    return ret
+            # @TODO re-enable, maybe leave out the delta part
+            #for ev in g.changelog:
+            #    #self._logger.info('Message: {}'.format(pformat(ev)))
+            #    send_event_msg(*ev)
+
+        return _wrapper
+    return _transaction_deco
+
+
+def send_msg(self, ev_type, remove_trp=None, add_trp=None):
+    '''
+    Sent a message about a changed (created, modified, deleted) resource.
+    '''
+    try:
+        type = self.types
+        actor = self.metadata.value(nsc['fcrepo'].createdBy)
+    except (ResourceNotExistsError, TombstoneError):
+        type = set()
+        actor = None
+        for t in add_trp:
+            if t[1] == RDF.type:
+                type.add(t[2])
+            elif actor is None and t[1] == nsc['fcrepo'].createdBy:
+                actor = t[2]
+
+    g.changelog.append((set(remove_trp), set(add_trp), {
+        'ev_type' : ev_type,
+        'time' : g.timestamp,
+        'type' : type,
+        'actor' : actor,
+    }))
+
+
 ## REST SERVICES ##
 
 @ldp.route('/<path:uid>', methods=['GET'], strict_slashes=False)
 @ldp.route('/', defaults={'uid': ''}, methods=['GET'], strict_slashes=False)
 @ldp.route('/<path:uid>/fcr:metadata', defaults={'force_rdf' : True},
         methods=['GET'])
+@transaction()
 def get_resource(uid, force_rdf=False):
     '''
     Retrieve RDF or binary content.
@@ -151,6 +218,7 @@ def get_resource(uid, force_rdf=False):
 @ldp.route('/<path:parent>', methods=['POST'], strict_slashes=False)
 @ldp.route('/', defaults={'parent': ''}, methods=['POST'],
         strict_slashes=False)
+@transaction(True)
 def post_resource(parent):
     '''
     Add a new resource in a new URI.
@@ -168,7 +236,8 @@ def post_resource(parent):
     try:
         uid = uuid_for_post(parent, slug)
         logger.debug('Generated UID for POST: {}'.format(uid))
-        rsrc = LdpFactory.from_provided(uid, content_length=request.content_length,
+        rsrc = LdpFactory.from_provided(
+                uid, content_length=request.content_length,
                 stream=stream, mimetype=mimetype, handling=handling,
                 disposition=disposition)
     except ResourceNotExistsError as e:
@@ -197,6 +266,7 @@ def post_resource(parent):
 
 
 @ldp.route('/<path:uid>/fcr:versions', methods=['GET'])
+@transaction()
 def get_version_info(uid):
     '''
     Get version info (`fcr:versions`).
@@ -214,6 +284,7 @@ def get_version_info(uid):
 
 
 @ldp.route('/<path:uid>/fcr:versions/<ver_uid>', methods=['GET'])
+@transaction()
 def get_version(uid, ver_uid):
     '''
     Get an individual resource version.
@@ -234,6 +305,7 @@ def get_version(uid, ver_uid):
 
 
 @ldp.route('/<path:uid>/fcr:versions', methods=['POST', 'PUT'])
+@transaction(True)
 def post_version(uid):
     '''
     Create a new resource version.
@@ -254,6 +326,7 @@ def post_version(uid):
 
 
 @ldp.route('/<path:uid>/fcr:versions/<ver_uid>', methods=['PATCH'])
+@transaction(True)
 def patch_version(uid, ver_uid):
     '''
     Revert to a previous version.
@@ -278,6 +351,7 @@ def patch_version(uid, ver_uid):
 @ldp.route('/<path:uid>', methods=['PUT'], strict_slashes=False)
 @ldp.route('/<path:uid>/fcr:metadata', defaults={'force_rdf' : True},
         methods=['PUT'])
+@transaction(True)
 def put_resource(uid):
     '''
     Add a new resource at a specified URI.
@@ -325,6 +399,7 @@ def put_resource(uid):
 
 
 @ldp.route('/<path:uid>', methods=['PATCH'], strict_slashes=False)
+@transaction(True)
 def patch_resource(uid):
     '''
     Update an existing resource with a SPARQL-UPDATE payload.
@@ -349,11 +424,13 @@ def patch_resource(uid):
 
 
 @ldp.route('/<path:uid>/fcr:metadata', methods=['PATCH'])
+@transaction(True)
 def patch_resource_metadata(uid):
     return patch_resource(uid)
 
 
 @ldp.route('/<path:uid>', methods=['DELETE'])
+@transaction(True)
 def delete_resource(uid):
     '''
     Delete a resource and optionally leave a tombstone.
@@ -393,6 +470,7 @@ def delete_resource(uid):
 
 @ldp.route('/<path:uid>/fcr:tombstone', methods=['GET', 'POST', 'PUT',
         'PATCH', 'DELETE'])
+@transaction(True)
 def tombstone(uid):
     '''
     Handle all tombstone operations.

+ 0 - 1
lakesuperior/model/ldp_factory.py

@@ -15,7 +15,6 @@ from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.exceptions import (IncompatibleLdpTypeError,
         InvalidResourceError, ResourceNotExistsError)
 
-
 class LdpFactory:
     '''
     Generate LDP instances.

+ 1 - 2
lakesuperior/model/ldp_nr.py

@@ -4,7 +4,7 @@ from rdflib.resource import Resource
 from rdflib.term import URIRef, Literal, Variable
 
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
-from lakesuperior.model.ldpr import Ldpr, atomic
+from lakesuperior.model.ldpr import Ldpr
 from lakesuperior.model.ldp_rs import LdpRs
 
 class LdpNr(Ldpr):
@@ -52,7 +52,6 @@ class LdpNr(Ldpr):
 
     ## LDP METHODS ##
 
-    @atomic
     def _create_or_replace_rsrc(self, create_only=False):
         '''
         Create a new binary resource with a corresponding RDF representation.

+ 1 - 8
lakesuperior/model/ldp_rs.py

@@ -1,12 +1,10 @@
-#from copy import deepcopy
-
 from flask import current_app, g
 from rdflib import Graph
 from rdflib.plugins.sparql.algebra import translateUpdate
 from rdflib.plugins.sparql.parser import parseUpdate
 
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
-from lakesuperior.model.ldpr import Ldpr, atomic
+from lakesuperior.model.ldpr import Ldpr
 
 class LdpRs(Ldpr):
     '''LDP-RS (LDP RDF source).
@@ -40,7 +38,6 @@ class LdpRs(Ldpr):
 
     ## LDP METHODS ##
 
-    @atomic
     def patch(self, update_str):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_PATCH
@@ -181,7 +178,3 @@ class LdpIc(Ldpc):
             nsc['ldp'].IndirectContainer,
         }
 
-
-
-
-

+ 15 - 87
lakesuperior/model/ldpr.py

@@ -29,36 +29,6 @@ ROOT_UID = ''
 ROOT_RSRC_URI = nsc['fcres'][ROOT_UID]
 
 
-def atomic(fn):
-    '''
-    Handle atomic operations in an RDF store.
-
-    This wrapper ensures that a write operation is performed atomically. It
-    also takes care of sending a message for each resource changed in the
-    transaction.
-    '''
-    def wrapper(self, *args, **kwargs):
-        g.changelog = []
-        try:
-            ret = fn(self, *args, **kwargs)
-        except:
-            self._logger.warn('Rolling back transaction.')
-            self.rdfly.store.rollback()
-            raise
-        else:
-            self._logger.info('Committing transaction.')
-            #if hasattr(self.rdfly.store, '_edits'):
-            #    # @FIXME ugly.
-            #    self.rdfly._conn.optimize_edits()
-            self.rdfly.store.commit()
-            for ev in g.changelog:
-                #self._logger.info('Message: {}'.format(pformat(ev)))
-                self._send_event_msg(*ev)
-            return ret
-
-    return wrapper
-
-
 class Ldpr(metaclass=ABCMeta):
     '''LDPR (LDP Resource).
 
@@ -137,10 +107,6 @@ class Ldpr(metaclass=ABCMeta):
         '''Instantiate an in-memory LDP resource that can be loaded from and
         persisted to storage.
 
-        Persistence is done in this class. None of the operations in the store
-        layout should commit an open transaction. Methods are wrapped in a
-        transaction by using the `@atomic` decorator.
-
         @param uid (string) uid of the resource. If None (must be explicitly
         set) it refers to the root node. It can also be the full URI or URN,
         in which case it will be converted.
@@ -306,14 +272,6 @@ class Ldpr(metaclass=ABCMeta):
         return self._version_info
 
 
-    @property
-    def versions(self):
-        '''
-        Return a generator of version URIs.
-        '''
-        return set(self.version_info[self.urn : nsc['fcrepo'].hasVersion :])
-
-
     @property
     def version_uids(self):
         '''
@@ -429,7 +387,6 @@ class Ldpr(metaclass=ABCMeta):
         return gr
 
 
-    @atomic
     def post(self):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_POST
@@ -439,7 +396,6 @@ class Ldpr(metaclass=ABCMeta):
         return self._create_or_replace_rsrc(create_only=True)
 
 
-    @atomic
     def put(self):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_PUT
@@ -451,7 +407,6 @@ class Ldpr(metaclass=ABCMeta):
         raise NotImplementedError()
 
 
-    @atomic
     def delete(self, inbound=True, delete_children=True, leave_tstone=True):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_DELETE
@@ -488,7 +443,6 @@ class Ldpr(metaclass=ABCMeta):
         return ret
 
 
-    @atomic
     def resurrect(self):
         '''
         Resurrect a resource from a tombstone.
@@ -527,7 +481,6 @@ class Ldpr(metaclass=ABCMeta):
 
 
 
-    @atomic
     def purge(self, inbound=True):
         '''
         Delete a tombstone and all historic snapstots.
@@ -540,7 +493,6 @@ class Ldpr(metaclass=ABCMeta):
         return self._purge_rsrc(inbound)
 
 
-    @atomic
     def create_version(self, ver_uid=None):
         '''
         Create a new version of the resource.
@@ -558,7 +510,6 @@ class Ldpr(metaclass=ABCMeta):
         return g.tbox.globalize_term(self.create_rsrc_snapshot(ver_uid))
 
 
-    @atomic
     def revert_to_version(self, ver_uid, backup=True):
         '''
         Revert to a previous version.
@@ -732,36 +683,12 @@ class Ldpr(metaclass=ABCMeta):
 
         ret = self.rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
 
-        if notify and current_app.config.get('messaging'):
-            self._send_msg(ev_type, remove_trp, add_trp)
+        #if notify and current_app.config.get('messaging'):
+        #    self._send_msg(ev_type, remove_trp, add_trp)
 
         return ret
 
 
-    def _send_msg(self, ev_type, remove_trp=None, add_trp=None):
-        '''
-        Sent a message about a changed (created, modified, deleted) resource.
-        '''
-        try:
-            type = self.types
-            actor = self.metadata.value(nsc['fcrepo'].createdBy)
-        except (ResourceNotExistsError, TombstoneError):
-            type = set()
-            actor = None
-            for t in add_trp:
-                if t[1] == RDF.type:
-                    type.add(t[2])
-                elif actor is None and t[1] == nsc['fcrepo'].createdBy:
-                    actor = t[2]
-
-        g.changelog.append((set(remove_trp), set(add_trp), {
-            'ev_type' : ev_type,
-            'time' : g.timestamp,
-            'type' : type,
-            'actor' : actor,
-        }))
-
-
     # Not used. @TODO Deprecate or reimplement depending on requirements.
     #def _ensure_single_subject_rdf(self, gr, add_fragment=True):
     #    '''
@@ -1017,17 +944,18 @@ class Ldpr(metaclass=ABCMeta):
         self._modify_rsrc(self.RES_UPDATED, add_trp=add_trp)
 
 
-    def _send_event_msg(self, remove_trp, add_trp, metadata):
-        '''
-        Break down delta triples, find subjects and send event message.
-        '''
-        remove_grp = groupby(remove_trp, lambda x : x[0])
-        remove_dict = { k[0] : k[1] for k in remove_grp }
+    # @TODO reenable at request level.
+    #def _send_event_msg(self, remove_trp, add_trp, metadata):
+    #    '''
+    #    Break down delta triples, find subjects and send event message.
+    #    '''
+    #    remove_grp = groupby(remove_trp, lambda x : x[0])
+    #    remove_dict = { k[0] : k[1] for k in remove_grp }
 
-        add_grp = groupby(add_trp, lambda x : x[0])
-        add_dict = { k[0] : k[1] for k in add_grp }
+    #    add_grp = groupby(add_trp, lambda x : x[0])
+    #    add_dict = { k[0] : k[1] for k in add_grp }
 
-        subjects = set(remove_dict.keys()) | set(add_dict.keys())
-        for rsrc_uri in subjects:
-            self._logger.info('subject: {}'.format(rsrc_uri))
-            #current_app.messenger.send
+    #    subjects = set(remove_dict.keys()) | set(add_dict.keys())
+    #    for rsrc_uri in subjects:
+    #        self._logger.info('subject: {}'.format(rsrc_uri))
+    #        #current_app.messenger.send

+ 34 - 0
lakesuperior/store_layouts/ldp_rs/lmdb_connector.py

@@ -0,0 +1,34 @@
+import logging
+
+from rdflib import Dataset, plugin
+from rdflib.store import Store
+from rdflib.term import URIRef
+from rdflib.plugins.stores.sparqlstore import SPARQLStore, SPARQLUpdateStore
+from SPARQLWrapper.Wrapper import POST
+
+from lakesuperior.dictionaries.namespaces import ns_collection as nsc
+from lakesuperior.store_layouts.ldp_rs.base_connector import BaseConnector
+
+Lmdb = plugin.register('Lmdb', Store,
+        'lakesuperior.store_layouts.ldp_rs.lmdb_store', 'LmdbStore')
+
+class LmdbConnector(BaseConnector):
+    '''
+    Handles the connection with a LMDB store.
+    '''
+
+    _logger = logging.getLogger(__name__)
+
+    def _init_connection(self, location):
+        '''
+        Initialize the connection to the LMDB store and open it.
+        '''
+        self.store = plugin.get('Lmdb', Store)(location)
+        self.ds = Dataset(self.store)
+
+
+    def __del__(self):
+        '''
+        Close store connection.
+        '''
+        self.ds.close(commit_pending_transaction=False)

+ 142 - 81
lakesuperior/store_layouts/ldp_rs/lmdb_store.py

@@ -9,7 +9,7 @@ from urllib.request import pathname2url
 import lmdb
 
 from rdflib.store import Store, VALID_STORE, NO_STORE
-from rdflib import Namespace, URIRef
+from rdflib import Graph, Namespace, URIRef
 
 
 logger = logging.getLogger(__name__)
@@ -61,14 +61,14 @@ class TxnManager(ContextDecorator):
         self.write = write
 
     def __enter__(self):
-        self.txn = self.store.begin(write=self.write)
+        self._txn = self.store.begin(write=self.write)
 
     def __exit__(self, exc_type, exc_value, traceback):
         if exc_type:
             self.store.rollback()
         else:
             self.store.commit()
-        return True
+        #return True
 
 
 class LmdbStore(Store):
@@ -139,11 +139,13 @@ class LmdbStore(Store):
     db = None
     dbs = {}
     txn = None
+    is_txn_rw = None
 
 
     def __init__(self, path, identifier=None):
         self.__open = False
-        self.identifier = identifier
+
+        self.identifier = identifier or URIRef(pathname2url(abspath(path)))
         super(LmdbStore, self).__init__(path)
 
         self._pickle = self.node_pickler.dumps
@@ -167,9 +169,6 @@ class LmdbStore(Store):
         This method is called outside of the main transaction. All cursors
         are created separately within the transaction.
         '''
-        if self.identifier is None:
-            self.identifier = URIRef(pathname2url(abspath(path)))
-
         self._init_db_environment(path, create)
         if self.db_env == NO_STORE:
             return NO_STORE
@@ -185,6 +184,7 @@ class LmdbStore(Store):
         if not self.is_open:
             raise RuntimeError('Store must be opened first.')
         self.txn = self.db_env.begin(write=write, buffers=True)
+        self.is_txn_rw = write
         # Cursors.
         self.curs = self.get_data_cursors(self.txn)
         self.curs.update(self.get_idx_cursors(self.txn))
@@ -197,11 +197,11 @@ class LmdbStore(Store):
         '''
         try:
             self.txn.id()
-        except (lmdb.Error, AttributeError):
-            logger.info('Main transaction does not exist or is closed.')
+        except (lmdb.Error, AttributeError) as e:
+            #logger.info('Main transaction does not exist or is closed.')
             return False
         else:
-            logger.info('Main transaction is open.')
+            #logger.info('Main transaction is open.')
             return True
 
 
@@ -253,13 +253,26 @@ class LmdbStore(Store):
         self.db_env.close()
 
 
-    def add(self, triple, context=None):
+    def destroy(self, path):
+        '''
+        Destroy the store.
+
+        https://www.youtube.com/watch?v=lIVq7FJnPwg
+
+        @param path (string) Path of the folder containing the database(s).
+        '''
+        if exists(path):
+            rmtree(path)
+
+
+    def add(self, triple, context=None, quoted=False):
         '''
         Add a triple and start indexing.
 
         @param triple (tuple:rdflib.Identifier) Tuple of three identifiers.
         @param context (rdflib.Identifier | None) Context identifier.
         'None' inserts in the default graph.
+        @param quoted (bool) Not used.
         '''
         assert context != self, "Can not add triple directly to store"
         Store.add(self, triple, context)
@@ -267,8 +280,8 @@ class LmdbStore(Store):
         if self.DEFAULT_UNION:
             raise NotImplementedError()
             # @TODO
-        else:
-            context = context or self.DEFAULT_GRAPH_URI
+        elif context is None:
+            context = self.DEFAULT_GRAPH_URI
         pk_trp = self._pickle(triple)
         trp_key = hashlib.new(self.KEY_HASH_ALGO, pk_trp).digest()
 
@@ -276,14 +289,16 @@ class LmdbStore(Store):
         if self.curs['tk:t'].put(trp_key, pk_trp, overwrite=False):
             needs_indexing = True
 
-        pk_ctx = self._pickle(context)
+        pk_ctx = self._pickle(context.identifier) \
+                if isinstance(context, Graph) \
+                else self._pickle(context)
         if not self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
             self.curs['tk:c'].put(trp_key, pk_ctx)
             needs_indexing = True
 
         if needs_indexing:
-            # @TODO make await
-            self._update_indices(triple, trp_key, pk_ctx)
+            # @TODO make await; run outside of this txn
+            self._update_indices(trp_key, pk_ctx, triple=triple)
 
 
     def remove(self, triple_pattern, context=None):
@@ -293,14 +308,17 @@ class LmdbStore(Store):
         if self.DEFAULT_UNION:
             raise NotImplementedError()
             # @TODO
-        else:
-            context = context or self.DEFAULT_GRAPH_URI
-        pk_ctx = self._pickle(context)
-        for trp in self.triples(triple_pattern, context):
-            trp_key = self._to_key(trp)
-
+        elif context is None:
+            context = self.DEFAULT_GRAPH_URI
+
+        #import pdb; pdb.set_trace()
+        pk_ctx = self._pickle(context.identifier) \
+                if isinstance(context, Graph) \
+                else self._pickle(context)
+        for trp_key in self._triple_keys(triple_pattern, context):
             # Delete context association.
             if self.curs['tk:c'].set_key_dup(trp_key, pk_ctx):
+                triple = self._key_to_triple(trp_key)
                 self.curs['tk:c'].delete()
 
                 # If no other contexts are associated w/ the triple, delete it.
@@ -308,21 +326,27 @@ class LmdbStore(Store):
                         self.curs['tk:t'].set_key(trp_key)):
                     self.curs['tk:t'].delete()
 
-                # @TODO make await
-                self._update_indices(trp, trp_key, pk_ctx)
+                # @TODO make await; run outside of this txn
+                #import pdb; pdb.set_trace()
+                self._update_indices(trp_key, pk_ctx, triple)
 
 
     # @TODO Make async
-    def _update_indices(self, triple, trp_key, pk_ctx):
+    def _update_indices(self, trp_key, pk_ctx, triple=None):
         '''
         Update indices for a given triple.
 
         If the triple is found, add indices. if it is not found, delete them.
 
-        @param triple (tuple: rdflib.Identifier) Tuple of 3 RDFLib terms.
         @param key (bytes) Unique key associated with the triple.
         @param pk_ctx (bytes) Pickled context term.
+        @param triple (tuple: rdflib.Identifier) Tuple of 3 RDFLib terms.
+        This can be provided if already pre-calculated, otherwise it will be
+        retrieved from the store using `trp_key`.
         '''
+        if triple is None:
+            triple = self._key_to_triple(trp_key)
+
         s, p, o = triple
         term_keys = {
             'sk:tk': self._to_key(s),
@@ -353,61 +377,25 @@ class LmdbStore(Store):
     def triples(self, triple_pattern, context=None):
         '''
         Generator over matching triples.
-        '''
-        if context == self:
-            context = None
-
-        if self.DEFAULT_UNION:
-            raise NotImplementedError()
-            # In theory, this is what should happen:
-            #if context == self.DEFAULT_GRAPH_URI
-            #    # Any pattern with unbound context
-            #    for tk in self._lookup(triple_pattern, tkey):
-            #        yield self._key_to_triple(tk)
-            #    return
-        else:
-            context = context or self.DEFAULT_GRAPH_URI
-
-        tkey = self._to_key(triple_pattern)
 
-        # Shortcuts
-        pk_ctx = self._pickle(context)
-        if not self.curs['c:tk'].set_key(pk_ctx):
-            # Context not found.
-            return iter(())
-
-        # s p o c
-        if all(triple_pattern):
-            if self.curs['tk:c'].set_key_dup(tkey, pk_ctx):
-                yield self._key_to_triple(tkey)
-                return
-            else:
-                # Triple not found.
-                return iter(())
-
-        # ? ? ? c
-        elif not any(triple_pattern):
-            # Get all triples from the context
-            for tk in self.curs['c:tk'].iternext_dup():
-                yield self._key_to_triple(tk)
-
-        # Regular lookup.
-        else:
-            for tk in self._lookup(triple_pattern, tkey):
-                if self.curs['c:tk'].set_key_dup(pk_ctx, tk):
-                    yield self._key_to_triple(tk)
+        @param triple_pattern (tuple) 3 RDFLib terms
+        @param context (rdflib.Graph | None) Context graph, if available.
+        If a graph is given, only its identifier is stored.
+        '''
+        for tk in self._triple_keys(triple_pattern, context):
+            yield self._key_to_triple(tk), context
 
 
     def __len__(self, context=None):
         '''
         Return length of the dataset.
         '''
-        if context == self:
-            context = None
-        context = context or self.DEFAULT_GRAPH_URI
+        if context == self or context is None:
+            context = Graph(identifier=self.DEFAULT_GRAPH_URI)
 
-        if context is not self.DEFAULT_GRAPH_URI:
-            dataset = self.triples((None, None, None), context)
+        if context.identifier is not self.DEFAULT_GRAPH_URI:
+            #dataset = self.triples((None, None, None), context)
+            dataset = (tk for tk in self.curs['c:tk'].iternext_dup())
             return len(set(dataset))
         else:
             return self.txn.stat(self.dbs['tk:t'])['entries']
@@ -474,10 +462,22 @@ class LmdbStore(Store):
         '''
         Add a graph to the database.
 
+        This may be called by supposedly read-only operations:
+        https://github.com/RDFLib/rdflib/blob/master/rdflib/graph.py#L1623
+        Therefore it needs to open a write transaction. This is not ideal
+        but the only way to play well with RDFLib.
+
         @param graph (URIRef) URI of the named graph to add.
         '''
-        self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
-        self.curs['c:tk'].put(self._pickle(graph), self._pickle(None))
+        if not self.is_txn_rw:
+            with self.db_env.begin(write=True) as txn:
+                with txn.cursor(self.dbs['tk:c']) as tk2c_cur:
+                    tk2c_cur.put(self._pickle(None), self._pickle(graph))
+                with txn.cursor(self.dbs['c:tk']) as c2tk_cur:
+                    c2tk_cur.put(self._pickle(graph), self._pickle(None))
+        else:
+            self.curs['tk:c'].put(self._pickle(None), self._pickle(graph))
+            self.curs['c:tk'].put(self._pickle(graph), self._pickle(None))
 
 
     def remove_graph(self, graph):
@@ -499,18 +499,20 @@ class LmdbStore(Store):
 
     def commit(self):
         '''
-        Commit main write transaction.
+        Commit main transaction.
         '''
-        self.txn.commit()
-        self.txn = None
+        if self.is_txn_open:
+            self.txn.commit()
+        self.txn = self.is_txn_rw = None
 
 
     def rollback(self):
         '''
-        Roll back main write transaction.
+        Roll back main transaction.
         '''
-        self.txn.abort()
-        self.txn = None
+        if self.is_txn_open:
+            self.txn.abort()
+        self.txn = self.is_txn_rw = None
 
 
     #def _next_lex_key(self, db=None):
@@ -542,6 +544,65 @@ class LmdbStore(Store):
 
     ## PRIVATE METHODS ##
 
+    def _triple_keys(self, triple_pattern, context=None):
+        '''
+        Generator over matching triple keys.
+
+        This method is used by `triples` which returns native Python tuples,
+        as well as by other methods that need to iterate and filter triple
+        keys without incurring in the overhead of converting them to triples.
+
+        @param triple_pattern (tuple) 3 RDFLib terms
+        @param context (rdflib.Graph | None) Context graph, if available.
+        If a graph is given, only its identifier is stored.
+        '''
+        if context == self:
+            context = None
+
+        if self.DEFAULT_UNION:
+            raise NotImplementedError()
+            # In theory, this is what should happen:
+            #if context == self.DEFAULT_GRAPH_URI
+            #    # Any pattern with unbound context
+            #    for tk in self._lookup(triple_pattern, tkey):
+            #        yield self._key_to_triple(tk)
+            #    return
+        elif context is None:
+            context = self.DEFAULT_GRAPH_URI
+
+        tkey = self._to_key(triple_pattern)
+
+        # Shortcuts
+        pk_ctx = self._pickle(context.identifier) \
+                if isinstance(context, Graph) \
+                else self._pickle(context)
+        if not self.curs['c:tk'].set_key(pk_ctx):
+            # Context not found.
+            return iter(())
+
+        # s p o c
+        if all(triple_pattern):
+            if self.curs['tk:c'].set_key_dup(tkey, pk_ctx):
+                yield tkey
+                return
+            else:
+                # Triple not found.
+                return iter(())
+
+        # ? ? ? c
+        elif not any(triple_pattern):
+            # Get all triples from the context
+            for tk in self.curs['c:tk'].iternext_dup():
+                yield tk
+
+        # Regular lookup.
+        else:
+            for tk in self._lookup(triple_pattern, tkey):
+                if self.curs['c:tk'].set_key_dup(pk_ctx, tk):
+                    yield tk
+            return
+
+
     def _init_db_environment(self, path, create=True):
         '''
         Initialize the DB environment.

+ 0 - 1
lakesuperior/store_layouts/ldp_rs/rsrc_centric_layout.py

@@ -185,7 +185,6 @@ class RsrcCentricLayout:
             self.ds.update(f.read())
 
         self.ds.store.commit()
-        self.ds.store.close()
 
 
     def get_raw(self, uri, ctx):

+ 79 - 0
tests/store/test_lmdb_store.py

@@ -3,6 +3,7 @@ import pytest
 from shutil import rmtree
 
 from rdflib import Namespace, URIRef
+from rdflib.namespace import RDF, RDFS
 
 from lakesuperior.store_layouts.ldp_rs.lmdb_store import LmdbStore, TxnManager
 
@@ -290,3 +291,81 @@ class TestContext:
             assert len(set(store.triples((None, None, None)))) == 0
             assert len(set(store.triples((None, None, None), gr_uri))) == 2
             assert len(set(store.triples((None, None, None), gr2_uri))) == 0
+
+
+@pytest.mark.usefixtures('store')
+class TestTransactions:
+    '''
+    Tests for transaction handling.
+    '''
+    # @TODO Test concurrent reads and writes.
+    pass
+
+
+#@pytest.mark.usefixtures('store')
+#class TestRdflib:
+#    '''
+#    Test case adapted from
+#    http://rdflib.readthedocs.io/en/stable/univrdfstore.html#interface-test-cases
+#    '''
+#
+#    @pytest.fixture
+#    def sample_gr(self):
+#        return Graph().parse('''
+#        @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+#        @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+#        @prefix : <http://test/> .
+#        {:a :b :c; a :foo} => {:a :d :c} .
+#        _:foo a rdfs:Class .
+#        :a :d :c .
+#        ''', format='n3')
+#
+#    def _test_basic(self, sample_gr):
+#        with TxnManager as txn:
+#            implies = URIRef("http://www.w3.org/2000/10/swap/log#implies")
+#            a = URIRef('http://test/a')
+#            b = URIRef('http://test/b')
+#            c = URIRef('http://test/c')
+#            d = URIRef('http://test/d')
+#            for s,p,o in g.triples((None,implies,None)):
+#                formulaA = s
+#                formulaB = o
+#
+#                #contexts test
+#                assert len(list(g.contexts()))==3
+#
+#                #contexts (with triple) test
+#                assert len(list(g.contexts((a,d,c))))==2
+#
+#                #triples test cases
+#                assert type(list(g.triples(
+#                        (None,RDF.type,RDFS.Class)))[0][0]) == BNode
+#                assert len(list(g.triples((None,implies,None))))==1
+#                assert len(list(g.triples((None,RDF.type,None))))==3
+#                assert len(list(g.triples((None,RDF.type,None),formulaA)))==1
+#                assert len(list(g.triples((None,None,None),formulaA)))==2
+#                assert len(list(g.triples((None,None,None),formulaB)))==1
+#                assert len(list(g.triples((None,None,None))))==5
+#                assert len(list(g.triples(
+#                        (None,URIRef('http://test/d'),None),formulaB)))==1
+#                assert len(list(g.triples(
+#                        (None,URIRef('http://test/d'),None))))==1
+#
+#                #Remove test cases
+#                g.remove((None,implies,None))
+#                assert len(list(g.triples((None,implies,None))))==0
+#                assert len(list(g.triples((None,None,None),formulaA)))==2
+#                assert len(list(g.triples((None,None,None),formulaB)))==1
+#                g.remove((None,b,None),formulaA)
+#                assert len(list(g.triples((None,None,None),formulaA)))==1
+#                g.remove((None,RDF.type,None),formulaA)
+#                assert len(list(g.triples((None,None,None),formulaA)))==0
+#                g.remove((None,RDF.type,RDFS.Class))
+#
+#                #remove_context tests
+#                formulaBContext=Context(g,formulaB)
+#                g.remove_context(formulaB)
+#                assert len(list(g.triples((None,RDF.type,None))))==2
+#                assert len(g)==3 assert len(formulaBContext)==0
+#                g.remove((None,None,None))
+#                assert len(g)==0

+ 7 - 3
util/bootstrap.py

@@ -7,8 +7,7 @@ sys.path.append('.')
 
 from lakesuperior.app import create_app
 from lakesuperior.config_parser import config
-from lakesuperior.store_layouts.ldp_rs.bdb_connector import \
-        BdbConnector
+from lakesuperior.store_layouts.ldp_rs.lmdb_store import TxnManager
 from lakesuperior.model.ldpr import Ldpr
 
 __doc__ = '''
@@ -45,5 +44,10 @@ if __name__=='__main__':
         sys.exit()
 
     app = create_app(config['application'], config['logging'])
-    app.rdfly.bootstrap()
+    if hasattr(app.rdfly.store, 'begin'):
+        with TxnManager(app.rdfly.store, write=True) as txn:
+            app.rdfly.bootstrap()
+    else:
+        app.rdfly.bootstrap()
+
     bootstrap_binary_store(app)