瀏覽代碼

Stand up messaging.

Stefano Cossu 7 年之前
父節點
當前提交
36b40204d2

+ 4 - 1
etc.defaults/application.yml

@@ -87,11 +87,14 @@ messaging:
           # for this route.
           active: True
 
+          # Protocol version. One of `10`, `11` or `12`.
+          protocol: '11'
           host: 127.0.0.1
           port: 61613
+
+          # Credentials are optional.
           username:
           password:
-          protocol: '12'
           destination: '/topic/fcrepo'
 
           # Message format: at the moment the following are supported:

+ 9 - 3
lakesuperior/api/resource.py

@@ -101,8 +101,14 @@ def process_queue():
 
 def send_event_msg(remove_trp, add_trp, metadata):
     '''
-    Break down delta triples, find subjects and send event message.
+    Send messages about a changed LDPR.
+
+    A single LDPR message packet can contain multiple resource subjects, e.g.
+    if the resource graph contains hash URIs or even other subjects. This
+    method groups triples by subject and sends a message for each of the
+    subjects found.
     '''
+    # Group delta triples by subject.
     remove_grp = groupby(remove_trp, lambda x : x[0])
     remove_dict = {k[0]: k[1] for k in remove_grp}
 
@@ -111,8 +117,8 @@ def send_event_msg(remove_trp, add_trp, metadata):
 
     subjects = set(remove_dict.keys()) | set(add_dict.keys())
     for rsrc_uri in subjects:
-        logger.info('subject: {}'.format(rsrc_uri))
-        app_globals.messenger.send
+        logger.debug('Processing event for subject: {}'.format(rsrc_uri))
+        app_globals.messenger.send(rsrc_uri, **metadata)
 
 
 ### API METHODS ###

+ 1 - 2
lakesuperior/globals.py

@@ -43,12 +43,11 @@ class AppGlobals:
         #logger.info('Non-RDF layout: {}'.format(nonrdfly_mod_name))
 
         # Set up messaging.
-        messenger = Messenger(app_conf['messaging'])
+        self._messenger  = Messenger(app_conf['messaging'])
 
         # Exposed globals.
         self._rdfly = rdfly_cls(app_conf['store']['ldp_rs'])
         self._nonrdfly = nonrdfly_cls(app_conf['store']['ldp_nr'])
-        self._messenger = messenger
         self._changelog = deque()
 
 

+ 25 - 28
lakesuperior/messaging/formatters.py

@@ -21,13 +21,13 @@ class BaseASFormatter(metaclass=ABCMeta):
     }
 
     ev_names = {
-        RES_CREATED : 'Resource Modification',
-        RES_DELETED : 'Resource Creation',
-        RES_UPDATED : 'Resource Deletion',
+        RES_CREATED : 'Resource Creation',
+        RES_DELETED : 'Resource Deletion',
+        RES_UPDATED : 'Resource Modification',
     }
 
-    def __init__(self, uri, ev_type, time, type, data=None,
-                data_fmt='text/turtle', metadata=None):
+    def __init__(
+            self, rsrc_uri, ev_type, timestamp, rsrc_type, actor, data=None):
         '''
         Format output according to granularity level.
 
@@ -36,23 +36,20 @@ class BaseASFormatter(metaclass=ABCMeta):
         are logged under the same level. This it is rather about *what* gets
         logged in a message.
 
-        @param record (dict) This holds a dict with the following keys:
-        - `uri`: URI of the resource.
-        - `ev_type`: one of `create`, `delete` or `update`
-        - `time`: Timestamp of the ev_type.
-        - `data`: if messaging is configured with `provenance` level, this is
-          a `rdflib.Graph` containing the triples that have been removed or
-        added.
-        - `metadata`: provenance metadata as a rdflib.Graph object. This
-        contains properties such as actor(s), action (add/remove), etc. This is
-        only meaningful for `ASDeltaFormatter`.
+        @param rsrc_uri (rdflib.URIRef) URI of the resource.
+        @param ev_type (string) one of `create`, `delete` or `update`
+        @param timestamp (string) Timestamp of the event.
+        @param data (tuple(set)) if messaging is configured with `provenance`
+        level, this is a 2-tuple with one set (as 3-tuples of
+        RDFlib.Identifier instances) for removed triples, and one set for
+        added triples.
         '''
-        self.uri = uri
+        self.rsrc_uri = rsrc_uri
         self.ev_type = ev_type
-        self.time = time
-        self.type = type
-        self.data = data or None
-        self.metadata = metadata
+        self.timestamp = timestamp
+        self.rsrc_type = rsrc_type
+        self.actor = actor
+        self.data = data
 
 
     @abstractmethod
@@ -77,11 +74,11 @@ class ASResourceFormatter(BaseASFormatter):
             'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
-                'id' : self.uri,
-                'updated' : self.time,
-                'type' : self.type,
+                'id' : self.rsrc_uri,
+                'updated' : self.timestamp,
+                'type' : self.rsrc_type,
             },
-            'actor' : self.metadata.get('actor', None),
+            'actor' : self.actor,
         }
 
         return json.dumps(ret)
@@ -104,11 +101,11 @@ class ASDeltaFormatter(BaseASFormatter):
             'type' : self.ev_types[self.ev_type],
             'name' : self.ev_names[self.ev_type],
             'object' : {
-                'id' : self.uri,
-                'updated' : self.time,
-                'type' : self.type,
+                'id' : self.rsrc_uri,
+                'updated' : self.timestamp,
+                'type' : self.rsrc_type,
             },
-            'actor' : self.metadata.get('actor', None),
+            'actor' : self.actor,
             'data' : self.data,
         }
 

+ 0 - 4
lakesuperior/messaging/handlers.py

@@ -1,11 +1,7 @@
 import logging
 
-from abc import ABCMeta, abstractmethod
-
 import stomp
 
-from flask import current_app
-
 
 class StompHandler(logging.Handler):
     '''

+ 19 - 10
lakesuperior/messaging/messenger.py

@@ -2,6 +2,7 @@ import logging
 
 from lakesuperior.messaging import formatters, handlers
 
+logger = logging.getLogger(__name__)
 messenger = logging.getLogger('_messenger')
 
 
@@ -9,21 +10,29 @@ class Messenger:
     '''
     Very simple message sender using the standard Python logging facility.
     '''
-    _msg_routes = []
-
     def __init__(self, config):
-        for route in config['routes']:
-            handler_cls = getattr(handlers, route['handler'])
-            messenger.addHandler(handler_cls(route))
-            messenger.setLevel(logging.INFO)
-            formatter = getattr(formatters, route['formatter'])
+        '''
+        Set up the messenger.
+
+        @param config (dict) Messenger configuration.
+        '''
+        def msg_routes():
+            for route in config['routes']:
+                handler_cls = getattr(handlers, route['handler'])
+                messenger.addHandler(handler_cls(route))
+                messenger.setLevel(logging.INFO)
+                formatter = getattr(formatters, route['formatter'])
+
+                yield messenger, formatter
 
-            self._msg_routes.append((messenger, formatter))
+        self.config = config
+        self.msg_routes = tuple(r for r in msg_routes())
+        logger.info('Active messaging routes: {}'.format(self.msg_routes))
 
 
     def send(self, *args, **kwargs):
         '''
         Send one or more external messages.
         '''
-        for m, f in self._msg_routes:
-            m.info(f(*args, **kwargs))
+        for msg, fn in self.msg_routes:
+            msg.info(fn(*args, **kwargs))

+ 15 - 10
lakesuperior/model/ldpr.py

@@ -610,34 +610,39 @@ class Ldpr(metaclass=ABCMeta):
         @param add_trp (set) Triples to be added.
         @param notify (boolean) Whether to send a message about the change.
         '''
-        ret = rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
+        pdb.set_trace()
+        rdfly.modify_rsrc(self.uid, remove_trp, add_trp)
 
-        if notify and env.config.get('messaging'):
+        if notify and env.config['application'].get('messaging'):
+            logger.debug('Enqueuing message for {}'.format(self.uid))
             self._enqueue_msg(ev_type, remove_trp, add_trp)
 
-        return ret
-
 
     def _enqueue_msg(self, ev_type, remove_trp=None, add_trp=None):
         '''
-        Sent a message about a changed (created, modified, deleted) resource.
+        Compose a message about a resource change.
+
+        The message is enqueued for asynchronous processing.
+
+        @param ev_type (string) The event type. See global constants.
+        @param remove_trp (set) Triples removed. Only used if the 
         '''
         try:
-            type = self.types
+            rsrc_type = tuple(str(t) for t in self.types)
             actor = self.metadata.value(nsc['fcrepo'].createdBy)
         except (ResourceNotExistsError, TombstoneError):
-            type = set()
+            rsrc_type = ()
             actor = None
             for t in add_trp:
                 if t[1] == RDF.type:
-                    type.add(t[2])
+                    rsrc_type.add(t[2])
                 elif actor is None and t[1] == nsc['fcrepo'].createdBy:
                     actor = t[2]
 
         env.app_globals.changelog.append((set(remove_trp), set(add_trp), {
             'ev_type': ev_type,
-            'time': env.timestamp,
-            'type': type,
+            'timestamp': env.timestamp.format(),
+            'rsrc_type': rsrc_type,
             'actor': actor,
         }))