test_3_1_admin.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import pytest
  2. from io import BytesIO
  3. from uuid import uuid4
  4. from lakesuperior import env
  5. from lakesuperior.api import resource as rsrc_api
  6. @pytest.mark.usefixtures('client_class')
  7. @pytest.mark.usefixtures('db')
  8. class TestAdminApi:
  9. """
  10. Test admin endpoint.
  11. """
  12. def test_fixity_check_ok(self):
  13. """
  14. Verify that fixity check passes for a non-corrupted resource.
  15. """
  16. uid = uuid4()
  17. content = uuid4().bytes
  18. path = f'/ldp/{uid}'
  19. fix_path = f'/admin/{uid}/fixity'
  20. self.client.put(
  21. path, data=content, headers={'content-type': 'text/plain'})
  22. rsp = self.client.get(fix_path)
  23. assert rsp.status_code == 200
  24. assert rsp.json['uid'] == f'/{uid}'
  25. assert rsp.json['pass'] == True
  26. def test_fixity_check_corrupt(self):
  27. """
  28. Verify that fixity check fails for a corrupted resource.
  29. """
  30. uid = uuid4()
  31. content = uuid4().bytes
  32. path = f'/ldp/{uid}'
  33. fix_path = f'/admin/{uid}/fixity'
  34. self.client.put(
  35. path, data=content, headers={'content-type': 'text/plain'})
  36. rsrc = rsrc_api.get(f'/{uid}')
  37. with env.app_globals.rdf_store.txn_ctx():
  38. fname = rsrc.local_path
  39. with open(fname, 'wb') as fh:
  40. fh.write(uuid4().bytes)
  41. rsp = self.client.get(fix_path)
  42. assert rsp.status_code == 200
  43. assert rsp.json['uid'] == f'/{uid}'
  44. assert rsp.json['pass'] == False
  45. def test_fixity_check_missing(self):
  46. """
  47. Verify that fixity check is not performed on a missing resource.
  48. """
  49. uid = uuid4()
  50. content = uuid4().bytes
  51. path = f'/ldp/{uid}'
  52. fix_path = f'/admin/{uid}/fixity'
  53. assert self.client.get(fix_path).status_code == 404
  54. self.client.put(
  55. path, data=content, headers={'content-type': 'text/plain'})
  56. self.client.delete(path)
  57. assert self.client.get(fix_path).status_code == 410