Using ZeroMQ sockets in your Perl application.

ØMQ Sockets and Perl

ØMQ finally clicked for me while I was building out an API service and service clients for all of our mail systems. When I started the project, and I was just beginning to use ØMQ, I had a lot of questions on how to use it best, and what type of patterns I could use, or should use, when working with these sockets.

While part of deciding on ØMQ was delving into everything it could do, the larger focus was what it did well. There will always be project-specific constraints that guide choices, so it’s never possible to say that any one project is the answer. But, ØMQ was able to provide a very flexible, and generalized solution – where all I needed was socket communication, it gave me a better socket.

Why ØMQ?

I originally decided on ØMQ because this project required something more flexible than a vanilla job queue. It could act like a message queue, but still provide streaming sockets. It supported multiple languages, and the socket patterns for building concurrent applications went far beyond what redundancy I could accomplish with a RESTful API.

ØMQ gave me exactly what I needed: it provided an advanced socket interface with the features of a message queue. ØMQ takes your typical BSD or POSIX sockets, and builds on top of them, providing one that caches, queues messages, and is connection-aware. All this boilerplate is abstracted with ØMQ, including load balancing and pub/sub patterns for concurrency. It gives you a socket that is minimalist as much it is powerful.

There is much more to ØMQ than I could describe here. For anyone looking to create sockets with ØMQ, the guide is an excellent, thorough resource. If you are deciding if ØMQ is a fit for your application, here are some more specific parts of the guide you should look at:

  • Multipart messages - Multipart message is an important concept for ØMQ handle large messages very well, at the cost of latency. It would be a bad fit for any sockets where latency was a sizable concern.
  • Using push/pull sockets - The push/pull socket pair provides streaming sockets, useful when sending data one-way.
  • Handling multiple sockets - Polling sockets in ØMQ is similar to using the select function, which can’t be used on a ZMQ::Socket.
  • Request/reply mechanisms - ØMQ routes messages by adding frames through router and dealer socket types.

ØMQ and Perl

For starters, the best distribution to use when working in Perl is ZMQ, not the now defunct ZeroMQ or AnyEvent::ZeroMQ. Both version 2 and 3 of ØMQ developments headers can be used, through the package ZMQ::LibZMQ2 and ZMQ::LibZMQ3, respectively. The ØMQ development packages must be installed on your system for these packages to work – most modern distributions still use version 2. ZMQ::LibZMQ* exposes the functions that nearly resemble the ØMQ C functions, with some changes for type checking. The full library is exposed, which is good, but it’s exposed as a Perl-ish C interface, but that’s fine.

This bit might be necessary, when ZMQ defaults to version 3, this forces version 2. This will use version 2, which you’ll find in most Linux distros still.

BEGIN {
    $ENV{ PERL_ZMQ_BACKEND } = 'ZMQ::LibZMQ2';
}

Here is what a full example would look like. There is a sender piece and a listener piece operating in a request/reply pattern. This is what an API implementation would look like to start – the sending socket sends a message and waits for a response.

ØMQ Usage and Patterns

Because sent messages are only strings, serialization of the packets must happen on send and receive. Working between Python and Perl, the Python package zmq includes a wrapper for JSON serialization, which can also be done in Perl’s ZMQ.

use ZMQ;
use ZMQ::Constants qw/ZMQ_REQ/;
use ZMQ::Serializer;

ZMQ::Serializer::register_read_type(json => \&JSON::decode_json);
ZMQ::Serializer::register_write_type(json => \&JSON::encode_json);

my $ctx = ZMQ::Context->new(8);
my $socket = $ctx->socket(ZMQ_REQ);
$socket->connect("tcp://localhost:8888");

$socket->sendmsg_as(json => {
    command => 'do_foobar',
    foobar => 'foobar',
    something => JSON::true
});

This is a slightly naive approach, assuming everything coming over the wire is valid JSON. The registered handlers for the serializer could also wrap the message decoding in an eval or try block at very least.

Socket reliability is an important piece that isn’t built into the sockets. While waiting for messages to arrive, especially waiting for a push or pull stream to end or hang up, the socket should be polled for new messages, instead of blocking on the receiving socket. Blocking here would tie up the receiving end, hanging indefinitely should the sending side break transmission or crash.

my $tries = 5;
while (1) {
    my $poller = zmq_poll([
        {
            socket => $socket->{_socket},
            events => ZMQ_POLLIN,
            callback => sub {
                my $data = $socket->recvmsg_as('json');
                print $data->{foobar};
            }
        }
    ], (5000000));

    if ($poller == 0) {
        last
          if ($tries-- == 0);

        # Disconnect, reconnect
        $socket->setsockopt(ZMQ_LINGER, 0);
        $socket->close();
        $socket = $ctx->socket(ZMQ_PULL);
        $socket->connect($host);
    }
}

The zmq_poll function takes an array of socket configurations to poll on and a timeout in microseconds. Individual sockets are configured with either a file descriptor – using fd instead of socket in the hash – or a socket and a callback. The socket part here is pointing to $socket->{_socket} because zmq_poll expects a ZMQ::LibZMQ2::Socket instance, not ZMQ::Socket.

Also important is the disconnect and reconnect code. On a timeout, zmq_poll will return 0, signaling either a chance to hangup and quit, or reconnect. The beauty of ØMQ here is that if a network event caused a timeout, the sending side has simply queued up messages. On a reconnect the receiving side will start pulling those messages again, the socket context instance – $ctx – hasn’t changed.

The hangup bit is important here. Setting the ZMQ_LINGER option to 0 seconds is required to disregard any pending connections or sends and allow the socket’s context instance to be destroyed. If not, the underlying call to zmq_ctx_destroy will wait here forever when cleaning up or going out of scope.

When building an application that will scale out, it’s important to scale ahead of time. An important consideration here is service discovery. An example of what wouldn’t scale easily is a star topology – subscribers simply given a common publisher socket to connect to. This doesn’t scale beacuse it doesn’t allow for multiple publishers.

To simplify service discovery, a broker should be used early on in your application development. In the case of a pub/sub pattern, instead of using bound publisher sockets, both publishers and subscribers would connect to the broker, which is bound and listening for both connections. The broker would act as a simple switch in this case.

../../_images/fig13.png
use ZMQ::LibZMQ2;
use ZMQ::Constants qw(ZMQ_DEALER ZMQ_ROUTER ZMQ_QUEUE);

my $context = zmq_init();

# Socket facing clients
my $frontend = zmq_socket($context, ZMQ_ROUTER);
zmq_bind($frontend, 'tcp://*:5559');

# Socket facing services
my $backend = zmq_socket($context, ZMQ_DEALER);
zmq_bind($backend, 'tcp://*:5560');

# Routing device connecting both sockets
zmq_device(ZMQ_QUEUE, $frontend, $backend);

Hopefully this all offers some direction. ØMQ has worked beautifully in our project, but I would definitely suggest reading through the guide and looking at a few projects that use ØMQ to see if it would be the best fit for your project.