Browse Source

Initial PoC messaging with key parts in place.

Stefano Cossu 7 năm trước cách đây
mục cha
commit
7931c4c2a8

+ 11 - 8
doc/notes/TODO

@@ -24,24 +24,27 @@
   - [ ] Storage layer (RDF + file)
   - [ ] LDP layer
 - [D] Bootstrap
-- [ ] Optimize queries
-- [ ] Messaging SPI
+- [W] Messaging SPI
 - [ ] Hook up Hyrax
 - [ ] Reformat documentation
 
 # Alpha 2 TODO
 
-- [ ] Migraion tool
+- [ ] Migration tool
 - [ ] Complete header handling
 - [ ] Query API
+
+# Alpha 3 TODO
+
+- [ ] Optimize queries
 - [ ] Separate read-only graph/resource generation from RW graph/res generation
 - [ ] Move server-managed triples to separate graph in simple layout
 - [ ] Fixity checks
 
-# Alpha 3 TODO
+# Alpha 4 TODO
+
 - [ ] Full provenance layout
 - [ ] Versioning
-- [ ] Async coroutines
-  - [ ] Within sync request
-  - [ ] Fully async request
-- [?] Basic UI
+- [ ] AuthN/Z
+  - [ ] Authentication
+  - [ ] WebAC

+ 23 - 0
etc.skeleton/application.yml

@@ -75,3 +75,26 @@ store:
         # Default: 4
         pairtree_branches: 4
 
+# Configuration for messaging.
+messaging:
+    # List of channels to send messages to.
+    # Each channel must define the `endpoint` and the `level` parameters.
+    routes:
+        # The destination of the message. Currently only the STOMP protocol
+        # is supported.
+        - endpoint: stomp://localhost:61613
+
+          # Message format: at the moment only `ActivityStreamsFormatter` is
+          # supported.
+          formatter: ActivityStreamsFormatter
+
+          # Output handler. Currently only `StompHandler` is supported.
+          handler: StompHandler
+
+          # Granularity level of messages. It can be one of:
+          # - `none`: No message are ever sent.
+          # - `resource`: Messages are sent whenever a resource is created,
+          #   updated or deleted. No details about what changed is included.
+          # - `provenance`: up to two messages are sent for each individual
+          #   request: one for the added triples and one for the deleted triples.
+          level: resource

+ 70 - 0
lakesuperior/messaging/formatters.py

@@ -0,0 +1,70 @@
+import json
+import logging
+import uuid
+
+from activipy import vocab
+
+
+class ActivityStreamsFormatter:
+    '''
+    Format message as ActivityStreams.
+
+    This is not really a `logging.Formatter` subclass, but a plain string
+    builder.
+    '''
+    ev_names = {
+        'Update' : 'Resource Modification',
+        'Create' : 'Resource Creation',
+        'Delete' : 'Resource Deletion',
+    }
+
+    def __init__(self, uri, ev_type, time, type, data=None,
+                data_fmt='text/turtle', metadata=None):
+        '''
+        Format output according to granularity level.
+
+        NOTE: Granularity level does not refer to the logging levels, i.e.
+        *when* a message gets logged, in fact all the Messaging logger messages
+        are logged under the same level. This it is rather about *what* gets
+        logged in a message.
+
+        @param record (dict) This holds a dict with the following keys:
+        - `uri`: URI of the resource.
+        - `ev_type`: one of `create`, `delete` or `update`
+        - `time`: Timestamp of the ev_type.
+        - `data`: if messaging is configured with `provenance` level, this is
+        a `rdflib.Graph` containing the triples that have been removed or
+        added.
+        - `metadata`: provenance metadata as a rdflib.Graph object. This
+        contains properties such as actor(s), action (add/remove), etc. This is
+        only present with messaging level set to `provenance`.
+        '''
+        self.uri = uri
+        self.ev_type = ev_type
+        self.time = time
+        self.type = type
+        self.data = data.serialize(format=data_fmt).decode('utf8') \
+                if data else None
+        self.metadata = metadata
+
+
+    def __str__(self):
+        '''
+        Output structured data as string.
+        '''
+        ret = {
+            '@context': 'https://www.w3.org/ns/activitystreams',
+            'id' : 'urn:uuid:{}'.format(uuid.uuid4()),
+            'type' : self.ev_type,
+            'name' : self.ev_names[self.ev_type],
+            'object' : {
+                'id' : self.uri,
+                'updated' : self.time,
+                'type' : self.type,
+            },
+            'actor' : self.metadata.setdefault('actor', None),
+            'data' : self.data or '',
+        }
+
+        return json.dumps(ret)
+

+ 25 - 0
lakesuperior/messaging/handlers.py

@@ -0,0 +1,25 @@
+import logging
+
+from abc import ABCMeta, abstractmethod
+
+
+class StompHandler(logging.StreamHandler):
+    '''
+    Send messages to a remote queue broker using the STOMP protocol.
+
+    This module is named and configured separately from
+    standard logging for clarity about its scope: while logging has an
+    informational purpose, this module has a functional one.
+    '''
+    def __init__(self, ep):
+        self.ep = ep
+        super().__init__()
+
+
+    def emit(self, record):
+        '''
+        Send the message to the destination endpoint.
+        '''
+        return self.format(record)
+
+

+ 32 - 0
lakesuperior/messaging/messenger.py

@@ -0,0 +1,32 @@
+import logging
+
+from flask import current_app
+
+from lakesuperior.messaging import formatters, handlers
+
+messenger = logging.getLogger('_messaging')
+
+
+class Messenger:
+    '''
+    Very simple message sender.
+    '''
+    _msg_routes = []
+
+    def __init__(self, config):
+        for route in config['routes']:
+            handler_cls = getattr(handlers, route['handler'])
+            messenger.addHandler(handler_cls(route['endpoint']))
+            messenger.setLevel(logging.INFO)
+            #messenger.formatter = logging.Formatter('%(message)s')
+            formatter = getattr(formatters, route['formatter'])
+
+            self._msg_routes.append((messenger, formatter))
+
+
+    def send(self, *args, **kwargs):
+        '''
+        Send one or more external messages.
+        '''
+        for m, f in self._msg_routes:
+            m.info(f(*args, **kwargs))

+ 22 - 4
lakesuperior/store_layouts/rdf/base_rdf_layout.py

@@ -3,6 +3,7 @@ import logging
 from abc import ABCMeta, abstractmethod
 
 from flask import current_app
+from rdflib.namespace import RDF
 from rdflib.query import ResultException
 from rdflib.resource import Resource
 from rdflib.term import URIRef
@@ -10,6 +11,7 @@ from rdflib.term import URIRef
 from lakesuperior.dictionaries.namespaces import ns_collection as nsc
 from lakesuperior.dictionaries.namespaces import ns_mgr as nsm
 from lakesuperior.exceptions import ResourceNotExistsError
+from lakesuperior.messaging.messenger import Messenger
 from lakesuperior.store_layouts.rdf.graph_store_connector import \
         GraphStoreConnector
 from lakesuperior.toolbox import Toolbox
@@ -50,8 +52,9 @@ class BaseRdfLayout(metaclass=ABCMeta):
     # N.B. This is Fuseki-specific.
     UNION_GRAPH_URI = URIRef('urn:x-arq:UnionGraph')
 
-    RES_CREATED = '_created_'
-    RES_UPDATED = '_updated_'
+    RES_CREATED = 'Create'
+    RES_UPDATED = 'Update'
+    RES_DELETED = 'Delete'
 
     _logger = logging.getLogger(__name__)
 
@@ -72,6 +75,8 @@ class BaseRdfLayout(metaclass=ABCMeta):
                 query_ep=self.conf['webroot'] + self.conf['query_ep'],
                 update_ep=self.conf['webroot'] + self.conf['update_ep'])
 
+        self._msg = Messenger(current_app.config['messaging'])
+
 
     @property
     def store(self):
@@ -123,9 +128,22 @@ class BaseRdfLayout(metaclass=ABCMeta):
             self._logger.info(
                     'Resource {} exists. Removing all outbound triples.'
                     .format(imr.identifier))
-            return self.replace_rsrc(imr)
+            ev_type = self.replace_rsrc(imr)
         else:
-            return self.create_rsrc(imr)
+            ev_type = self.create_rsrc(imr)
+
+        self._msg.send(
+            imr.identifier,
+            ev_type,
+            time=imr.value(nsc['fcrepo'].lastModified),
+            type=list(imr.graph.objects(imr.identifier, RDF.type)),
+            data=imr.graph,
+            metadata={
+                'actor' : imr.value(nsc['fcrepo'].lastModifiedBy),
+            }
+        )
+
+        return ev_type
 
 
     def delete_rsrc(self, urn, inbound=True, delete_children=True):