resource.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import logging
  2. from functools import wraps
  3. from itertools import groupby
  4. from multiprocessing import Process
  5. from threading import Lock, Thread
  6. import arrow
  7. from rdflib import Literal
  8. from rdflib.namespace import XSD
  9. from lakesuperior.config_parser import config
  10. from lakesuperior.exceptions import (
  11. InvalidResourceError, ResourceNotExistsError, TombstoneError)
  12. from lakesuperior.env import env
  13. from lakesuperior.globals import RES_DELETED
  14. from lakesuperior.model.ldp_factory import LDP_NR_TYPE, LdpFactory
  15. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  16. logger = logging.getLogger(__name__)
  17. app_globals = env.app_globals
  18. __doc__ = '''
  19. Primary API for resource manipulation.
  20. Quickstart:
  21. >>> # First import default configuration and globals—only done once.
  22. >>> import lakesuperior.default_env
  23. >>> from lakesuperior.api import resource
  24. >>> # Get root resource.
  25. >>> rsrc = resource.get('/')
  26. >>> # Dump graph.
  27. >>> set(rsrc.imr)
  28. {(rdflib.term.URIRef('info:fcres/'),
  29. rdflib.term.URIRef('http://purl.org/dc/terms/title'),
  30. rdflib.term.Literal('Repository Root')),
  31. (rdflib.term.URIRef('info:fcres/'),
  32. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  33. rdflib.term.URIRef('http://fedora.info/definitions/v4/repository#Container')),
  34. (rdflib.term.URIRef('info:fcres/'),
  35. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  36. rdflib.term.URIRef('http://fedora.info/definitions/v4/repository#RepositoryRoot')),
  37. (rdflib.term.URIRef('info:fcres/'),
  38. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  39. rdflib.term.URIRef('http://fedora.info/definitions/v4/repository#Resource')),
  40. (rdflib.term.URIRef('info:fcres/'),
  41. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  42. rdflib.term.URIRef('http://www.w3.org/ns/ldp#BasicContainer')),
  43. (rdflib.term.URIRef('info:fcres/'),
  44. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  45. rdflib.term.URIRef('http://www.w3.org/ns/ldp#Container')),
  46. (rdflib.term.URIRef('info:fcres/'),
  47. rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
  48. rdflib.term.URIRef('http://www.w3.org/ns/ldp#RDFSource'))}
  49. '''
  50. def transaction(write=False):
  51. '''
  52. Handle atomic operations in a store.
  53. This wrapper ensures that a write operation is performed atomically. It
  54. also takes care of sending a message for each resource changed in the
  55. transaction.
  56. ALL write operations on the LDP-RS and LDP-NR stores go through this
  57. wrapper.
  58. '''
  59. def _transaction_deco(fn):
  60. @wraps(fn)
  61. def _wrapper(*args, **kwargs):
  62. # Mark transaction begin timestamp. This is used for create and
  63. # update timestamps on resources.
  64. env.timestamp = arrow.utcnow()
  65. env.timestamp_term = Literal(env.timestamp, datatype=XSD.dateTime)
  66. with TxnManager(app_globals.rdf_store, write=write) as txn:
  67. ret = fn(*args, **kwargs)
  68. if len(app_globals.changelog):
  69. job = Thread(target=process_queue)
  70. job.start()
  71. logger.debug('Deleting timestamp: {}'.format(getattr(env, 'timestamp')))
  72. delattr(env, 'timestamp')
  73. delattr(env, 'timestamp_term')
  74. return ret
  75. return _wrapper
  76. return _transaction_deco
  77. def process_queue():
  78. '''
  79. Process the message queue on a separate thread.
  80. '''
  81. lock = Lock()
  82. lock.acquire()
  83. while len(app_globals.changelog):
  84. send_event_msg(*app_globals.changelog.popleft())
  85. lock.release()
  86. def send_event_msg(remove_trp, add_trp, metadata):
  87. '''
  88. Send messages about a changed LDPR.
  89. A single LDPR message packet can contain multiple resource subjects, e.g.
  90. if the resource graph contains hash URIs or even other subjects. This
  91. method groups triples by subject and sends a message for each of the
  92. subjects found.
  93. '''
  94. # Group delta triples by subject.
  95. remove_grp = groupby(remove_trp, lambda x : x[0])
  96. remove_dict = {k[0]: k[1] for k in remove_grp}
  97. add_grp = groupby(add_trp, lambda x : x[0])
  98. add_dict = {k[0]: k[1] for k in add_grp}
  99. subjects = set(remove_dict.keys()) | set(add_dict.keys())
  100. for rsrc_uri in subjects:
  101. logger.debug('Processing event for subject: {}'.format(rsrc_uri))
  102. app_globals.messenger.send(rsrc_uri, **metadata)
  103. ### API METHODS ###
  104. @transaction()
  105. def get(uid, repr_options={}):
  106. '''
  107. Get an LDPR resource.
  108. The resource comes preloaded with user data and metadata as indicated by
  109. the `repr_options` argument. Any further handling of this resource is done
  110. outside of a transaction.
  111. @param uid (string) Resource UID.
  112. @param repr_options (dict(bool)) Representation options. This is a dict
  113. that is unpacked downstream in the process. The default empty dict results
  114. in default values. The accepted dict keys are:
  115. - incl_inbound: include inbound references. Default: False.
  116. - incl_children: include children URIs. Default: True.
  117. - embed_children: Embed full graph of all child resources. Default: False
  118. '''
  119. rsrc = LdpFactory.from_stored(uid, repr_options)
  120. # Load graph before leaving the transaction.
  121. rsrc.imr
  122. return rsrc
  123. @transaction()
  124. def get_version_info(uid):
  125. '''
  126. Get version metadata (fcr:versions).
  127. '''
  128. return LdpFactory.from_stored(uid).version_info
  129. @transaction()
  130. def get_version(uid, ver_uid):
  131. '''
  132. Get version metadata (fcr:versions).
  133. '''
  134. return LdpFactory.from_stored(uid).get_version(ver_uid)
  135. @transaction(True)
  136. def create(parent, slug, **kwargs):
  137. '''
  138. Mint a new UID and create a resource.
  139. The UID is computed from a given parent UID and a "slug", a proposed path
  140. relative to the parent. The application will attempt to use the suggested
  141. path but it may use a different one if a conflict with an existing resource
  142. arises.
  143. @param parent (string) UID of the parent resource.
  144. @param slug (string) Tentative path relative to the parent UID.
  145. @param **kwargs Other parameters are passed to the
  146. LdpFactory.from_provided method. Please see the documentation for that
  147. method for explanation of individual parameters.
  148. @return string UID of the new resource.
  149. '''
  150. uid = LdpFactory.mint_uid(parent, slug)
  151. logger.debug('Minted UID for new resource: {}'.format(uid))
  152. rsrc = LdpFactory.from_provided(uid, **kwargs)
  153. rsrc.create_or_replace(create_only=True)
  154. return uid
  155. @transaction(True)
  156. def create_or_replace(uid, stream=None, **kwargs):
  157. '''
  158. Create or replace a resource with a specified UID.
  159. If the resource already exists, all user-provided properties of the
  160. existing resource are deleted. If the resource exists and the provided
  161. content is empty, an exception is raised (not sure why, but that's how
  162. FCREPO4 handles it).
  163. @param uid (string) UID of the resource to be created or updated.
  164. @param stream (BytesIO) Content stream. If empty, an empty container is
  165. created.
  166. @param **kwargs Other parameters are passed to the
  167. LdpFactory.from_provided method. Please see the documentation for that
  168. method for explanation of individual parameters.
  169. @return string Event type: whether the resource was created or updated.
  170. '''
  171. rsrc = LdpFactory.from_provided(uid, stream=stream, **kwargs)
  172. if not stream and rsrc.is_stored:
  173. raise InvalidResourceError(rsrc.uid,
  174. 'Resource {} already exists and no data set was provided.')
  175. return rsrc.create_or_replace()
  176. @transaction(True)
  177. def update(uid, update_str, is_metadata=False):
  178. '''
  179. Update a resource with a SPARQL-Update string.
  180. @param uid (string) Resource UID.
  181. @param update_str (string) SPARQL-Update statements.
  182. @param is_metadata (bool) Whether the resource metadata is being updated.
  183. If False, and the resource being updated is a LDP-NR, an error is raised.
  184. '''
  185. rsrc = LdpFactory.from_stored(uid)
  186. if LDP_NR_TYPE in rsrc.ldp_types:
  187. if is_metadata:
  188. rsrc.patch_metadata(update_str)
  189. else:
  190. raise InvalidResourceError(uid)
  191. else:
  192. rsrc.patch(update_str)
  193. return rsrc
  194. @transaction(True)
  195. def create_version(uid, ver_uid):
  196. '''
  197. Create a resource version.
  198. @param uid (string) Resource UID.
  199. @param ver_uid (string) Version UID to be appended to the resource URI.
  200. NOTE: this is a "slug", i.e. the version URI is not guaranteed to be the
  201. one indicated.
  202. @return string Version UID.
  203. '''
  204. return LdpFactory.from_stored(uid).create_version(ver_uid)
  205. @transaction(True)
  206. def delete(uid, soft=True):
  207. '''
  208. Delete a resource.
  209. @param uid (string) Resource UID.
  210. @param soft (bool) Whether to perform a soft-delete and leave a
  211. tombstone resource, or wipe any memory of the resource.
  212. '''
  213. # If referential integrity is enforced, grab all inbound relationships
  214. # to break them.
  215. refint = app_globals.rdfly.config['referential_integrity']
  216. inbound = True if refint else inbound
  217. repr_opts = {'incl_inbound' : True} if refint else {}
  218. children = app_globals.rdfly.get_descendants(uid)
  219. if soft:
  220. rsrc = LdpFactory.from_stored(uid, repr_opts)
  221. ret = rsrc.bury_rsrc(inbound)
  222. for child_uri in children:
  223. try:
  224. child_rsrc = LdpFactory.from_stored(
  225. app_globals.rdfly.uri_to_uid(child_uri),
  226. repr_opts={'incl_children' : False})
  227. except (TombstoneError, ResourceNotExistsError):
  228. continue
  229. child_rsrc.bury_rsrc(inbound, tstone_pointer=rsrc.uri)
  230. else:
  231. ret = app_globals.rdfly.forget_rsrc(uid, inbound)
  232. for child_uri in children:
  233. child_uid = app_globals.rdfly.uri_to_uid(child_uri)
  234. ret = app_globals.rdfly.forget_rsrc(child_uid, inbound)
  235. return ret
  236. @transaction(True)
  237. def resurrect(uid):
  238. '''
  239. Reinstate a buried (soft-deleted) resource.
  240. @param uid (string) Resource UID.
  241. '''
  242. return LdpFactory.from_stored(uid).resurrect_rsrc()