test_admin_api.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import pdb
  2. import pytest
  3. from rdflib import Graph, URIRef
  4. from lakesuperior import env
  5. from lakesuperior.api import resource as rsrc_api
  6. from lakesuperior.api import admin as admin_api
  7. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  8. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  9. @pytest.mark.usefixtures('db')
  10. class TestAdminApi:
  11. """
  12. Test admin operations.
  13. """
  14. def test_check_refint_ok(self):
  15. """
  16. Check that referential integrity is OK.
  17. """
  18. uid1 = '/test_refint1'
  19. uid2 = '/test_refint2'
  20. gr = Graph().parse(
  21. data='<> <http://ex.org/ns#p1> <info:fcres{}> .'.format(uid1),
  22. format='turtle', publicID=nsc['fcres'][uid2])
  23. rsrc_api.create_or_replace(uid1, graph=gr)
  24. assert admin_api.integrity_check() == set()
  25. def test_check_refint_corrupt(self):
  26. """
  27. Corrupt the data store and verify that the missing triple is detected.
  28. """
  29. brk_uid = '/test_refint1'
  30. brk_uri = nsc['fcres'][brk_uid]
  31. store = env.app_globals.rdf_store
  32. with TxnManager(store, True):
  33. store.remove((URIRef('info:fcres/test_refint1'), None, None))
  34. #import pdb; pdb.set_trace()
  35. check_res = admin_api.integrity_check()
  36. assert check_res != set()
  37. assert len(check_res) == 4
  38. check_trp = {trp[0] for trp in check_res}
  39. assert {trp[2] for trp in check_trp} == {brk_uri}
  40. assert (nsc['fcres']['/'], nsc['ldp'].contains, brk_uri) in check_trp
  41. assert (
  42. nsc['fcres']['/test_refint2'],
  43. URIRef('http://ex.org/ns#p1'), brk_uri) in check_trp