test_resource_api.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import pdb
  2. import pytest
  3. from io import BytesIO
  4. from uuid import uuid4
  5. from rdflib import Graph, Literal, URIRef
  6. from lakesuperior.api import resource as rsrc_api
  7. from lakesuperior.dictionaries.namespaces import ns_collection as nsc
  8. from lakesuperior.exceptions import (
  9. InvalidResourceError, ResourceNotExistsError, TombstoneError)
  10. from lakesuperior.globals import RES_CREATED, RES_UPDATED
  11. from lakesuperior.model.ldpr import Ldpr
  12. @pytest.fixture(scope='module')
  13. def random_uuid():
  14. return str(uuid.uuid4())
  15. @pytest.mark.usefixtures('db')
  16. class TestResourceApi:
  17. '''
  18. Test interaction with the Resource API.
  19. '''
  20. def test_nodes_exist(self):
  21. """
  22. Verify whether nodes exist or not.
  23. """
  24. assert rsrc_api.exists('/') is True
  25. assert rsrc_api.exists('/{}'.format(uuid4())) is False
  26. def test_get_root_node_metadata(self):
  27. """
  28. Get the root node metadata.
  29. The ``dcterms:title`` property should NOT be included.
  30. """
  31. gr = rsrc_api.get_metadata('/')
  32. assert isinstance(gr, Graph)
  33. assert len(gr) == 9
  34. assert gr[gr.identifier : nsc['rdf'].type : nsc['ldp'].Resource ]
  35. assert not gr[gr.identifier : nsc['dcterms'].title : "Repository Root"]
  36. def test_get_root_node(self):
  37. """
  38. Get the root node.
  39. The ``dcterms:title`` property should be included.
  40. """
  41. rsrc = rsrc_api.get('/')
  42. assert isinstance(rsrc, Ldpr)
  43. gr = rsrc.imr
  44. assert len(gr) == 10
  45. assert gr[gr.identifier : nsc['rdf'].type : nsc['ldp'].Resource ]
  46. assert gr[
  47. gr.identifier : nsc['dcterms'].title : Literal('Repository Root')]
  48. def test_get_nonexisting_node(self):
  49. """
  50. Get a non-existing node.
  51. """
  52. with pytest.raises(ResourceNotExistsError):
  53. gr = rsrc_api.get('/{}'.format(uuid4()))
  54. def test_create_from_graph(self):
  55. """
  56. Create a resource from a provided graph.
  57. """
  58. uid = '/rsrc_from_graph'
  59. uri = nsc['fcres'][uid]
  60. gr = Graph().parse(
  61. data='<> a <http://ex.org/type#A> .', format='turtle',
  62. publicID=uri)
  63. #pdb.set_trace()
  64. evt = rsrc_api.create_or_replace(uid, init_gr=gr)
  65. rsrc = rsrc_api.get(uid)
  66. assert rsrc.imr[
  67. rsrc.uri : nsc['rdf'].type : URIRef('http://ex.org/type#A')]
  68. assert rsrc.imr[
  69. rsrc.uri : nsc['rdf'].type : nsc['ldp'].RDFSource]
  70. def test_create_from_rdf_stream(self):
  71. """
  72. Create a resource from a RDF stream (Turtle).
  73. This is the same method used by the LDP endpoint.
  74. """
  75. uid = '/rsrc_from_stream'
  76. uri = nsc['fcres'][uid]
  77. stream = BytesIO(b'<> a <http://ex.org/type#B> .')
  78. #pdb.set_trace()
  79. evt = rsrc_api.create_or_replace(
  80. uid, stream=stream, mimetype='text/turtle')
  81. rsrc = rsrc_api.get(uid)
  82. assert rsrc.imr[
  83. rsrc.uri : nsc['rdf'].type : URIRef('http://ex.org/type#B')]
  84. assert rsrc.imr[
  85. rsrc.uri : nsc['rdf'].type : nsc['ldp'].RDFSource]
  86. def test_replace_rsrc(self):
  87. uid = '/test_replace'
  88. uri = nsc['fcres'][uid]
  89. gr1 = Graph().parse(
  90. data='<> a <http://ex.org/type#A> .', format='turtle',
  91. publicID=uri)
  92. evt = rsrc_api.create_or_replace(uid, init_gr=gr1)
  93. assert evt == RES_CREATED
  94. rsrc = rsrc_api.get(uid)
  95. assert rsrc.imr[
  96. rsrc.uri : nsc['rdf'].type : URIRef('http://ex.org/type#A')]
  97. assert rsrc.imr[
  98. rsrc.uri : nsc['rdf'].type : nsc['ldp'].RDFSource]
  99. gr2 = Graph().parse(
  100. data='<> a <http://ex.org/type#B> .', format='turtle',
  101. publicID=uri)
  102. #pdb.set_trace()
  103. evt = rsrc_api.create_or_replace(uid, init_gr=gr2)
  104. assert evt == RES_UPDATED
  105. rsrc = rsrc_api.get(uid)
  106. assert not rsrc.imr[
  107. rsrc.uri : nsc['rdf'].type : URIRef('http://ex.org/type#A')]
  108. assert rsrc.imr[
  109. rsrc.uri : nsc['rdf'].type : URIRef('http://ex.org/type#B')]
  110. assert rsrc.imr[
  111. rsrc.uri : nsc['rdf'].type : nsc['ldp'].RDFSource]