Security News
RubyGems.org Adds New Maintainer Role
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq
.
Install the package with:
yarn add cuckmq
N.B. pg
is a peer dependency:
yarn add pg
Unlike other offerings, cuckmq
requires you to manually provide a connection pool, allowing you to share a single connection pool across the entire app.
We must first create a Config
object. This contains a mapping of all jobs, as well as information about the database connection pool:
import { Pool } from "pg"
import { Config } from "cuckmq"
const pool : Pool = getPool()
const config : Config = new Config({ pool })
Next, we must ensure that cuckmq
has the requisite database tables to persist and mutate jobs. This can be done by calling the idempotent function:
await config.prepareSchema()
Now lets define our jobs:
import { JobDefinition } from "cuckmq"
export const pingJob = new JobDefinition({
config: config,
name : "ping",
jobFunction: async (payload : { message : string }) => {
console.log(payload.message)
}
})
To add jobs to the job queue, we simply call:
await pingJob.defer({ message: "Hello, World!" })
Before we proceed, we must ensure all defined jobs are "registered" with the config. This can be done by calling:
pingJob.register()
We can now instantiate worker daemons to process queued jobs.
import { Worker } from "cuckmq"
import process from "process"
const worker = new Worker({ config })
worker.start()
process.on("SIGINT", () => worker.stop())
cuckmq
supports scheduled jobs. They can be trivially defined by adding the repeatIntervalMs
property to job definitions:
import { JobDefinition } from "cuckmq"
export const pingScheduledJob = new JobDefinition({
config: config,
name : "ping-scheduled",
repeatIntervalMs: 60_000, // Run every minute
jobFunction: async (params : {}) => {
await pingJob.defer({ message : "Scheduled Hello, World!" })
}
})
N.B. you are only able to specify a scheduled job/repeatIntervalMs
if the type of params
in the job function is an empty object.
To ensure these jobs are enqueued as scheduled, we must setup a scheduler daemon:
import { Scheduler } from "cuckmq"
const scheduler = new Scheduler({ config })
scheduler.start()
process.on("SIGINT", () => scheduler.stop())
By default, cuckmq
doesn't delete jobs from the database. To facilitiate, this, we setup a sweeper daemon:
import { JobSweeper } from "cuckmq"
const sweeper = new JobSweeper({ config })
sweeper.start()
process.on("SIGINT", () => sweeper.stop())
cuckmq
classes are packed full of various configuration options, these are detailed below:
Parameter | Type | Is Required | Default Value | Description |
---|---|---|---|---|
pool | pg.Pool | yes | N/A | a pg connection pool |
schema | string | no | _cuckmq | the DB schema under which the database tables are created |
Parameter | Type | Is Required | Default Value | Description |
---|---|---|---|---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | yes | N/A | a unique name for the job definition |
channel | string | no | _default | an attribute that jobs are tagged with that workers can filter on |
numRetries | number | no | 0 | the number of times a job can be retried after erroring before being finalized |
repeatIntervalMs | number | no | null | If defined, the interval between jobs being automatically scheduled |
lockIntervalMs | number | no | 60_000 | The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job |
jobFunction | <T extends object> (T) => Promise<void> | yes | N/A | The definition of the function to process/perform the job |
Parameter | Type | Is Required | Default Value | Description |
---|---|---|---|---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your worker daemon |
channel | string | no | _default | an attribute that jobs are tagged with that workers can filter on |
concurrency | number | no | 0 | The number of jobs that a worker can process concurrently |
processIntervalMs | number | no | 1000 | The amount of time a worker will sleep after failing to dequeue a job before trying again |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the worker |
Parameter | Type | Is Required | Default Value | Description |
---|---|---|---|---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your scheduler daemon |
scheduleIntervalMs | number | no | 30_000 | The amount of time a scheduler will sleep after failing to schedule a job before trying again |
heartbeatIntervalMs | number | no | 60_000 | The interval at which the scheduler will update scheduled jobs with a "heartbeat", to stop them from going stale. The scheduler will also remove stale scheduled jobs at this time |
heartbeatCutoffMs | number | no | 60_000 * 12 | The maximum amount of time after the last "heartbeat" before the scheduler considers a scheduled job as stale |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the scheduler |
Parameter | Type | Is Required | Default Value | Description |
---|---|---|---|---|
config | cuckmq.Config | yes | N/A | the instantiated cuckmq config object |
name | string | no | anon | A nickname for your sweeper daemon |
sweepIntervalMs | number | no | 10 * 60_000 | The amount of time a sweeper will sleep between sweeps of stale jobs |
jobAgeCutoffMs | number | no | to_ms(1 week) | The maximum amount of time after a job is created before the sweeper considers a job as stale and thus ready for deletion. |
eventHandler | EventHandler | no | N/A | A handler to listen to events emitted by the sweeper |
All daemons can accept an eventHandler
which will receive emitted events. The type of the eventHandler
is:
(params : {
daemonID : number, // A unique daemon ID
name: string, // The name of the daemon
event : Event,
timestamp : Date
}) => void
Event
is a union type, with the field: eventType
used to differentiate them. The members of the union type are enumerated below:
Type | Event Type Field | Event |
---|---|---|
DaemonStarted | daemon-started | The daemon starts (via .start() ) |
DaemonStopSignalSent | daemon-stop-signal-sent | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown |
DaemonStopped | daemon-stopped | The daemon stops |
WorkerJobDequeued | worker-job-dequeued | A worker daemon pulls a job from the database for processing |
WorkerJobFinalized | worker-job-finalized | A job has been marked as completed/finalized. This happens if the job succeeds, or if it fails with no more retries available |
WorkerJobErrored | worker-job-errored | A job that a worker tried to run has thrown an error |
SchedulerJobScheduled | scheduler-job-scheduled | The scheduler has enqueued a scheduled/periodic job to be run |
SchedulerHeartbeat | scheduler-heartbeat | The scheduler has updated the heartbeat of at least one scheduled job |
SweeperJobsSwept | sweeper-jobs-swept | The job sweeper has cleaned/deleted at least one stale job |
FAQs
A lightweight postgres-backed job queue
We found that cuckmq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.
Security News
Research
Socket's threat research team has detected five malicious npm packages targeting Roblox developers, deploying malware to steal credentials and personal data.