Messaging Using Clojure and ZeroMQ

Messaging has multiple uses in system design. It can aid communication between application components, enable asynchronous processing, and form the basis for a distributed architecture.

My last project involved receiving financial data in multiple formats, transforming it into a single canonical form, analyzing it for patterns, and forwarding findings to a web app. This mapped naturally to a pipeline design.

By implementing each step as its own subsystem and having them all communicate through a messaging layer, a number of benefits were achieved. New formats were easy to support. Scaling was a matter of adding more processes and machines. Any application which needed transformed data, such as a dashboard, could get it simply by subscribing to the right component.

This messaging was implemented using ZeroMQ.

In this post, I’ll give you a quick introduction to ZeroMQ through Clojure and help you get up and running with it. By the end, I hope to show you how simple it can be to add basic messaging capabilities to your own projects.

Tutorial Overview
What we’ll cover

What we won’t cover:

  • High-level patterns. These patterns address reliability, persistence, slow joiners, and load-balancing among others. Please see further reading for more information.
  • Advanced features: the High Water Mark, message envelopes, etc.
  • The different varieties of transport outside of TCP.

ZeroMQ Overview
ZeroMQ is an embeddable networking library. It gives you an enhanced socket which encapsulates and handles much of the complexity that typically comes with connecting programs. Specifically, these sockets:

  • Handle I/O asynchronously.
  • Queue messages automatically.
  • Enable components to come and go dynamically. ZeroMQ handles automatically reconnecting.
  • Don’t impose a message format.

One of ZeroMQ’s goal is to make building distributed software akin to Lego. In that vein, connecting two application components is simply a matter of instantiating sockets on both ends with a configuration of your choice, and letting ZeroMQ handle the rest. Much like Clojure, creating a ZeroMQ-based topology means starting with basic communication patterns, and then composing them into higher-level systems suited to your goal.

ZeroMQ Sockets vs TCP Sockets
ZeroMQ sockets exhibit a few differences from typical TCP sockets:

  • ZeroMQ sockets carry messages rather than a stream of bytes. A ZeroMQ message is length-specified binary data.
  • ZeroMQ I/O is automatically handled in a background thread, where output is enqueued and input is dequeued from local queues as your application goes about its business.
  • ZeroMQ sockets come in a variety of types, a few of which have 1-N routing built-in.

In addition, ZeroMQ messages can go across multiple transports (inproc, ipc, tcp, pgm, epgm), bound endpoints accept connections automatically, and rather than working directly with connections (which are encapsulated under the socket), you work with the socket instead.

ZeroMQ vs Other Messaging Tools
From a user’s perspective, the biggest difference between ZeroMQ and other popular messaging solutions (such as RabbitMQ) is the lack of an external component to provide easy routing, persistence, and load-balancing; features which RabbitMQ’s broker architecture lets you get quite easily.

In contrast, ZeroMQ is designed for high-throughput/low-latency applications. It works at a lower-level as a socket library and is designed to be embedded in your application. Its brokerless nature is also the origin of ‘Zero’ in the library’s name. Advanced messaging schemes are then built by combining pieces of the framework.

Setup
ZeroMQ is written in C++ and offers bindings for a number of languages, including Java through JNI. In the past, I used the Java bindings (JZMQ) but there is now a native Java implementation called JeroMQ. For simplicity, that’s what we’ll use here today. The examples we’ll cover are simple enough that we’ll use Java interop rather than wrap the calls in Clojure functions.

Start by creating a new Clojure project called clj-zmq. I use Leiningen:

Next, open project.clj in your project’s root folder and add these to your dependencies:

Cheshire is a JSON encoding library which we’ll use in our examples.

Set main to clj-zmq.core:

This is what my project.clj file looks like:

Save the file, switch to your project’s root directory, and run:

That should set your dependencies.

Open core.clj and set up the namespace:

Contexts
The first thing we need to do is create a context, which is what we’ll use to create our sockets. A context is a container for all our sockets. In general, you should create one context per process. Creating multiple contexts in a single process is akin to creating separate ZeroMQ instances.

Continuing in core.clj, let’s create a context:

The argument in the context creation call above is the number of threads to use. The ZeroMQ FAQ recommends one thread per gigabyte of data in or out per second. Now that we have our context, we can go over sockets before moving on to our examples.

Sockets
When using ZeroMQ, your application communicates by dealing with the socket, rather than the connection the socket handles. The socket, depending on its type, encapsulates policies for inward and outward routing, queuing, and what other types of sockets it can connect to. These relationships form messaging patterns, which give rise to ZeroMQ’s power and ability to layer additional patterns. The three basic socket pairings are:

  • REQ/REP [Request-Reply]
  • PUB/SUB [Publish-Subscribe]
  • PUSH/PULL [Pipeline]

To connect two sockets, you perform a bind on one and a connect on the other. In general, the socket with the known network address (such as the server) should bind, and the client(s) with unknown addresses should connect. Let’s see this in action with our first core ZeroMQ pattern.

The Request-Reply Pattern
For the request-reply pattern, we’ll create a simple echo server for our client to connect to. Let’s take a look at the code and go over the salient lines:

Here’s what’s happening:

  • On line 3, we use our context to create a REP socket.
  • On line 4, we bind our socket describing the transport mechanism (tcp), the IP, and the port number
  • On line 5, we wait for input using our socket’s recv command.
  • On line 6, upon receiving a message from the client, we send it right back using our socket’s send command

All in all, pretty straightforward.

One thing to note: ZeroMQ doesn’t know anything about the data we send except its size in bytes. That means we’re responsible for formatting it safely.

In our examples, we’ll be sending strings back and forth. JeroMQ has an overloaded send command which, when called only with a single argument, assumes the data is a string and formats it accordingly for us. For other types of data, we would need to take additional care and perhaps use specialized libraries such as protocol buffers.

Let’s move on to our client:

This code looks similar to our server with three differences:

  • On line 3, we create a REQ socket.
  • On line 4, we use connect rather than bind.
  • On line 7, we close our socket once the server replies.

Let’s give it a try. Start your REPL (you should be in the clj-zmq.core namespace). In our examples, rather than starting multiple REPLs, we’ll run certain servers in separate threads. Start the echo server with the following command:

And now send a message to it using the client:

You should get back the following response:

This is the request-reply pattern, probably the simplest way to use ZeroMQ. The client issues a send command and then a receive command. Any other action such as sending multiple messages in a row will result in a -1 return code.

The Publish-Subscribe Pattern
The second pattern we’ll look at is used for one-way data distribution. For our example, we’ll create a server which publishes random stock trades and a client which subscribes to it. Here’s the server:

By now, the code probably looks familiar to you. Once again, we use the context to create our PUB socket. We then bind it to port 6666 and begin generating market data events. c/generate-string is a Cheshire command to turn a Clojure map into a JSON string. When no clients are connected, the server silently drops its messages.

Here’s the client:

Our client connects to the publisher, processes a certain number of market events, and ends. Notice that in addition to creating the SUB socket and connecting it, we issue a subscribe command on Line 4. In this case, issuing a subscribe command with an empty string means we wish to receive everything from the publisher (your client can specify filters on what to receive). Not issuing this command is a common oversight. If you use this pattern and find yourself not receiving anything, check for this.

The c/parse-string command on line 7 converts a JSON string into a Clojure map.

Reload your REPL and give it a try. Start the publisher in a separate thread:

Now listen to 100 market data events:

You should get output similar to the following:

Pub-Sub is a one-way, asynchronous line from the server to the client. An error results if either the server tries to receive or the client tries to send.

One issue this example doesn’t address is slow joiner syndrome. When your client initiates a connection, it takes time to step through the connection routine. This may take milliseconds, but during that time, the publisher may have already sent a few hundred or thousand messages which will be missed by the client. This may not matter if the publisher sends data endlessly and the client doesn’t care what transpired before. But if you want to ensure the client is well and ready before the publisher begins sending, then there’s an advanced pattern in the documentation for reliable pub-sub.

The Pipeline Pattern
Our final example covers the most involved of the three core patterns. The pipeline pattern is suited for task distribution and collection. For our example, we’ll set up a dispatcher which generates tasks and sends them to workers in a round-robin fashion. These workers will be connected upstream to the dispatcher and downstream to a collector. The collector will receive output from each worker as they complete.

Here’s the code for the dispatcher:

Similar to the previous examples, we create a PUSH socket and bind it to port 7777. But we add an additional step where we wait for one second before dispatching. Why?

Originally, I had the dispatcher prompt the user to hit enter when ready.The reason for this is because we want to ensure all the workers are connected before beginning. This way, the first worker to connect won’t be inundated with an outsized set of tasks. Nevertheless, workers are still free to come and go as they please and the dispatcher will automatically adjust the routing accordingly.

But because certain versions of nREPL have a bug where the read-line command causes an I/O exception in the REPL, I chose to go with this arbitrary timer instead for this example to avoid confusion. For a production scenario, the ZeroMQ guide covers more sophisticated means to signal when dispatch is ready.

Here’s the worker:

The worker has two sockets, one pulling from the dispatcher and another pushing to the collector (on a different port). Each worker has a unique ID and simulates performing a task for a random duration. It then forwards information about itself, the job, and how long it took to the collector.

Here’s the code for the collector:

The collector offers an endpoint where it pulls completed tasks from all the workers in a fair-queuing fashion. Let’s give it a try. Reload your REPL, and start your collector:

Then start three workers:

Finally, dispatch 100 jobs:

You should see output similar to the following:

As you can see, all three workers are processing tasks in parallel and completing them in variable time.

Here’s a complete listing for core.clj:

Further Reading
ZeroMQ removes a lot of complexity from creating high-performance, scalable, distributed architectures, and I hope I’ve shown you a glimmer of it in this tutorial. If you want to learn more about advanced patterns, implementing persistence, and other ways to work with ZeroMQ, the first stop would be the excellent ZeroMQ Guide. Much of it is also reflected in a recent (2013) O’Reilly book.

JeroMQ is a fairly new project, so if you prefer using the C++ version with Java bindings, this is a good installation guide.

The code for this tutorial is available on github.

If you have any questions, thoughts, suggestions, or corrections, please leave a comment or email me.

1 Comment Messaging Using Clojure and ZeroMQ

  1. Matthew

    For the pipeline pattern is it possible to make it more data driven i.e. use some kind of data file to specify the socket port numbers 7777 and 8888 rather than code them directly in the source code? If you have a pipeline processing application with 20+ processes running, maintaining all the socket port numbers will become quite “challenging” without using the data file.

    More info/examples on pipeline processing (some call it flow based programming) can be found in the following sites:

    http://www.jpaulmorrison.com/fbp/
    https://plus.google.com/communities/109985462312550244212
    http://www.nodally.com/

Comments are closed.