Explorar o código

Rename `transactional` to `atomic`; reroute modify methods.

Stefano Cossu %!s(int64=7) %!d(string=hai) anos
pai
achega
d53a464342

+ 4 - 1
lakesuperior/app.py

@@ -7,6 +7,7 @@ from logging.config import dictConfig
 from flask import Flask
 
 from lakesuperior.endpoints.ldp import ldp
+from lakesuperior.messaging.messenger import Messenger
 from lakesuperior.endpoints.query import query
 from lakesuperior.toolbox import Toolbox
 
@@ -53,6 +54,9 @@ def create_app(app_conf, logging_conf):
     app.rdfly = load_layout('ldp_rs')
     app.nonrdfly = load_layout('ldp_nr')
 
+    # Set up messaging.
+    app.messenger = Messenger(app_conf['messaging'])
+
     return app
 
 
@@ -65,4 +69,3 @@ def camelcase(word):
     return ''.join(x.capitalize() or '_' for x in word.split('_'))
 
 
-

+ 15 - 0
lakesuperior/exceptions.py

@@ -72,6 +72,21 @@ class ServerManagedTermError(RuntimeError):
 
 
 
+class InvalidTripleError(RuntimeError):
+    '''
+    Raised when a triple in a delta is not valid.
+
+    This does not necessarily that it is not valid RDF, but rather that it may
+    not be valid for the context it is meant to be utilized.
+    '''
+    def __init__(self, t):
+        self.t = t
+
+    def __str__(self):
+        return '{} is not a valid triple.'.format(self.t)
+
+
+
 class RefIntViolationError(RuntimeError):
     '''
     Raised when a provided data set has a link to a non-existing repository

+ 15 - 8
lakesuperior/messaging/formatters.py

@@ -4,6 +4,8 @@ import uuid
 
 from abc import ABCMeta, abstractmethod
 
+from lakesuperior.model.ldpr import Ldpr
+
 
 class BaseASFormatter(metaclass=ABCMeta):
     '''
@@ -12,10 +14,16 @@ class BaseASFormatter(metaclass=ABCMeta):
     This is not really a `logging.Formatter` subclass, but a plain string
     builder.
     '''
+    ev_types = {
+        Ldpr.RES_CREATED : 'Create',
+        Ldpr.RES_DELETED : 'Delete',
+        Ldpr.RES_UPDATED : 'Update',
+    }
+
     ev_names = {
-        'Update' : 'Resource Modification',
-        'Create' : 'Resource Creation',
-        'Delete' : 'Resource Deletion',
+        Ldpr.RES_CREATED : 'Resource Modification',
+        Ldpr.RES_DELETED : 'Resource Creation',
+        Ldpr.RES_UPDATED : 'Resource Deletion',
     }
 
     def __init__(self, uri, ev_type, time, type, data=None,
@@ -33,7 +41,7 @@ class BaseASFormatter(metaclass=ABCMeta):
         - `ev_type`: one of `create`, `delete` or `update`
         - `time`: Timestamp of the ev_type.
         - `data`: if messaging is configured with `provenance` level, this is
-        a `rdflib.Graph` containing the triples that have been removed or
+          a `rdflib.Graph` containing the triples that have been removed or
         added.
         - `metadata`: provenance metadata as a rdflib.Graph object. This
         contains properties such as actor(s), action (add/remove), etc. This is
@@ -43,8 +51,7 @@ class BaseASFormatter(metaclass=ABCMeta):
         self.ev_type = ev_type
         self.time = time
         self.type = type
-        self.data = data.serialize(format=data_fmt).decode('utf8') \
-                if data else None
+        self.data = data or None
         self.metadata = metadata
 
 
@@ -67,7 +74,7 @@ class ASResourceFormatter(BaseASFormatter):
         ret = {
             '@context': 'https://www.w3.org/ns/activitystreams',
             'id' : 'urn:uuid:{}'.format(uuid.uuid4()),
-            'type' : self.ev_type,
+            'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
                 'id' : self.uri,
@@ -94,7 +101,7 @@ class ASDeltaFormatter(BaseASFormatter):
         ret = {
             '@context': 'https://www.w3.org/ns/activitystreams',
             'id' : 'urn:uuid:{}'.format(uuid.uuid4()),
-            'type' : self.ev_type,
+            'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
                 'id' : self.uri,

+ 0 - 1
lakesuperior/messaging/handlers.py

@@ -25,7 +25,6 @@ class StompHandler(logging.Handler):
         else:
             protocol_v = StompSpec.VERSION_1_0
 
-        self.conf
         client_config = StompConfig(
             'tcp://{}:{}'.format(self.conf['host'], self.conf['port']),
             login=self.conf['username'],

+ 7 - 10
lakesuperior/messaging/messenger.py

@@ -1,27 +1,24 @@
 import logging
 
-from flask import current_app
-
 from lakesuperior.messaging import formatters, handlers
 
-messenger = logging.getLogger('_messaging')
+messenger = logging.getLogger('_messenger')
 
 
 class Messenger:
     '''
