Dead simple core.async job system in Clojure
In my off time (and my on time, for that matter) I've been working on this quirky thing I've called Scinamalink (sounds like 'skinamarink'). Scinamalink lets customers send Magic login links to their users with a single REST API call. If you don't know what a magic login link is, it's basically a password reset email on steroids. Quite literally a link that just authenticates the user's session, skipping the usual password reset flow.
My motivations for working on Scinamalink range from having something to show off my Clojure skills on Twitch, and to see if this is worthwhile for people since services like Auth0 and Clerk support magic login links. I'm 'unbundling' as the cool kids say.
An early problem I was concerned with was curtailing spam. I figure the best approach for this would be domain verification for the customer's domain. Makes sense. Time for an asynchronous job and some DNS queries.
Avoiding a RabbitMQ hole
The first thing someone suggested was introducing some kind of message broker, like RabbitMQ, and going from there. I said hell no. I'm trying to avoid complexity. Yet, I understand that building an async worker from scratch doesn't seem like the simplest approach.
My line of thinking is this: I think infrastructure is part of your Software Architecture. Every component in an architecture adds exponentially more complexity, whether it's a software component, or another process on the same network. By using something like RabbitMQ and writing the jobs for it, I'm essentially adding two or three more components to the architecture: The RabbitMQ process, its deployment configuration, and the code to manage jobs from my main application server.
Such complexity may be worth it for some developers building a solution, but, as someone who wants to get to market faster, cutting the ops work looks like the better approach. The obvious trade-off being my single-process worker system may be less reliable than RabbitMQ.
Simple Yet Reliable Enough
I have a single process with multiple threads, thanks to core.async. It's not lost on me that if the process fails, the jobs will be lost, so my approach to reliability starts at the data model and database. Let's take a look at the PostgreSQL data definition for a job:
CREATE TYPE worker_job_state AS ENUM ('created', 'starting', 'working', 'finished', 'crashed');
CREATE TABLE IF NOT EXISTS worker_jobs (
id serial primary key,
current_state worker_job_state not null default 'created',
timeout timestamp default now() + interval '24 hours',
attempt_count integer not null default 0,
priority integer not null default 1,
context jsonb not null,
created_at timestamp not null default now(),
updated_at timestamp not null default now()
);
I originally intended for timeouts and priorities to be a thing, but they're 'reserved for future use' (waste of time). But, since each job could be different in implementation, it's necessary to store some of the context in a free-form JSON blob column. For example, when verifying domain ownership, it might be a good idea to store the customer and domain associated with the verification job.
However, we can see each job shares the same states describing the job's lifecycle: created, starting, working, finished, and crashed. I think each state is self-explanatory here, and these states imply each job function is a Finite-State-Machine (FSM).
All work, no play
So, I need to implement a Finite-State-Machine in Clojure. If you've read any of my previous works, you know whats coming next: Functions returning functions. In short, we can implement a finite state machine by having a function represent each state. When a state needs to transition to another state, it returns that function, otherwise it returns itself. We can use the recurring lexical scope in a recursion to propel the state machine forward. It might be easier to start with the loop:
This is a bit more complex than my past examples, but the same idea. We pull a job function off a core.async channel, queue
, and execute it. The job function is executed in a core.async thread because jobs must perform blocking network operations. The result, the next-state
function comes off the channel returned by thread. Instead of passing the next-state
to the next recursive call, next-state
returns back to the queue
, provided it's a function, so it doesn't starve other job functions waiting in the queue
.
Jobs-as-functions also makes for easy synchronous testing as I just wrote a regular (non-go) loop to test each job's FSM. Of course, the jobs themselves require a bit of forethought.
They Took Our Jobs
So what does a job function look like? They're pretty simple:
The domain verification job gets created with ->job
which creates the job in the database and returns the first function to place on the queue with something like (dispatch-work queue (->job customer-id domain-id))
.
Since the workers are so thin themselves, jobs are responsible for everything related to its function. Each state needs to clean up after itself if something goes wrong. They also update the database with its serialized context regardless of failure.
However, I'm not bound by the rigidity of the job data model though. You'll notice that job-work
does most of the work for this task, yet do-job
sets the state to :working
in the database. I did this because I didn't want to unnecessarily write the state :working
to the database each time the job attempts to make the DNS query. The worker doesn't care as long as it gets a function. Although, when the process starts and loads the jobs from the database, it will start at do-job
again.
Starting, Restarting, and Unstarting Processes
At some point, jobs will need to be loaded from the database into the worker system whether it's from failures or restarts. This is a pretty simple process: Read from database, dispatch to the appropriate job constructor, and put the resulting jobs on the queue for the workers.
(defmulti ->job-fn
"Multimethod to dispatch on job creation function"
(fn [job]
(let [{:keys [current-state context]} job]
(vector (csk/->kebab-case-keyword (:job-type context))
(keyword current-state)))))
;; Currently in core.worker, but should be in core.loader:
(defn- start-existing-helper!
"Recursively puts each `job` fn on the port `queue`,
presumably to be processed by a worker (see above)."
[queue jobs]
(let [job (first jobs)]
(try
(a/put! queue job (fn [all-good]
(if all-good
(start-existing-helper! queue (rest jobs))
(log/errorf "didn't put job onto queue, exploding: %s" job))))
(catch Exception e
(log/errorf "didn't put job onto queue, exploding: %s" job)))))
(defn start-existing-jobs!
"Starts existing jobs from the DB"
[]
(try
(let [jobs (db-jobs/get-all-pending-jobs ds-opts buffer-limit)]
(->> jobs
(mapv ->job-fn)
(start-existing-helper! queue)))
(catch Exception e
(log/errorf "Unexpected error while starting existing jobs: %s" (.getMessage e)))))
In the actual job definition, we can extend ->job-fn
with a dispatch values that map to our database record's context
column.
This start-existing-jobs!
function gets called when the process starts, but we need a method to periodically load jobs while the process is running. Ideally, our job loader would be aware of each running job so that the same jobs aren't loaded over and over.
Lucky for us, core.async violates the thread-local nature of Clojure Vars. Meaning, that I can have a Var pointing to an atom where information about all the running jobs is stored (Jobs take on responsibility of registering themselves). As we can see, a loader functions very similarly to a worker running the start-existing-jobs!
functionality, filtering on what's in the registry
atom upon this iteration. After each iteration, the loaders will wait using a core.async/timeout
for specified amount of time.
Big Picture
Finally, the big picture of the sub-system emerges.