Browse Source

Merge pull request #74 from scossu/rdf_digest

RDF digest
Stefano Cossu 6 years ago
parent
commit
a688c50eca

+ 31 - 10
lakesuperior/endpoints/ldp.py

@@ -26,6 +26,7 @@ from lakesuperior.model.ldp_nr import LdpNr
 from lakesuperior.model.ldp_rs import LdpRs
 from lakesuperior.model.ldpr import Ldpr
 from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
+from lakesuperior.store.ldp_rs.metadata_store import MetadataStore
 from lakesuperior.toolbox import Toolbox
 
 
@@ -233,7 +234,7 @@ def post_resource(parent_uid):
 
     Add a new resource in a new URI.
     """
-    out_headers = std_headers
+    rsp_headers = std_headers
     try:
         slug = request.headers['Slug']
         logger.debug('Slug: {}'.format(slug))
@@ -273,9 +274,10 @@ def post_resource(parent_uid):
         hdr['Link'] = '<{0}/fcr:metadata>; rel="describedby"; anchor="{0}"'\
                 .format(uri)
 
-    out_headers.update(hdr)
+    rsp_headers.update(hdr)
+    rsp_headers.update(_digest_headers(nsc['fcres'][uid]))
 
-    return uri, 201, out_headers
+    return uri, 201, rsp_headers
 
 
 @ldp.route('/<path:uid>', methods=['PUT'], strict_slashes=False)
@@ -328,6 +330,8 @@ def put_resource(uid):
     else:
         rsp_code = 204
         rsp_body = ''
+    rsp_headers.update(_digest_headers(nsc['fcres'][uid]))
+
     return rsp_body, rsp_code, rsp_headers
 
 
@@ -625,7 +629,7 @@ def _headers_from_metadata(rsrc, out_fmt='text/turtle'):
     :param lakesuperior.model.ldpr.Ldpr rsrc: Resource to extract metadata
         from.
     """
-    out_headers = defaultdict(list)
+    rsp_headers = defaultdict(list)
 
     digest = rsrc.metadata.value(rsrc.uri, nsc['premis'].hasMessageDigest)
     # Only add ETag and digest if output is not RDF.
@@ -638,21 +642,38 @@ def _headers_from_metadata(rsrc, out_fmt='text/turtle'):
                 'W/"{}"'.format(cksum_hex)
                 if nsc['ldp'].RDFSource in rsrc.ldp_types
                 else cksum_hex)