-    Very simple message sender.
+    Very simple message sender using the standard Python logging facility.
     '''
     _msg_routes = []
 
     def __init__(self, config):
         for route in config['routes']:
-            if route['active']:
-                handler_cls = getattr(handlers, route['handler'])
-                messenger.addHandler(handler_cls(route))
-                messenger.setLevel(logging.INFO)
-                formatter = getattr(formatters, route['formatter'])
+            handler_cls = getattr(handlers, route['handler'])
+            messenger.addHandler(handler_cls(route))
+            messenger.setLevel(logging.INFO)
+            formatter = getattr(formatters, route['formatter'])
 
-                self._msg_routes.append((messenger, formatter))
+            self._msg_routes.append((messenger, formatter))
 
 
     def send(self, *args, **kwargs):

+ 0 - 1
lakesuperior/model/fcrepo/README.md

@@ -1 +0,0 @@
-Implementation of Fedora specs here. Extend LDP classes.

+ 0 - 1
lakesuperior/model/lakesuperior/README.md

@@ -1 +0,0 @@
-All classes that provide extensions of the standard FCREPO specs go here.

+ 2 - 2
lakesuperior/model/ldp_nr.py

@@ -4,7 +4,7 @@ from rdflib.resource import Resource
 from rdflib.term import URIRef, Literal, Variable
 
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
-from lakesuperior.model.ldpr import Ldpr, transactional
+from lakesuperior.model.ldpr import Ldpr, atomic
 from lakesuperior.model.ldp_rs import LdpRs
 
 class LdpNr(Ldpr):
@@ -38,7 +38,7 @@ class LdpNr(Ldpr):
         return LdpRs(self.uuid).get(**kwargs)
 
 
-    @transactional
+    @atomic
     def post(self, stream, mimetype=None, disposition=None):
         '''
         Create a new binary resource with a corresponding RDF representation.

+ 4 - 4
lakesuperior/model/ldp_rs.py

@@ -12,7 +12,7 @@ from rdflib.term import URIRef, Literal, Variable
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.dictionaries.srv_mgd_terms import  srv_mgd_subjects, \
         srv_mgd_predicates, srv_mgd_types
-from lakesuperior.model.ldpr import Ldpr, transactional
+from lakesuperior.model.ldpr import Ldpr, atomic
 from lakesuperior.exceptions import ResourceNotExistsError, \
         ServerManagedTermError, SingleSubjectError
 from lakesuperior.toolbox import Toolbox
@@ -41,7 +41,7 @@ class LdpRs(Ldpr):
         return Toolbox().globalize_rsrc(self.imr)
 
 
-    @transactional
+    @atomic
     def post(self, data, format='text/turtle', handling=None):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_POST
@@ -52,7 +52,7 @@ class LdpRs(Ldpr):
                 create_only=True)
 
 
-    @transactional
+    @atomic
     def put(self, data, format='text/turtle', handling=None):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_PUT
@@ -60,7 +60,7 @@ class LdpRs(Ldpr):
         return self._create_or_replace_rsrc(data, format, handling)
 
 
-    @transactional
+    @atomic
     def patch(self, update_str):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_PATCH

+ 74 - 31
lakesuperior/model/ldpr.py

@@ -2,12 +2,12 @@ import logging
 
 from abc import ABCMeta
 from collections import defaultdict
-from itertools import accumulate
+from itertools import accumulate, groupby
 from uuid import uuid4
 
 import arrow
 
-from flask import current_app
+from flask import current_app, request
 from rdflib import Graph
 from rdflib.resource import Resource
 from rdflib.namespace import RDF, XSD
@@ -22,21 +22,29 @@ from lakesuperior.store_layouts.ldp_rs.base_rdf_layout import BaseRdfLayout
 from lakesuperior.toolbox import Toolbox
 
 
-def transactional(fn):
+def atomic(fn):
     '''
-    Decorator for methods of the Ldpr class to handle transactions in an RDF
-    store.
+    Handle atomic operations in an RDF store.
+
+    This wrapper ensures that a write operation is performed atomically. It
+    also takes care of sending a message for each resource changed in the
+    transaction.
     '''
     def wrapper(self, *args, **kwargs):
