UMPS
The University of Utah Seismograph Stations Message Passing System.
Extended Publisher-Subscriber

A fully asynchronous publish-subscribe mechanism whereby a forwarder (proxy) relays messages from multiple publishers to multiple subscribers. More...

Collaboration diagram for Extended Publisher-Subscriber:

Classes

class  UMPS::Messaging::XPublisherXSubscriber::Proxy
 A ZeroMQ proxy to be used in the XPUB/XSUB pattern. More...
 
class  UMPS::Messaging::XPublisherXSubscriber::ProxyOptions
 Options for initializing the proxy. More...
 
class  UMPS::Messaging::XPublisherXSubscriber::Publisher
 A ZeroMQ publisher. More...
 
class  UMPS::Messaging::XPublisherXSubscriber::PublisherOptions
 Options for initializing the publisher in the XPUB/XSUB pattern. More...
 
class  UMPS::Messaging::XPublisherXSubscriber::Subscriber
 The subscriber in an extended publisher/subscriber messaging pattern. More...
 
class  UMPS::Messaging::XPublisherXSubscriber::SubscriberOptions
 Options for initializing the subscriber in the extended PUB/SUB pattern. More...
 

Detailed Description

A fully asynchronous publish-subscribe mechanism whereby a forwarder (proxy) relays messages from multiple publishers to multiple subscribers.

Despite its simplicitly the Publisher-Subscriber is not sufficient in practice. For real applications, where we may have multiple data sources (e.g., UUSS field instruments telemetered to import boxes and data streams from other networks served through IRIS). Clearly, we will have multiple data publishers. Of course, it is possible to perpetually add a subscriber for each data feed in our applications but this will become cumbersome and difficult to maintain; particularly when a data feed is dropped. What we really want is one-stop shopping. If a subscriber connects to a particular endpoint then it will be able to receive every message in the data feed, i.e., the subscriber need not know the details of every publisher. For this reason, we introduce the extended publisher-subscriber or xPub-xSub. As a rule, even if I were considering a single producer and single consumer application, I would still implement it as a xPub-xSub pattern because experience dictates that even in well-planned applications more publishers always manage to materialize.


The extended publisher-subscriber (xPub-xSub) pattern. Here four subscribers receive messages from three publishers.


The technology required to make this happen is to introduce a middleman or proxy. The proxy provides a stable endpoint to which publishers send data and subscribers retrieve data. In the parlence of ZeroMQ data goes into the proxy's frontend and data goes out the proxy's backend.

An Example

There really is not much more to this pattern than what you have already seen in the An Example. In principle, data is again being sent from producers to subscribers. The main differences are that instead of one publisher we will have three, the number of subscribers is increased from three to four, and there is a proxy.

The Publisher

The publisher is like the previous pub-sub publisher. However, it connects to the proxy. In the verbiage of ZeroMQ we bind to stable endpoints. Hence, UMPS views content producers and subscribers as ephemeral and proxies as long-lifetime, stable endpoints. Consequently, your modules will always connect to the uOperator.

#include <string>
#include <thread>
#include <umps/messaging/xPublisherXSubscriber/publisher.hpp>
#include <umps/messaging/xPublisherXSubscriber/publisherOptions.hpp>
#include <umps/messageFormats/text.hpp>
#include "xpubxsub.hpp"
using namespace UMPS::Messaging::XPublisherXSubscriber;
void publisher(const int publisherID)
{
PublisherOptions publisherOptions;
publisherOptions.setAddress("tcp://127.0.0.1:5555"); // Connect to this address
Publisher publisher;
publisher.initialize(publisherOptions);
// Deal with slow joiner problem.
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Send messages
for (int i = 0; i < N_MESSAGES; ++i)
{
textMessage.setContents("Message number "
+ std::to_string(i + 1)
+ " from publisher "
+ std::to_string(publisherID));
publisher.send(textMessage);
}
}
Defines a text-based message. For example, this class would allow you to send the contents of a text ...
Definition: text.hpp:13
void setContents(const std::string &contents) noexcept
Sets the contents of a text message.

The Subscriber

The subscriber is exactly like the pub-sub subscriber. In fact, you can use a pub-sub subscriber to connect to the proxy's backend and things will work.

