test_2_1_admin_api.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import pdb
  2. import pytest
  3. from io import BytesIO
  4. from uuid import uuid4
  5. from rdflib import URIRef
  6. from lakesuperior import env
  7. from lakesuperior.api import resource as rsrc_api
  8. from lakesuperior.api import admin as admin_api
  9. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  10. from lakesuperior.exceptions import ChecksumValidationError
  11. from lakesuperior.model.rdf.graph import Graph, from_rdf
  12. @pytest.mark.usefixtures('db')
  13. class TestAdminApi:
  14. """
  15. Test admin operations.
  16. """
  17. def test_check_refint_ok(self):
  18. """
  19. Check that referential integrity is OK.
  20. """
  21. uid1 = '/test_refint1'
  22. uid2 = '/test_refint2'
  23. with env.app_globals.rdf_store.txn_ctx():
  24. gr = from_rdf(
  25. store=env.app_globals.rdf_store,
  26. data=f'<> <http://ex.org/ns#p1> <info:fcres{uid1}> .',
  27. format='turtle', publicID=nsc['fcres'][uid2]
  28. )
  29. rsrc_api.create_or_replace(uid1, graph=gr)
  30. assert admin_api.integrity_check() == set()
  31. def test_check_refint_corrupt(self):
  32. """
  33. Corrupt the data store and verify that the missing triple is detected.
  34. """
  35. brk_uid = '/test_refint1'
  36. brk_uri = nsc['fcres'][brk_uid]
  37. store = env.app_globals.rdf_store
  38. with store.txn_ctx(True):
  39. store.remove((URIRef('info:fcres/test_refint1'), None, None))
  40. check_res = admin_api.integrity_check()
  41. assert check_res != set()
  42. assert len(check_res) == 4
  43. check_trp = {trp[0] for trp in check_res}
  44. assert {trp[2] for trp in check_trp} == {brk_uri}
  45. assert (nsc['fcres']['/'], nsc['ldp'].contains, brk_uri) in check_trp
  46. assert (
  47. nsc['fcres']['/test_refint2'],
  48. URIRef('http://ex.org/ns#p1'), brk_uri) in check_trp
  49. def test_fixity_check_ok(self):
  50. """
  51. Verify that fixity check passes for a non-corrupted resource.
  52. """
  53. content = BytesIO(uuid4().bytes)
  54. uid = f'/{uuid4()}'
  55. rsrc_api.create_or_replace(uid, stream=content)
  56. admin_api.fixity_check(uid)
  57. def test_fixity_check_corrupt(self):
  58. """
  59. Verify that fixity check fails for a corrupted resource.
  60. """
  61. content = BytesIO(uuid4().bytes)
  62. uid = f'/{uuid4()}'
  63. _, rsrc = rsrc_api.create_or_replace(uid, stream=content)
  64. with env.app_globals.rdf_store.txn_ctx():
  65. with open(rsrc.local_path, 'wb') as fh:
  66. fh.write(uuid4().bytes)
  67. with pytest.raises(ChecksumValidationError):
  68. admin_api.fixity_check(uid)