Event Stream Processing Using Clojure and Esper

I work at a stock exchange. Last winter, we were hit with new requirements from regulators. In short, we needed to step up our market surveillance. We had to start monitoring our order activity in real-time and flag any patterns which are considered potentially abusive.

To address this, I built a system using Clojure (backend) and Rails (frontend). Part of the backend leveraged Esper, an open-source event stream processing engine.

In this post, I’ll show you how to use Esper through Clojure to process event streams in real-time or near real-time to generate statistics and detect events and patterns.

What’s Esper?
Esper is an event stream processing engine with a DSL called Event Processing Language (EPL). Using the EPL, you write statements combining windows, aggregation rules, rate limiters, joins, and more. Streaming data is then run through these statements. When a statement’s conditions are matched, a listener you supply is called with the event(s) discovered.

Tutorial Overview
What we’ll cover

What we won’t cover:

  • The details of EPL (There’s 700+ pages of documentation on this)

We’ll learn by writing a simple wrapper around Esper which we’ll then use to generate the examples.

Project Setup
Start by creating a new Clojure project called clj-esp. I use Leiningen:

Next, open project.clj in your project’s root folder and add this to your dependencies:

and set main to clj-esp.core:

This is what my project.clj file looks like:

Save it off, switch to your project’s root directory, and run:

That should set everything in place.

Event Registration
The first step is to describe and register each type of event we intend to send into Esper. These event types can be described in multiple forms. For this tutorial, we’ll describe event types as Clojure maps, with each key being an event attribute and its value being the attribute’s data type. For example:

Esper requires maps to have string keys and values to be Java types, so we’ll need a function to translate those keywords in the map to the correct type. This is where we’ll begin writing our wrapper.

Open a new file and save it as esper.clj in ./src/clj_esp. In the namespace declaration, import the required Esper classes:

Here’s a quick description of each class:

  • Configuration: Allows you to specify properties of the Esper service. This is also where we register our events.
  • UpdateListener: This is an interface with a single method called update which we implement. Esper will report to us through this callback handler.
  • EPServiceProviderManager: This takes a configuration(optional) and returns an Esper service.

Next, we’ll create a mapping between our data type keywords and actual data types:

Then a function to transform our simple description into something Esper can use:

Calling the function above with a name for the event type (string) and the map describing the event type:

We get:

Now we’ll write a function to generate an Esper configuration given a variable number of event types:

To add an event type, we call .addEventType on the configuration object and supply it with the event type name and it’s attributes map.

Finally, we need a function to create an Esper service:

The function above takes a string for the service’s name (whatever you want) and a configuration object. It returns an Esper service provider.

Statement Creation
Esper statements are strings in an SQL-like syntax. For example, if you were listening to stock order events and you wanted the average order size on IBM over a sliding window of 30 seconds, the statement would be something like this:

We’ll see more of this in the examples. For now, we need a function to create the statement within Esper and another to destroy it:

The function above takes an Esper service provider along with your statement, and creates the EPL equivalent within Esper.

The function to destroy a statement is straight-forward:

Destroying a statement releases all the resources and listeners associated with it.

Attaching Listeners to Statements
When an Esper statement detects an event or a set of events matching conditions we’re interested in, it lets us know by invoking listeners we create and attach to the statement. Listeners are invoked when new events arrive or when old events are removed (eg. when an event exits a sliding window). A listener takes two parameters: a list of new events, and a list of old events.

A listener is created by implementing the UpdateListener interface. Specifically, its single update method. To help create listeners, we’ll write a function which takes a callback function, and implements update using proxy.

Next, we’ll need functions to add listeners to statements and remove them as needed.

And that’s our barebones Esper wrapper. Here’s the complete listing for esper.clj:

Detecting a Single Event
For our examples, we’ll use a basic form of market data we’ll generate and send into Esper. Open core.clj and pull in esper.clj in the namespace declaration:

Next, we’ll create a market data event type using the new-event function:

To help generate market data events, we’ll constrain the possible sides and symbols each event can have:

Now that we have our event type, we’ll create a configuration with it along with an Esper service in one step:

We’ll need a function to send market data events into our Esper service. That’s simply a matter of getting an interface to Esper’s runtime services, and sending it our event along with the name we gave to the event’s type.

The last step before we get to our example is to create a demo function which generates a statement, attaches a listener to it, sends a set amount of order data into Esper, and finally destroys the statement:

Now we’re ready to implement our first example where we detect a single event. Suppose we wanted to detect every order event where the order size was over 1000 shares. Our statement would be pretty straight-forward:

For our listener, recall that Esper sends lists. Once we get that list, we can do whatever we want. In this case, we are interested in every order event with an order size over 1000 shares, so Esper will send a list of one event each time a matching order event goes through. We’ll pull the information we want out of that event and print it:

Let’s give it a try. Start the REPL (you should be in the clj-esp.core namespace) and run the following:

This will generate 100 orders and send each to Esper for processing. Your listener should be invoked each time an order of over 1000 shares is detected, and you should see output similar to this:

Generating Statistics
For our next example, we’ll calculate the average order value for each symbol over a five second moving window, and output the last calculation every second. As in our previous example, we’ll need to create the statement and the listener we’ll attach to the statement.

Here’s the statement:

Every second, our listener will be invoked with a list containing stats for each symbol, so this time we’ll need to go over the list to process each symbol:

Reload the namespace and try it out:

You should get output each second similar to the following:

Detecting a Pattern
For our last example, we’ll detect a pattern composed of multiple events. Specifically, each time there are three bids at successively higher prices on the same symbol, we want to know the prices. The statement for this is a bit more involved than before but still fairly clear:

Notice how each event in the sequence is assigned to a label (e1, e2, e3). When Esper invokes our listener, it will return a map with these labels as keys, with each corresponding to an order event. Our listener looks like this:

Try it in the REPL:

You should see output similar to the following:

There you have it. Here’s a complete listing for core.clj:

Further Reading
There’s a lot more to Esper than this simple tutorial. I recommend visiting their website and taking a look at their tutorial, quick start, and documentation.

The code for this tutorial is available on github.

If you have any questions, thoughts, suggestions, or corrections, please leave a comment or email me.

1 Comment Event Stream Processing Using Clojure and Esper

  1. Jae-Jun Hwang

    Hi, Nitin:

    Thank you for your great article on Clojure and ZeroMQ and Clojure and Esper.

    I was looking for easy command line tool to test Esper and I found your blog and the tutorial where I start extending your code to do a little more, and ended up creating my own simple Esper command line tool.

    Although I am newbie on Clojure, ZeroMQ, and Esper, your code enlightened me to understand the key

    I would like to share my work built on your example with readers of your blog. I drop in my first, naive version in http://github.com/jaejunh

    Again I would like to thank you for great work and
    looking forward to reading your next article.


Comments are closed.