resource.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import logging
  2. from functools import wraps
  3. from multiprocessing import Process
  4. from threading import Lock, Thread
  5. from lakesuperior.config_parser import config
  6. from lakesuperior.exceptions import InvalidResourceError
  7. from lakesuperior.env import env
  8. from lakesuperior.model.ldp_factory import LdpFactory
  9. from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
  10. logger = logging.getLogger(__name__)
  11. app_globals = env.app_globals
  12. def transaction(write=False):
  13. '''
  14. Handle atomic operations in a store.
  15. This wrapper ensures that a write operation is performed atomically. It
  16. also takes care of sending a message for each resource changed in the
  17. transaction.
  18. '''
  19. def _transaction_deco(fn):
  20. @wraps(fn)
  21. def _wrapper(*args, **kwargs):
  22. with TxnManager(app_globals.rdf_store, write=write) as txn:
  23. ret = fn(*args, **kwargs)
  24. if len(app_globals.changelog):
  25. job = Thread(target=process_queue)
  26. job.start()
  27. return ret
  28. return _wrapper
  29. return _transaction_deco
  30. def process_queue():
  31. '''
  32. Process the message queue on a separate thread.
  33. '''
  34. lock = Lock()
  35. lock.acquire()
  36. while len(app_globals.changelog):
  37. send_event_msg(app_globals.changelog.popleft())
  38. lock.release()
  39. def send_event_msg(remove_trp, add_trp, metadata):
  40. '''
  41. Break down delta triples, find subjects and send event message.
  42. '''
  43. remove_grp = groupby(remove_trp, lambda x : x[0])
  44. remove_dict = { k[0] : k[1] for k in remove_grp }
  45. add_grp = groupby(add_trp, lambda x : x[0])
  46. add_dict = { k[0] : k[1] for k in add_grp }
  47. subjects = set(remove_dict.keys()) | set(add_dict.keys())
  48. for rsrc_uri in subjects:
  49. self._logger.info('subject: {}'.format(rsrc_uri))
  50. app_globals.messenger.send
  51. ### API METHODS ###
  52. @transaction()
  53. def get(uid, repr_options={}):
  54. '''
  55. Get an LDPR resource.
  56. The resource comes preloaded with user data and metadata as indicated by
  57. the `repr_options` argument. Any further handling of this resource is done
  58. outside of a transaction.
  59. @param uid (string) Resource UID.
  60. @param repr_options (dict(bool)) Representation options. This is a dict
  61. that is unpacked downstream in the process. The default empty dict results
  62. in default values. The accepted dict keys are:
  63. - incl_inbound: include inbound references. Default: False.
  64. - incl_children: include children URIs. Default: True.
  65. - embed_children: Embed full graph of all child resources. Default: False
  66. '''
  67. rsrc = LdpFactory.from_stored(uid, repr_options)
  68. # Load graph before leaving the transaction.
  69. rsrc.imr
  70. return rsrc
  71. @transaction()
  72. def get_version_info(uid):
  73. '''
  74. Get version metadata (fcr:versions).
  75. '''
  76. return LdpFactory.from_stored(uid).version_info
  77. @transaction()
  78. def get_version(uid, ver_uid):
  79. '''
  80. Get version metadata (fcr:versions).
  81. '''
  82. return LdpFactory.from_stored(uid).get_version(ver_uid)
  83. @transaction(True)
  84. def create(parent, slug, **kwargs):
  85. '''
  86. Mint a new UID and create a resource.
  87. The UID is computed from a given parent UID and a "slug", a proposed path
  88. relative to the parent. The application will attempt to use the suggested
  89. path but it may use a different one if a conflict with an existing resource
  90. arises.
  91. @param parent (string) UID of the parent resource.
  92. @param slug (string) Tentative path relative to the parent UID.
  93. @param **kwargs Other parameters are passed to the
  94. LdpFactory.from_provided method. Please see the documentation for that
  95. method for explanation of individual parameters.
  96. @return string UID of the new resource.
  97. '''
  98. uid = LdpFactory.mint_uid(parent, slug)
  99. logger.debug('Minted UID for new resource: {}'.format(uid))
  100. rsrc = LdpFactory.from_provided(uid, **kwargs)
  101. rsrc.create_or_replace_rsrc(create_only=True)
  102. return uid
  103. @transaction(True)
  104. def create_or_replace(uid, stream=None, **kwargs):
  105. '''
  106. Create or replace a resource with a specified UID.
  107. If the resource already exists, all user-provided properties of the
  108. existing resource are deleted. If the resource exists and the provided
  109. content is empty, an exception is raised (not sure why, but that's how
  110. FCREPO4 handles it).
  111. @param uid (string) UID of the resource to be created or updated.
  112. @param stream (BytesIO) Content stream. If empty, an empty container is
  113. created.
  114. @param **kwargs Other parameters are passed to the
  115. LdpFactory.from_provided method. Please see the documentation for that
  116. method for explanation of individual parameters.
  117. @return string Event type: whether the resource was created or updated.
  118. '''
  119. rsrc = LdpFactory.from_provided(uid, stream=stream, **kwargs)
  120. if not stream and rsrc.is_stored:
  121. raise InvalidResourceError(rsrc.uid,
  122. 'Resource {} already exists and no data set was provided.')
  123. return rsrc.create_or_replace_rsrc()
  124. @transaction(True)
  125. def update(uid, update_str):
  126. '''
  127. Update a resource with a SPARQL-Update string.
  128. @param uid (string) Resource UID.
  129. @param update_str (string) SPARQL-Update statements.
  130. '''
  131. rsrc = LdpFactory.from_stored(uid)
  132. rsrc.patch(update_str)
  133. return rsrc
  134. @transaction(True)
  135. def create_version(uid, ver_uid):
  136. '''
  137. Create a resource version.
  138. @param uid (string) Resource UID.
  139. @param ver_uid (string) Version UID to be appended to the resource URI.
  140. NOTE: this is a "slug", i.e. the version URI is not guaranteed to be the
  141. one indicated.
  142. @return string Version UID.
  143. '''
  144. return LdpFactory.from_stored(uid).create_version(ver_uid)
  145. @transaction(True)
  146. def delete(uid, leave_tstone=True):
  147. '''
  148. Delete a resource.
  149. @param uid (string) Resource UID.
  150. @param leave_tstone (bool) Whether to perform a soft-delete and leave a
  151. tombstone resource, or wipe any memory of the resource.
  152. '''
  153. # If referential integrity is enforced, grab all inbound relationships
  154. # to break them.
  155. refint = rdfly.config['referential_integrity']
  156. inbound = True if refint else inbound
  157. repr_opts = {'incl_inbound' : True} if refint else {}
  158. rsrc = LdpFactory.from_stored(uid, repr_opts)
  159. children = rdfly.get_descendants(uid)
  160. ret = (
  161. rsrc.bury_rsrc(inbound)
  162. if leave_tstone
  163. else rsrc.forget_rsrc(inbound))
  164. for child_uri in children:
  165. try:
  166. child_rsrc = LdpFactory.from_stored(
  167. rdfly.uri_to_uid(child_uri),
  168. repr_opts={'incl_children' : False})
  169. except (TombstoneError, ResourceNotExistsError):
  170. continue
  171. if leave_tstone:
  172. child_rsrc.bury_rsrc(inbound, tstone_pointer=rsrc.uri)
  173. else:
  174. child_rsrc.forget_rsrc(inbound)
  175. return ret
  176. @transaction(True)
  177. def resurrect(uid):
  178. '''
  179. Reinstate a buried (soft-deleted) resource.
  180. @param uid (string) Resource UID.
  181. '''
  182. return LdpFactory.from_stored(uid).resurrect_rsrc()
  183. @transaction(True)
  184. def forget(uid, inbound=True):
  185. '''
  186. Delete a resource completely, removing all its traces.
  187. @param uid (string) Resource UID.
  188. @param inbound (bool) Whether the inbound relationships should be deleted
  189. as well. If referential integrity is checked system-wide inbound references
  190. are always deleted and this option has no effect.
  191. '''
  192. return LdpFactory.from_stored(uid).forget_rsrc(inbound)