+        request.changelog = []
         try:
             ret = fn(self, *args, **kwargs)
-            self._logger.info('Committing transaction.')
-            self.rdfly.store.commit()
-            return ret
         except:
             self._logger.warn('Rolling back transaction.')
             self.rdfly.store.rollback()
             raise
+        else:
+            self._logger.info('Committing transaction.')
+            self.rdfly.store.commit()
+            for ev in request.changelog:
+                self._logger.info('Message: {}'.format(ev))
+                self._send_event_msg(*ev)
+            return ret
 
     return wrapper
 
@@ -80,9 +88,9 @@ class Ldpr(metaclass=ABCMeta):
     RETURN_SRV_MGD_RES_URI = nsc['fcrepo'].ServerManaged
     ROOT_NODE_URN = nsc['fcsystem'].root
 
-    RES_CREATED = 'Create'
-    RES_DELETED = 'Delete'
-    RES_UPDATED = 'Update'
+    RES_CREATED = '_create_'
+    RES_DELETED = '_delete_'
+    RES_UPDATED = '_update_'
 
     protected_pred = (
         nsc['fcrepo'].created,
@@ -230,13 +238,16 @@ class Ldpr(metaclass=ABCMeta):
 
         Persistence is done in this class. None of the operations in the store
         layout should commit an open transaction. Methods are wrapped in a
-        transaction by using the `@transactional` decorator.
+        transaction by using the `@atomic` decorator.
 
         @param uuid (string) UUID of the resource. If None (must be explicitly
-        set) it refers to the root node.
+        set) it refers to the root node. It can also be the full URI or URN,
+        in which case it will be converted.
         '''
-        self.uuid = uuid
-        self.urn = nsc['fcres'][uuid] if self.uuid else self.ROOT_NODE_URN
+        self.uuid = Toolbox().uri_to_uuid(uuid) \
+                if isinstance(uuid, URIRef) else uuid
+        self.urn = nsc['fcres'][uuid] \
+                if self.uuid else self.ROOT_NODE_URN
         self.uri = Toolbox().uuid_to_uri(self.uuid)
 
         self.repr_opts = repr_opts
@@ -405,7 +416,7 @@ class Ldpr(metaclass=ABCMeta):
         raise NotImplementedError()
 
 
-    @transactional
+    @atomic
     def delete(self, inbound=True, delete_children=True, leave_tstone=True):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_DELETE
@@ -433,7 +444,7 @@ class Ldpr(metaclass=ABCMeta):
         return ret
 
 
-    @transactional
+    @atomic
     def delete_tombstone(self):
         '''
         Delete a tombstone.
@@ -455,7 +466,7 @@ class Ldpr(metaclass=ABCMeta):
         Create a new resource by comparing an empty graph with the provided
         IMR graph.
         '''
-        self.rdfly.modify_dataset(add_trp=self.provided_imr.graph)
+        self._modify_rsrc(self.RES_CREATED, add_trp=self.provided_imr.graph)
 
         return self.RES_CREATED
 
@@ -472,7 +483,7 @@ class Ldpr(metaclass=ABCMeta):
             self.imr.remove(p)
 
         delta = self._dedup_deltas(self.imr.graph, self.provided_imr.graph)
-        self.rdfly.modify_dataset(*delta)
+        self._modify_rsrc(self.RES_UPDATED, *delta)
 
         # Reset the IMR because it has changed.
         delattr(self, 'imr')
@@ -505,15 +516,32 @@ class Ldpr(metaclass=ABCMeta):
         else:
             self._logger.info('NOT leaving tombstone.')
 
+        self._modify_rsrc(self.RES_DELETED, remove_trp, add_trp)
+
         if inbound:
+            remove_trp = set()
             for ib_rsrc_uri in self.imr.graph.subjects(None, self.urn):
-                remove_trp.add((ib_rsrc_uri, None, self.urn))
-
-        self.rdfly.modify_dataset(remove_trp, add_trp)
+                remove_trp = {(ib_rsrc_uri, None, self.urn)}
+                Ldpr(ib_rsrc_uri)._modify_rsrc(self.RES_UPDATED, remove_trp)
 
         return self.RES_DELETED
 
 
+    def _modify_rsrc(self, ev_type, remove_trp={}, add_trp={}):
+        '''
+        Low-level method to modify a graph for a single resource.
+
+        @param remove_trp (Iterable) Triples to be removed. This can be a graph
+        @param add_trp (Iterable) Triples to be added. This can be a graph.
+        '''
+        return self.rdfly.modify_dataset(remove_trp, add_trp, metadata={
+            'ev_type' : ev_type,
+            'time' : arrow.utcnow(),
+            'type' : list(self.imr.graph.objects(self.urn, RDF.type)),
+            'actor' : self.imr.value(nsc['fcrepo'].lastModifiedBy),
+        })
+
+
     def _set_containment_rel(self):
         '''Find the closest parent in the path indicated by the UUID and
         establish a containment triple.
