A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq
.
Installation
Install the package with:
yarn add cuckmq
N.B. pg
is a peer dependency:
yarn add pg
Usage
Unlike other offerings, cuckmq
requires you to manually provide a connection pool, allowing you to share a single connection pool across the entire app.
We must first create a Config
object. This contains a mapping of all jobs, as well as information about the database connection pool:
import { Pool } from "pg"
import { Config } from "cuckmq"
const pool : Pool = getPool()
const config : Config = new Config({ pool })
Next, we must ensure that cuckmq
has the requisite database tables to persist and mutate jobs. This can be done by calling the idempotent function:
await config.prepareSchema()
Now lets define our jobs:
import { JobDefinition } from "cuckmq"
export const pingJob = new JobDefinition({
config: config,
name : "ping",
jobFunction: async (payload : { message : string }) => {
console.log(payload.message)
}
})
To add jobs to the job queue, we simply call:
await pingJob.defer({ message: "Hello, World!" })
Before we proceed, we must ensure all defined jobs are "registered" with the config. This can be done by calling:
pingJob.register()
We can now instantiate worker daemons to process queued jobs.
import { Worker } from "cuckmq"
import process from "process"
const worker = new Worker({ config })
worker.start()
process.on("SIGINT", () => worker.stop())
Scheduled Jobs
cuckmq
supports scheduled jobs. They can be trivially defined by adding the repeatIntervalMs
property to job definitions:
import { JobDefinition } from "cuckmq"
export const pingScheduledJob = new JobDefinition({
config: config,
name : "ping-scheduled",
repeatIntervalMs: 60_000,
jobFunction: async (params : {}) => {
await pingJob.defer({ message : "Scheduled Hello, World!" })
}
})
N.B. you are only able to specify a scheduled job/repeatIntervalMs
if the type of params
in the job function is an empty object.
To ensure these jobs are enqueued as scheduled, we must setup a scheduler daemon:
import { Scheduler } from "cuckmq"
const scheduler = new Scheduler({ config })
scheduler.start()
process.on("SIGINT", () => scheduler.stop())
Sweeping Jobs
By default, cuckmq
doesn't delete jobs from the database. To facilitiate, this, we setup a sweeper daemon:
import { JobSweeper } from "cuckmq"
const sweeper = new JobSweeper({ config })
sweeper.start()
process.on("SIGINT", () => sweeper.stop())
Advanced Usage
cuckmq
classes are packed full of various configuration options, these are detailed below:
Config#constructor
Parameter | Type | Is Required | Default Value | Description |
---|
pool | pg.Pool | yes | N/A | a pg connection pool |
schema | string | no | _cuckmq | the DB schema under which the database tables are created |
JobDefinition#constructor
Parameter | Type | Is Required | Default Value | Description |
---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | yes | N/A | a unique name for the job definition |
channel | string | no | _default | an attribute that jobs are tagged with that workers can filter on |
numRetries | number | no | 0 | the number of times a job can be retried after erroring before being finalized |
repeatIntervalMs | number | no | null | If defined, the interval between jobs being automatically scheduled |
lockIntervalMs | number | no | 60_000 | The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job |
jobFunction | <T extends object> (T) => Promise<void> | yes | N/A | The definition of the function to process/perform the job |
Worker#constructor
Parameter | Type | Is Required | Default Value | Description |
---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your worker daemon |
channel | string | no | _default | an attribute that jobs are tagged with that workers can filter on |
concurrency | number | no | 0 | The number of jobs that a worker can process concurrently |
processIntervalMs | number | no | 1000 | The amount of time a worker will sleep after failing to dequeue a job before trying again |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the worker |
Scheduler#constructor
Parameter | Type | Is Required | Default Value | Description |
---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your scheduler daemon |
scheduleIntervalMs | number | no | 30_000 | The amount of time a scheduler will sleep after failing to schedule a job before trying again |
heartbeatIntervalMs | number | no | 60_000 | The interval at which the scheduler will update scheduled jobs with a "heartbeat", to stop them from going stale. The scheduler will also remove stale scheduled jobs at this time |
heartbeatCutoffMs | number | no | 60_000 * 12 | The maximum amount of time after the last "heartbeat" before the scheduler considers a scheduled job as stale |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the scheduler |
JobSweeper#constructor
Parameter | Type | Is Required | Default Value | Description |
---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your sweeper daemon |
sweepIntervalMs | number | no | 10 * 60_000 | The amount of time a sweeper will sleep between sweeps of stale jobs |
jobAgeCutoffMs | number | no | to_ms(1 week) | The maximum amount of time after a job is created before the sweeper considers a job as stale and thus ready for deletion. |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the sweeper |
Events
All daemons can accept an eventHandler
which will receive emitted events. The type of the eventHandler
is:
(params : {
daemonID : number,
name: string,
event : Event,
timestamp : Date
}) => void
Event
is a union type, with the field: eventType
used to differentiate them. The members of the union type are enumerated below:
Type | Event Type Field | Event |
---|
DaemonStarted | daemon-started | The daemon starts (via .start() ) |
DaemonStopSignalSent | daemon-stop-signal-sent | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown |
DaemonStopped | daemon-stopped | The daemon stops |
WorkerJobDequeued | worker-job-dequeued | A worker daemon pulls a job from the database for processing |
WorkerJobFinalized | worker-job-finalized | A job has been marked as completed/finalized. This happens if the job succeeds, or if it fails with no more retries available |
WorkerJobErrored | worker-job-errored | A job that a worker tried to run has thrown an error |
SchedulerJobScheduled | scheduler-job-scheduled | The scheduler has enqueued a scheduled/periodic job to be run |
SchedulerHeartbeat | scheduler-heartbeat | The scheduler has updated the heartbeat of at least one scheduled job |
SweeperJobsSwept | sweeper-jobs-swept | The job sweeper has cleaned/deleted at least one stale job |