-        out_headers['ETag'] = etag_str,
-        out_headers['Digest'] = '{}={}'.format(
+        rsp_headers['ETag'] = etag_str,
+        rsp_headers['Digest'] = '{}={}'.format(
                 digest_algo.upper(), b64encode(cksum).decode('ascii'))
+    else:
+        rsp_headers.update(_digest_headers(rsrc.uri))
+
 
     last_updated_term = rsrc.metadata.value(nsc['fcrepo'].lastModified)
     if last_updated_term:
-        out_headers['Last-Modified'] = arrow.get(last_updated_term)\
+        rsp_headers['Last-Modified'] = arrow.get(last_updated_term)\
             .format('ddd, D MMM YYYY HH:mm:ss Z')
 
     for t in rsrc.ldp_types:
-        out_headers['Link'].append('{};rel="type"'.format(t.n3()))
+        rsp_headers['Link'].append('{};rel="type"'.format(t.n3()))
 
     mimetype = rsrc.metadata.value(nsc['ebucore'].hasMimeType)
     if mimetype:
-        out_headers['Content-Type'] = mimetype
+        rsp_headers['Content-Type'] = mimetype
+
+    return rsp_headers
+
+
+def _digest_headers(uri):
+    """
+    Get an LDP-RS resource digest and create header tags.
+
+    The ``Digest`` and ``ETag`` headers are created.
+    """
+    headers = {}
+    digest = MetadataStore().get_checksum(uri)
+    headers['Digest'] = 'SHA256={}'.format(b64encode(digest).decode('ascii'))
+    headers['ETag'] = 'W/{}'.format(digest.hex())
 
-    return out_headers
+    return headers
 

+ 70 - 0
lakesuperior/model/ldpr.py

@@ -3,12 +3,15 @@ import re
 
 from abc import ABCMeta
 from collections import defaultdict
+from hashlib import sha256
+from threading import Thread
 from urllib.parse import urldefrag
 from uuid import uuid4
 
 import arrow
 
 from rdflib import Graph, URIRef, Literal
+from rdflib.compare import to_isomorphic
 from rdflib.namespace import RDF
 
 from lakesuperior import env, thread_env
@@ -21,6 +24,7 @@ from lakesuperior.exceptions import (
     InvalidResourceError, RefIntViolationError, ResourceNotExistsError,
     ServerManagedTermError, TombstoneError)
 from lakesuperior.store.ldp_rs.rsrc_centric_layout import VERS_CONT_LABEL
+from lakesuperior.store.ldp_rs.metadata_store import MetadataStore
 from lakesuperior.toolbox import Toolbox
 
 
@@ -275,6 +279,50 @@ class Ldpr(metaclass=ABCMeta):
         return out_gr
 
 
+    @property
+    def canonical_graph(self):
+        """
+        "Canonical" representation of a resource.
+
+        TODO: There is no agreement yet on what a "canonical" representation
+        of an LDP resource should be. This is a PoC method that assumes such
+        representation to include all triples that would be retrieved with a
+        GET request to the resource, including the ones with a different
+        subject than the resource URI.
+
+        :rtype: rdflib.compare.IsomorphicGraph
+        """
+        # First verify that the instance IMR options correspond to the
+        # "canonical" representation.
+        if (
+                hasattr(self, '_imr_options')
+                and self._imr_options.get('incl_srv_mgd')
+                and not self._imr_options.get('incl_inbound')
+                and self._imr_options.get('incl_children')):
+            gr = self.imr
+        else:
+            gr = rdfly.get_imr(
+                    self.uid, incl_inbound=False, incl_children=True)
+        return to_isomorphic(gr)
+
+
+    @property
+    def rsrc_digest(self):
+        """
+        Cryptographic digest (SHA256) of a resource.
+
+        :rtype: bytes
+        """
+        # This RDFLib function seems to be based on an in-depth study of the
+        # topic of graph checksums; however the output is odd because it
+        # returns an arbitrarily long int that cannot be converted to bytes.
+        # The output is being converted to a proper # SHA256 checksum. This is
+        # a temporary fix. See https://github.com/RDFLib/rdflib/issues/825
+        checksum = self.canonical_graph.graph_digest()
+
+        return sha256(str(checksum).encode('ascii')).digest()
+
+
     @property
     def version_info(self):
         """
@@ -706,9 +754,17 @@ class Ldpr(metaclass=ABCMeta):
             delete) or None. In the latter case, no notification is sent.
         :type ev_type: str or None
         :param set remove_trp: Triples to be removed.
+            # Add metadata.
         :param set add_trp: Triples to be added.
         """
         rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
+
+        # Calculate checksum (asynchronously).
+        cksum_action = (
+                self._delete_checksum if ev_type == RES_DELETED
+                else self._update_checksum)
+        Thread(target=cksum_action).run()
+
         # Clear IMR buffer.
         if hasattr(self, '_imr'):
             delattr(self, '_imr')
@@ -724,6 +780,20 @@ class Ldpr(metaclass=ABCMeta):
             self._enqueue_msg(ev_type, remove_trp, add_trp)
 
 
+    def _update_checksum(self):
+        """
+        Save the resource checksum in a dedicated metadata store.
+        """
+        MetadataStore().update_checksum(self.uri, self.rsrc_digest)
+
+
+    def _delete_checksum(self):
+        """
+        Delete the resource checksum from the metadata store.
+        """
+        MetadataStore().delete_checksum(self.uri)
+
+
     def _enqueue_msg(self, ev_type, remove_trp=None, add_trp=None):
         """
         Compose a message about a resource change.

+ 184 - 0
lakesuperior/store/base_lmdb_store.py

@@ -0,0 +1,184 @@
+import hashlib
+
+from abc import ABCMeta, abstractmethod
+from contextlib import contextmanager
+from os import makedirs, path
+
+import lmdb
+
+from lakesuperior import env
+
+
+class BaseLmdbStore(metaclass=ABCMeta):
+    """
+    Generic LMDB store abstract class.
+
+    This class contains convenience method to create an LMDB store for any
+    purpose and provides some convenience methods to wrap cursors and
+    transactions into contexts.
+
+    This interface can be subclassed for specific storage back ends. It is
+    *not* used for :py:class:`~lakesuperior.store.ldp_rs.lmdb_store.LmdbStore`
+    which has a more complex lifecycle and setup.
+
+    Example usage::
+
+        >>> class MyStore(BaseLmdbStore):
+        ...     path = '/base/store/path'
+        ...     db_labels = ('db1', 'db2')
+        ...
+        >>> ms = MyStore()
+        >>> # "with" wraps the operation in a transaction.
+        >>> with ms.cur(index='db1', write=True):
+        ...     cur.put(b'key1', b'val1')
+        True
+
+    """
+
+    path = None
+    """
+    Filesystem path where the database environment is stored.
+
+    This is a mandatory value for implementations.
+
+    :rtype: str
+    """
+
+    db_labels = None
+    """
+    List of databases in the DB environment by label.
+
+    If the environment has only one database, do not override this value (i.e.
+    leave it to ``None``).
+
+    :rtype: tuple(str)
+    """
+
+
+    options = {}
+    """
+    LMDB environment option overrides. Setting this is not required.
+
+    See `LMDB documentation
+    <http://lmdb.readthedocs.io/en/release/#environment-class`_ for details
+    on available options.
+
+    Default values are available for the following options:
+
+    - ``map_size``: 1 Gib
+    - ``max_dbs``: dependent on the number of DBs defined in
+      :py:meth:``db_labels``. Only override if necessary.
+    - ``max_spare_txns``: dependent on the number of threads, if accessed via
+      WSGI, or ``1`` otherwise. Only override if necessary.
+
+    :rtype: dict
+    """
+
+    def __init__(self, create=True):
+        """
+        Initialize DB environment and databases.
+        """
+        if not path.exists(self.path) and create is True:
+            try:
+                makedirs(self.path)
+            except Exception as e:
+                raise IOError(
+                    'Could not create the database at {}. Error: {}'.format(
+                        self.path, e))
+
+        options = self.options
+
+        if not options.get('max_dbs'):
+            options['max_dbs'] = len(self.db_labels)
+
+        if options.get('max_spare_txns', False):
+            options['max_spare_txns'] = (
+                    env.wsgi_options['workers']
+                    if getattr(env, 'wsgi_options', False)
+                    else 1)
+            logger.info('Max LMDB readers: {}'.format(
+                    options['max_spare_txns']))
+
+        self._dbenv = lmdb.open(self.path, **options)
+
+        if self.db_labels is not None:
+            self._dbs = {
+                label: self._dbenv.open_db(
+                    label.encode('ascii'), create=create)
+                for label in self.db_labels}
+
+
+    @property
+    def dbenv(self):
+        """
+        LMDB environment handler.
+
+        :rtype: :py:class:`lmdb.Environment`
+        """
+        return self._dbenv
+
+
+    @property
+    def dbs(self):
+        """
+        List of databases in the environment, as LMDB handles.
+
+        These handles can be used to begin transactions.
+
+        :rtype: tuple
+        """
+        return self._dbs
+
+
+    @contextmanager
+    def txn(self, write=False):
+        """
+        Transaction context manager.
+
+        :param bool write: Whether a write transaction is to be opened.
+
+        :rtype: lmdb.Transaction
+        """
+        try:
+            txn = self.dbenv.begin(write=write)
+            yield txn
+            txn.commit()
+        except:
+            txn.abort()
+            raise
+        finally:
+            txn = None
+
+
+    @contextmanager
+    def cur(self, index=None, txn=None, write=False):
+        """
+        Handle a cursor on a database by its index as a context manager.
+
+        An existing transaction can be used, otherwise a new one will be
+        automatically opened and closed within the cursor context.
+
+        :param str index: The database index. If not specified, a cursor is
+            opened for the main database environment.
+        :param lmdb.Transaction txn: Existing transaction to use. If not
+            specified, a new transaction will be opened.
+        :param bool write: Whether a write transaction is to be opened. Only
+            meaningful if ``txn`` is ``None``.
+
+        :rtype: lmdb.Cursor
+        """
+        db = None if index is None else self.dbs[index]
+
+        if txn is None:
+            with self.txn(write=write) as _txn:
+                cur = _txn.cursor(db)
+                yield cur
+                cur.close()
+        else:
+            try:
+                cur = txn.cursor(db)
+                yield cur
+            finally:
+                if cur:
+                    cur.close()
+                    cur = None

+ 5 - 4
lakesuperior/store/ldp_rs/lmdb_store.py

@@ -37,14 +37,15 @@ class TxnManager(ContextDecorator):
     """
     Handle ACID transactions with an LmdbStore.
 
-    Wrap this within a ``with`` statement:
+    Wrap this within a ``with`` statement::
 
-    >>> with TxnManager(store, True):
-    ...     # Do something with the database
-    >>>
+        >>> with TxnManager(store, True):
+        ...     # Do something with the database
+        >>>
 
     The transaction will be opened and handled automatically.
     """
+
     def __init__(self, store, write=False):
         """
         Begin and close a transaction in a store.

+ 62 - 0
lakesuperior/store/ldp_rs/metadata_store.py

@@ -0,0 +1,62 @@
+from os import path
+
+from lakesuperior.store.base_lmdb_store import BaseLmdbStore
+
+from lakesuperior import env
+
+
+class MetadataStore(BaseLmdbStore):
+    """
+    LMDB store for RDF metadata.
+
+    Note that even though this store connector uses LMDB as the
+    :py::class:`LmdbStore` class, it is separate because it is not part of the
+    RDFLib store implementation and carries higher-level concepts such as LDP
+    resource URIs.
+    """
+
+    db_labels = ('checksums',)
+    """
+    At the moment only ``checksums`` is implemented. It is a registry of
+    LDP resource graphs, indicated in the key by their UID, and their
+    cryptographic hashes.
+    """
+
+    path = path.join(
+        env.app_globals.config['application']['store']['ldp_rs']['location'],
+        'metadata')
+
+
+    def get_checksum(self, uri):
+        """
+        Get the checksum of a resource.
+
+        :param str uri: Resource URI (``info:fcres...``).
+        :rtype: bytes
+        """
+        with self.cur(index='checksums') as cur:
+            cksum = cur.get(uri.encode('utf-8'))
+
+        return cksum
+
+
+    def update_checksum(self, uri, cksum):
+        """
+        Update the stored checksum of a resource.
+
+        :param str uri: Resource URI (``info:fcres...``).
+        :param bytes cksum: Checksum bytestring.
+        """
+        with self.cur(index='checksums', write=True) as cur:
+            cur.put(uri.encode('utf-8'), cksum)
+
+
+    def delete_checksum(self, uri):
+        """
+        Delete the stored checksum of a resource.
+
+        :param str uri: Resource URI (``info:fcres...``).
+        """
+        with self.cur(index='checksums', write=True) as cur:
+            if cur.set_key(uri.encode('utf-8')):
+                cur.delete()

+ 11 - 1
lakesuperior/store/ldp_rs/rsrc_centric_layout.py

@@ -1,6 +1,7 @@
 import logging
 
 from collections import defaultdict
+from hashlib import sha256
 from itertools import chain
 from os import path
 from string import Template
@@ -9,6 +10,7 @@ from urllib.parse import urldefrag
 import arrow
 
 from rdflib import Dataset, Graph, Literal, URIRef, plugin
+from rdflib.compare import to_isomorphic
 from rdflib.namespace import RDF
 from rdflib.query import ResultException
 from rdflib.resource import Resource
@@ -19,6 +21,7 @@ from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.dictionaries.namespaces import ns_mgr as nsm
 from lakesuperior.dictionaries.srv_mgd_terms import  srv_mgd_subjects, \
         srv_mgd_predicates, srv_mgd_types
+from lakesuperior.globals import ROOT_RSRC_URI
 from lakesuperior.exceptions import (InvalidResourceError,
         ResourceNotExistsError, TombstoneError, PathSegmentError)
 from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
@@ -197,6 +200,8 @@ class RsrcCentricLayout:
         """
         Delete all graphs and insert the basic triples.
         """
+        from lakesuperior.store.ldp_rs.metadata_store import MetadataStore
+
         logger.info('Deleting all data from the graph store.')
         store = self.ds.store
         if getattr(store, 'is_txn_open', False):
@@ -211,6 +216,11 @@ class RsrcCentricLayout:
             with open(fname, 'r') as f:
                 data = Template(f.read())
                 self.ds.update(data.substitute(timestamp=arrow.utcnow()))
+            gr = self.get_imr('/', incl_inbound=False, incl_children=True)
+
+        checksum = to_isomorphic(gr).graph_digest()
+        digest = sha256(str(checksum).encode('ascii')).digest()
+        MetadataStore().update_checksum(ROOT_RSRC_URI, digest)
 
 
     def get_raw(self, uri, ctx=None):
@@ -251,7 +261,7 @@ class RsrcCentricLayout:
 
     def get_imr(
                 self, uid, ver_uid=None, strict=True, incl_inbound=False,
-                incl_children=True, embed_children=False, **kwargs):
+                incl_children=True, **kwargs):
         """
         See base_rdf_layout.get_imr.
         """

