Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cuckmq

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cuckmq

A lightweight postgres-backed job queue

  • 0.1.0
  • npm
  • Socket score

Version published
Maintainers
1
Created
Source

A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq.

Installation

Install the package with:

yarn add cuckmq

N.B. pg is a peer dependency:

yarn add pg

Usage

Unlike other offerings, cuckmq requires you to manually provide a connection pool, allowing you to share a single connection pool across the entire app.

We must first create a Config object. This contains a mapping of all jobs, as well as information about the database connection pool:

import { Pool } from "pg"
import { Config } from "cuckmq"

const pool : Pool = getPool()
const config : Config = new Config({ pool })

Next, we must ensure that cuckmq has the requisite database tables to persist and mutate jobs. This can be done by calling the idempotent function:

await config.prepareSchema()

Now lets define our jobs:

import { JobDefinition } from "cuckmq"

export const pingJob = new JobDefinition({
    config: config,
    name : "ping",
    jobFunction: async (payload : { message : string }) => {
        console.log(payload.message)
    }
})

To add jobs to the job queue, we simply call:

await pingJob.defer({ message: "Hello, World!" })

Before we proceed, we must ensure all defined jobs are "registered" with the config. This can be done by calling:

    pingJob.register()

We can now instantiate worker daemons to process queued jobs.

import { Worker } from "cuckmq"
import process from "process"

const worker = new Worker({ config })
worker.start()
process.on("SIGINT", () => worker.stop())

Scheduled Jobs

cuckmq supports scheduled jobs. They can be trivially defined by adding the repeatIntervalMs property to job definitions:

import { JobDefinition } from "cuckmq"

export const pingScheduledJob = new JobDefinition({
    config: config,
    name : "ping-scheduled",
    repeatIntervalMs: 60_000, // Run every minute
    jobFunction: async (params : {}) => {
        await pingJob.defer({ message : "Scheduled Hello, World!" })
    }
})

N.B. you are only able to specify a scheduled job/repeatIntervalMs if the type of params in the job function is an empty object.

To ensure these jobs are enqueued as scheduled, we must setup a scheduler daemon:

import { Scheduler } from "cuckmq"

const scheduler = new Scheduler({ config })
scheduler.start()
process.on("SIGINT", () => scheduler.stop())

Sweeping Jobs

By default, cuckmq doesn't delete jobs from the database. To facilitiate, this, we setup a sweeper daemon:

import { JobSweeper } from "cuckmq"

const sweeper = new JobSweeper({ config })
sweeper.start()
process.on("SIGINT", () => sweeper.stop())

Advanced Usage

cuckmq classes are packed full of various configuration options, these are detailed below:

Config#constructor
ParameterTypeIs RequiredDefault ValueDescription
poolpg.PoolyesN/Aa pg connection pool
schemastringno_cuckmqthe DB schema under which the database tables are created
JobDefinition#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringyesN/Aa unique name for the job definition
channelstringno_defaultan attribute that jobs are tagged with that workers can filter on
numRetriesnumberno0the number of times a job can be retried after erroring before being finalized
repeatIntervalMsnumbernonullIf defined, the interval between jobs being automatically scheduled
lockIntervalMsnumberno60_000The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job
jobFunction<T extends object> (T) => Promise<void>yesN/AThe definition of the function to process/perform the job
Worker#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringnoanonA nickname for your worker daemon
channelstringno_defaultan attribute that jobs are tagged with that workers can filter on
concurrencynumberno0The number of jobs that a worker can process concurrently
processIntervalMsnumberno1000The amount of time a worker will sleep after failing to dequeue a job before trying again
eventHandlerEventHandlernoN/AA handler to listen to events emitted by the worker
Scheduler#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringnoanonA nickname for your scheduler daemon
scheduleIntervalMsnumberno30_000The amount of time a scheduler will sleep after failing to schedule a job before trying again
heartbeatIntervalMsnumberno60_000The interval at which the scheduler will update scheduled jobs with a "heartbeat", to stop them from going stale. The scheduler will also remove stale scheduled jobs at this time
heartbeatCutoffMsnumberno60_000 * 12The maximum amount of time after the last "heartbeat" before the scheduler considers a scheduled job as stale
eventHandlerEventHandlernoN/AA handler to listen to events emitted by the scheduler
JobSweeper#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringnoanonA nickname for your sweeper daemon
sweepIntervalMsnumberno10 * 60_000The amount of time a sweeper will sleep between sweeps of stale jobs
jobAgeCutoffMsnumbernoto_ms(1 week)The maximum amount of time after a job is created before the sweeper considers a job as stale and thus ready for deletion.
eventHandlerEventHandlernoN/AA handler to listen to events emitted by the sweeper
Events

All daemons can accept an eventHandler which will receive emitted events. The type of the eventHandler is:

(params : {
    daemonID : number, // A unique daemon ID
    name: string, // The name of the daemon
    event : Event,
    timestamp : Date
}) => void

Event is a union type, with the field: eventType used to differentiate them. The members of the union type are enumerated below:

TypeEvent Type FieldEvent
DaemonStarteddaemon-startedThe daemon starts (via .start())
DaemonStopSignalSentdaemon-stop-signal-sentThe daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown
DaemonStoppeddaemon-stoppedThe daemon stops
WorkerJobDequeuedworker-job-dequeuedA worker daemon pulls a job from the database for processing
WorkerJobFinalizedworker-job-finalizedA job has been marked as completed/finalized. This happens if the job succeeds, or if it fails with no more retries available
WorkerJobErroredworker-job-erroredA job that a worker tried to run has thrown an error
SchedulerJobScheduledscheduler-job-scheduledThe scheduler has enqueued a scheduled/periodic job to be run
SchedulerHeartbeatscheduler-heartbeatThe scheduler has updated the heartbeat of at least one scheduled job
SweeperJobsSweptsweeper-jobs-sweptThe job sweeper has cleaned/deleted at least one stale job

FAQs

Package last updated on 17 Apr 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc