Socket
Socket
Sign inDemoInstall

cuckmq

Package Overview
Dependencies
35
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    cuckmq

A lightweight postgres-backed job queue


Version published
Weekly downloads
20
decreased by-74.68%
Maintainers
1
Install size
68.4 kB
Created
Weekly downloads
 

Readme

Source

A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq.

Core Features:

  • built-in type safety.
  • repeating/scheduled jobs.
  • rate-limited jobs.
  • job dependencies.
  • delayed jobs.
  • retryable jobs.
  • self cleaning.

Installation

Install the package with:

yarn add cuckmq

N.B. pg is a peer dependency:

yarn add pg

Usage

To start, we must first create a Config object. This contains a mapping of all jobs, a reference to a pg connection pool and the name of the schema in which cuckmq will work...

import { Pool } from "pg"
import { Config } from "cuckmq"

const pool : Pool = getPool()
const config : Config = new Config({ pool })

Next, we must ensure that cuckmq has the requisite database tables to persist and mutate jobs. This can be done by calling the idempotent function:

await config.prepareSchema()

Deferring Jobs

Now lets define our jobs:

import { JobDefinition } from "cuckmq"

export const pingJob = new JobDefinition({
    config: config,
    name : "ping",
    jobFunction: async (payload : { message : string }) => {
        console.log(payload.message)
    }
})

To add jobs to the job queue, we simply call:

await pingJob.defer({ payload: { message: "Hello, World!" }})

Running Jobs

We must instantiate "daemons" in order to ensure oustanding jobs are processed. N.B. make sure job definitions are "registered" by calling #register() on each JobDefinition prior to constructing any daemons.

import { Worker, Orchestrator } from "cuckmq"
import process from "process"
import { pingJob } from "./jobs"

pingJob.register()

// Create Worker daemon(s) and an Orchestrator daemon
// N.B. Daemons will start automatically once created.
const daemons = [
    new Worker({config}),
    new Orchestrator({config})
]

// Request all daemons to gracefully shutdown on SIGINT signal
process.on("SIGINT", () => {
    daemons.forEach(d => d.setShouldStop())
})

// Wait until all daemons have gracefully shutdown.
await Promise.all(daemons.map(d => d.join()))

Repeatable Jobs

cuckmq supports repeatable jobs. They can be trivially defined by adding the repeatIntervalMs property to job definitions:

import { JobDefinition } from "cuckmq"

export const pingScheduledJob = new JobDefinition({
    config: config,
    name : "ping-scheduled",
    repeatIntervalMs: 60_000, // Run every minute
    jobFunction: async (params : {}) => {
        await pingJob.defer({ message : "Scheduled Hello, World!" })
    }
})

N.B. you are only able to specify a repeatable job/repeatIntervalMs if the type of params in the job function is an empty object.

Advanced Usage

cuckmq classes are packed full of various configuration options, these are detailed below:

Config#constructor
ParameterTypeIs RequiredDefault ValueDescription
poolpg.PoolyesN/Aa pg connection pool
schemastringno_cuckmqthe DB schema under which the database tables are created
JobDefinition#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringyesN/Aa unique name for the job definition
channelstringno_defaultan attribute that jobs are tagged with that workers can filter on
numAttemptsnumberno0the number of times a job can be attempted after erroring before being finalized
repeatIntervalMsnumbernonullIf defined, the interval between jobs being automatically scheduled
releaseIntervalMsnumberno0This defines the minimum amount of time that must elapse between jobs being released from the queue. Use this to perform rate limiting for certain jobs.
timeoutIntervalMsnumberno12 * 60 * 60_000This defines the maximum amount of time that a job can exist before it "times out", resulting in the job being "finalized".
lockIntervalMsnumberno60_000The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job
jobFunction<T extends object> (T) => Promise<void>yesN/AThe definition of the function to process/perform the job
Worker#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringnoanonA nickname for your worker daemon
channelstringno_defaultan attribute that jobs are tagged with that workers can filter on
concurrencynumberno0The number of jobs that a worker can process concurrently
processIntervalMsnumberno1000The amount of time a worker will sleep after failing to dequeue a job before trying again
eventHandlerEventHandlernoN/AA handler to listen to events emitted by the worker
Orchestrator#constructor
ParameterTypeIs RequiredDefault ValueDescription
configcuckmq.ConfigyesN/Athe instantiated cuckmq config object
namestringnoanonA nickname for your scheduler daemon
repeatIntervalMsnumberno30_000The amount of time the orchestrator will wait after not finding a repeatable job to schedule before trying again
heartbeatIntervalMsnumberno60_000The amount of time the orchestrator will wait between updating the heartbeat state of all registered JobDefinitions.
cleanIntervalMsnumberno5 * 60_000The amount of time the orchestrator will wait between performing a database clean
staleJobDefinitionSweepThresholdMsnumberno60_000 * 60The maximum amount of time after the last "heartbeat" before the orchestrator considers a job definition as stale and tries to remove it
finalizedJobSweepThresholdMsnumberno12 * 60 * 60_000The maximum amount of time a finalized job will exist before the orchestrator attempts to remove it
eventHandlerEventHandlernoN/AA handler to listen to events emitted by the scheduler
Events

All daemons can accept an eventHandler which will receive emitted events. The type of the eventHandler is:

(params : {
    daemonID : number, // A unique daemon ID
    name: string, // The name of the daemon
    event : Event,
    timestamp : Date
}) => void

Event is a union type, with the field: eventType used to differentiate them. The members of the union type are enumerated below:

TypeEvent Type FieldEvent
DaemonStartdaemon-startThe daemon starts (via .start())
DaemonStopSignalSenddaemon-stop-signal-sendThe daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown
DaemonStopdaemon-stopThe daemon stops
WorkerJobDequeueworker-job-dequeueA worker daemon pulls a job from the database for processing
WorkerJobFinalizeSuccessworker-job-finalize-successA job has been successfully run by the worker
WorkerJobFinalizeFailureOrphanedworker-job-finalize-failure-orphanedA job has been finalized because the worker is unable to find an associated JobDefinition
WorkerJobErrorworker-job-errorA job that a worker tried to run has thrown an error
OrchestratorJobScheduleorchestrator-job-scheduleThe orchestrator has enqueued a scheduled/periodic job to be run
OrchestratorHeartbeatorchestrator-heartbeatThe orchestrator has updated the heartbeat of at least one scheduled job

FAQs

Last updated on 25 Apr 2024

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc