Examples¶
Start RabbitMQ¶
In order for these examples to work you need a RabbitMQ server:
# From https://hub.docker.com/_/rabbitmq
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
Pub/Sub¶
This code snippet will subscribe to an ActivityStarted in order to publish an ActivityFinished
- Set up a subscriber for the ‘EiffelActivityStartedEvent’.
- Publish an ‘EiffelActivityTriggeredEvent’ and ‘EiffelActivityStartedEvent’.
- Callback for ‘EiffelActivityStartedEvent’ is called.
- Publish an ‘EiffelActivityFinishedEvent’
import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import (EiffelActivityTriggeredEvent,
EiffelActivityStartedEvent,
EiffelActivityFinishedEvent)
def callback(event, context):
"""Fetch EiffelActivityTriggeredEvent ID from links and send EiffelActivityFinishedEvent."""
activity_triggered = None
for link in event.links.links:
if link.get("type") == "ACTIVITY_EXECUTION":
activity_triggered = link.get("target")
break
else:
print(event.pretty)
raise Exception("No ACTIVITY_EXECUTION link on EiffelActivityStartedEvent")
# https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelActivityFinishedEvent.md
activity_finished = EiffelActivityFinishedEvent()
activity_finished.data.add("outcome", {"conclusion": "SUCCESSFUL"})
activity_finished.links.add("ACTIVITY_EXECUTION", activity_triggered)
PUBLISHER.send_event(activity_finished)
SUBSCRIBER = RabbitMQSubscriber(host="127.0.0.1", queue="pubsub", exchange="amq.fanout",
ssl=False, port=5672)
PUBLISHER = RabbitMQPublisher(host="127.0.0.1", exchange="amq.fanout", port=5672, ssl=False,
routing_key=None)
SUBSCRIBER.subscribe("EiffelActivityStartedEvent", callback)
SUBSCRIBER.start()
PUBLISHER.start()
# https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelActivityTriggeredEvent.md
ACTIVITY_TRIGGERED = EiffelActivityTriggeredEvent()
ACTIVITY_TRIGGERED.data.add("name", "Pubsub activity")
PUBLISHER.send_event(ACTIVITY_TRIGGERED)
# https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelActivityStartedEvent.md
ACTIVITY_STARTED = EiffelActivityStartedEvent()
ACTIVITY_STARTED.links.add("ACTIVITY_EXECUTION", ACTIVITY_TRIGGERED)
PUBLISHER.send_event(ACTIVITY_STARTED)
# Wait for event to be received by 'callback'.
time.sleep(1)
Activity¶
How to utilize an eiffellib.activity.Activity
An activity is just a callable which will send ActivityTriggered, Started and Finished.
import os
import time
from eiffellib.subscribers import RabbitMQSubscriber
from eiffellib.publishers import RabbitMQPublisher
from eiffellib.events import EiffelAnnouncementPublishedEvent
from eiffellib.activity import Activity
class MyActivity(Activity):
def pre_call(self, event, context):
print("Activity has triggered.")
def call(self, event, context):
print("Activity has started. Let's do stuff.")
def post_call(self, event, context):
print("Activity has finished.")
SUBSCRIBER = RabbitMQSubscriber(host="127.0.0.1", queue="activity", exchange="amq.fanout",
ssl=False, port=5672)
PUBLISHER = RabbitMQPublisher(host="127.0.0.1", exchange="amq.fanout", port=5672, ssl=False,
routing_key=None)
SOURCE = {"host": os.getenv("HOSTNAME", "hostname"), "name": "MyActivity",
"domainId": "example"}
MY_ACTIVITY = MyActivity("Name of activity", PUBLISHER, SOURCE)
SUBSCRIBER.subscribe("EiffelAnnouncementPublishedEvent", MY_ACTIVITY)
SUBSCRIBER.start()
PUBLISHER.start()
# https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelAnnouncementPublishedEvent.md
ANNOUNCEMENT = EiffelAnnouncementPublishedEvent()
ANNOUNCEMENT.data.add("heading", "My activity will now trigger")
ANNOUNCEMENT.data.add("body", "This is just a quick trigger for my activity")
ANNOUNCEMENT.data.add("severity", "MINOR")
PUBLISHER.send_event(ANNOUNCEMENT)
# Wait for event to be received by 'callback'.
time.sleep(1)