A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq
.
Core Features:
- built-in type safety
- repeating/scheduled jobs
- retryable jobs
Job Lifecycle
To fully understand the below documentation, it is imperative that the "Job Lifecycle" is understood. The different states that jobs can be in are described below:
Active
- Active jobs can be dequeued by workers for processing.Locked
- Locked jobs cannot be dequeued by workers for processing, but will eventually return to an active state.Finalized
- Finalized jobs also cannot be dequeued by workers for processing. Unlike locked jobs, A finalized job is in its terminal state and will not return to being active.
Jobs transition between these states in the following ways:
- Jobs start in an active state when they are initially enqueued.
- Upon dequeuing, jobs a move into a locked state, preventing other workers from also dequeuing them.
- Jobs that are processed successfully by workers move into the finalized state.
Depending on job-specific configuration:
- Locked jobs move back to an active state after a timeout (
retryMs
). - Active jobs are finalized, regardless of whether they have been successfully processed or not (
reapMs
).
Depending on global configuration:
- Finalized jobs are eventually deleted (
sweepMs
).
Installation
Install the package with:
yarn add cuckmq
Setup
We must ensure the database is ready to persist job state. cuckmq
creates tables, indices and stores data under the _cuckmq
schema, to avoid namespace collisions with app tables.
This is achieved by using the Migrator
, which runs an idempotent migration function. We can call this as part of our build step, or during app initialization:
import { Migrator } from "cuckmq"
import { Pool } from "pg"
const pool = new Pool({ connectionString: process.env.DATABASE_URL as string })
await new Migrator(pool).migrate()
Usage
To begin using cuckmq
, we must first create the core Context
object:
import { Context } from "cuckmq"
import process from "process"
const context = new Context({ pool })
Context#constructor
Parameter | Type | Required | Default | Description |
---|
pool | pg.Pool | yes | N/A | a pg connection pool |
eventHandler | (event : CuckEvent) => void | no | null | A handler that receives typed events (see below) from cuckmq |
sweepMs | number | no | 7 days | The amount of time finalized jobs exist in the database before being deleted |
Next, lets start defining definitions of various jobs that we would like cuckmq
to run for us. This is achieved by creating JobDefinition
objects and registering them with the context
:
import { JobDefinition } from "cuckmq"
export const pingJob = new JobDefinition({
name: "ping",
context: context
jobFunction: async (payload : { message : string }) => {
console.log(payload.message)
}
}).register()
JobDefinition#constructor
Parameter | Type | Required | Default | Description |
---|
name | string | yes | N/A | A globally unique name for the job definition |
context | cuckmq.Context | yes | N/A | A reference to the previously created Context object |
channel | string | no | _default | The channel that jobs are published to. Workers will only process jobs from channels they are subscribed to |
repeatMs | number | no | null | The presence of this value specifies the job will be scheduled to run repeatedly with the interval specified - N.B. if this value is set, jobFunction can only receive an empty payload |
reapMs | number | yes | N/A | The amount of time since creation before jobs are finalized, regardless of whether they successfully processed or not |
retryMs | number | no | null | The amount of time a locked job has to wait before transitioning back to an active state. By default, this value is set to match reapMs - ensuring a job is only attempted once before finalization |
Deploying
Once our jobs are defined, we must deploy the context - persisting info about each defined job definition. We achieve this by simply calling:
await context.deploy()
N.B. The enqueueing, dequeueing, processing of jobs and various maintenace tasks are all implicitly deferred until after the context deployment has completed.
Processing Jobs
We can create Worker
objects to process enqueued jobs:
import { Worker } from "cuckmq"
const worker = new Worker({ context })
Worker#constructor
Parameter | Type | Required | Default | Description |
---|
context | cuckmq.Context | yes | N/A | A reference to the previously created Context object |
pollMs | number | no | 2 secs | The amount of time a worker waits before checking for new jobs |
channels | string[] | no | ["_default"] | The channels that the worker is subscribed to |
concurrency | number | no | 1 | The number of jobs the worker will process concurrently |
N.B. Concurrency can be achieved in multiple ways:
- Multiple processes can run a
Worker
instance. - A single process can run multiple
Worker
instances. - A single
Worker
instance can process multiple jobs concurrently via the concurrency
constructor parameter.
CuckMQ workers poll the DB for new work. Thus, for tasks requiring large degrees of concurrency, it is advised to use option (3), as to prevent unnecessary hammering of the database.
Maintenance
We also need to have at least one Orchestrator
running, which takes care of all maintenance tasks:
- Scheduling - enqueuing jobs that have been marked as repeating.
- Liberating - moving locked jobs back into an active state after
JobDefinition#retryMs
. - Reaping - moving active jobs into a finalized state after
JobDefinition#reapMs
. - Sweeping - deleting finalized jobs after
Context#sweepMs
.
import { Orchestrator } from "cuckmq"
const orchestrator = new Orchestrator({ context })
Orchestrator#constructor
Parameter | Type | Required | Default | Description |
---|
context | cuckmq.Context | yes | N/A | A reference to the previously created Context object |
reaperPollMs | number | no | 5 mins | The time interval between checking for old active jobs that are due to be finalized |
schedulerPollMs | number | no | 5 secs | The time interval between checking for new repeating jobs that are due to be scheduled |
sweeperPollMs | number | no | 5 mins | The time interval between deleting old finalized that have been finalized for longer than Context#sweepMs |
liberatorPollMs | number | no | 30 secs | The time interval between checking for locked jobs that are due to be unlocked and retried (according to JobDefinition#retryMs ). |
N.B. If a job is due to be "reaped", workers will not dequeue it. This prevents scenarios where a job that should've been finalized manages to be processed before the Reaper can get to it.
Shutting down
Once we are ready to shut down, we can call await worker.stop()
. The worker will finish the job it is currently processing (should one exist) and them immediately stop.
Event Handling
cuckmq
produces a large amount of events that can be consumed via an optional EventHandler
that can be passed into the Context
object. Details about all event types are below:
Key | Description |
---|
jobDefinitionDeploy | A JobDefinition has been persisted to the database |
jobEnqueue | A job has been enqueued and persisted in the database |
jobDequeue | A worker has dequeued a job for processing |
jobOrphan | A dequeued job did not find a corresponding JobDefinition registered with the Context |
jobError | A job threw an error during processing |
jobRun | A job has started to run on a worker |
jobSuccess | A job was sucessfully processed and was subsequently finalized |
jobReap | A job was finalized after existing for too long |
jobLiberate | A job was unlocked after being locked for JobDefinition#retryMs |
jobSweep | A finalized job was deleted after being finalized for longer than Context#sweepMs |
jobSchedule | A repeating job is due to be enqueued |
jobScheduleOrphan | A repeating job was due to be enqueued but there was no corresponding JobDefinition registered with the Context |
engineStart | An "engine" [Worker, Liberator, Sweeper, Reaper, Scheduler] has started |
enginePoll | An engine has polled the database for new outstanding tasks |
engineStop | An engine has completed any current tasks and has stopped polling for additional work |