UMPS
The University of Utah Seismograph Stations Message Passing System.

The publisher-subscriber pattern is the simplest communication mechanism. It is included for pedagogical purposes only. Here, multiple consumers receive messages from one producer. More...

Collaboration diagram for Publisher-Subscriber:

Classes

class  UMPS::Messaging::PublisherSubscriber::Publisher
 A ZeroMQ publisher. More...
 
class  UMPS::Messaging::PublisherSubscriber::PublisherOptions
 Options for initializing the publisher in the PUB/SUB pattern. More...
 
class  UMPS::Messaging::PublisherSubscriber::Subscriber
 The subscriber in a publisher/subscriber messaging pattern. More...
 
class  UMPS::Messaging::PublisherSubscriber::SubscriberOptions
 Options for initializing the subscriber in the PUB/SUB pattern. More...
 

Detailed Description

The publisher-subscriber pattern is the simplest communication mechanism. It is included for pedagogical purposes only. Here, multiple consumers receive messages from one producer.

Pub-Sub Messaging Overview

This is the most naive messaging strategy available in UMPS. All that happens are that messages are sent from a producer to consumers. This pattern exists for educational and testing purposes and should not be used in production code.


The publisher-subscriber pattern. Here, three subscribers receive messages from one publisher.


Despite its simplicity the pub-sub pattern is important. From an application standpoint, it is the mechanism that will allow UMPS to distribute data packets, heartbeat messages, pick messages, earthquake messages, etc. This pattern also will allow me to introduce preliminary concepts like the distinction between connecting and binding.

An Example

In this example, a publisher will send a handful of text messages to three subscribers.

The Publisher

The important points in this code snippet are that the publisher binds to an endpoint, defines a message to send, then sends the message in a non-blocking way. Notice that the publisher does not care whether or not the subscriber receives the messages.

#include <string>
#include <thread>
#include <umps/messaging/publisherSubscriber/publisher.hpp>
#include <umps/messaging/publisherSubscriber/publisherOptions.hpp>
#include <umps/messageFormats/text.hpp>
#include "pubsub.hpp"
using namespace UMPS::Messaging::PublisherSubscriber;
void publisher()
{
PublisherOptions publisherOptions;
publisherOptions.setAddress("tcp://127.0.0.1:5555"); // Bind on this address
Publisher publisher;
publisher.initialize(publisherOptions);
// Wait a bit for the subscriber to connect
std::this_thread::sleep_for(std::chrono::milliseconds(750));
for (int i = 0; i < N_MESSAGES; ++i)
{
// Define a message
textMessage.setContents("Message number " + std::to_string(i + 1));
// The message is serialized by the Text class and put on the wire
// by ZeroMQ
publisher.send(textMessage);
}
}
Defines a text-based message. For example, this class would allow you to send the contents of a text ...
Definition: text.hpp:13
void setContents(const std::string &contents) noexcept
Sets the contents of a text message.

The Subscriber

The important points are that the subscriber connects to an endpoint, receives a pre-agreed upon number of messages, and returns. It is important to consider the blocking (waiting) behavior of the subscriber. If the default is to block indefinitely until a message is received then in this example the program may hang. For this reason, we actually connect the subscriber to the publisher before the publisher is created. You may want to pause for a moment to let that sink in - ZeroMQ allows the subscriber to connect prior to the publisher binding to the socket.

#include <iostream>
#include <string>
#include <umps/messaging/publisherSubscriber/subscriber.hpp>
#include <umps/messaging/publisherSubscriber/subscriberOptions.hpp>
#include <umps/messageFormats/text.hpp>
#include <umps/messageFormats/messages.hpp>
#include <umps/messageFormats/staticUniquePointerCast.hpp>
#include "pubsub.hpp"
using namespace UMPS::Messaging::PublisherSubscriber;
void subscriber(const int subscriberID)
{
// Define the message types that the subscriber will receive
std::unique_ptr<UMPS::MessageFormats::IMessage> textMessageType
= std::make_unique<UMPS::MessageFormats::Text> ();
messageTypes.add(textMessageType);
// Define the subscriber options
SubscriberOptions subscriberOptions;
subscriberOptions.setAddress("tcp://127.0.0.1:5555"); // Connect to this address
subscriberOptions.setMessageTypes(messageTypes); // Types of messages to get
// Initialize the subscriber
Subscriber subscriber;
subscriber.initialize(subscriberOptions);
// Now retrieve messages
for (int i = 0; i < N_MESSAGES; ++i)
{
// The message is read off the wire by ZeroMQ and deserialized
// by the Text class.
auto message = subscriber.receive(); // Blocks until message is received
// Convert the message so we can look at the contents
auto textMessage
= static_unique_pointer_cast<UMPS::MessageFormats::Text>
(std::move(message));
std::cout << "SubscriberID: " << subscriberID
<< " received: " << textMessage->getContents() << std::endl;
}
}
This is a container for holding multiple (unique) message formats.
Definition: messages.hpp:16
void add(const std::unique_ptr< IMessage > &message)
Add the message type to the container.
std::string getContents() const noexcept

The Driver

Here is the example driver code that launches this example. We create four threads - a publisher and three subscribers. The subscribers connect first then will block until a message is received. Then the publisher is started. Since this code is brittle, the publisher code pauses after initialization to deal with something called the slow-joiner problem (basically, ZeroMQ has to do some asynchronous work behind the scenes that takes time). In practice, we do not pause since we never send a predefined number of messages in a pub-sub pattern.

#include <thread>
#include "pubsub.hpp"
int main()
{
// Actually let the subscribers `connect' first
auto subscriberThread1 = std::thread(subscriber, 1);
auto subscriberThread2 = std::thread(subscriber, 2);
auto subscriberThread3 = std::thread(subscriber, 3);
// Now create the publisher
auto publisherThread = std::thread(publisher);
// Wait for threads to finish
subscriberThread1.join();
subscriberThread2.join();
subscriberThread3.join();
publisherThread.join();
return EXIT_SUCCESS;
}

Summary

The pub-sub is a foundational pattern in messaging that is provides a natural solution for handling streaming data. Additionally, the concepts of binding and connecting were introduced. Despite this, the alert reader is likely asking - what if I have multiple producers? The answer to that question is the Extended Publisher-Subscriber pattern.