Building an Event-Driven Architecture in Clojure (Part 1)
Clojure has two types of publish/subscribe facilities built-in. The first is a watch, quite literally a first-class observer. The second, and the one we'll be using, is built into clojure's core.async library.
If you've ever wanted to build the event-processing startup of your dreams and exit for billions of dollars, this post is for you, or you just want to know how to code one. I didn't know how they worked at my first developer job. We had this events pipeline with all the frills: A highly-available event bus, definitions, consumers, producers, consumer-producers. I always figured the architecture just started like that. Right out of the gate, we're scalling for millions of events per second. Now that I fancy myself as a lean startup person (or whatever), I think a lot in terms of Minimum-Viable-Product (MVP).
You can use this architecture as your blueprint for any event handling service. I think the ideas here will translate well enough. If you're new to event-driven architecture, we decouple receiving events from processing events with one hell-of-a-buffer called an event bus. Consumers consume messages from the event bus, and producers produce messages by putting them on the event bus. We segregate the events by attributes or shape with some kind of publish/subscribe technique (or message broker).
Lucky for us, Clojure has two types of publish/subscribe facilities built-in. The first is a watch, quite literally a first-class observer. The second, channels, and the one we'll be using, is built into clojure's core.async library. It allows us to supply a function to create our publication topic. Let's use this for a stand-in until we hit millions of events and exit for billions with our new event-processing startup.
Enter the Webhooks-as-a-Service
Event driven architecture MVP is relatively straight forward.
If you're a long time reader, you know where this is heading. We're going to sandwich our event-bus in between a few higher-order functions. Not only can we decouple the event-bus from the application and swap it out for something more 'cloud', we can also supply a multimethod and dispatch on the incoming source or outgoing destination. Dispatching like that on a multimethod gives us the potential for having multiple event buses. Like a poor man's sharding, or in reality a way to deprioritize certain events. All from passing a function as an argrument!
Okay, real simple, lets create an event bus with a core.async channel, and we'll spin up our event bus and message broker.
(defn payment-received-consumer
[events-chan]
(a/go-loop [event (a/<! events-chan)]
;; Do something with event
(println "Received payment" event)))
(defn user-unsubscribed-consumer
[events-chan]
(a/go-loop [event (a/<! events-chan)]
;; Do something with event
(println "User unsubscribed" event)))
(defn ->event-deserializer
[publish-fn]
(fn event-deserializer
[event-chan]
(a/go-loop [event (a/<! event-chan)]
(try
(let [parsed-event (json/parse-string event)]
(if (valid-event? parsed-event)
(publish-fn parsed-event)
(log/warn "Dropping unsigned event: " event)))
(catch Exception e
(log/error "Internal error processing event: "
event
(.getMessage e))))
(recur (a/<! event-chan)))))
(defn setup-consumer
[consumer-fn publication topic]
(let [consumer-chan (a/chan)]
(a/sub publication topic consumer-chan)
(consumer-fn consumer-chan)))
(def event-bus (a/chan 2048))
(def publication (a/pub event-bus :event-type))
(def payments-consumer (setup-consumer payment-received-consumer
publication
:payment))
(def user-consumer (setup-consumer user-unsubscribed-consumer
publication
:unsubscribed))
(def serializing-consumer-producer (setup-consumer (->event-deserializer publish-event)
publication
nil))
core.async/pub
creates a 'publication' on channel event-bus
and a topic function, :event-type
(taking advantage of the iFn interface). For now we'll assume the shape of our events are all the same, and we'll only receive events that tell us what they are via the :event-type
keyword.
We also have a topic nil
. We'll use this topic for a consumer-producer that de-serializes JSON to EDN for us. If you're wondering why on earth we'd want to do that, most streaming tech (eg AWS kinesis) passes JSON strings through to our consumers. We could also serialize JSON to EDN-encoded strings and put them back on the bus. This will also give us the ability to respond to requests faster since we don't have to process the body of the request (yet); we'll return HTTP status code 202 whenever we receive an event.
Of course, just putting raw events on the event bus, and de-serializing them back onto the event bus will create back pressure. There are techniques for dealing with back pressure I won't go into here. To manage initial back pressure, I will say we should supply a buffer to our event-bus
channel. Channels created without one create an unbuffered channel where the value will only take if there is a put waiting. Without a buffer, back pressure on the default buffer of put!
and take!
will rise too quickly depending on our context. We can take back pressure on our buffered channel to be a good sign though: We can start burning captial on AWS infrastructure, signalling our soon-to-be billion dollar exit.
Consumers, Producers, and Consumer-Producers
You'll notice in the setup-consumer
adapter contains an internal unbuffered channel, consumer-chan
, passed to consumer-fn
. consumer-fn
exibits our consumer interface, a function that takes a channel. This makes for nice core.async programming in our consumers regardless of what the event bus looks like. If we wanted to get super fancy, we could store descriptions of the event data ("Event Definitions") in a database to further decouple our consumers from the events they handle.
On the other hand, producers are pretty simple:
(defn publish-event
[event]
(a/go (a/>! event-bus event)))
(defn ->event-handler
[publish-fn]
(fn [req]
(let [{:keys [body]} req]
(publish-fn body)
{:status 202})))
The ->event-handler
constructor creates a request handler for our system boundary, closing over publish-event
. Any consumers that want to produce events, like event-deserializer
, can close over publish-event
, or just call it directly.
Caveats
We can't talk about event-driven anything without talking about eventual consistency. By introducing an event bus, or even an event buffer, we've decoupled receiving events from processing events. We now have a lead time between event generation, and it's intended effect on the system. Typically, it's a bad fit for certain types of real-time or responsive applications unless the lead time can be well managed into the milliseconds. A challenging feat without sufficient software engineering resources.
Conclusion?
And that's pretty much it for our MVP. Add consumers as needed. You can find the full source code on Github. Since I was dragging my heels on this post, I decided to break it up into a two (or three) part series. In part two I'll separate out the event bus into AWS Kinesis and the consumers into AWS lambdas with Holy Lambda. Remember, billions.