Browse Source

Messaging (#27)

* Replace stompest library with stomp.py.

* Merge development branch.

* Stand up messaging.

* Only send messages once per resource request.

* Fix missing slash in UID when POSTing to root. (#26)

* Add minimal messaging doc.
Stefano Cossu 7 years ago
parent
commit
fd3e2d0cc5

+ 2 - 0
README.md

@@ -154,6 +154,8 @@ meant to live as a community project.
 
 [Content Model](doc/notes/model.md)
 
+[Messaging](doc/notes/messaging.md)
+
 [Command-Line Reference](doc/notes/cli.md)
 
 [Storage Implementation](doc/notes/storage.md)

+ 0 - 2
doc/notes/architecture.md

@@ -1,7 +1,5 @@
 # LAKEsuperior Architecture
 
-**DOCUMENTATION AND IMPLEMENTATION OF THIS SECTION ARE WORK-IN-PROGRESS!**
-
 LAKEsuperior is written in Python. It is not excluded that parts of the code
 may be rewritten in [Cython](http://cython.readthedocs.io/) for performance.
 

+ 27 - 0
doc/notes/messaging.md

@@ -0,0 +1,27 @@
+# LAKEsuperior Messaging
+
+LAKEsuperior implements a messaging system based on ActivityStreams, as
+indicated by the
+[Feodra API specs](https://fedora.info/2017/06/30/spec/#notifications).
+The metadata set provided is currently quite minimal but can be easily
+enriched by extending the
+[default formatter class](https://github.com/scossu/lakesuperior/blob/master/lakesuperior/messaging/messenger.py).
+
+STOMP is the only supported protocol at the moment. More protocols may be made
+available at a later time.
+
+LAKEsuperior can send messages to any number of destinations: see
+[configuration](https://github.com/scossu/lakesuperior/blob/master/etc.defaults/application.yml#L79).
+By default, CoilMQ is provided for testing purposes and listens to
+`localhost:61613`. The default route sends messages to `/topic/fcrepo`.
+
+A small command-line utility, also provided with the Python dependencies,
+allows to watch incoming messages. To monitor messages, enter the following
+*after activating your virtualenv*:
+
+```
+stomp -H localhost -P 61613 -L /topic/fcrepo
+```
+
+See the [stomp.py library reference page](https://github.com/jasonrbriggs/stomp.py/wiki/Command-Line-Access)
+for details.

+ 4 - 1
etc.defaults/application.yml

@@ -87,11 +87,14 @@ messaging:
           # for this route.
           active: True
 
+          # Protocol version. One of `10`, `11` or `12`.
+          protocol: '11'
           host: 127.0.0.1
           port: 61613
+
+          # Credentials are optional.
           username:
           password:
-          protocol: '12'
           destination: '/topic/fcrepo'
 
           # Message format: at the moment the following are supported:

+ 14 - 7
lakesuperior/api/resource.py

@@ -101,18 +101,24 @@ def process_queue():
 
 def send_event_msg(remove_trp, add_trp, metadata):
     '''
-    Break down delta triples, find subjects and send event message.
+    Send messages about a changed LDPR.
+
+    A single LDPR message packet can contain multiple resource subjects, e.g.
+    if the resource graph contains hash URIs or even other subjects. This
+    method groups triples by subject and sends a message for each of the
+    subjects found.
     '''
+    # Group delta triples by subject.
     remove_grp = groupby(remove_trp, lambda x : x[0])
-    remove_dict = { k[0] : k[1] for k in remove_grp }
+    remove_dict = {k[0]: k[1] for k in remove_grp}
 
     add_grp = groupby(add_trp, lambda x : x[0])
-    add_dict = { k[0] : k[1] for k in add_grp }
+    add_dict = {k[0]: k[1] for k in add_grp}
 
     subjects = set(remove_dict.keys()) | set(add_dict.keys())
     for rsrc_uri in subjects:
-        logger.info('subject: {}'.format(rsrc_uri))
-        app_globals.messenger.send
+        logger.debug('Processing event for subject: {}'.format(rsrc_uri))
+        app_globals.messenger.send(rsrc_uri, **metadata)
 
 
 ### API METHODS ###
@@ -179,7 +185,7 @@ def create(parent, slug, **kwargs):
     logger.debug('Minted UID for new resource: {}'.format(uid))
     rsrc = LdpFactory.from_provided(uid, **kwargs)
 
-    rsrc.create_or_replace_rsrc(create_only=True)
+    rsrc.create_or_replace(create_only=True)
 
     return uid
 
@@ -204,12 +210,13 @@ def create_or_replace(uid, stream=None, **kwargs):
     @return string Event type: whether the resource was created or updated.
     '''
     rsrc = LdpFactory.from_provided(uid, stream=stream, **kwargs)
+    create = not rsrc.is_stored
 
     if not stream and rsrc.is_stored:
         raise InvalidResourceError(rsrc.uid,
                 'Resource {} already exists and no data set was provided.')
 
-    return rsrc.create_or_replace_rsrc()
+    return rsrc.create_or_replace(create_only=create)
 
 
 @transaction(True)

+ 1 - 2
lakesuperior/globals.py

@@ -43,12 +43,11 @@ class AppGlobals:
         #logger.info('Non-RDF layout: {}'.format(nonrdfly_mod_name))
 
         # Set up messaging.
-        messenger = Messenger(app_conf['messaging'])
+        self._messenger  = Messenger(app_conf['messaging'])
 
         # Exposed globals.
         self._rdfly = rdfly_cls(app_conf['store']['ldp_rs'])
         self._nonrdfly = nonrdfly_cls(app_conf['store']['ldp_nr'])
-        self._messenger = messenger
         self._changelog = deque()
 
 

+ 25 - 28
lakesuperior/messaging/formatters.py

@@ -21,13 +21,13 @@ class BaseASFormatter(metaclass=ABCMeta):
     }
 
     ev_names = {
-        RES_CREATED : 'Resource Modification',
-        RES_DELETED : 'Resource Creation',
-        RES_UPDATED : 'Resource Deletion',
+        RES_CREATED : 'Resource Creation',
+        RES_DELETED : 'Resource Deletion',
+        RES_UPDATED : 'Resource Modification',
     }
 
-    def __init__(self, uri, ev_type, time, type, data=None,
-                data_fmt='text/turtle', metadata=None):
+    def __init__(
+            self, rsrc_uri, ev_type, timestamp, rsrc_type, actor, data=None):
         '''
         Format output according to granularity level.
 
@@ -36,23 +36,20 @@ class BaseASFormatter(metaclass=ABCMeta):
         are logged under the same level. This it is rather about *what* gets
         logged in a message.
 
-        @param record (dict) This holds a dict with the following keys:
-        - `uri`: URI of the resource.
-        - `ev_type`: one of `create`, `delete` or `update`
-        - `time`: Timestamp of the ev_type.
-        - `data`: if messaging is configured with `provenance` level, this is
-          a `rdflib.Graph` containing the triples that have been removed or
-        added.
-        - `metadata`: provenance metadata as a rdflib.Graph object. This
-        contains properties such as actor(s), action (add/remove), etc. This is
-        only meaningful for `ASDeltaFormatter`.
+        @param rsrc_uri (rdflib.URIRef) URI of the resource.
+        @param ev_type (string) one of `create`, `delete` or `update`
+        @param timestamp (string) Timestamp of the event.
+        @param data (tuple(set)) if messaging is configured with `provenance`
+        level, this is a 2-tuple with one set (as 3-tuples of
+        RDFlib.Identifier instances) for removed triples, and one set for
+        added triples.
         '''
-        self.uri = uri
+        self.rsrc_uri = rsrc_uri
         self.ev_type = ev_type
-        self.time = time
-        self.type = type
-        self.data = data or None
-        self.metadata = metadata
+        self.timestamp = timestamp
+        self.rsrc_type = rsrc_type
+        self.actor = actor
+        self.data = data
 
 
     @abstractmethod
@@ -77,11 +74,11 @@ class ASResourceFormatter(BaseASFormatter):
             'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
-                'id' : self.uri,
-                'updated' : self.time,
-                'type' : self.type,
+                'id' : self.rsrc_uri,
+                'updated' : self.timestamp,
+                'type' : self.rsrc_type,
             },
-            'actor' : self.metadata.get('actor', None),
+            'actor' : self.actor,
         }
 
         return json.dumps(ret)
@@ -104,11 +101,11 @@ class ASDeltaFormatter(BaseASFormatter):
             'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
-                'id' : self.uri,
-                'updated' : self.time,
-                'type' : self.type,
+                'id' : self.rsrc_uri,
+                'updated' : self.timestamp,
+                'type' : self.rsrc_type,
             },
-            'actor' : self.metadata.get('actor', None),
+            'actor' : self.actor,
             'data' : self.data,
         }
 

+ 15 - 15
lakesuperior/messaging/handlers.py

@@ -1,11 +1,6 @@
 import logging
 
-from abc import ABCMeta, abstractmethod
-
-from flask import current_app
-from stompest.config import StompConfig
-from stompest.protocol import StompSpec
-from stompest.sync import Stomp
+import stomp
 
 
 class StompHandler(logging.Handler):
@@ -19,24 +14,29 @@ class StompHandler(logging.Handler):
     def __init__(self, conf):
         self.conf = conf
         if self.conf['protocol'] == '11':
-            protocol_v = StompSpec.VERSION_1_1
+            conn_cls = stomp.Connection12
         elif self.conf['protocol'] == '12':
-            protocol_v = StompSpec.VERSION_1_2
+            conn_cls = stomp.Connection11
         else:
-            protocol_v = StompSpec.VERSION_1_0
+            conn_cls = stomp.Connection10
 
-        client_config = StompConfig(
-            'tcp://{}:{}'.format(self.conf['host'], self.conf['port']),
-            login=self.conf['username'],
+        self.conn = conn_cls([(self.conf['host'], self.conf['port'])])
+        self.conn.start()
+        self.conn.connect(
+            username=self.conf['username'],
             passcode=self.conf['password'],
-            version=protocol_v
+            wait=True
         )
-        self.conn = Stomp(client_config)
-        self.conn.connect()
 
         return super().__init__()
 
 
+    def __del_(self):
+        '''
+        Disconnect the client.
+        '''
+        self.conn.disconnect()
+
     def emit(self, record):
         '''
         Send the message to the destination endpoint.

+ 19 - 10
lakesuperior/messaging/messenger.py

@@ -2,6 +2,7 @@ import logging
 
 from lakesuperior.messaging import formatters, handlers
 
+logger = logging.getLogger(__name__)
 messenger = logging.getLogger('_messenger')
 
 
@@ -9,21 +10,29 @@ class Messenger:
     '''
     Very simple message sender using the standard Python logging facility.
     '''
-    _msg_routes = []
-
     def __init__(self, config):
-        for route in config['routes']:
-            handler_cls = getattr(handlers, route['handler'])
-            messenger.addHandler(handler_cls(route))
-            messenger.setLevel(logging.INFO)
-            formatter = getattr(formatters, route['formatter'])
+        '''
+        Set up the messenger.
+
+        @param config (dict) Messenger configuration.
+        '''
+        def msg_routes():
+            for route in config['routes']:
+                handler_cls = getattr(handlers, route['handler'])
+                messenger.addHandler(handler_cls(route))
+                messenger.setLevel(logging.INFO)
+                formatter = getattr(formatters, route['formatter'])
+
+                yield messenger, formatter
 
-            self._msg_routes.append((messenger, formatter))
+        self.config = config
+        self.msg_routes = tuple(r for r in msg_routes())
+        logger.info('Active messaging routes: {}'.format(self.msg_routes))
 
 
     def send(self, *args, **kwargs):
         '''
         Send one or more external messages.
         '''
-        for m, f in self._msg_routes:
-            m.info(f(*args, **kwargs))
+        for msg, fn in self.msg_routes:
+            msg.info(fn(*args, **kwargs))

+ 1 - 1
lakesuperior/model/ldp_factory.py

@@ -201,7 +201,7 @@ class LdpFactory:
             raise ValueError('Slug cannot start with a slash.')
         # Shortcut!
         if not path and parent_uid == '/':
-            return split_if_legacy(str(uuid4()))
+            return '/' + split_if_legacy(str(uuid4()))
 
         if not parent_uid.startswith('/'):
             raise ValueError('Invalid parent UID: {}'.format(parent_uid))

+ 2 - 2
lakesuperior/model/ldp_nr.py

@@ -66,7 +66,7 @@ class LdpNr(Ldpr):
         return nonrdfly.local_path(cksum)
 
 
-    def create_or_replace_rsrc(self, create_only=False):
+    def create_or_replace(self, create_only=False):
         '''
         Create a new binary resource with a corresponding RDF representation.
 
@@ -78,7 +78,7 @@ class LdpNr(Ldpr):
         # Try to persist metadata. If it fails, delete the file.
         logger.debug('Persisting LDP-NR triples in {}'.format(self.uri))
         try:
-            ev_type = super().create_or_replace_rsrc(create_only)
+            ev_type = super().create_or_replace(create_only)
         except:
             # self.digest is also the file UID.
             nonrdfly.delete(self.digest)

+ 49 - 28
lakesuperior/model/ldpr.py

@@ -364,7 +364,7 @@ class Ldpr(metaclass=ABCMeta):
         return rdfly.extract_imr(self.uid, ver_uid, **kwargs).graph
 
 
-    def create_or_replace_rsrc(self, create_only=False):
+    def create_or_replace(self, create_only=False):
         '''
         Create or update a resource. PUT and POST methods, which are almost
         identical, are wrappers for this method.
@@ -372,27 +372,34 @@ class Ldpr(metaclass=ABCMeta):
         @param create_only (boolean) Whether this is a create-only operation.
         '''
         create = create_only or not self.is_stored
+        ev_type = RES_CREATED if create else RES_UPDATED
 
         self._add_srv_mgd_triples(create)
-        #self._ensure_single_subject_rdf(self.provided_imr.graph)
         ref_int = rdfly.config['referential_integrity']
         if ref_int:
             self._check_ref_int(ref_int)
 
-        rdfly.create_or_replace_rsrc(self.uid, self.provided_imr.graph)
-        self.imr = self.provided_imr
+        # Delete existing triples if replacing.
+        if not create:
+            rdfly.truncate_rsrc(self.uid)
 
-        self._set_containment_rel()
+        add_trp = set(self.provided_imr.graph) | self._containment_rel(create)
 
-        return RES_CREATED if create else RES_UPDATED
-        #return self._head(self.provided_imr.graph)
+        self._modify_rsrc(ev_type, add_trp=add_trp)
+        new_gr = Graph()
+        for trp in add_trp:
+            new_gr.add(trp)
+
+        self.imr = new_gr.resource(self.uri)
+
+        return ev_type
 
 
     def put(self):
         '''
         https://www.w3.org/TR/ldp/#ldpr-HTTP_PUT
         '''
-        return self.create_or_replace_rsrc()
+        return self.create_or_replace()
 
 
     def patch(self, update_str):
@@ -535,8 +542,8 @@ class Ldpr(metaclass=ABCMeta):
         elif nsc['ldp'].Container in laz_gr[: RDF.type :]:
             laz_gr.add((self.uri, RDF.type, nsc['fcrepo'].Container))
 
-        self._modify_rsrc(RES_CREATED, tstone_trp, set(laz_gr))
-        self._set_containment_rel()
+        laz_set = set(laz_gr) | self._containment_rel()
+        self._modify_rsrc(RES_CREATED, tstone_trp, laz_set)
 
         return self.uri
 
@@ -581,7 +588,7 @@ class Ldpr(metaclass=ABCMeta):
             # @TODO Check individual objects: if they are repo-managed URIs
             # and not existing or tombstones, they are not added.
 
-        return self.create_or_replace_rsrc(create_only=False)
+        return self.create_or_replace(create_only=False)
 
 
     ## PROTECTED METHODS ##
@@ -610,34 +617,38 @@ class Ldpr(metaclass=ABCMeta):
         @param add_trp (set) Triples to be added.
         @param notify (boolean) Whether to send a message about the change.
         '''
-        ret = rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
+        rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
 
-        if notify and env.config.get('messaging'):
+        if notify and env.config['application'].get('messaging'):
+            logger.debug('Enqueuing message for {}'.format(self.uid))
             self._enqueue_msg(ev_type, remove_trp, add_trp)
 
-        return ret
-
 
     def _enqueue_msg(self, ev_type, remove_trp=None, add_trp=None):
         '''
-        Sent a message about a changed (created, modified, deleted) resource.
+        Compose a message about a resource change.
+
+        The message is enqueued for asynchronous processing.
+
+        @param ev_type (string) The event type. See global constants.
+        @param remove_trp (set) Triples removed. Only used if the 
         '''
         try:
-            type = self.types
+            rsrc_type = tuple(str(t) for t in self.types)
             actor = self.metadata.value(nsc['fcrepo'].createdBy)
         except (ResourceNotExistsError, TombstoneError):
-            type = set()
+            rsrc_type = ()
             actor = None
             for t in add_trp:
                 if t[1] == RDF.type:
-                    type.add(t[2])
+                    rsrc_type.add(t[2])
                 elif actor is None and t[1] == nsc['fcrepo'].createdBy:
                     actor = t[2]
 
         env.app_globals.changelog.append((set(remove_trp), set(add_trp), {
             'ev_type': ev_type,
-            'time': env.timestamp,
-            'type': type,
+            'timestamp': env.timestamp.format(),
+            'rsrc_type': rsrc_type,
             'actor': actor,
         }))
 
@@ -731,7 +742,7 @@ class Ldpr(metaclass=ABCMeta):
         self.provided_imr.set(nsc['fcrepo'].lastModifiedBy, self.DEFAULT_USER)
 
 
-    def _set_containment_rel(self):
+    def _containment_rel(self, create):
         '''Find the closest parent in the path indicated by the uid and
         establish a containment triple.
 
@@ -747,6 +758,9 @@ class Ldpr(metaclass=ABCMeta):
           fcres:/a/b/c.
         - If fcres:/e is being created, the root node becomes container of
           fcres:/e.
+
+        @param create (bool) Whether the resource is being created. If false,
+        the parent container is not updated.
         '''
         from lakesuperior.model.ldp_factory import LdpFactory
 
@@ -765,25 +779,32 @@ class Ldpr(metaclass=ABCMeta):
                 parent_rsrc = LdpFactory.new_container(cnd_parent_uid)
                 # This will trigger this method again and recurse until an
                 # existing container or the root node is reached.
-                parent_rsrc.create_or_replace_rsrc()
+                parent_rsrc.create_or_replace()
                 parent_uid = parent_rsrc.uid
         else:
             parent_uid = ROOT_UID
 
-        add_gr = Graph()
-        add_gr.add((nsc['fcres'][parent_uid], nsc['ldp'].contains, self.uri))
         parent_rsrc = LdpFactory.from_stored(
             parent_uid, repr_opts={'incl_children' : False}, handling='none')
-        parent_rsrc._modify_rsrc(RES_UPDATED, add_trp=add_gr)
+
+        # Only update parent if the resource is new.
+        if create:
+            add_gr = Graph()
+            add_gr.add(
+                    (nsc['fcres'][parent_uid], nsc['ldp'].contains, self.uri))
+            parent_rsrc._modify_rsrc(RES_UPDATED, add_trp=add_gr)
 
         # Direct or indirect container relationship.
-        self._add_ldp_dc_ic_rel(parent_rsrc)
+        return self._add_ldp_dc_ic_rel(parent_rsrc)
 
 
     def _dedup_deltas(self, remove_gr, add_gr):
         '''
         Remove duplicate triples from add and remove delta graphs, which would
         otherwise contain unnecessary statements that annul each other.
+
+        @return tuple 2 "clean" sets of respectively remove statements and
+        add statements.
         '''
         return (
             remove_gr - add_gr,
@@ -829,7 +850,7 @@ class Ldpr(metaclass=ABCMeta):
             target_rsrc = LdpFactory.from_stored(rdfly.uri_to_uid(s))
             target_rsrc._modify_rsrc(RES_UPDATED, add_trp={(s, p, o)})
 
-        self._modify_rsrc(RES_UPDATED, add_trp=add_trp)
+        return add_trp
 
 
     def _sparql_update(self, update_str, notify=True):

+ 22 - 8
lakesuperior/store/ldp_rs/rsrc_centric_layout.py

@@ -282,6 +282,20 @@ class RsrcCentricLayout:
         return rsrc
 
 
+    def get_user_data(self, uid):
+        '''
+        Get all the user-provided data.
+
+        @param uid (string) Resource UID.
+        '''
+        # @TODO This only works as long as there is only one user-provided
+        # graph. If multiple user-provided graphs will be supported, this
+        # should use another query to get all of them.
+        userdata_gr = self.ds.graph(nsc['fcmain'][uid])
+
+        return userdata_gr | Graph()
+
+
     def get_version_info(self, uid, strict=True):
         '''
         Get all metadata about a resource's versions.
@@ -422,20 +436,20 @@ class RsrcCentricLayout:
         # Remove versions.
         for ver_uri in self.ds.graph(nsc['fcadmin'][uid])[
                 uri : nsc['fcrepo'].hasVersion : None]:
-            self._delete_rsrc(uid_fn(ver_uri), True)
+            self.delete_rsrc(uid_fn(ver_uri), True)
 
         # Remove resource itself.
-        self._delete_rsrc(uid)
+        self.delete_rsrc(uid)
 
 
-    def create_or_replace_rsrc(self, uid, trp):
+    def truncate_rsrc(self, uid):
         '''
-        Create a new resource or replace an existing one.
+        Remove all user-provided data from a resource and only leave admin and
+        structure data.
         '''
-        if self.ask_rsrc_exists(uid):
-            self._delete_rsrc(uid)
+        userdata = set(self.get_user_data(uid))
 
-        return self.modify_rsrc(uid, add_trp=trp)
+        return self.modify_rsrc(uid, remove_trp=userdata)
 
 
     def modify_rsrc(self, uid, remove_trp=set(), add_trp=set()):
@@ -490,7 +504,7 @@ class RsrcCentricLayout:
             meta_gr.add((gr_uri, RDF.type, gr_type))
 
 
-    def _delete_rsrc(self, uid, historic=False):
+    def delete_rsrc(self, uid, historic=False):
         '''
         Delete all aspect graphs of an individual resource.
 

+ 1 - 1
requirements.txt

@@ -13,5 +13,5 @@ pytest==3.2.2
 rdflib==4.2.2
 requests-toolbelt==0.8.0
 requests==2.18.4
-stompest==2.3.0
+stomp.py==4.1.20
 wheel==0.30.0a0