|
@@ -1,12 +1,16 @@
|
|
import pdb
|
|
import pdb
|
|
import pytest
|
|
import pytest
|
|
|
|
|
|
|
|
+from io import BytesIO
|
|
|
|
+from uuid import uuid4
|
|
|
|
+
|
|
from rdflib import Graph, URIRef
|
|
from rdflib import Graph, URIRef
|
|
|
|
|
|
from lakesuperior import env
|
|
from lakesuperior import env
|
|
from lakesuperior.api import resource as rsrc_api
|
|
from lakesuperior.api import resource as rsrc_api
|
|
from lakesuperior.api import admin as admin_api
|
|
from lakesuperior.api import admin as admin_api
|
|
from lakesuperior.dictionaries.namespaces import ns_collection as nsc
|
|
from lakesuperior.dictionaries.namespaces import ns_collection as nsc
|
|
|
|
+from lakesuperior.exceptions import ChecksumValidationError
|
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures('db')
|
|
@pytest.mark.usefixtures('db')
|
|
@@ -48,7 +52,34 @@ class TestAdminApi:
|
|
assert {trp[2] for trp in check_trp} == {brk_uri}
|
|
assert {trp[2] for trp in check_trp} == {brk_uri}
|
|
assert (nsc['fcres']['/'], nsc['ldp'].contains, brk_uri) in check_trp
|
|
assert (nsc['fcres']['/'], nsc['ldp'].contains, brk_uri) in check_trp
|
|
assert (
|
|
assert (
|
|
- nsc['fcres']['/test_refint2'],
|
|
|
|
|
|
+ nsc['fcres']['/test_refint2'],
|
|
URIRef('http://ex.org/ns#p1'), brk_uri) in check_trp
|
|
URIRef('http://ex.org/ns#p1'), brk_uri) in check_trp
|
|
|
|
|
|
|
|
|
|
|
|
+ def test_fixity_check_ok(self):
|
|
|
|
+ """
|
|
|
|
+ Verify that fixity check passes for a non-corrupted resource.
|
|
|
|
+ """
|
|
|
|
+ content = BytesIO(uuid4().bytes)
|
|
|
|
+ uid = f'/{uuid4()}'
|
|
|
|
+
|
|
|
|
+ rsrc_api.create_or_replace(uid, stream=content)
|
|
|
|
+ admin_api.fixity_check(uid)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def test_fixity_check_corrupt(self):
|
|
|
|
+ """
|
|
|
|
+ Verify that fixity check fails for a corrupted resource.
|
|
|
|
+ """
|
|
|
|
+ content = BytesIO(uuid4().bytes)
|
|
|
|
+ uid = f'/{uuid4()}'
|
|
|
|
+
|
|
|
|
+ _, rsrc = rsrc_api.create_or_replace(uid, stream=content)
|
|
|
|
+
|
|
|
|
+ with open(rsrc.local_path, 'wb') as fh:
|
|
|
|
+ fh.write(uuid4().bytes)
|
|
|
|
+
|
|
|
|
+ with pytest.raises(ChecksumValidationError):
|
|
|
|
+ admin_api.fixity_check(uid)
|
|
|
|
+
|
|
|
|
+
|