Clojure Transducers: Your Composable Data Pipelines
What is a transducer anyways?
Specifically, from the documentation 'a transducer is a transformation from one reducing function to another'. We can think of a transducer as a context-independent transformation composed of, say, many reducers (like map
and filter
). If we think of functional programming as a series of transformations on data, we'd naturally want to decouple the transformations from the contexts they're used in. Decoupling our transformations gives us better building blocks for functional programs.
Independence means composition
If transducers are independent of their context, we can also compose transducers into larger transducers instead of only composing reducers. Let's look at an example.
;; Consider a transducer that filters out deleted records and
;; packages them into a format that reagent likes to consume.
(def active-indexed-xf
(comp (filter #(false? (:deleted %)))
(map-indexed (fn [idx item] (merge item {:idx idx})))))
(defn list-orders
"Returns all active orders in a format that Reagent components
like. Expects pagination query params `page` and `limit` to be
present in `req`."
[req]
(let [results (get-all-orders)
{:keys [page limit year]
:or {page 1 limit 25}} (:query-params req)
page (dec page)
xfv (cond-> [active-indexed-xf]
;; For each optional query param we can just
;; keep conj'ing different transducers to the vector to
;; further refine the result for our users.
year (conj (filter #(= (:year %) year)))
;; month (conj (filter #(= (:month %) month)))
(and (not (neg? page)) (pos? limit)) (conj (drop (* page limit))
(take limit)))
xf (apply comp xfv)]
;; Could also use (apply eduction (conj xfv results))
;; or (sequence xf results)
(into [] xf results)))
Here we can see the classic way of creating a transducer, by using comp
to compose many reducers into a single transform, active-indexed-xf
. When the list-orders
handle is called, our transducer can be comp
'd with other functions that return transducers as well, allowing our API return a list of orders according to the query parameters provided.
We understand how transducers can be composed, but how is it transducers be independent of their context? Well, like most things in Clojure: the higher-order function.
Transducer as a type
As some of you may know, I have a little thing for types. Anything conforming to an interface is a type, and an interface is a bag of functions, or even just one function that has multiple arity, and returns something specific for each. Looking at the code for reducers returning transducers like map
, we can see the transducer interface in the 1-arty map
function.
;; Copyright Rich Hickey, used under license.
;; map from clojure.core
(defn map
"Returns a lazy sequence consisting of the result of applying f to
the set of first items of each coll, followed by applying f to the
set of second items in each coll, until any one of the colls is
exhausted. Any remaining items in other colls are ignored. Function
f should accept number-of-colls arguments. Returns a transducer when
no collection is provided."
{:added "1.0"
:static true}
([f]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(rf result (f input)))
([result input & inputs]
(rf result (apply f input inputs))))))
([f coll]
(lazy-seq
(when-let [s (seq coll)]
(if (chunked-seq? s)
(let [c (chunk-first s)
size (int (count c))
b (chunk-buffer size)]
(dotimes [i size]
(chunk-append b (f (.nth c i))))
(chunk-cons (chunk b) (map f (chunk-rest s))))
(cons (f (first s)) (map f (rest s)))))))
([f c1 c2]
(lazy-seq
(let [s1 (seq c1) s2 (seq c2)]
(when (and s1 s2)
(cons (f (first s1) (first s2))
(map f (rest s1) (rest s2)))))))
([f c1 c2 c3]
(lazy-seq
(let [s1 (seq c1) s2 (seq c2) s3 (seq c3)]
(when (and s1 s2 s3)
(cons (f (first s1) (first s2) (first s3))
(map f (rest s1) (rest s2) (rest s3)))))))
([f c1 c2 c3 & colls]
(let [step (fn step [cs]
(lazy-seq
(let [ss (map seq cs)]
(when (every? identity ss)
(cons (map first ss) (step (map rest ss)))))))]
(map #(apply f %) (step (conj colls c3 c2 c1))))))
Looking the 1-arity call to filter, we can see something similar.
;; Copyright Rich Hickey, used under license.
;; filter from clojure.core
(defn filter
"Returns a lazy sequence of the items in coll for which
(pred item) returns logical true. pred must be free of side-effects.
Returns a transducer when no collection is provided."
{:added "1.0"
:static true}
([pred]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(if (pred input)
(rf result input)
result)))))
([pred coll]
(lazy-seq
(when-let [s (seq coll)]
(if (chunked-seq? s)
(let [c (chunk-first s)
size (count c)
b (chunk-buffer size)]
(dotimes [i size]
(let [v (.nth c i)]
(when (pred v)
(chunk-append b v))))
(chunk-cons (chunk b) (filter pred (chunk-rest s))))
(let [f (first s) r (rest s)]
(if (pred f)
(cons f (filter pred r))
(filter pred r))))))))
Now for the last piece of the puzzle, look at how comp
composes functions.
;; Copyright Rich Hickey, used under license.
;; comp from clojure.core
(defn comp
"Takes a set of functions and returns a fn that is the composition
of those fns. The returned fn takes a variable number of args,
applies the rightmost of fns to the args, the next
fn (right-to-left) to the result, etc."
{:added "1.0"
:static true}
([] identity)
([f] f)
([f g]
(fn
([] (f (g)))
([x] (f (g x)))
([x y] (f (g x y)))
([x y z] (f (g x y z)))
([x y z & args] (f (apply g x y z args)))))
([f g & fs]
(reduce1 comp (list* f g fs))))
I'm sure some of you have figured out that calling comp
on transducers will give us nested fn
calls.
;; Doing this
(comp (filter even?)
(map inc))
;; Gives us something like this:
;; rf is reducing function here
;; if we passed this xf to transduce with `+`, + is our reducing fn
;; rf from filter, is now bound to the function returned by 1-arty map
(fn [rf]
(fn
([] (;; rf from map. I've sustituted the rf from filter for what
;; it is bound to, the function from map.
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(rf result (f input)))
([result input & inputs]
(rf result (apply f input inputs)))))))
([result] ((fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(rf result (f input)))
([result input & inputs]
(rf result (apply f input inputs))))) result))
([result input]
(if (pred input)
((fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(rf result (f input)))
([result input & inputs]
(rf result (apply f input inputs))))) result input)
result))))
Passing a transducer to another transducer "stacks" the operation on top of one another, revealing the interface for transducers. A transducer is a function that takes a reducing function ( rf
) and returns a closure over it with three arities.
- Init, or arity zero, calls the init arity on the rest of the stack.
- Step, or arity two, calls the reducing function on two values, like a single step of
reduce
. - Completion, or arity one, Cleans up stateful transducers and returns a value. It must call the reducing function completion once.
If even?
is true, then the value will be incremented in our example before being passed to the reducing function +
in transduce.
(def xf (comp (filter even?) (map inc)))
(transduce xf + (range 100))
In a sense transducers 'parallelize' multiple transformations from stacking them on top of one another.
Gotta Go Fast
For most, developers often think of the parallelization as a forgone conclusion. I don't blame them. Every Computer Science faculty in the world (I hope) teaches about computational complexity analysis of algorithms like filter
and map
. They're both average time complexity of O(n), or linear time. Meaning, the algorithms time to run will be a linear function of it's input. So, according to Big-O Notation, even if our input is 5n, our time complexity will still be on the order of O(n).
Unfortunately for us working outside of the classroom, we have cloud bills to worry about. If we're working with the massive input of a collection with, say, five million records, we might not have the computational luxury of naively threading ( ->>
) through a bunch of reducers. Even though crunching one million records and five million records have the same time complexity for clojure.core's reducers, in reality the latter has five times more instructions to execute sequentially with threading macros. It would be far more efficient if we just parallelize them using transducers!
Let's do a benchmark with Criterium to confirm.
Quite a reduction in execution time when using transducers on large input! You won't see that if the input was (range 10)
and (range 50)
though, there's just not enough to execute sequentially to see a worthy jump in performance.
Conclusion
In my opinion, transducers are an underrated feature in Clojure. Not only do we get functional programming building blocks for little cost, we get a signifigant boost in performance when working with large datasets.