+ 34 - 0
tests/api/test_resource_api.py

@@ -13,6 +13,7 @@ from lakesuperior.exceptions import (
         TombstoneError)
 from lakesuperior.globals import RES_CREATED, RES_UPDATED
 from lakesuperior.model.ldpr import Ldpr
+from lakesuperior.store.ldp_rs.metadata_store import MetadataStore
 
 
 @pytest.fixture(scope='module')
@@ -456,6 +457,39 @@ class TestResourceCRUD:
                 rsrc_api.resurrect('{}/child{}'.format(uid, i))
 
 
+    def test_checksum(self):
+        """
+        Verify that a checksum is created and updated appropriately.
+        """
+        mds = MetadataStore()
+        root_cksum1 = mds.get_checksum(nsc['fcres']['/'])
+        uid = '/test_checksum'
+        rsrc_api.create_or_replace(uid)
+
+        mds = MetadataStore()
+        root_cksum2 = mds.get_checksum(nsc['fcres']['/'])
+        cksum1 = mds.get_checksum(nsc['fcres'][uid])
+
+        assert len(cksum1)
+        assert root_cksum1 != root_cksum2
+
+        rsrc_api.update(
+                uid,
+                'DELETE {} INSERT {<> a <http://ex.org/ns#Hello> .} WHERE {}')
+
+        mds = MetadataStore()
+        cksum2 = mds.get_checksum(nsc['fcres'][uid])
+
+        assert cksum1 != cksum2
+
+        rsrc_api.delete(uid)
+
+        mds = MetadataStore()
+        cksum3 = mds.get_checksum(nsc['fcres'][uid])
+
+        assert cksum3 is None
+
+
 
 @pytest.mark.usefixtures('db')
 class TestResourceVersioning:

