The router-dealer combination allows for asynchronous clients interacting with asynchronous servers.
More...
The router-dealer combination allows for asynchronous clients interacting with asynchronous servers.
We now quickly overcome the shortcomings of the Request-Reply pattern. As will be a theme, this is accomplished by introducing a proxy.
The router-dealer pattern. Here, three clients submit requests, which are shepharded from the router frontend to the dealer backend, the requests are sent to a server which responds. The router-dealer then unwinds the request logic and delivers the appropriate response to the appropriate client.
The proxy now consists of a router frontend to which clients connect and a dealer backend to which servers connect. The router socket can accept an arbitrary number of client connections and the dealer socket can accept an arbitrary number of server connections. Moreover, the messaging is asynchronous which means the framework can handle multiple non-blocking requests from a client and multiple non-blocking responses from a server. Of course, whether your application's logic can handle that is another thing - but the option is on the table if you want to explore it.
In more detail, the clients requests are handled using a fair-queuing strategy. The proxy distributes work via the dealer which uses a round-robin load-balancing strategy. When the computational work in requests and backend hardware are approximately equal this is a very good strategy. Though it is possible to implement a custom proxy that performs whatever type of load balancing you desire. The grand irony is even though ZeroMQ would imply no message queues, with only about 20 lines of code UMPS provides a light-weight, highly-scalable solution that is leveraged by all of our machine-learning inference algorithms to support real-time and post-processing.
An Example
In this example, three clients will interact with two servers by way of the router-dealer proxy.
The Server
The server behaves very much like a simple request-reply backend. A server connects to the backend, provides UMPS with a processing functions, and responds to requests. This particular server has been hardened to better deal with invalid messages and processing errors by way of the generic UMPS failure message and try-catch statements. This is because the server logic must be robust and resilient to error so as not to crash the backend.
#include <iostream>
#include <thread>
#include <umps/messageFormats/failure.hpp>
#include <umps/messaging/routerDealer/reply.hpp>
#include <umps/messaging/routerDealer/replyOptions.hpp>
#include "routerDealer.hpp"
#include "messageTypes.hpp"
namespace
{
class MessageProcessor
{
public:
explicit MessageProcessor(int identifier) :
mIdentifier(identifier)
{
}
[[nodiscard]]
std::unique_ptr<UMPS::MessageFormats::IMessage>
process(const std::string &messageType,
const void *messageContents, const size_t length)
{
::RequestMessage request;
if (messageType == request.getMessageType())
{
::ReplyMessage response;
try
{
reinterpret_cast<const char *> (messageContents), length);
}
catch (const std::exception &e)
{
+ std::to_string(mIdentifier)
+ " failed to deserialize message");
return failureMessage.
clone();
}
mResponses = mResponses + 1
response.setContents("Server " + std::to_string(mIdentifier)
+ " handling message: " + request.getContents());
return response.clone();
}
failureMessage.
setDetails(
"Server " + std::to_string(mIdentifier)
+ " encountered unhandled message type "
+ messageType);
return failureMessage.
clone();
}
[[nodiscard]] int getNumberOfResponses() const
{
return mResponses;
}
private:
int mIdentifier{0};
int mResponses{0};
};
}
void server(int instance)
{
::MessageProcessor messageProcessor(instance);
options.
setCallback(std::bind(&MessageProcessor::process,
&messageProcessor,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
auto replierThread
std::this_thread::sleep_for(std::chrono::seconds {2});
std::cout << "Server instance " << instance << " handled "
<< messageProcessor.getNumberOfResponses()
<< " requests" << std::endl;
if (replierThread.joinable()){replierThread.join();}
}
A ZeroMQ reply for use in the router-dealer.
Definition: reply.hpp:39
void start()
Starts the reply service. The service will poll on messages from the dealer, process the messages wit...
void initialize(const ReplyOptions &options)
Initializes the reply.
void stop()
This will stop the reply service.
Defines the reply socket options.
Definition: replyOptions.hpp:29
void setCallback(const std::function< std::unique_ptr< UMPS::MessageFormats::IMessage >(const std::string &messageType, const void *data, size_t length)> &callback)
The callback function is specified by the user to process messages.
void setAddress(const std::string &address)
The address of the dealer to which the service will connect.
The Client
Similarly, the client behaves very much like a simple request-reply client. It submits a request and waits for a reply. This particular client has been hardened to deal with different types of failure (invalid request messages, server processing errors, and request time-outs). Typically, this logic would be embedded in a custom, application-specific client class.
#include <iostream>
#include <string>
#include <umps/messageFormats/failure.hpp>
#include <umps/messageFormats/messages.hpp>
#include <umps/messaging/routerDealer/request.hpp>
#include <umps/messaging/routerDealer/requestOptions.hpp>
#include <umps/messageFormats/staticUniquePointerCast.hpp>
#include "routerDealer.hpp"
#include "messageTypes.hpp"
void client(int instance)
{
std::unique_ptr<UMPS::MessageFormats::IMessage> replyMessageType
= std::make_unique<::ReplyMessage> ();
std::unique_ptr<UMPS::MessageFormats::IMessage> failureMessageType
= std::make_unique<UMPS::MessageFormats::Failure> ();
messageTypes.
add(replyMessageType);
messageTypes.
add(failureMessageType);
for (int i = 0; i < N_REQUESTS; ++i)
{
::RequestMessage request;
request.setContents("Request from instance "
+ std::to_string(instance));
auto reply = client.
request(request);
if (reply != nullptr)
{
::ReplyMessage replyMessage;
if (reply->getMessageType() == replyMessage.getMessageType())
{
auto replyMessage
<::ReplyMessage> (std::move(reply));
std::cout << "Reply thread " << instance << " received reply: "
<< replyMessage->getContents() << std::endl;
}
else
{
std::cout << "Server reply to client " << instance
<< " encountered problems." << std::endl;
}
}
else
{
std::cout << "Reply to " << instance << " timed out." << std::endl;
}
}
}
A ZeroMQ request for use in the router-dealer combination.
Definition: request.hpp:39
std::unique_ptr< UMPS::MessageFormats::IMessage > request(const MessageFormats::IMessage &request)
Performs a blocking request of from the router.
void initialize(const RequestOptions &options)
Initializes the request.
Defines the request socket options.
Definition: requestOptions.hpp:25
void setMessageFormats(const UMPS::MessageFormats::Messages &messageFormats)
Sets the message types to which to subscriber.
void setAddress(const std::string &address)
Sets the address of the router to which requests will be submitted and from which replies will be rec...
void setReceiveTimeOut(const std::chrono::milliseconds &timeOut) noexcept
std::unique_ptr< TO > static_unique_pointer_cast(std::unique_ptr< FROM > &&old)
Converts a unique pointer of one type to a unique pointer of another type.
Definition: staticUniquePointerCast.hpp:11
The Proxy
The API of the router-dealer proxy behaves very much like the proxy you encountered xPub-xSub section. However, the behind-the-scenes logic of this proxy is very different from the xPub-xSub proxy.
#include <thread>
#include <chrono>
#include <umps/messaging/routerDealer/proxy.hpp>
#include <umps/messaging/routerDealer/proxyOptions.hpp>
using namespace UMPS::Messaging::RouterDealer;
void proxy()
{
ProxyOptions proxyOptions;
proxyOptions.setFrontendAddress("tcp://127.0.0.1:5555");
proxyOptions.setBackendAddress("tcp://127.0.0.1:5556");
proxy.initialize(proxyOptions);
std::this_thread::sleep_for(std::chrono::seconds {3});
proxy.stop();
proxyThread.join();
}
This is the intermediary that allows communication between a client and a backend service.
void start()
Starts the proxy.