@@ -612,12 +640,13 @@ class Ldpr(metaclass=ABCMeta):
 
     def _add_ldp_dc_ic_rel(self, cont_uri):
         '''
-        Add relationship triples from a direct or indirect container parent.
+        Add relationship triples from a parent direct or indirect container.
 
         @param cont_uri (rdflib.term.URIRef)  The container URI.
         '''
-        cont_imr = self.rdfly.extract_imr(cont_uri, incl_children=False)
-        cont_p = set(cont_imr.graph.predicates())
+        repr_opts = {'parameters' : {'omit' : Ldpr.RETURN_CHILD_RES_URI }}
+        cont_rsrc = Ldpr.inst(cont_uri, repr_opts=repr_opts)
+        cont_p = set(cont_rsrc.imr.graph.predicates())
         add_g = Graph()
 
         self._logger.info('Checking direct or indirect containment.')
@@ -625,19 +654,19 @@ class Ldpr(metaclass=ABCMeta):
 
         if self.MBR_RSRC_URI in cont_p and self.MBR_REL_URI in cont_p:
             s = Toolbox().localize_term(
-                    cont_imr.value(self.MBR_RSRC_URI).identifier)
-            p = cont_imr.value(self.MBR_REL_URI).identifier
+                    cont_rsrc.imr.value(self.MBR_RSRC_URI).identifier)
+            p = cont_rsrc.imr.value(self.MBR_REL_URI).identifier
 
-            if cont_imr[RDF.type : nsc['ldp'].DirectContainer]:
+            if cont_rsrc.imr[RDF.type : nsc['ldp'].DirectContainer]:
                 self._logger.info('Parent is a direct container.')
 
                 self._logger.debug('Creating DC triples.')
                 add_g.add((s, p, self.urn))
 
-            elif cont_imr[RDF.type : nsc['ldp'].IndirectContainer] \
+            elif cont_rsrc.imr[RDF.type : nsc['ldp'].IndirectContainer] \
                    and self.INS_CNT_REL_URI in cont_p:
                 self._logger.info('Parent is an indirect container.')
-                cont_rel_uri = cont_imr.value(self.INS_CNT_REL_URI).identifier
+                cont_rel_uri = cont_rsrc.imr.value(self.INS_CNT_REL_URI).identifier
                 target_uri = self.provided_imr.value(cont_rel_uri).identifier
                 self._logger.debug('Target URI: {}'.format(target_uri))
                 if target_uri:
@@ -648,6 +677,20 @@ class Ldpr(metaclass=ABCMeta):
             add_g = self._check_mgd_terms(add_g)
             self._logger.debug('Adding DC/IC triples: {}'.format(
                 add_g.serialize(format='turtle').decode('utf-8')))
-            self.rdfly.modify_dataset(Graph(), add_g)
+            rsrc._modify_rsrc(self.RES_UPDATED, attr_trp=add_g)
+
+
+    def _send_event_msg(self, remove_trp, add_trp, metadata):
+        '''
+        Break down delta triples, find subjects and send event message.
+        '''
+        remove_grp = groupby(remove_trp, lambda x : x[0])
+        remove_dict = { k[0] : k[1] for k in remove_grp }
 
+        add_grp = groupby(add_trp, lambda x : x[0])
+        add_dict = { k[0] : k[1] for k in add_grp }
 
+        subjects = set(remove_dict.keys()) | set(add_dict.keys())
+        for rsrc_uri in subjects:
+            self._logger.info('subject: {}'.format(rsrc_uri))
+            #current_app.messenger.send

+ 34 - 6
lakesuperior/store_layouts/ldp_rs/base_rdf_layout.py

@@ -2,7 +2,6 @@ import logging
 
 from abc import ABCMeta, abstractmethod
 
-from flask import current_app
 from rdflib.namespace import RDF
 from rdflib.query import ResultException
 from rdflib.resource import Resource
@@ -11,7 +10,6 @@ from rdflib.term import URIRef
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.dictionaries.namespaces import ns_mgr as nsm
 from lakesuperior.exceptions import ResourceNotExistsError