+ 62 - 0
tests/endpoints/test_ldp.py

@@ -2,6 +2,7 @@ import pdb
 import pytest
 import uuid
 
+from base64 import b64encode
 from hashlib import sha1
 
 from flask import g
@@ -874,6 +875,67 @@ class TestPrefHeader:
 
 
 
+@pytest.mark.usefixtures('client_class')
+@pytest.mark.usefixtures('db')
+class TestDigest:
+    """
+    Test digest and ETag handling.
+    """
+    def test_digest_post(self):
+        """
+        Test ``Digest`` and ``ETag`` headers on resource POST.
+        """
+        resp = self.client.post('/ldp/')
+        assert 'Digest' in resp.headers
+        assert 'ETag' in resp.headers
+        assert (
+                b64encode(bytes.fromhex(
+                    resp.headers['ETag'].replace('W/', '')
+                    )).decode('ascii') ==
+                resp.headers['Digest'].replace('SHA256=', ''))
+
+
+    def test_digest_put(self):
+        """
+        Test ``Digest`` and ``ETag`` headers on resource PUT.
+        """
+        resp_put = self.client.put('/ldp/test_digest_put')
+        assert 'Digest' in resp_put.headers
+        assert 'ETag' in resp_put.headers
+        assert (
+                b64encode(bytes.fromhex(
+                    resp_put.headers['ETag'].replace('W/', '')
+                    )).decode('ascii') ==
+                resp_put.headers['Digest'].replace('SHA256=', ''))
+
+        resp_get = self.client.get('/ldp/test_digest_put')
+        assert 'Digest' in resp_get.headers
+        assert 'ETag' in resp_get.headers
+        assert (
+                b64encode(bytes.fromhex(
+                    resp_get.headers['ETag'].replace('W/', '')
+                    )).decode('ascii') ==
+                resp_get.headers['Digest'].replace('SHA256=', ''))
+
+
+    def test_digest_patch(self):
+        """
+        Verify that the digest and ETag change on resource change.
+        """
+        path = '/ldp/test_digest_patch'
+        self.client.put(path)
+        rsp1 = self.client.get(path)
+
+        self.client.patch(
+                path, data=b'DELETE {} INSERT {<> a <http://ex.org/Test> .} '
+                b'WHERE {}',
+                headers={'Content-Type': 'application/sparql-update'})
+        rsp2 = self.client.get(path)
+
+        assert rsp1.headers['ETag'] != rsp2.headers['ETag']
+        assert rsp1.headers['Digest'] != rsp2.headers['Digest']
+
+
 @pytest.mark.usefixtures('client_class')
 @pytest.mark.usefixtures('db')
 class TestVersion:

+ 81 - 0
tests/store/test_metadata_store.py

@@ -0,0 +1,81 @@
+import pytest
+
+from hashlib import sha256
+
+from lakesuperior.store.ldp_rs.metadata_store import MetadataStore
+
+
+class TestMetadataStore:
+    """
+    Tests for the LMDB Metadata store.
+    """
+    def test_put_checksum(self):
+        """
+        Put a new checksum.
+        """
+        uri = 'info:fcres/test_checksum'
+        cksum = sha256(b'Bogus content')
+        mds = MetadataStore()
+        with mds.cur(index='checksums', write=True) as cur:
+            cur.put(uri.encode('utf-8'), cksum.digest())
+
+        with mds.cur(index='checksums') as cur:
+            assert cur.get(uri.encode('utf-8')) == cksum.digest()
+
+
+    def test_separate_txn(self):
+        """
+        Open a transaction and put a new checksum.
+
+        Same as test_put_checksum but wrapping the cursor in a separate
+        transaction. This is really to test the base store which is an abstract
+        class.
+        """
+        uri = 'info:fcres/test_checksum_separate'
+        cksum = sha256(b'More bogus content.')
+        mds = MetadataStore()
+        with mds.txn(True) as txn:
+            with mds.cur(index='checksums', txn=txn) as cur:
+                cur.put(uri.encode('utf-8'), cksum.digest())
+
+        with mds.txn() as txn:
+            with mds.cur(index='checksums', txn=txn) as cur:
+                assert cur.get(uri.encode('utf-8')) == cksum.digest()
+
+
+    def test_exception(self):
+        """
+        Test exceptions within cursor and transaction contexts.
+        """
+        uri = 'info:fcres/test_checksum_exception'
+        cksum = sha256(b'More bogus content.')
+        mds = MetadataStore()
+
+        class CustomError(Exception):
+            pass
+
+        with pytest.raises(CustomError):
+            with mds.txn() as txn:
+                raise CustomError()
+
+        with pytest.raises(CustomError):
+            with mds.txn() as txn:
+                with mds.cur(index='checksums', txn=txn) as cur:
+                    raise CustomError()
+
+        with pytest.raises(CustomError):
+            with mds.cur(index='checksums') as cur:
+                raise CustomError()
+
+        with pytest.raises(CustomError):
+            with mds.txn(write=True) as txn:
+                raise CustomError()
+
+        with pytest.raises(CustomError):
+            with mds.txn(write=True) as txn:
+                with mds.cur(index='checksums', txn=txn) as cur:
+                    raise CustomError()
+
+        with pytest.raises(CustomError):
+            with mds.cur(index='checksums', write=True) as cur:
+                raise CustomError()