123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import logging
- from functools import wraps
- from multiprocessing import Process
- from threading import Lock, Thread
- from lakesuperior.config_parser import config
- from lakesuperior.exceptions import InvalidResourceError
- from lakesuperior.env import env
- from lakesuperior.model.ldp_factory import LdpFactory
- from lakesuperior.store.ldp_rs.lmdb_store import TxnManager
- logger = logging.getLogger(__name__)
- app_globals = env.app_globals
- def transaction(write=False):
- '''
- Handle atomic operations in a store.
- This wrapper ensures that a write operation is performed atomically. It
- also takes care of sending a message for each resource changed in the
- transaction.
- '''
- def _transaction_deco(fn):
- @wraps(fn)
- def _wrapper(*args, **kwargs):
- with TxnManager(app_globals.rdf_store, write=write) as txn:
- ret = fn(*args, **kwargs)
- if len(app_globals.changelog):
- job = Thread(target=process_queue)
- job.start()
- return ret
- return _wrapper
- return _transaction_deco
- def process_queue():
- '''
- Process the message queue on a separate thread.
- '''
- lock = Lock()
- lock.acquire()
- while len(app_globals.changelog):
- send_event_msg(app_globals.changelog.popleft())
- lock.release()
- def send_event_msg(remove_trp, add_trp, metadata):
- '''
- Break down delta triples, find subjects and send event message.
- '''
- remove_grp = groupby(remove_trp, lambda x : x[0])
- remove_dict = { k[0] : k[1] for k in remove_grp }
- add_grp = groupby(add_trp, lambda x : x[0])
- add_dict = { k[0] : k[1] for k in add_grp }
- subjects = set(remove_dict.keys()) | set(add_dict.keys())
- for rsrc_uri in subjects:
- self._logger.info('subject: {}'.format(rsrc_uri))
- app_globals.messenger.send
- ### API METHODS ###
- @transaction()
- def get(uid, repr_options={}):
- '''
- Get an LDPR resource.
- The resource comes preloaded with user data and metadata as indicated by
- the `repr_options` argument. Any further handling of this resource is done
- outside of a transaction.
- @param uid (string) Resource UID.
- @param repr_options (dict(bool)) Representation options. This is a dict
- that is unpacked downstream in the process. The default empty dict results
- in default values. The accepted dict keys are:
- - incl_inbound: include inbound references. Default: False.
- - incl_children: include children URIs. Default: True.
- - embed_children: Embed full graph of all child resources. Default: False
- '''
- rsrc = LdpFactory.from_stored(uid, repr_options)
- # Load graph before leaving the transaction.
- rsrc.imr
- return rsrc
- @transaction()
- def get_version_info(uid):
- '''
- Get version metadata (fcr:versions).
- '''
- return LdpFactory.from_stored(uid).version_info
- @transaction()
- def get_version(uid, ver_uid):
- '''
- Get version metadata (fcr:versions).
- '''
- return LdpFactory.from_stored(uid).get_version(ver_uid)
- @transaction(True)
- def create(parent, slug, **kwargs):
- '''
- Mint a new UID and create a resource.
- The UID is computed from a given parent UID and a "slug", a proposed path
- relative to the parent. The application will attempt to use the suggested
- path but it may use a different one if a conflict with an existing resource
- arises.
- @param parent (string) UID of the parent resource.
- @param slug (string) Tentative path relative to the parent UID.
- @param **kwargs Other parameters are passed to the
- LdpFactory.from_provided method. Please see the documentation for that
- method for explanation of individual parameters.
- @return string UID of the new resource.
- '''
- uid = LdpFactory.mint_uid(parent, slug)
- logger.debug('Minted UID for new resource: {}'.format(uid))
- rsrc = LdpFactory.from_provided(uid, **kwargs)
- rsrc.create_or_replace_rsrc(create_only=True)
- return uid
- @transaction(True)
- def create_or_replace(uid, stream=None, **kwargs):
- '''
- Create or replace a resource with a specified UID.
- If the resource already exists, all user-provided properties of the
- existing resource are deleted. If the resource exists and the provided
- content is empty, an exception is raised (not sure why, but that's how
- FCREPO4 handles it).
- @param uid (string) UID of the resource to be created or updated.
- @param stream (BytesIO) Content stream. If empty, an empty container is
- created.
- @param **kwargs Other parameters are passed to the
- LdpFactory.from_provided method. Please see the documentation for that
- method for explanation of individual parameters.
- @return string Event type: whether the resource was created or updated.
- '''
- rsrc = LdpFactory.from_provided(uid, stream=stream, **kwargs)
- if not stream and rsrc.is_stored:
- raise InvalidResourceError(rsrc.uid,
- 'Resource {} already exists and no data set was provided.')
- return rsrc.create_or_replace_rsrc()
- @transaction(True)
- def update(uid, update_str):
- '''
- Update a resource with a SPARQL-Update string.
- @param uid (string) Resource UID.
- @param update_str (string) SPARQL-Update statements.
- '''
- rsrc = LdpFactory.from_stored(uid)
- rsrc.patch(update_str)
- return rsrc
- @transaction(True)
- def create_version(uid, ver_uid):
- '''
- Create a resource version.
- @param uid (string) Resource UID.
- @param ver_uid (string) Version UID to be appended to the resource URI.
- NOTE: this is a "slug", i.e. the version URI is not guaranteed to be the
- one indicated.
- @return string Version UID.
- '''
- return LdpFactory.from_stored(uid).create_version(ver_uid)
- @transaction(True)
- def delete(uid, leave_tstone=True):
- '''
- Delete a resource.
- @param uid (string) Resource UID.
- @param leave_tstone (bool) Whether to perform a soft-delete and leave a
- tombstone resource, or wipe any memory of the resource.
- '''
- # If referential integrity is enforced, grab all inbound relationships
- # to break them.
- refint = rdfly.config['referential_integrity']
- inbound = True if refint else inbound
- repr_opts = {'incl_inbound' : True} if refint else {}
- rsrc = LdpFactory.from_stored(uid, repr_opts)
- children = rdfly.get_descendants(uid)
- ret = (
- rsrc.bury_rsrc(inbound)
- if leave_tstone
- else rsrc.forget_rsrc(inbound))
- for child_uri in children:
- try:
- child_rsrc = LdpFactory.from_stored(
- rdfly.uri_to_uid(child_uri),
- repr_opts={'incl_children' : False})
- except (TombstoneError, ResourceNotExistsError):
- continue
- if leave_tstone:
- child_rsrc.bury_rsrc(inbound, tstone_pointer=rsrc.uri)
- else:
- child_rsrc.forget_rsrc(inbound)
- return ret
- @transaction(True)
- def resurrect(uid):
- '''
- Reinstate a buried (soft-deleted) resource.
- @param uid (string) Resource UID.
- '''
- return LdpFactory.from_stored(uid).resurrect_rsrc()
- @transaction(True)
- def forget(uid, inbound=True):
- '''
- Delete a resource completely, removing all its traces.
- @param uid (string) Resource UID.
- @param inbound (bool) Whether the inbound relationships should be deleted
- as well. If referential integrity is checked system-wide inbound references
- are always deleted and this option has no effect.
- '''
- return LdpFactory.from_stored(uid).forget_rsrc(inbound)
|