Socket
Socket
Sign inDemoInstall

cuckmq

Package Overview
Dependencies
0
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.3 to 0.2.0

dist/orchestrator.d.ts

4

dist/config.d.ts

@@ -13,7 +13,7 @@ import { Pool } from "pg";

getJobDefinitions: () => JobDefinition<any>[];
getJobDefinition: (name: string) => JobDefinition<any> | null;
getJobDefinition: (name: string) => JobDefinition<any> | undefined;
registerJobDefinition: (jobDefinition: JobDefinition<any>) => void;
getSchema: () => string;
getPool: () => Pool;
getSchema: () => string;
prepareSchema: () => Promise<void>;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Config = void 0;
const defaultSchemaName = "_cuckmq";
const prepare_schema_1 = require("./query/prepare-schema");
const defaultSchema = "_cuckmq";
class Config {

@@ -9,3 +10,3 @@ pool;

jobDefinitions;
constructor({ pool, schema = defaultSchemaName }) {
constructor({ pool, schema = defaultSchema }) {
this.pool = pool;

@@ -19,3 +20,3 @@ this.schema = schema;

getJobDefinition = (name) => {
return this.jobDefinitions[name] || null;
return this.jobDefinitions[name];
};

@@ -25,43 +26,15 @@ registerJobDefinition = (jobDefinition) => {

};
getSchema = () => {
return this.schema;
};
getPool = () => {
return this.pool;
};
getSchema = () => {
return this.schema;
};
prepareSchema = async () => {
await this.pool.query(`
CREATE SCHEMA IF NOT EXISTS "${this.schema}"
`);
await this.pool.query(`
CREATE TABLE IF NOT EXISTS "${this.schema}"."scheduled_job" (
"id" SERIAL,
"name" TEXT NOT NULL,
"repeat_interval_ms" INTEGER NOT NULL,
"last_repeated_at" TIMESTAMP NULL,
"last_heartbeat_at" TIMESTAMP NULL,
PRIMARY KEY ("id")
)
`);
await this.pool.query(`
CREATE UNIQUE INDEX IF NOT EXISTS "scheduled_job_name_idx" ON
"${this.schema}"."scheduled_job" ("name")
`);
await this.pool.query(`
CREATE TABLE IF NOT EXISTS "${this.schema}"."job" (
"id" SERIAL,
"name" TEXT NOT NULL,
"channel" TEXT NOT NULL,
"payload" JSONB NOT NULL,
"num_retries" INTEGER NOT NULL,
"lock_interval_ms" INTEGER NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT NOW(),
"locked_at" TIMESTAMP NULL,
"finalized_at" TIMESTAMP NULL,
"is_success" BOOLEAN NULL,
PRIMARY KEY ("id")
)
`);
await (0, prepare_schema_1.prepareSchema)({
schema: this.schema,
handle: this.pool
})();
};
}
exports.Config = Config;

@@ -8,2 +8,3 @@ import { Event } from "./event";

}) => void;
export type StopCallback = () => void;
export declare class Daemon {

@@ -16,7 +17,8 @@ private shouldStop;

constructor(name: string, eventHandler?: DaemonEventHandler);
protected getShouldStop: () => boolean;
protected onEvent: (event: Event) => void;
protected run: () => Promise<void>;
start: () => Promise<void>;
stop: () => Promise<void>;
protected getShouldStop(): boolean;
protected onEvent(event: Event): void;
protected run(): Promise<void>;
protected snooze(): Promise<void>;
join(): Promise<void>;
setShouldStop(): Promise<void>;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Daemon = void 0;
const utils_1 = require("./utils");
let daemonID = 0;
const snoozeMs = 50;
class Daemon {

@@ -18,7 +20,11 @@ shouldStop;

daemonID += 1;
this.onEvent({ eventType: "daemon-start" });
this.runPromise = Promise.resolve()
.then(() => this.run())
.then(() => this.onEvent({ eventType: "daemon-stop" }));
}
getShouldStop = () => {
getShouldStop() {
return this.shouldStop;
};
onEvent = (event) => {
}
onEvent(event) {
if (this.eventHandler === undefined) {

@@ -33,20 +39,17 @@ return;

});
};
run = () => {
}
run() {
throw new Error("Not implemented");
};
start = () => {
if (this.runPromise === null) {
this.onEvent({ eventType: "daemon-started" });
this.runPromise = this.run()
.then(() => this.onEvent({ eventType: "daemon-stopped" }));
}
return this.runPromise;
};
stop = async () => {
}
async snooze() {
await (0, utils_1.sleep)(snoozeMs);
}
async join() {
await this.runPromise;
}
async setShouldStop() {
this.shouldStop = true;
this.onEvent({ eventType: "daemon-stop-signal-sent" });
await this.runPromise;
};
this.onEvent({ eventType: "daemon-stop-signal-send" });
}
}
exports.Daemon = Daemon;

