Socket
Socket
Sign inDemoInstall

cuckmq

Package Overview
Dependencies
Maintainers
0
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cuckmq

A lightweight postgres-backed job queue


Version published
Maintainers
0
Created
Source

A lightweight, configurable job-queue backed by postgresql, offering an alternative to the redis-backed bullmq.

Core Features:

  • built-in type safety
  • repeating/scheduled jobs
  • retryable jobs
Job Lifecycle

To fully understand the below documentation, it is imperative that the "Job Lifecycle" is understood. The different states that jobs can be in are described below:

  • Active - Active jobs can be dequeued by workers for processing.
  • Locked - Locked jobs cannot be dequeued by workers for processing, but will eventually return to an active state.
  • Finalized - Finalized jobs also cannot be dequeued by workers for processing. Unlike locked jobs, A finalized job is in its terminal state and will not return to being active.

Jobs transition between these states in the following ways:

  • Jobs start in an active state when they are initially enqueued.
  • Upon dequeuing, jobs a move into a locked state, preventing other workers from also dequeuing them.
  • Jobs that are processed successfully by workers move into the finalized state.

Depending on job-specific configuration:

  • Locked jobs move back to an active state after a timeout (retrySecs).
  • Active jobs are finalized, regardless of whether they have been successfully processed or not (reapSecs).

Depending on global configuration:

  • Finalized jobs are eventually deleted (sweepSecs).

Installation

Install the package with:

yarn add cuckmq

Setup

Usage

To begin using cuckmq, we must first create the core Context object:

import { Context } from "cuckmq"
import process from "process"

const context = new Context({ pool })
Context#constructor
ParameterTypeRequiredDefaultDescription
poolpg.PoolyesN/Aa pg connection pool
eventHandler(event : CuckEvent) => voidnonullA handler that receives typed events (see below) from cuckmq
sweepSecsnumberno7 daysThe amount of time finalized jobs exist in the database before being deleted

Next, lets start defining definitions of various jobs that we would like cuckmq to run for us. This is achieved by creating JobDefinition objects and registering them with the context:

import { JobDefinition } from "cuckmq"

export const pingJob = new JobDefinition({
    name: "ping",
    context: context
    jobFunction: async (payload : { message : string }) => {
        console.log(payload.message)
    }
}).register()
JobDefinition#constructor
ParameterTypeRequiredDefaultDescription
namestringyesN/AA globally unique name for the job definition
contextcuckmq.ContextyesN/AA reference to the previously created Context object
channelstringno_defaultThe channel that jobs are published to. Workers will only process jobs from channels they are subscribed to
repeatSecsnumbernonullThe presence of this value specifies the job will be scheduled to run repeatedly with the interval specified - N.B. if this value is set, jobFunction can only receive an empty payload
reapSecsnumberyesN/AThe amount of time since creation before jobs are finalized, regardless of whether they successfully processed or not
retrySecsnumbernonullThe amount of time a locked job has to wait before transitioning back to an active state. By default, this value is set to match reapSecs - ensuring a job is only attempted once before finalization

Migrating

We must ensure the database is ready to persist job state. cuckmq creates tables, indices and stores data under the _cuckmq schema, to avoid namespace collisions with app tables.

This is achieved by calling the idempotent Context#migrate function. We can call this as part of our build step, or during app initialization:

await context.migrate()

Deploying

Once our jobs are defined, we must deploy the context - persisting info about each defined job definition. We achieve this by simply calling:

await context.deploy()

N.B. The enqueueing, dequeueing, processing of jobs and various maintenace tasks are all implicitly deferred until after the context deployment has completed.

Processing Jobs

We can create Worker objects to process enqueued jobs:

import { Worker } from "cuckmq"

const worker = new Worker({ context })
Worker#constructor
ParameterTypeRequiredDefaultDescription
contextcuckmq.ContextyesN/AA reference to the previously created Context object
pollSecsnumberno2 secsThe amount of time a worker waits before checking for new jobs
channelsstring[]no["_default"]The channels that the worker is subscribed to
concurrencynumberno1The number of jobs the worker will process concurrently

N.B. Concurrency can be achieved in multiple ways:

  1. Multiple processes can run a Worker instance.
  2. A single process can run multiple Worker instances.
  3. A single Worker instance can process multiple jobs concurrently via the concurrency constructor parameter.

CuckMQ workers poll the DB for new work. Thus, for tasks requiring large degrees of concurrency, it is advised to use option (3), as to prevent unnecessary hammering of the database.

Maintenance

We also need to have at least one Orchestrator running, which takes care of all maintenance tasks:

  1. Scheduling - enqueuing jobs that have been marked as repeating.
  2. Liberating - moving locked jobs back into an active state after JobDefinition#retrySecs.
  3. Reaping - moving active jobs into a finalized state after JobDefinition#reapSecs.
  4. Sweeping - deleting finalized jobs after Context#sweepSecs.
import { Orchestrator } from "cuckmq"

const orchestrator = new Orchestrator({ context })
Orchestrator#constructor
ParameterTypeRequiredDefaultDescription
contextcuckmq.ContextyesN/AA reference to the previously created Context object
reaperPollSecsnumberno5 minsThe time interval between checking for old active jobs that are due to be finalized
schedulerPollSecsnumberno5 secsThe time interval between checking for new repeating jobs that are due to be scheduled
sweeperPollSecsnumberno5 minsThe time interval between deleting old finalized that have been finalized for longer than Context#sweepSecs
liberatorPollSecsnumberno30 secsThe time interval between checking for locked jobs that are due to be unlocked and retried (according to JobDefinition#retrySecs).

N.B. If a job is due to be "reaped", workers will not dequeue it. This prevents scenarios where a job that should've been finalized manages to be processed before the Reaper can get to it.

Shutting down

Once we are ready to shut down, we can call await worker.stop(). The worker will finish the job it is currently processing (should one exist) and them immediately stop.

Event Handling

cuckmq produces a large amount of events that can be consumed via an optional EventHandler that can be passed into the Context object. Details about all event types are below:

KeyDescription
jobDefinitionDeployA JobDefinition has been persisted to the database
jobEnqueueA job has been enqueued and persisted in the database
jobDequeueA worker has dequeued a job for processing
jobOrphanA dequeued job did not find a corresponding JobDefinition registered with the Context
jobErrorA job threw an error during processing
jobRunA job has started to run on a worker
jobSuccessA job was sucessfully processed and was subsequently finalized
jobReapA job was finalized after existing for too long
jobLiberateA job was unlocked after being locked for JobDefinition#retrySecs
jobSweepA finalized job was deleted after being finalized for longer than Context#sweepSecs
jobScheduleA repeating job is due to be enqueued
jobScheduleOrphanA repeating job was due to be enqueued but there was no corresponding JobDefinition registered with the Context
engineStartAn "engine" [Worker, Liberator, Sweeper, Reaper, Scheduler] has started
enginePollAn engine has polled the database for new outstanding tasks
engineStopAn engine has completed any current tasks and has stopped polling for additional work

FAQs

Package last updated on 01 Sep 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc