Clojure Transducers: Your Composable Data Pipelines

Clojure Transducers: Your Composable Data Pipelines
Photo by Christophe Dion / Unsplash

What is a transducer anyways?

Specifically, from the documentation 'a transducer is a transformation from one reducing function to another'. We can think of a transducer as a context-independent transformation composed of, say, many reducers (like map and filter). If we think of functional programming as a series of transformations on data, we'd naturally want to decouple the transformations from the contexts they're used in. Decoupling our transformations gives us better building blocks for functional programs.

Independence means composition

If transducers are independent of their context, we can also compose transducers into larger transducers instead of only composing reducers. Let's look at an example.

;; Consider a transducer that filters out deleted records and
;; packages them into a format that reagent likes to consume.
(def active-indexed-xf
  (comp (filter #(false? (:deleted %)))
        (map-indexed (fn [idx item] (merge item {:idx idx})))))

(defn list-orders
  "Returns all active orders in a format that Reagent components
  like. Expects pagination query params `page` and `limit` to be
  present in `req`."
  [req]
  (let [results (get-all-orders)
        {:keys [page limit year]
         :or {page 1 limit 25}} (:query-params req)
        page (dec page)
        xfv (cond-> [active-indexed-xf]
              ;; For each optional query param we can just
              ;; keep conj'ing different transducers to the vector to
              ;; further refine the result for our users.
              year (conj (filter #(= (:year %) year)))
              ;; month (conj (filter #(= (:month %) month)))
              (and (not (neg? page)) (pos? limit)) (conj (drop (* page limit))
                                                         (take limit)))
        xf (apply comp xfv)]
    ;; Could also use (apply eduction (conj xfv results))
    ;; or (sequence xf results)
    (into [] xf results)))

Here we can see the classic way of creating a transducer, by using comp to compose many reducers into a single transform, active-indexed-xf. When the list-orders handle is called, our transducer can be comp 'd with other functions that return transducers as well, allowing our API return a list of orders according to the query parameters provided.

We understand how transducers can be composed, but how is it transducers be independent of their context? Well, like most things in Clojure: the higher-order function.

Transducer as a type

As some of you may know, I have a little thing for types. Anything conforming to an interface is a type, and an interface is a bag of functions, or even just one function that has multiple arity, and returns something specific for each. Looking at the code for reducers returning transducers like map, we can see the transducer interface in the 1-arty map function.

;; Copyright Rich Hickey, used under license.
;; map from clojure.core 
(defn map
  "Returns a lazy sequence consisting of the result of applying f to
  the set of first items of each coll, followed by applying f to the
  set of second items in each coll, until any one of the colls is
  exhausted.  Any remaining items in other colls are ignored. Function
  f should accept number-of-colls arguments. Returns a transducer when
  no collection is provided."
  {:added "1.0"
   :static true}
  ([f]
    (fn [rf]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
           (rf result (f input)))
        ([result input & inputs]
           (rf result (apply f input inputs))))))
  ([f coll]
   (lazy-seq
    (when-let [s (seq coll)]
      (if (chunked-seq? s)
        (let [c (chunk-first s)
              size (int (count c))
              b (chunk-buffer size)]
          (dotimes [i size]
              (chunk-append b (f (.nth c i))))
          (chunk-cons (chunk b) (map f (chunk-rest s))))
        (cons (f (first s)) (map f (rest s)))))))
  ([f c1 c2]
   (lazy-seq
    (let [s1 (seq c1) s2 (seq c2)]
      (when (and s1 s2)
        (cons (f (first s1) (first s2))
              (map f (rest s1) (rest s2)))))))
  ([f c1 c2 c3]
   (lazy-seq
    (let [s1 (seq c1) s2 (seq c2) s3 (seq c3)]
      (when (and  s1 s2 s3)
        (cons (f (first s1) (first s2) (first s3))
              (map f (rest s1) (rest s2) (rest s3)))))))
  ([f c1 c2 c3 & colls]
   (let [step (fn step [cs]
                 (lazy-seq
                  (let [ss (map seq cs)]
                    (when (every? identity ss)
                      (cons (map first ss) (step (map rest ss)))))))]
     (map #(apply f %) (step (conj colls c3 c2 c1))))))

Looking the 1-arity call to filter, we can see something similar.

;; Copyright Rich Hickey, used under license.
;; filter from clojure.core
(defn filter
  "Returns a lazy sequence of the items in coll for which
  (pred item) returns logical true. pred must be free of side-effects.
  Returns a transducer when no collection is provided."
  {:added "1.0"
   :static true}
  ([pred]
    (fn [rf]
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
           (if (pred input)
             (rf result input)
             result)))))
  ([pred coll]
   (lazy-seq
    (when-let [s (seq coll)]
      (if (chunked-seq? s)
        (let [c (chunk-first s)
              size (count c)
              b (chunk-buffer size)]
          (dotimes [i size]
              (let [v (.nth c i)]
                (when (pred v)
                  (chunk-append b v))))
          (chunk-cons (chunk b) (filter pred (chunk-rest s))))
        (let [f (first s) r (rest s)]
          (if (pred f)
            (cons f (filter pred r))
            (filter pred r))))))))

Now for the last piece of the puzzle, look at how comp composes functions.

;; Copyright Rich Hickey, used under license.
;; comp from clojure.core
(defn comp
  "Takes a set of functions and returns a fn that is the composition
  of those fns.  The returned fn takes a variable number of args,
  applies the rightmost of fns to the args, the next
  fn (right-to-left) to the result, etc."
  {:added "1.0"
   :static true}
  ([] identity)
  ([f] f)
  ([f g] 
     (fn 
       ([] (f (g)))
       ([x] (f (g x)))
       ([x y] (f (g x y)))
       ([x y z] (f (g x y z)))
       ([x y z & args] (f (apply g x y z args)))))
  ([f g & fs]
     (reduce1 comp (list* f g fs))))

I'm sure some of you have figured out that calling comp on transducers will give us nested fn calls.

;; Doing this
(comp (filter even?)
      (map inc))
      
;; Gives us something like this:

;; rf is reducing function here
;; if we passed this xf to transduce with `+`, + is our reducing fn
;; rf from filter, is now bound to the function returned by 1-arty map
(fn [rf]
  (fn
    ([] (;; rf from map. I've sustituted the rf from filter for what
         ;; it is bound to, the function from map.
         (fn [rf]
           (fn
             ([] (rf))
             ([result] (rf result))
             ([result input]
              (rf result (f input)))
             ([result input & inputs]
              (rf result (apply f input inputs)))))))
    ([result] ((fn [rf]
                 (fn
                   ([] (rf))
                   ([result] (rf result))
                   ([result input]
                    (rf result (f input)))
                   ([result input & inputs]
                    (rf result (apply f input inputs))))) result))
    ([result input]
     (if (pred input)
       ((fn [rf]
          (fn
            ([] (rf))
            ([result] (rf result))
            ([result input]
             (rf result (f input)))
            ([result input & inputs]
             (rf result (apply f input inputs))))) result input)
       result))))

Passing a transducer to another transducer "stacks" the operation on top of one another, revealing the interface for transducers. A transducer is a function that takes a reducing function ( rf ) and returns a closure over it with three arities.

  • Init, or arity zero, calls the init arity on the rest of the stack.
  • Step, or arity two, calls the reducing function on two values, like a single step of reduce.
  • Completion, or arity one, Cleans up stateful transducers and returns a value. It must call the reducing function completion once.

If even? is true, then the value will be incremented in our example before being passed to the reducing function + in transduce.

(def xf (comp (filter even?) (map inc)))
(transduce xf + (range 100))

In a sense transducers 'parallelize' multiple transformations from stacking them on top of one another.

Gotta Go Fast

For most, developers often think of the parallelization as a forgone conclusion. I don't blame them. Every Computer Science faculty in the world (I hope) teaches about computational complexity analysis of algorithms like filter and map. They're both average time complexity of O(n), or linear time. Meaning, the algorithms time to run will be a linear function of it's input. So, according to Big-O Notation, even if our input is 5n, our time complexity will still be on the order of O(n).

Unfortunately for us working outside of the classroom, we have cloud bills to worry about. If we're working with the massive input of a collection with, say, five million records, we might not have the computational luxury of naively threading ( ->>) through a bunch of reducers. Even though crunching one million records and five million records have the same time complexity for clojure.core's reducers, in reality the latter has five times more instructions to execute sequentially with threading macros. It would be far more efficient if we just parallelize them using transducers!

Let's do a benchmark with Criterium to confirm.

user> (criterium.core/with-progress-reporting
        (criterium.core/bench
         (into []
               (comp
                (map inc)
                (filter odd?)
                (map dec)
                (filter even?)
                (map (fn [n] (+ 3 n)))
                (filter odd?)
                (map inc)
                (filter odd?)
                (map dec)
                (filter even?)
                (map (fn [n] (+ 3 n)))
                (filter odd?))
               (range 5000000))))
Warming up for JIT optimisations 10000000000 ...
Estimating execution count ...
Sampling ...
Final GC...
Checking GC...
Finding outliers ...
Bootstrapping ...
Checking outlier significance
Evaluation count : 600 in 60 samples of 10 calls.
             Execution time mean : 105.381854 ms
    Execution time std-deviation : 900.473165 µs
   Execution time lower quantile : 103.346469 ms ( 2.5%)
   Execution time upper quantile : 106.385211 ms (97.5%)
                   Overhead used : 1.598527 ns

Found 4 outliers in 60 samples (6.6667 %)
	low-severe	 3 (5.0000 %)
	low-mild	 1 (1.6667 %)
 Variance from outliers : 1.6389 % Variance is slightly inflated by outliers
nil
user> (criterium.core/with-progress-reporting
        (criterium.core/bench
         (->> (range 5000000)
              (map inc)
              (filter odd?)
              (map dec)
              (filter even?)
              (map (fn [n] (+ 3 n)))
              (filter odd?)
              (map inc)
              (filter odd?)
              (map dec)
              (filter even?)
              (map (fn [n] (+ 3 n)))
              (filter odd?)
              (into []))))
Warming up for JIT optimisations 10000000000 ...
Estimating execution count ...
Sampling ...
Final GC...
Checking GC...
Finding outliers ...
Bootstrapping ...
Checking outlier significance
Evaluation count : 240 in 60 samples of 4 calls.
             Execution time mean : 256.346727 ms
    Execution time std-deviation : 1.908808 ms
   Execution time lower quantile : 254.553894 ms ( 2.5%)
   Execution time upper quantile : 258.981447 ms (97.5%)
                   Overhead used : 1.598527 ns

Found 4 outliers in 60 samples (6.6667 %)
	low-severe	 1 (1.6667 %)
	low-mild	 2 (3.3333 %)
	high-mild	 1 (1.6667 %)
 Variance from outliers : 1.6389 % Variance is slightly inflated by outliers
nil

Example based on example from this discussion

Quite a reduction in execution time when using transducers on large input! You won't see that if the input was (range 10) and (range 50) though, there's just not enough to execute sequentially to see a worthy jump in performance.

Conclusion

In my opinion, transducers are an underrated feature in Clojure. Not only do we get functional programming building blocks for little cost, we get a signifigant boost in performance when working with large datasets.

Thanks for reading. Subscribe for the next one or follow me on twitter @janetacarr , or don't ¯\_(ツ)_/¯ . I might write about creating transducers next. You can also join the discussion about this post on twitter, hackernews, or reddit if you think I'm wrong. I'm sure some of you will.

Subscribe to Janet A. Carr

Sign up now to get access to the library of members-only issues.
Jamie Larson
Subscribe