@@ -1,38 +0,68 @@

export type DaemonStarted = {
eventType: "daemon-started";
export type DaemonStart = {
eventType: "daemon-start";
};
export type DaemonStopSignalSent = {
eventType: "daemon-stop-signal-sent";
export type DaemonStopSignalSend = {
eventType: "daemon-stop-signal-send";
};
export type DaemonStopped = {
eventType: "daemon-stopped";
export type DaemonStop = {
eventType: "daemon-stop";
};
export type WorkerJobDequeued = {
eventType: "worker-job-dequeued";
export type WorkerJobDequeue = {
eventType: "worker-job-dequeue";
jobID: number;
jobName: string;
jobDefinitionName: string;
};
export type WorkerJobFinalized = {
eventType: "worker-job-finalized";
export type WorkerJobFinalizeSuccess = {
eventType: "worker-job-finalize-success";
jobID: number;
jobName: string;
isSuccess: boolean;
jobDefinitionName: string;
};
export type WorkerJobErrored = {
eventType: "worker-job-errored";
export type WorkerJobFinalizeFailureOrphaned = {
eventType: "worker-job-finalize-failure-orphaned";
jobID: number;
jobName: string;
jobDefinitionName: string;
};
export type WorkerJobError = {
eventType: "worker-job-error";
jobID: number;
jobDefinitionName: string;
error: any;
};
export type SchedulerJobScheduled = {
eventType: "scheduler-job-scheduled";
export type OrchestratorJobRepeat = {
eventType: "orchestrator-job-repeat";
jobID: number;
jobName: string;
jobDefinitionName: string;
};
export type SchedulerHeartbeat = {
eventType: "scheduler-heartbeat";
export type OrchestratorJobDefinitionInitialize = {
eventType: "orchestrator-job-definition-initialize";
jobDefinitionName: string;
};
export type SweeperJobsSwept = {
eventType: "sweeper-jobs-swept";
export type OrchestratorJobDefinitionHeartbeat = {
eventType: "orchestrator-job-definition-heartbeat";
jobDefinitionName: string;
};
export type Event = DaemonStarted | DaemonStopSignalSent | DaemonStopped | WorkerJobDequeued | WorkerJobFinalized | WorkerJobErrored | SchedulerJobScheduled | SchedulerHeartbeat | SweeperJobsSwept;
export type OrchestratorJobFinalizeFailureTimeout = {
eventType: "orchestrator-job-finalize-failure-timeout";
jobID: number;
jobDefinitionName: string;
};
export type OrchestratorJobFinalizeFailureUnmetDependencies = {
eventType: "orchestrator-job-finalize-failure-unmet-dependencies";
jobID: number;
jobDefinitionName: string;
};
export type OrchestratorJobFinalizeFailureNoAttemptsRemaining = {
eventType: "orchestrator-job-finalize-failure-no-attempts-remaining";
jobID: number;
jobDefinitionName: string;
};
export type OrchestratorFinalizedJobSweep = {
eventType: "orchestrator-finalized-job-sweep";
jobID: number;
jobDefinitionName: string;
};
export type OrchestratorStaleJobDefinitionSweep = {
eventType: "orchestrator-stale-job-definition-sweep";
jobDefinitionName: string;
};
export type Event = DaemonStart | DaemonStopSignalSend | DaemonStop | WorkerJobDequeue | WorkerJobError | WorkerJobFinalizeSuccess | WorkerJobFinalizeFailureOrphaned | OrchestratorJobDefinitionInitialize | OrchestratorJobDefinitionHeartbeat | OrchestratorJobRepeat | OrchestratorJobFinalizeFailureTimeout | OrchestratorJobFinalizeFailureUnmetDependencies | OrchestratorJobFinalizeFailureNoAttemptsRemaining | OrchestratorFinalizedJobSweep | OrchestratorStaleJobDefinitionSweep;
export { Config } from "./config";
export { JobSweeper } from "./job-sweeper";
export { Scheduler } from "./scheduler";
export { Orchestrator } from "./orchestrator";
export { Worker } from "./worker";
export { JobDefinition } from "./job-definition";
export { Job } from "./job";
export { DaemonStarted, DaemonStopSignalSent, DaemonStopped, WorkerJobDequeued, WorkerJobFinalized, WorkerJobErrored, SchedulerJobScheduled, SchedulerHeartbeat, SweeperJobsSwept, Event } from "./event";
export { DaemonEventHandler } from "./daemon";
export * as event from "./event";
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Job = exports.JobDefinition = exports.Worker = exports.Scheduler = exports.JobSweeper = exports.Config = void 0;
exports.event = exports.JobDefinition = exports.Worker = exports.Orchestrator = exports.Config = void 0;
var config_1 = require("./config");
Object.defineProperty(exports, "Config", { enumerable: true, get: function () { return config_1.Config; } });
var job_sweeper_1 = require("./job-sweeper");
Object.defineProperty(exports, "JobSweeper", { enumerable: true, get: function () { return job_sweeper_1.JobSweeper; } });
var scheduler_1 = require("./scheduler");
Object.defineProperty(exports, "Scheduler", { enumerable: true, get: function () { return scheduler_1.Scheduler; } });
var orchestrator_1 = require("./orchestrator");
Object.defineProperty(exports, "Orchestrator", { enumerable: true, get: function () { return orchestrator_1.Orchestrator; } });
var worker_1 = require("./worker");

@@ -14,3 +12,2 @@ Object.defineProperty(exports, "Worker", { enumerable: true, get: function () { return worker_1.Worker; } });

Object.defineProperty(exports, "JobDefinition", { enumerable: true, get: function () { return job_definition_1.JobDefinition; } });
var job_1 = require("./job");
Object.defineProperty(exports, "Job", { enumerable: true, get: function () { return job_1.Job; } });
exports.event = require("./event");
import { Config } from "./config";
import { Job } from "./job";
type EmptyObject = {
[key: string]: never;
};
export type JobID = number;
export type JobDefinitionDeferConfigParams<T extends object> = {
payload: T;
dependencies?: JobID[];
delayMs?: number;
};
export type JobDefinitionConstructorParams<T extends object> = {

@@ -11,5 +16,7 @@ config: Config;

jobFunction: (params: T) => Promise<void>;
numRetries?: number;
numAttempts?: number;
repeatIntervalMs?: T extends EmptyObject ? (number | null) : null;
releaseIntervalMs?: number;
lockIntervalMs?: number;
timeoutIntervalMs?: number;
};

@@ -21,15 +28,19 @@ export declare class JobDefinition<T extends object = EmptyObject> {

private jobFunction;
private numRetries;
private numAttempts;
private repeatIntervalMs;
private releaseIntervalMs;
private lockIntervalMs;
constructor({ config, name, channel, jobFunction, numRetries, repeatIntervalMs, lockIntervalMs }: JobDefinitionConstructorParams<T>);
getName: () => string;
getChannel: () => string;
getNumRetries: () => number;
getRepeatIntervalMs: () => number | null;
getTimeoutIntervalMs: () => number;
register: () => void;
defer: (payload: T) => Promise<Job>;
private timeoutIntervalMs;
private jobDefinitionID?;
constructor({ config, name, channel, jobFunction, numAttempts, repeatIntervalMs, releaseIntervalMs, lockIntervalMs, timeoutIntervalMs }: JobDefinitionConstructorParams<T>);
getName(): string;
getChannel(): string;
getNumAttempts(): number;
getRepeatIntervalMs(): number | null;
getTimeoutIntervalMs(): number;
initialize(): Promise<number>;
register(): void;
defer({ payload, dependencies, delayMs }: JobDefinitionDeferConfigParams<T>): Promise<JobID>;
run: (payload: T) => Promise<void>;
}
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobDefinition = void 0;
const job_1 = require("./job");
const insert_job_1 = require("./query/insert-job");
const insert_job_dependency_1 = require("./query/insert-job-dependency");
const upsert_job_definition_1 = require("./query/upsert-job-definition");
const utils_1 = require("./utils");
const defaultChannel = "_default";
const defaultNumRetries = 0;
const defaultTimeoutIntervalMs = 60_000;
const defaultNumAttempts = 1;
const defaultDelayMs = 0;
const defaultReleaseIntervalMs = 0;
const defaultLockIntervalMs = 60 * 1000;
const defaultTimeoutIntervalMs = 12 * 60 * 60 * 1000;
class JobDefinition {

@@ -13,6 +19,9 @@ config;

jobFunction;
numRetries;
numAttempts;
repeatIntervalMs;
releaseIntervalMs;
lockIntervalMs;
constructor({ config, name, channel = defaultChannel, jobFunction, numRetries = defaultNumRetries, repeatIntervalMs = null, lockIntervalMs = defaultTimeoutIntervalMs }) {
timeoutIntervalMs;
jobDefinitionID;
constructor({ config, name, channel = defaultChannel, jobFunction, numAttempts = defaultNumAttempts, repeatIntervalMs = null, releaseIntervalMs = defaultReleaseIntervalMs, lockIntervalMs = defaultLockIntervalMs, timeoutIntervalMs = defaultTimeoutIntervalMs }) {
this.config = config;

@@ -22,48 +31,68 @@ this.name = name;

this.jobFunction = jobFunction;
this.numRetries = numRetries;
this.numAttempts = numAttempts;
this.repeatIntervalMs = repeatIntervalMs;
this.releaseIntervalMs = releaseIntervalMs;
this.lockIntervalMs = lockIntervalMs;
this.timeoutIntervalMs = timeoutIntervalMs;
}
getName = () => {
getName() {
return this.name;
};
getChannel = () => {
}
getChannel() {
return this.channel;
};
getNumRetries = () => {
return this.numRetries;
};
getRepeatIntervalMs = () => {
}
getNumAttempts() {
return this.numAttempts;
}
getRepeatIntervalMs() {
return this.repeatIntervalMs;
};
getTimeoutIntervalMs = () => {
}
getTimeoutIntervalMs() {
return this.lockIntervalMs;
};
register = () => {
}
async initialize() {
const queryConfig = {
schema: this.config.getSchema(),
handle: this.config.getPool()
};
if (this.jobDefinitionID === undefined) {
this.jobDefinitionID = await (0, upsert_job_definition_1.upsertJobDefinition)(queryConfig)({
name: this.name,
channel: this.channel,
repeatIntervalMs: this.repeatIntervalMs ? (0, utils_1.max)(this.repeatIntervalMs, 0) : null,
releaseIntervalMs: (0, utils_1.max)(this.releaseIntervalMs, 0),
lockIntervalMs: (0, utils_1.max)(this.lockIntervalMs, 0),
timeoutIntervalMs: (0, utils_1.max)(this.timeoutIntervalMs, 0),
numAttempts: (0, utils_1.max)(this.numAttempts, 1)
}).then(result => result.jobDefinitionID);
}
return this.jobDefinitionID;
}
register() {
this.config.registerJobDefinition(this);
};
defer = async (payload) => {
const rows = await this.config.getPool().query(`
INSERT INTO "${this.config.getSchema()}"."job" (
"name",
"channel",
"payload",
"num_retries",
"lock_interval_ms",
"created_at"
) VALUES (
$1, $2, $3, $4, $5, NOW()
) RETURNING "id"
`, [
this.name,
this.channel,
JSON.stringify(payload),
this.numRetries,
this.lockIntervalMs
]);
return new job_1.Job({
jobID: rows.rows[0].id,
config: this.config
}
async defer({ payload, dependencies = [], delayMs = defaultDelayMs }) {
const jobDefinitionID = await this.initialize();
const pool = this.config.getPool();
const jobID = await (0, utils_1.transaction)(pool)(async (client) => {
const queryConfig = {
schema: this.config.getSchema(),
handle: client
};
const jobID = await (0, insert_job_1.insertJob)(queryConfig)({
jobDefinitionID,
payload,
numAttempts: this.numAttempts,
delayMs: (0, utils_1.max)(delayMs, 0)
}).then(result => result.jobID);
for (const dependency of dependencies) {
await (0, insert_job_dependency_1.insertJobDependency)(queryConfig)({
jobID,
jobIDDependency: dependency
});
}
return jobID;
});
};
return jobID;
}
run = (payload) => {

@@ -70,0 +99,0 @@ return this.jobFunction(payload);

@@ -0,1 +1,3 @@

import { Pool, PoolClient } from "pg";
type TransactionCallback<T> = (client: PoolClient) => Promise<T>;
export declare class Semaphore {

@@ -19,1 +21,3 @@ private count;

export declare const max: (num1: number, num2: number) => number;
export declare const transaction: (pool: Pool) => <T>(callback: TransactionCallback<T>) => Promise<T>;
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.max = exports.sleep = exports.Timer = exports.Semaphore = void 0;
exports.transaction = exports.max = exports.sleep = exports.Timer = exports.Semaphore = void 0;
class Semaphore {

@@ -54,1 +54,18 @@ count;

exports.max = max;
const transaction = (pool) => async (callback) => {
const client = await pool.connect();
await client.query("BEGIN TRANSACTION");
try {
const result = await callback(client);
await client.query("COMMIT TRANSACTION");
return result;
}
catch (err) {
await client.query("ROLLBACK TRANSACTION");
throw err;
}
finally {
await client.release();
}
};
exports.transaction = transaction;

@@ -17,7 +17,4 @@ import { Config } from "./config";

constructor({ config, channel, concurrency, processIntervalMs, name, eventHandler }: WorkerConstructorParams);
private dequeueJob;
private finalizeJob;
private decrementRetries;
private processJobs;
protected run: () => Promise<void>;
protected run(): Promise<void>;
}

@@ -5,2 +5,7 @@ "use strict";

const daemon_1 = require("./daemon");
const get_available_job_1 = require("./query/get-available-job");
const update_job_definition_release_state_1 = require("./query/update-job-definition-release-state");
const update_job_finalize_state_1 = require("./query/update-job-finalize-state");
const update_job_num_attempts_state_1 = require("./query/update-job-num-attempts-state");
const update_job_unlock_state_1 = require("./query/update-job-unlock-state");
const utils_1 = require("./utils");

@@ -23,93 +28,58 @@ const defaultName = "anon";

}
dequeueJob = async () => {
const client = await this.config.getPool().connect();
try {
await client.query("BEGIN TRANSACTION");
const rows = await client.query(`
SELECT
"id",
"name",
"payload",
"num_retries"
FROM "${this.config.getSchema()}"."job"
WHERE
"finalized_at" IS NULL AND
(
"locked_at" IS NULL OR
"locked_at" + ("lock_interval_ms" / 1000.0) * INTERVAL '1 second' < NOW()
) AND
"channel" = $1
FOR UPDATE SKIP LOCKED
LIMIT 1
`, [this.channel]);
if (rows.rows.length === 0) {
await client.query("ROLLBACK TRANSACTION");
return null;
}
const row = rows.rows[0];
const rowData = {
id: row.id,
name: row.name,
payload: row.payload,
numRetries: row.num_retries
};
await client.query(`
UPDATE "${this.config.getSchema()}"."job"
SET "locked_at" = NOW()
WHERE "id" = $1
`, [rowData.id]);
await client.query("COMMIT TRANSACTION");
return rowData;
}
catch (e) {
await client.query("ROLLBACK TRANSACTION");
throw e;
}
finally {
await client.release();
}
};
finalizeJob = async (jobID, isSuccess) => {
await this.config.getPool().query(`
UPDATE "${this.config.getSchema()}"."job" SET
"finalized_at" = NOW(),
"is_success" = $1
WHERE "id" = $2
`, [isSuccess, jobID]);
};
decrementRetries = async (jobID) => {
await this.config.getPool().query(`
UPDATE "${this.config.getSchema()}"."job" SET
"num_retries" = "num_retries" - 1
WHERE "id" = $1
`, [jobID]);
};
processJobs = async () => {
async processJobs() {
if (!this.processTimer.hasElapsed()) {
return;
}
this.processTimer.reset();
const pool = this.config.getPool();
while (!this.getShouldStop()) {
const rowData = await this.dequeueJob();
if (!rowData) {
const job = await (0, utils_1.transaction)(pool)(async (client) => {
const queryConfig = {
schema: this.config.getSchema(),
handle: client
};
const job = await (0, get_available_job_1.getAvailableJob)(queryConfig)({ channel: this.channel });
if (job) {
await (0, update_job_unlock_state_1.updateJobUnlockState)(queryConfig)({ jobID: job.jobID });
await (0, update_job_definition_release_state_1.updateJobDefinitionReleaseState)(queryConfig)({ jobDefinitionID: job.jobDefinitionID });
}
return job;
});
if (!job) {
break;
}
this.onEvent({
eventType: "worker-job-dequeued",
jobID: rowData.id,
jobName: rowData.name,
eventType: "worker-job-dequeue",
jobID: job?.jobID,
jobDefinitionName: job?.jobDefinitionName,
});
await this.semaphore.acquire();
const processFn = async () => {
const queryConfig = {
schema: this.config.getSchema(),
handle: pool
};
try {
const jobDefinition = await this.config.getJobDefinition(rowData.name);
if (jobDefinition === null) {
throw new Error(`Job definition not found for job name: ${rowData.name}`);
const jobDefinition = await this.config.getJobDefinition(job.jobDefinitionName);
if (!jobDefinition) {
await (0, update_job_finalize_state_1.updateJobFinalizeState)(queryConfig)({
isSuccess: false,
jobID: job.jobID
});
this.onEvent({
eventType: "worker-job-finalize-failure-orphaned",
jobID: job.jobID,
jobDefinitionName: job.jobDefinitionName,
});
return;
}
await jobDefinition.run(rowData.payload);
await this.finalizeJob(rowData.id, true);
await jobDefinition.run(job.payload);
await (0, update_job_finalize_state_1.updateJobFinalizeState)(queryConfig)({
isSuccess: true,
jobID: job.jobID
});
this.onEvent({
eventType: "worker-job-finalized",
jobID: rowData.id,
jobName: rowData.name,
isSuccess: true
eventType: "worker-job-finalize-success",
jobID: job.jobID,
jobDefinitionName: job.jobDefinitionName,
});

@@ -119,19 +89,8 @@ }

this.onEvent({
eventType: "worker-job-errored",
jobID: rowData.id,
jobName: rowData.name,
eventType: "worker-job-error",
jobID: job.jobID,
jobDefinitionName: job.jobDefinitionName,
error: e,
});
if (rowData.numRetries > 0) {
await this.decrementRetries(rowData.id);
}
else {
await this.finalizeJob(rowData.id, false);
this.onEvent({
eventType: "worker-job-finalized",
jobID: rowData.id,
jobName: rowData.name,
isSuccess: false
});
}
await (0, update_job_num_attempts_state_1.updateJobNumAttemptsState)(queryConfig)({ jobID: job.jobID });
}

@@ -145,4 +104,4 @@ finally {

this.processTimer.reset();
};
run = async () => {
}
async run() {
while (!this.getShouldStop()) {

@@ -155,4 +114,4 @@ await this.processJobs();

}
};
}
}
exports.Worker = Worker;
{
"name": "cuckmq",
"version": "0.1.3",
"version": "0.2.0",
"description": "A lightweight postgres-backed job queue",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -7,2 +7,12 @@ <div align="center">

### Core Features:
- built-in type safety.
- repeating/scheduled jobs.
- rate-limited jobs.
- job dependencies.
- delayed jobs.
- retryable jobs.
- self cleaning.
## Installation

@@ -24,6 +34,4 @@

Unlike other offerings, `cuckmq` requires you to manually provide a connection pool, allowing you to share a single connection pool across the entire app.
To start, we must first create a `Config` object. This contains a mapping of all jobs, a reference to a `pg` connection pool and the name of the schema in which `cuckmq` will work...
We must first create a `Config` object. This contains a mapping of all jobs, as well as information about the database connection pool:
```typescript

@@ -43,2 +51,4 @@ import { Pool } from "pg"

### Deferring Jobs
Now lets define our jobs:

@@ -60,25 +70,35 @@

```typescript
await pingJob.defer({ message: "Hello, World!" })
await pingJob.defer({ payload: { message: "Hello, World!" }})
```
Before we proceed, we must ensure all defined jobs are "registered" with the config. This can be done by calling:
### Running Jobs
```typescript
pingJob.register()
```
We must instantiate "daemons" in order to ensure oustanding jobs are processed. N.B. make sure job definitions are "registered" by calling `#register()` on each `JobDefinition` prior to constructing any daemons.
We can now instantiate worker daemons to process queued jobs.
```typescript
import { Worker } from "cuckmq"
import { Worker, Orchestrator } from "cuckmq"
import process from "process"
import { pingJob } from "./jobs"
const worker = new Worker({ config })
worker.start()
process.on("SIGINT", () => worker.stop())
pingJob.register()
// Create Worker daemon(s) and an Orchestrator daemon
// N.B. Daemons will start automatically once created.
const daemons = [
new Worker({config}),
new Orchestrator({config})
]
// Request all daemons to gracefully shutdown on SIGINT signal
process.on("SIGINT", () => {
daemons.forEach(d => d.setShouldStop())
})
// Wait until all daemons have gracefully shutdown.
await Promise.all(daemons.map(d => d.join()))
```
### Scheduled Jobs
### Repeatable Jobs
`cuckmq` supports scheduled jobs. They can be trivially defined by adding the `repeatIntervalMs` property to job definitions:
`cuckmq` supports repeatable jobs. They can be trivially defined by adding the `repeatIntervalMs` property to job definitions:

@@ -98,26 +118,4 @@ ```typescript

N.B. you are only able to specify a scheduled job/`repeatIntervalMs` if the type of `params` in the job function is an empty object.
N.B. you are only able to specify a repeatable job/`repeatIntervalMs` if the type of `params` in the job function is an empty object.
To ensure these jobs are enqueued as scheduled, we must setup a scheduler daemon:
```typescript
import { Scheduler } from "cuckmq"
const scheduler = new Scheduler({ config })
scheduler.start()
process.on("SIGINT", () => scheduler.stop())
```
### Sweeping Jobs
By default, `cuckmq` doesn't delete jobs from the database. To facilitiate, this, we setup a sweeper daemon:
```typescript
import { JobSweeper } from "cuckmq"
const sweeper = new JobSweeper({ config })
sweeper.start()
process.on("SIGINT", () => sweeper.stop())
```
### Advanced Usage

@@ -142,4 +140,6 @@

| `channel` | `string` | no | `_default` | an attribute that jobs are tagged with that workers can filter on |
| `numRetries` | `number` | no | `0` | the number of times a job can be retried after erroring before being finalized |
| `numAttempts` | `number` | no | `0` | the number of times a job can be attempted after erroring before being finalized |
| `repeatIntervalMs` | `number` | no | `null` | If defined, the interval between jobs being automatically scheduled |
| `releaseIntervalMs` | `number` | no | `0` | This defines the minimum amount of time that must elapse between jobs being released from the queue. Use this to perform rate limiting for certain jobs.
| `timeoutIntervalMs` | `number` | no | `12 * 60 * 60_000` | This defines the maximum amount of time that a job can exist before it "times out", resulting in the job being "finalized".
| `lockIntervalMs` | `number` | no | `60_000` | The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job |

@@ -159,3 +159,3 @@ | `jobFunction` | `<T extends object> (T) => Promise<void>` | yes | N/A | The definition of the function to process/perform the job |

#### **Scheduler#constructor**
#### **Orchestrator#constructor**

@@ -166,17 +166,10 @@ | Parameter | Type | Is Required | Default Value | Description |

| `name` | `string` | no | `anon` | A nickname for your scheduler daemon |
| `scheduleIntervalMs` | `number` | no | `30_000` | The amount of time a scheduler will sleep after failing to schedule a job before trying again |
| `heartbeatIntervalMs` | `number` | no | `60_000` | The interval at which the scheduler will update scheduled jobs with a "heartbeat", to stop them from going stale. The scheduler will also remove stale scheduled jobs at this time |
| `heartbeatCutoffMs` | `number` | no | `60_000 * 12` | The maximum amount of time after the last "heartbeat" before the scheduler considers a scheduled job as stale |
| `repeatIntervalMs` | `number` | no | `30_000` | The amount of time the orchestrator will wait after not finding a repeatable job to schedule before trying again |
| `heartbeatIntervalMs` | `number` | no | `60_000` | The amount of time the orchestrator will wait between updating the `heartbeat` state of all registered `JobDefinitions`. |
| `cleanIntervalMs` | `number` | no | `5 * 60_000` | The amount of time the orchestrator will wait between performing a database clean |
| `staleJobDefinitionSweepThresholdMs` | `number` | no | `60_000 * 60` | The maximum amount of time after the last "heartbeat" before the orchestrator considers a job definition as stale and tries to remove it |
| `finalizedJobSweepThresholdMs` | `number` | no | `12 * 60 * 60_000` | The maximum amount of time a finalized job will exist before the orchestrator attempts to remove it |
| `eventHandler` | `EventHandler` | no | N/A | A handler to listen to events emitted by the scheduler |
#### **JobSweeper#constructor**
| Parameter | Type | Is Required | Default Value | Description |
| --------- | ---- | ----------- | ------------- | ----------- |
| `config` | `cuckmq.Config` | yes | N/A | the instantiated `cuckmq` config object |
| `name` | `string` | no | `anon` | A nickname for your sweeper daemon |
| `sweepIntervalMs` | `number` | no | `10 * 60_000` | The amount of time a sweeper will sleep between sweeps of stale jobs |
| `jobAgeCutoffMs` | `number` | no | `to_ms(1 week)` | The maximum amount of time after a job is created before the sweeper considers a job as stale and thus ready for deletion.
| `eventHandler` | `EventHandler` | no | N/A | A handler to listen to events emitted by the sweeper |
#### **Events**

@@ -199,11 +192,11 @@

| ---- | ---------------- | ----------- |
| `DaemonStarted` | `daemon-started` | The daemon starts (via `.start()`)
| `DaemonStopSignalSent` | `daemon-stop-signal-sent` | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown |
| `DaemonStopped` | `daemon-stopped` | The daemon stops |
| `WorkerJobDequeued` | `worker-job-dequeued` | A worker daemon pulls a job from the database for processing |
| `WorkerJobFinalized` | `worker-job-finalized` | A job has been marked as completed/finalized. This happens if the job succeeds, or if it fails with no more retries available |
| `WorkerJobErrored` | `worker-job-errored` | A job that a worker tried to run has thrown an error |
| `SchedulerJobScheduled` | `scheduler-job-scheduled` | The scheduler has enqueued a scheduled/periodic job to be run |
| `SchedulerHeartbeat` | `scheduler-heartbeat` | The scheduler has updated the heartbeat of at least one scheduled job |
| `SweeperJobsSwept` | `sweeper-jobs-swept` | The job sweeper has cleaned/deleted at least one stale job |
| `DaemonStart` | `daemon-start` | The daemon starts (via `.start()`)
| `DaemonStopSignalSend` | `daemon-stop-signal-send` | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown |
| `DaemonStop` | `daemon-stop` | The daemon stops |
| `WorkerJobDequeue` | `worker-job-dequeue` | A worker daemon pulls a job from the database for processing |
| `WorkerJobFinalizeSuccess` | `worker-job-finalize-success` | A job has been successfully run by the worker |
| `WorkerJobFinalizeFailureOrphaned` | `worker-job-finalize-failure-orphaned` | A job has been finalized because the worker is unable to find an associated `JobDefinition` |
| `WorkerJobError` | `worker-job-error` | A job that a worker tried to run has thrown an error |
| `OrchestratorJobSchedule` | `orchestrator-job-schedule` | The orchestrator has enqueued a scheduled/periodic job to be run |
| `OrchestratorHeartbeat` | `orchestrator-heartbeat` | The orchestrator has updated the heartbeat of at least one scheduled job |

@@ -210,0 +203,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc