Kaynağa Gözat

WIP test with stomp.py.

Stefano Cossu 7 yıl önce
ebeveyn
işleme
ff550e1482

+ 8 - 7
etc.skeleton/application.yml

@@ -80,19 +80,20 @@ messaging:
     # List of channels to send messages to.
     # Each channel must define the `endpoint` and the `level` parameters.
     routes:
-        # The destination of the message. Currently only the STOMP protocol
-        # is supported.
-        - endpoint: stomp://localhost:61613
+          # Output handler. Currently only `StompHandler` is supported.
+        - handler: StompHandler
+
+          host: 127.0.0.1
+          port: 61613
+          username:
+          password:
 
           # Message format: at the moment only `ActivityStreamsFormatter` is
           # supported.
           formatter: ActivityStreamsFormatter
 
-          # Output handler. Currently only `StompHandler` is supported.
-          handler: StompHandler
-
           # Granularity level of messages. It can be one of:
-          # - `none`: No message are ever sent.
+          # - `none`: No message is ever sent.
           # - `resource`: Messages are sent whenever a resource is created,
           #   updated or deleted. No details about what changed is included.
           # - `provenance`: up to two messages are sent for each individual

+ 12 - 5
lakesuperior/messaging/handlers.py

@@ -2,8 +2,12 @@ import logging
 
 from abc import ABCMeta, abstractmethod
 
+import stomp
 
-class StompHandler(logging.StreamHandler):
+from flask import current_app
+
+
+class StompHandler(logging.Handler):
     '''
     Send messages to a remote queue broker using the STOMP protocol.
 
@@ -11,15 +15,18 @@ class StompHandler(logging.StreamHandler):
     standard logging for clarity about its scope: while logging has an
     informational purpose, this module has a functional one.
     '''
-    def __init__(self, ep):
-        self.ep = ep
+    def __init__(self, conf):
         super().__init__()
 
+        #import pdb; pdb.set_trace()
+        self.conn = stomp.Connection([(conf['host'], conf['port'])])
+        self.conn.start()
+        self.conn.connect(conf['username'], conf['password'], wait=True)
+
 
     def emit(self, record):
         '''
         Send the message to the destination endpoint.
         '''
-        return self.format(record)
-
+        self.conn.send('/topic/fcrepo', self.format(record))
 

+ 1 - 1
lakesuperior/messaging/messenger.py

@@ -16,7 +16,7 @@ class Messenger:
     def __init__(self, config):
         for route in config['routes']:
             handler_cls = getattr(handlers, route['handler'])
-            messenger.addHandler(handler_cls(route['endpoint']))
+            messenger.addHandler(handler_cls(route))
             messenger.setLevel(logging.INFO)
             #messenger.formatter = logging.Formatter('%(message)s')
             formatter = getattr(formatters, route['formatter'])