-from lakesuperior.messaging.messenger import Messenger
 from lakesuperior.store_layouts.ldp_rs.graph_store_connector import \
         GraphStoreConnector
 from lakesuperior.toolbox import Toolbox
@@ -121,7 +119,7 @@ class BaseRdfLayout(metaclass=ABCMeta):
 
 
     @abstractmethod
-    def modify_dataset(self, remove_trp, add_trp):
+    def modify_dataset(self, remove_trp=[], add_trp=[], metadata={}):
         '''
         Adds and/or removes triples from the graph.
 
@@ -129,13 +127,43 @@ class BaseRdfLayout(metaclass=ABCMeta):
         store that needs to be notified must be performed by invoking this
         method.
 
-        NOTE: This is not specific to a resource. The LDP layer is responsible
-        for checking that all the +/- triples are referring to the intended
-        subject(s).
+        NOTE: This method can apply to multiple resources. However, if
+        distinct resources are undergoing different operations (e.g. resource A
+        is being deleted and resource B is being updated) this method must be
+        called once for each operation.
 
         @param remove_trp (Iterable) Triples to be removed. This can be a graph
         @param add_trp (Iterable) Triples to be added. This can be a graph.
+        @param metadata (dict) Metadata related to the operation. At a minimum,
+        it should contain the name of the operation (create, update, delete).
+        If no metadata are passed, no messages are enqueued.
         '''
         pass
 
 
+    def _enqueue_event(self, remove_trp, add_trp):
+        '''
+        Group delta triples by subject and send out to event queue.
+
+        The event queue is stored in the request context and is processed
+        after `store.commit()` is called by the `atomic` decorator.
+        '''
+        remove_grp = groupby(remove_trp, lambda x : x[0])
+        remove_dict = { k[0] : k[1] for k in remove_grp }
+
+        add_grp = groupby(add_trp, lambda x : x[0])
+        add_dict = { k[0] : k[1] for k in add_grp }
+
+        subjects = set(remove_dict.keys()) | set(add_dict.keys())
+        for rsrc_uri in subjects:
+            request.changelog.append(
+                uri=rsrc_uri,
+                ev_type=None,
+                time=arrow.utcnow(),
+                type=list(imr.graph.subjects(imr.identifier, RDF.type)),
+                data=imr.graph,
+                metadata={
+                    'actor' : imr.value(nsc['fcrepo'].lastModifiedBy),
+                }
+            )
+

+ 11 - 7
lakesuperior/store_layouts/ldp_rs/simple_layout.py

@@ -2,6 +2,7 @@ from copy import deepcopy
 
 import arrow
 
+from flask import current_app, request
 from rdflib import Graph
 from rdflib.namespace import RDF, XSD
 from rdflib.query import ResultException
@@ -10,10 +11,10 @@ from rdflib.term import Literal, URIRef, Variable
 
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.dictionaries.namespaces import ns_mgr as nsm
-from lakesuperior.dictionaries.srv_mgd_terms import  srv_mgd_subjects, \
-        srv_mgd_predicates, srv_mgd_types
-from lakesuperior.exceptions import InvalidResourceError, \
-        ResourceNotExistsError, TombstoneError
+from lakesuperior.dictionaries.srv_mgd_terms import (srv_mgd_subjects,
+        srv_mgd_predicates, srv_mgd_types)
+from lakesuperior.exceptions import (InvalidResourceError, InvalidTripleError,
+        ResourceNotExistsError, TombstoneError)
 from lakesuperior.store_layouts.ldp_rs.base_rdf_layout import BaseRdfLayout
 from lakesuperior.toolbox import Toolbox
 
@@ -105,14 +106,17 @@ class SimpleLayout(BaseRdfLayout):
             's' : urn})
 
 
-    def modify_dataset(self, remove_trp=[], add_trp=[]):
+    def modify_dataset(self, remove_trp=[], add_trp=[], metadata=None):
         '''
         See base_rdf_layout.update_rsrc.
         '''
-        self._logger.debug('Remove graph: {}'.format(set(remove_trp)))
-        self._logger.debug('Add graph: {}'.format(set(add_trp)))
+        self._logger.debug('Remove triples: {}'.format(set(remove_trp)))
+        self._logger.debug('Add triples: {}'.format(set(add_trp)))
 
         for t in remove_trp:
             self.ds.remove(t)
         for t in add_trp:
             self.ds.add(t)
+
+        if current_app.config.setdefault('messaging') and metadata:
+            request.changelog.append((remove_trp, add_trp, metadata))