#include <iostream>
#include <string>
#include <umps/messaging/xPublisherXSubscriber/subscriber.hpp>
#include <umps/messaging/xPublisherXSubscriber/subscriberOptions.hpp>
#include <umps/messageFormats/text.hpp>
#include <umps/messageFormats/messages.hpp>
#include <umps/messageFormats/staticUniquePointerCast.hpp>
#include "xpubxsub.hpp"
using namespace UMPS::Messaging::XPublisherXSubscriber;
void subscriber(int subscriberID)
{
// Define the message types that the subscriber will receive
std::unique_ptr<UMPS::MessageFormats::IMessage> textMessageType
= std::make_unique<UMPS::MessageFormats::Text> ();
messageTypes.add(textMessageType);
// Define the subscriber options
SubscriberOptions subscriberOptions;
subscriberOptions.setAddress("tcp://127.0.0.1:5556"); // Connect to this address
subscriberOptions.setMessageTypes(messageTypes); // Types of messages to get
// Initialize the subscriber
Subscriber subscriber;
subscriber.initialize(subscriberOptions);
// Now retrieve messages
for (int i = 0; i < N_PUBLISHERS*N_MESSAGES; ++i)
{
// The message is read off the wire by ZeroMQ and deserialized
// by the Text class.
auto message = subscriber.receive(); // Blocks until message is received
// Convert the message so we can look at the contents
auto textMessage
= static_unique_pointer_cast<UMPS::MessageFormats::Text>
(std::move(message));
std::cout << "SubscriberID: " << subscriberID << " received: "
<< textMessage->getContents() << std::endl;
}
}
This is a container for holding multiple (unique) message formats.
Definition: messages.hpp:16
void add(const std::unique_ptr< IMessage > &message)
Add the message type to the container.
std::string getContents() const noexcept

The Proxy

The proxy is a new concept. What this thread does is simply take data from the input port (frontend) and stick it on output port (backend). Its a worker whose job is to simply take items from one conveyer belt and put them on an adjacent conveyer belt. Surprisingly, this remarkably simple mechanism seems sufficient for many broadcast applications.

#include <thread>
#include <chrono>
#include <umps/messaging/xPublisherXSubscriber/proxy.hpp>
#include <umps/messaging/xPublisherXSubscriber/proxyOptions.hpp>
using namespace UMPS::Messaging::XPublisherXSubscriber;
void proxy()
{
ProxyOptions proxyOptions;
// Data flows from frontend to backend
proxyOptions.setFrontendAddress("tcp://127.0.0.1:5555"); // Publishers connect here
proxyOptions.setBackendAddress("tcp://127.0.0.1:5556"); // Subscribers connect here
// Create the proxy
Proxy proxy;
proxy.initialize(proxyOptions);
// Run the proxy in a separate thread
auto proxyThread = std::thread(&Proxy::start, &proxy);
// Main thread sleeps a bit
std::this_thread::sleep_for(std::chrono::seconds {3});
// The main thread tells the proxy to shut down
proxy.stop();
proxyThread.join();
}
This is the intermediary that allows communication between a client and a backend service.

Notice that the proxy is running as another thread that must be started and stopped. This is because the proxy is supposed to stay up indefinitely. In general, you will not have to explicitly think about this since the uOperator will be responsible for keeping these proxies alive and open. All you will have to do is connect via the extended publisher or extended subscriber.

The Driver

Lastly, the driver code that launches this example. We create seven threads - three publishers and four subscribers. The subscribers connect first then will block until a message is received. Then the publishers are started. Again, this is a brittle code since we are specifying a priori the number of messages. Therefore, the producer code must contend with the slow joiner problem

#include <thread>
#include "xpubxsub.hpp"
int main()
{
// Create a proxy thread
auto proxyThread = std::thread(proxy);
// Give proxy a moment to get up and running
std::this_thread::sleep_for(std::chrono::milliseconds {500});
// Subscribers connect first - so as not to miss messages
auto subscriberThread1 = std::thread(subscriber, 1);
auto subscriberThread2 = std::thread(subscriber, 2);
auto subscriberThread3 = std::thread(subscriber, 3);
auto subscriberThread4 = std::thread(subscriber, 4);
// Publisher connects and lets it rip
auto publisherThread1 = std::thread(publisher, 1);
auto publisherThread2 = std::thread(publisher, 2);
auto publisherThread3 = std::thread(publisher, 3);
// Wait for threads to finish
proxyThread.join();
subscriberThread1.join();
subscriberThread2.join();
subscriberThread3.join();
subscriberThread4.join();
publisherThread1.join();
publisherThread2.join();
publisherThread3.join();
return EXIT_SUCCESS;
}

Summary

In closing, it should be clear that the xPub-xSub paradigm requires marginally more effort than its naive publisher-subscriber counterpart. However, do not undererestimate the xPub-xSub. At UUSS this is the mechanism by which we move all of our data packets, all of our probability packets, and STA/LTA packets, as well as all of our picks, events, and module heartbeats. And the computational cost on the uOperator hub is small. Long story short, when it comes to solving the broadcast problem, you will be hard-pressed to find a more flexible and scalable approach than the xPub-xSub.