cuckmq
Advanced tools
Comparing version 0.1.3 to 0.2.0
@@ -13,7 +13,7 @@ import { Pool } from "pg"; | ||
getJobDefinitions: () => JobDefinition<any>[]; | ||
getJobDefinition: (name: string) => JobDefinition<any> | null; | ||
getJobDefinition: (name: string) => JobDefinition<any> | undefined; | ||
registerJobDefinition: (jobDefinition: JobDefinition<any>) => void; | ||
getSchema: () => string; | ||
getPool: () => Pool; | ||
getSchema: () => string; | ||
prepareSchema: () => Promise<void>; | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Config = void 0; | ||
const defaultSchemaName = "_cuckmq"; | ||
const prepare_schema_1 = require("./query/prepare-schema"); | ||
const defaultSchema = "_cuckmq"; | ||
class Config { | ||
@@ -9,3 +10,3 @@ pool; | ||
jobDefinitions; | ||
constructor({ pool, schema = defaultSchemaName }) { | ||
constructor({ pool, schema = defaultSchema }) { | ||
this.pool = pool; | ||
@@ -19,3 +20,3 @@ this.schema = schema; | ||
getJobDefinition = (name) => { | ||
return this.jobDefinitions[name] || null; | ||
return this.jobDefinitions[name]; | ||
}; | ||
@@ -25,43 +26,15 @@ registerJobDefinition = (jobDefinition) => { | ||
}; | ||
getSchema = () => { | ||
return this.schema; | ||
}; | ||
getPool = () => { | ||
return this.pool; | ||
}; | ||
getSchema = () => { | ||
return this.schema; | ||
}; | ||
prepareSchema = async () => { | ||
await this.pool.query(` | ||
CREATE SCHEMA IF NOT EXISTS "${this.schema}" | ||
`); | ||
await this.pool.query(` | ||
CREATE TABLE IF NOT EXISTS "${this.schema}"."scheduled_job" ( | ||
"id" SERIAL, | ||
"name" TEXT NOT NULL, | ||
"repeat_interval_ms" INTEGER NOT NULL, | ||
"last_repeated_at" TIMESTAMP NULL, | ||
"last_heartbeat_at" TIMESTAMP NULL, | ||
PRIMARY KEY ("id") | ||
) | ||
`); | ||
await this.pool.query(` | ||
CREATE UNIQUE INDEX IF NOT EXISTS "scheduled_job_name_idx" ON | ||
"${this.schema}"."scheduled_job" ("name") | ||
`); | ||
await this.pool.query(` | ||
CREATE TABLE IF NOT EXISTS "${this.schema}"."job" ( | ||
"id" SERIAL, | ||
"name" TEXT NOT NULL, | ||
"channel" TEXT NOT NULL, | ||
"payload" JSONB NOT NULL, | ||
"num_retries" INTEGER NOT NULL, | ||
"lock_interval_ms" INTEGER NOT NULL, | ||
"created_at" TIMESTAMP NOT NULL DEFAULT NOW(), | ||
"locked_at" TIMESTAMP NULL, | ||
"finalized_at" TIMESTAMP NULL, | ||
"is_success" BOOLEAN NULL, | ||
PRIMARY KEY ("id") | ||
) | ||
`); | ||
await (0, prepare_schema_1.prepareSchema)({ | ||
schema: this.schema, | ||
handle: this.pool | ||
})(); | ||
}; | ||
} | ||
exports.Config = Config; |
@@ -8,2 +8,3 @@ import { Event } from "./event"; | ||
}) => void; | ||
export type StopCallback = () => void; | ||
export declare class Daemon { | ||
@@ -16,7 +17,8 @@ private shouldStop; | ||
constructor(name: string, eventHandler?: DaemonEventHandler); | ||
protected getShouldStop: () => boolean; | ||
protected onEvent: (event: Event) => void; | ||
protected run: () => Promise<void>; | ||
start: () => Promise<void>; | ||
stop: () => Promise<void>; | ||
protected getShouldStop(): boolean; | ||
protected onEvent(event: Event): void; | ||
protected run(): Promise<void>; | ||
protected snooze(): Promise<void>; | ||
join(): Promise<void>; | ||
setShouldStop(): Promise<void>; | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Daemon = void 0; | ||
const utils_1 = require("./utils"); | ||
let daemonID = 0; | ||
const snoozeMs = 50; | ||
class Daemon { | ||
@@ -18,7 +20,11 @@ shouldStop; | ||
daemonID += 1; | ||
this.onEvent({ eventType: "daemon-start" }); | ||
this.runPromise = Promise.resolve() | ||
.then(() => this.run()) | ||
.then(() => this.onEvent({ eventType: "daemon-stop" })); | ||
} | ||
getShouldStop = () => { | ||
getShouldStop() { | ||
return this.shouldStop; | ||
}; | ||
onEvent = (event) => { | ||
} | ||
onEvent(event) { | ||
if (this.eventHandler === undefined) { | ||
@@ -33,20 +39,17 @@ return; | ||
}); | ||
}; | ||
run = () => { | ||
} | ||
run() { | ||
throw new Error("Not implemented"); | ||
}; | ||
start = () => { | ||
if (this.runPromise === null) { | ||
this.onEvent({ eventType: "daemon-started" }); | ||
this.runPromise = this.run() | ||
.then(() => this.onEvent({ eventType: "daemon-stopped" })); | ||
} | ||
return this.runPromise; | ||
}; | ||
stop = async () => { | ||
} | ||
async snooze() { | ||
await (0, utils_1.sleep)(snoozeMs); | ||
} | ||
async join() { | ||
await this.runPromise; | ||
} | ||
async setShouldStop() { | ||
this.shouldStop = true; | ||
this.onEvent({ eventType: "daemon-stop-signal-sent" }); | ||
await this.runPromise; | ||
}; | ||
this.onEvent({ eventType: "daemon-stop-signal-send" }); | ||
} | ||
} | ||
exports.Daemon = Daemon; |
@@ -1,38 +0,68 @@ | ||
export type DaemonStarted = { | ||
eventType: "daemon-started"; | ||
export type DaemonStart = { | ||
eventType: "daemon-start"; | ||
}; | ||
export type DaemonStopSignalSent = { | ||
eventType: "daemon-stop-signal-sent"; | ||
export type DaemonStopSignalSend = { | ||
eventType: "daemon-stop-signal-send"; | ||
}; | ||
export type DaemonStopped = { | ||
eventType: "daemon-stopped"; | ||
export type DaemonStop = { | ||
eventType: "daemon-stop"; | ||
}; | ||
export type WorkerJobDequeued = { | ||
eventType: "worker-job-dequeued"; | ||
export type WorkerJobDequeue = { | ||
eventType: "worker-job-dequeue"; | ||
jobID: number; | ||
jobName: string; | ||
jobDefinitionName: string; | ||
}; | ||
export type WorkerJobFinalized = { | ||
eventType: "worker-job-finalized"; | ||
export type WorkerJobFinalizeSuccess = { | ||
eventType: "worker-job-finalize-success"; | ||
jobID: number; | ||
jobName: string; | ||
isSuccess: boolean; | ||
jobDefinitionName: string; | ||
}; | ||
export type WorkerJobErrored = { | ||
eventType: "worker-job-errored"; | ||
export type WorkerJobFinalizeFailureOrphaned = { | ||
eventType: "worker-job-finalize-failure-orphaned"; | ||
jobID: number; | ||
jobName: string; | ||
jobDefinitionName: string; | ||
}; | ||
export type WorkerJobError = { | ||
eventType: "worker-job-error"; | ||
jobID: number; | ||
jobDefinitionName: string; | ||
error: any; | ||
}; | ||
export type SchedulerJobScheduled = { | ||
eventType: "scheduler-job-scheduled"; | ||
export type OrchestratorJobRepeat = { | ||
eventType: "orchestrator-job-repeat"; | ||
jobID: number; | ||
jobName: string; | ||
jobDefinitionName: string; | ||
}; | ||
export type SchedulerHeartbeat = { | ||
eventType: "scheduler-heartbeat"; | ||
export type OrchestratorJobDefinitionInitialize = { | ||
eventType: "orchestrator-job-definition-initialize"; | ||
jobDefinitionName: string; | ||
}; | ||
export type SweeperJobsSwept = { | ||
eventType: "sweeper-jobs-swept"; | ||
export type OrchestratorJobDefinitionHeartbeat = { | ||
eventType: "orchestrator-job-definition-heartbeat"; | ||
jobDefinitionName: string; | ||
}; | ||
export type Event = DaemonStarted | DaemonStopSignalSent | DaemonStopped | WorkerJobDequeued | WorkerJobFinalized | WorkerJobErrored | SchedulerJobScheduled | SchedulerHeartbeat | SweeperJobsSwept; | ||
export type OrchestratorJobFinalizeFailureTimeout = { | ||
eventType: "orchestrator-job-finalize-failure-timeout"; | ||
jobID: number; | ||
jobDefinitionName: string; | ||
}; | ||
export type OrchestratorJobFinalizeFailureUnmetDependencies = { | ||
eventType: "orchestrator-job-finalize-failure-unmet-dependencies"; | ||
jobID: number; | ||
jobDefinitionName: string; | ||
}; | ||
export type OrchestratorJobFinalizeFailureNoAttemptsRemaining = { | ||
eventType: "orchestrator-job-finalize-failure-no-attempts-remaining"; | ||
jobID: number; | ||
jobDefinitionName: string; | ||
}; | ||
export type OrchestratorFinalizedJobSweep = { | ||
eventType: "orchestrator-finalized-job-sweep"; | ||
jobID: number; | ||
jobDefinitionName: string; | ||
}; | ||
export type OrchestratorStaleJobDefinitionSweep = { | ||
eventType: "orchestrator-stale-job-definition-sweep"; | ||
jobDefinitionName: string; | ||
}; | ||
export type Event = DaemonStart | DaemonStopSignalSend | DaemonStop | WorkerJobDequeue | WorkerJobError | WorkerJobFinalizeSuccess | WorkerJobFinalizeFailureOrphaned | OrchestratorJobDefinitionInitialize | OrchestratorJobDefinitionHeartbeat | OrchestratorJobRepeat | OrchestratorJobFinalizeFailureTimeout | OrchestratorJobFinalizeFailureUnmetDependencies | OrchestratorJobFinalizeFailureNoAttemptsRemaining | OrchestratorFinalizedJobSweep | OrchestratorStaleJobDefinitionSweep; |
export { Config } from "./config"; | ||
export { JobSweeper } from "./job-sweeper"; | ||
export { Scheduler } from "./scheduler"; | ||
export { Orchestrator } from "./orchestrator"; | ||
export { Worker } from "./worker"; | ||
export { JobDefinition } from "./job-definition"; | ||
export { Job } from "./job"; | ||
export { DaemonStarted, DaemonStopSignalSent, DaemonStopped, WorkerJobDequeued, WorkerJobFinalized, WorkerJobErrored, SchedulerJobScheduled, SchedulerHeartbeat, SweeperJobsSwept, Event } from "./event"; | ||
export { DaemonEventHandler } from "./daemon"; | ||
export * as event from "./event"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Job = exports.JobDefinition = exports.Worker = exports.Scheduler = exports.JobSweeper = exports.Config = void 0; | ||
exports.event = exports.JobDefinition = exports.Worker = exports.Orchestrator = exports.Config = void 0; | ||
var config_1 = require("./config"); | ||
Object.defineProperty(exports, "Config", { enumerable: true, get: function () { return config_1.Config; } }); | ||
var job_sweeper_1 = require("./job-sweeper"); | ||
Object.defineProperty(exports, "JobSweeper", { enumerable: true, get: function () { return job_sweeper_1.JobSweeper; } }); | ||
var scheduler_1 = require("./scheduler"); | ||
Object.defineProperty(exports, "Scheduler", { enumerable: true, get: function () { return scheduler_1.Scheduler; } }); | ||
var orchestrator_1 = require("./orchestrator"); | ||
Object.defineProperty(exports, "Orchestrator", { enumerable: true, get: function () { return orchestrator_1.Orchestrator; } }); | ||
var worker_1 = require("./worker"); | ||
@@ -14,3 +12,2 @@ Object.defineProperty(exports, "Worker", { enumerable: true, get: function () { return worker_1.Worker; } }); | ||
Object.defineProperty(exports, "JobDefinition", { enumerable: true, get: function () { return job_definition_1.JobDefinition; } }); | ||
var job_1 = require("./job"); | ||
Object.defineProperty(exports, "Job", { enumerable: true, get: function () { return job_1.Job; } }); | ||
exports.event = require("./event"); |
import { Config } from "./config"; | ||
import { Job } from "./job"; | ||
type EmptyObject = { | ||
[key: string]: never; | ||
}; | ||
export type JobID = number; | ||
export type JobDefinitionDeferConfigParams<T extends object> = { | ||
payload: T; | ||
dependencies?: JobID[]; | ||
delayMs?: number; | ||
}; | ||
export type JobDefinitionConstructorParams<T extends object> = { | ||
@@ -11,5 +16,7 @@ config: Config; | ||
jobFunction: (params: T) => Promise<void>; | ||
numRetries?: number; | ||
numAttempts?: number; | ||
repeatIntervalMs?: T extends EmptyObject ? (number | null) : null; | ||
releaseIntervalMs?: number; | ||
lockIntervalMs?: number; | ||
timeoutIntervalMs?: number; | ||
}; | ||
@@ -21,15 +28,19 @@ export declare class JobDefinition<T extends object = EmptyObject> { | ||
private jobFunction; | ||
private numRetries; | ||
private numAttempts; | ||
private repeatIntervalMs; | ||
private releaseIntervalMs; | ||
private lockIntervalMs; | ||
constructor({ config, name, channel, jobFunction, numRetries, repeatIntervalMs, lockIntervalMs }: JobDefinitionConstructorParams<T>); | ||
getName: () => string; | ||
getChannel: () => string; | ||
getNumRetries: () => number; | ||
getRepeatIntervalMs: () => number | null; | ||
getTimeoutIntervalMs: () => number; | ||
register: () => void; | ||
defer: (payload: T) => Promise<Job>; | ||
private timeoutIntervalMs; | ||
private jobDefinitionID?; | ||
constructor({ config, name, channel, jobFunction, numAttempts, repeatIntervalMs, releaseIntervalMs, lockIntervalMs, timeoutIntervalMs }: JobDefinitionConstructorParams<T>); | ||
getName(): string; | ||
getChannel(): string; | ||
getNumAttempts(): number; | ||
getRepeatIntervalMs(): number | null; | ||
getTimeoutIntervalMs(): number; | ||
initialize(): Promise<number>; | ||
register(): void; | ||
defer({ payload, dependencies, delayMs }: JobDefinitionDeferConfigParams<T>): Promise<JobID>; | ||
run: (payload: T) => Promise<void>; | ||
} | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.JobDefinition = void 0; | ||
const job_1 = require("./job"); | ||
const insert_job_1 = require("./query/insert-job"); | ||
const insert_job_dependency_1 = require("./query/insert-job-dependency"); | ||
const upsert_job_definition_1 = require("./query/upsert-job-definition"); | ||
const utils_1 = require("./utils"); | ||
const defaultChannel = "_default"; | ||
const defaultNumRetries = 0; | ||
const defaultTimeoutIntervalMs = 60_000; | ||
const defaultNumAttempts = 1; | ||
const defaultDelayMs = 0; | ||
const defaultReleaseIntervalMs = 0; | ||
const defaultLockIntervalMs = 60 * 1000; | ||
const defaultTimeoutIntervalMs = 12 * 60 * 60 * 1000; | ||
class JobDefinition { | ||
@@ -13,6 +19,9 @@ config; | ||
jobFunction; | ||
numRetries; | ||
numAttempts; | ||
repeatIntervalMs; | ||
releaseIntervalMs; | ||
lockIntervalMs; | ||
constructor({ config, name, channel = defaultChannel, jobFunction, numRetries = defaultNumRetries, repeatIntervalMs = null, lockIntervalMs = defaultTimeoutIntervalMs }) { | ||
timeoutIntervalMs; | ||
jobDefinitionID; | ||
constructor({ config, name, channel = defaultChannel, jobFunction, numAttempts = defaultNumAttempts, repeatIntervalMs = null, releaseIntervalMs = defaultReleaseIntervalMs, lockIntervalMs = defaultLockIntervalMs, timeoutIntervalMs = defaultTimeoutIntervalMs }) { | ||
this.config = config; | ||
@@ -22,48 +31,68 @@ this.name = name; | ||
this.jobFunction = jobFunction; | ||
this.numRetries = numRetries; | ||
this.numAttempts = numAttempts; | ||
this.repeatIntervalMs = repeatIntervalMs; | ||
this.releaseIntervalMs = releaseIntervalMs; | ||
this.lockIntervalMs = lockIntervalMs; | ||
this.timeoutIntervalMs = timeoutIntervalMs; | ||
} | ||
getName = () => { | ||
getName() { | ||
return this.name; | ||
}; | ||
getChannel = () => { | ||
} | ||
getChannel() { | ||
return this.channel; | ||
}; | ||
getNumRetries = () => { | ||
return this.numRetries; | ||
}; | ||
getRepeatIntervalMs = () => { | ||
} | ||
getNumAttempts() { | ||
return this.numAttempts; | ||
} | ||
getRepeatIntervalMs() { | ||
return this.repeatIntervalMs; | ||
}; | ||
getTimeoutIntervalMs = () => { | ||
} | ||
getTimeoutIntervalMs() { | ||
return this.lockIntervalMs; | ||
}; | ||
register = () => { | ||
} | ||
async initialize() { | ||
const queryConfig = { | ||
schema: this.config.getSchema(), | ||
handle: this.config.getPool() | ||
}; | ||
if (this.jobDefinitionID === undefined) { | ||
this.jobDefinitionID = await (0, upsert_job_definition_1.upsertJobDefinition)(queryConfig)({ | ||
name: this.name, | ||
channel: this.channel, | ||
repeatIntervalMs: this.repeatIntervalMs ? (0, utils_1.max)(this.repeatIntervalMs, 0) : null, | ||
releaseIntervalMs: (0, utils_1.max)(this.releaseIntervalMs, 0), | ||
lockIntervalMs: (0, utils_1.max)(this.lockIntervalMs, 0), | ||
timeoutIntervalMs: (0, utils_1.max)(this.timeoutIntervalMs, 0), | ||
numAttempts: (0, utils_1.max)(this.numAttempts, 1) | ||
}).then(result => result.jobDefinitionID); | ||
} | ||
return this.jobDefinitionID; | ||
} | ||
register() { | ||
this.config.registerJobDefinition(this); | ||
}; | ||
defer = async (payload) => { | ||
const rows = await this.config.getPool().query(` | ||
INSERT INTO "${this.config.getSchema()}"."job" ( | ||
"name", | ||
"channel", | ||
"payload", | ||
"num_retries", | ||
"lock_interval_ms", | ||
"created_at" | ||
) VALUES ( | ||
$1, $2, $3, $4, $5, NOW() | ||
) RETURNING "id" | ||
`, [ | ||
this.name, | ||
this.channel, | ||
JSON.stringify(payload), | ||
this.numRetries, | ||
this.lockIntervalMs | ||
]); | ||
return new job_1.Job({ | ||
jobID: rows.rows[0].id, | ||
config: this.config | ||
} | ||
async defer({ payload, dependencies = [], delayMs = defaultDelayMs }) { | ||
const jobDefinitionID = await this.initialize(); | ||
const pool = this.config.getPool(); | ||
const jobID = await (0, utils_1.transaction)(pool)(async (client) => { | ||
const queryConfig = { | ||
schema: this.config.getSchema(), | ||
handle: client | ||
}; | ||
const jobID = await (0, insert_job_1.insertJob)(queryConfig)({ | ||
jobDefinitionID, | ||
payload, | ||
numAttempts: this.numAttempts, | ||
delayMs: (0, utils_1.max)(delayMs, 0) | ||
}).then(result => result.jobID); | ||
for (const dependency of dependencies) { | ||
await (0, insert_job_dependency_1.insertJobDependency)(queryConfig)({ | ||
jobID, | ||
jobIDDependency: dependency | ||
}); | ||
} | ||
return jobID; | ||
}); | ||
}; | ||
return jobID; | ||
} | ||
run = (payload) => { | ||
@@ -70,0 +99,0 @@ return this.jobFunction(payload); |
@@ -0,1 +1,3 @@ | ||
import { Pool, PoolClient } from "pg"; | ||
type TransactionCallback<T> = (client: PoolClient) => Promise<T>; | ||
export declare class Semaphore { | ||
@@ -19,1 +21,3 @@ private count; | ||
export declare const max: (num1: number, num2: number) => number; | ||
export declare const transaction: (pool: Pool) => <T>(callback: TransactionCallback<T>) => Promise<T>; | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.max = exports.sleep = exports.Timer = exports.Semaphore = void 0; | ||
exports.transaction = exports.max = exports.sleep = exports.Timer = exports.Semaphore = void 0; | ||
class Semaphore { | ||
@@ -54,1 +54,18 @@ count; | ||
exports.max = max; | ||
const transaction = (pool) => async (callback) => { | ||
const client = await pool.connect(); | ||
await client.query("BEGIN TRANSACTION"); | ||
try { | ||
const result = await callback(client); | ||
await client.query("COMMIT TRANSACTION"); | ||
return result; | ||
} | ||
catch (err) { | ||
await client.query("ROLLBACK TRANSACTION"); | ||
throw err; | ||
} | ||
finally { | ||
await client.release(); | ||
} | ||
}; | ||
exports.transaction = transaction; |
@@ -17,7 +17,4 @@ import { Config } from "./config"; | ||
constructor({ config, channel, concurrency, processIntervalMs, name, eventHandler }: WorkerConstructorParams); | ||
private dequeueJob; | ||
private finalizeJob; | ||
private decrementRetries; | ||
private processJobs; | ||
protected run: () => Promise<void>; | ||
protected run(): Promise<void>; | ||
} |
@@ -5,2 +5,7 @@ "use strict"; | ||
const daemon_1 = require("./daemon"); | ||
const get_available_job_1 = require("./query/get-available-job"); | ||
const update_job_definition_release_state_1 = require("./query/update-job-definition-release-state"); | ||
const update_job_finalize_state_1 = require("./query/update-job-finalize-state"); | ||
const update_job_num_attempts_state_1 = require("./query/update-job-num-attempts-state"); | ||
const update_job_unlock_state_1 = require("./query/update-job-unlock-state"); | ||
const utils_1 = require("./utils"); | ||
@@ -23,93 +28,58 @@ const defaultName = "anon"; | ||
} | ||
dequeueJob = async () => { | ||
const client = await this.config.getPool().connect(); | ||
try { | ||
await client.query("BEGIN TRANSACTION"); | ||
const rows = await client.query(` | ||
SELECT | ||
"id", | ||
"name", | ||
"payload", | ||
"num_retries" | ||
FROM "${this.config.getSchema()}"."job" | ||
WHERE | ||
"finalized_at" IS NULL AND | ||
( | ||
"locked_at" IS NULL OR | ||
"locked_at" + ("lock_interval_ms" / 1000.0) * INTERVAL '1 second' < NOW() | ||
) AND | ||
"channel" = $1 | ||
FOR UPDATE SKIP LOCKED | ||
LIMIT 1 | ||
`, [this.channel]); | ||
if (rows.rows.length === 0) { | ||
await client.query("ROLLBACK TRANSACTION"); | ||
return null; | ||
} | ||
const row = rows.rows[0]; | ||
const rowData = { | ||
id: row.id, | ||
name: row.name, | ||
payload: row.payload, | ||
numRetries: row.num_retries | ||
}; | ||
await client.query(` | ||
UPDATE "${this.config.getSchema()}"."job" | ||
SET "locked_at" = NOW() | ||
WHERE "id" = $1 | ||
`, [rowData.id]); | ||
await client.query("COMMIT TRANSACTION"); | ||
return rowData; | ||
} | ||
catch (e) { | ||
await client.query("ROLLBACK TRANSACTION"); | ||
throw e; | ||
} | ||
finally { | ||
await client.release(); | ||
} | ||
}; | ||
finalizeJob = async (jobID, isSuccess) => { | ||
await this.config.getPool().query(` | ||
UPDATE "${this.config.getSchema()}"."job" SET | ||
"finalized_at" = NOW(), | ||
"is_success" = $1 | ||
WHERE "id" = $2 | ||
`, [isSuccess, jobID]); | ||
}; | ||
decrementRetries = async (jobID) => { | ||
await this.config.getPool().query(` | ||
UPDATE "${this.config.getSchema()}"."job" SET | ||
"num_retries" = "num_retries" - 1 | ||
WHERE "id" = $1 | ||
`, [jobID]); | ||
}; | ||
processJobs = async () => { | ||
async processJobs() { | ||
if (!this.processTimer.hasElapsed()) { | ||
return; | ||
} | ||
this.processTimer.reset(); | ||
const pool = this.config.getPool(); | ||
while (!this.getShouldStop()) { | ||
const rowData = await this.dequeueJob(); | ||
if (!rowData) { | ||
const job = await (0, utils_1.transaction)(pool)(async (client) => { | ||
const queryConfig = { | ||
schema: this.config.getSchema(), | ||
handle: client | ||
}; | ||
const job = await (0, get_available_job_1.getAvailableJob)(queryConfig)({ channel: this.channel }); | ||
if (job) { | ||
await (0, update_job_unlock_state_1.updateJobUnlockState)(queryConfig)({ jobID: job.jobID }); | ||
await (0, update_job_definition_release_state_1.updateJobDefinitionReleaseState)(queryConfig)({ jobDefinitionID: job.jobDefinitionID }); | ||
} | ||
return job; | ||
}); | ||
if (!job) { | ||
break; | ||
} | ||
this.onEvent({ | ||
eventType: "worker-job-dequeued", | ||
jobID: rowData.id, | ||
jobName: rowData.name, | ||
eventType: "worker-job-dequeue", | ||
jobID: job?.jobID, | ||
jobDefinitionName: job?.jobDefinitionName, | ||
}); | ||
await this.semaphore.acquire(); | ||
const processFn = async () => { | ||
const queryConfig = { | ||
schema: this.config.getSchema(), | ||
handle: pool | ||
}; | ||
try { | ||
const jobDefinition = await this.config.getJobDefinition(rowData.name); | ||
if (jobDefinition === null) { | ||
throw new Error(`Job definition not found for job name: ${rowData.name}`); | ||
const jobDefinition = await this.config.getJobDefinition(job.jobDefinitionName); | ||
if (!jobDefinition) { | ||
await (0, update_job_finalize_state_1.updateJobFinalizeState)(queryConfig)({ | ||
isSuccess: false, | ||
jobID: job.jobID | ||
}); | ||
this.onEvent({ | ||
eventType: "worker-job-finalize-failure-orphaned", | ||
jobID: job.jobID, | ||
jobDefinitionName: job.jobDefinitionName, | ||
}); | ||
return; | ||
} | ||
await jobDefinition.run(rowData.payload); | ||
await this.finalizeJob(rowData.id, true); | ||
await jobDefinition.run(job.payload); | ||
await (0, update_job_finalize_state_1.updateJobFinalizeState)(queryConfig)({ | ||
isSuccess: true, | ||
jobID: job.jobID | ||
}); | ||
this.onEvent({ | ||
eventType: "worker-job-finalized", | ||
jobID: rowData.id, | ||
jobName: rowData.name, | ||
isSuccess: true | ||
eventType: "worker-job-finalize-success", | ||
jobID: job.jobID, | ||
jobDefinitionName: job.jobDefinitionName, | ||
}); | ||
@@ -119,19 +89,8 @@ } | ||
this.onEvent({ | ||
eventType: "worker-job-errored", | ||
jobID: rowData.id, | ||
jobName: rowData.name, | ||
eventType: "worker-job-error", | ||
jobID: job.jobID, | ||
jobDefinitionName: job.jobDefinitionName, | ||
error: e, | ||
}); | ||
if (rowData.numRetries > 0) { | ||
await this.decrementRetries(rowData.id); | ||
} | ||
else { | ||
await this.finalizeJob(rowData.id, false); | ||
this.onEvent({ | ||
eventType: "worker-job-finalized", | ||
jobID: rowData.id, | ||
jobName: rowData.name, | ||
isSuccess: false | ||
}); | ||
} | ||
await (0, update_job_num_attempts_state_1.updateJobNumAttemptsState)(queryConfig)({ jobID: job.jobID }); | ||
} | ||
@@ -145,4 +104,4 @@ finally { | ||
this.processTimer.reset(); | ||
}; | ||
run = async () => { | ||
} | ||
async run() { | ||
while (!this.getShouldStop()) { | ||
@@ -155,4 +114,4 @@ await this.processJobs(); | ||
} | ||
}; | ||
} | ||
} | ||
exports.Worker = Worker; |
{ | ||
"name": "cuckmq", | ||
"version": "0.1.3", | ||
"version": "0.2.0", | ||
"description": "A lightweight postgres-backed job queue", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
117
README.md
@@ -7,2 +7,12 @@ <div align="center"> | ||
### Core Features: | ||
- built-in type safety. | ||
- repeating/scheduled jobs. | ||
- rate-limited jobs. | ||
- job dependencies. | ||
- delayed jobs. | ||
- retryable jobs. | ||
- self cleaning. | ||
## Installation | ||
@@ -24,6 +34,4 @@ | ||
Unlike other offerings, `cuckmq` requires you to manually provide a connection pool, allowing you to share a single connection pool across the entire app. | ||
To start, we must first create a `Config` object. This contains a mapping of all jobs, a reference to a `pg` connection pool and the name of the schema in which `cuckmq` will work... | ||
We must first create a `Config` object. This contains a mapping of all jobs, as well as information about the database connection pool: | ||
```typescript | ||
@@ -43,2 +51,4 @@ import { Pool } from "pg" | ||
### Deferring Jobs | ||
Now lets define our jobs: | ||
@@ -60,25 +70,35 @@ | ||
```typescript | ||
await pingJob.defer({ message: "Hello, World!" }) | ||
await pingJob.defer({ payload: { message: "Hello, World!" }}) | ||
``` | ||
Before we proceed, we must ensure all defined jobs are "registered" with the config. This can be done by calling: | ||
### Running Jobs | ||
```typescript | ||
pingJob.register() | ||
``` | ||
We must instantiate "daemons" in order to ensure oustanding jobs are processed. N.B. make sure job definitions are "registered" by calling `#register()` on each `JobDefinition` prior to constructing any daemons. | ||
We can now instantiate worker daemons to process queued jobs. | ||
```typescript | ||
import { Worker } from "cuckmq" | ||
import { Worker, Orchestrator } from "cuckmq" | ||
import process from "process" | ||
import { pingJob } from "./jobs" | ||
const worker = new Worker({ config }) | ||
worker.start() | ||
process.on("SIGINT", () => worker.stop()) | ||
pingJob.register() | ||
// Create Worker daemon(s) and an Orchestrator daemon | ||
// N.B. Daemons will start automatically once created. | ||
const daemons = [ | ||
new Worker({config}), | ||
new Orchestrator({config}) | ||
] | ||
// Request all daemons to gracefully shutdown on SIGINT signal | ||
process.on("SIGINT", () => { | ||
daemons.forEach(d => d.setShouldStop()) | ||
}) | ||
// Wait until all daemons have gracefully shutdown. | ||
await Promise.all(daemons.map(d => d.join())) | ||
``` | ||
### Scheduled Jobs | ||
### Repeatable Jobs | ||
`cuckmq` supports scheduled jobs. They can be trivially defined by adding the `repeatIntervalMs` property to job definitions: | ||
`cuckmq` supports repeatable jobs. They can be trivially defined by adding the `repeatIntervalMs` property to job definitions: | ||
@@ -98,26 +118,4 @@ ```typescript | ||
N.B. you are only able to specify a scheduled job/`repeatIntervalMs` if the type of `params` in the job function is an empty object. | ||
N.B. you are only able to specify a repeatable job/`repeatIntervalMs` if the type of `params` in the job function is an empty object. | ||
To ensure these jobs are enqueued as scheduled, we must setup a scheduler daemon: | ||
```typescript | ||
import { Scheduler } from "cuckmq" | ||
const scheduler = new Scheduler({ config }) | ||
scheduler.start() | ||
process.on("SIGINT", () => scheduler.stop()) | ||
``` | ||
### Sweeping Jobs | ||
By default, `cuckmq` doesn't delete jobs from the database. To facilitiate, this, we setup a sweeper daemon: | ||
```typescript | ||
import { JobSweeper } from "cuckmq" | ||
const sweeper = new JobSweeper({ config }) | ||
sweeper.start() | ||
process.on("SIGINT", () => sweeper.stop()) | ||
``` | ||
### Advanced Usage | ||
@@ -142,4 +140,6 @@ | ||
| `channel` | `string` | no | `_default` | an attribute that jobs are tagged with that workers can filter on | | ||
| `numRetries` | `number` | no | `0` | the number of times a job can be retried after erroring before being finalized | | ||
| `numAttempts` | `number` | no | `0` | the number of times a job can be attempted after erroring before being finalized | | ||
| `repeatIntervalMs` | `number` | no | `null` | If defined, the interval between jobs being automatically scheduled | | ||
| `releaseIntervalMs` | `number` | no | `0` | This defines the minimum amount of time that must elapse between jobs being released from the queue. Use this to perform rate limiting for certain jobs. | ||
| `timeoutIntervalMs` | `number` | no | `12 * 60 * 60_000` | This defines the maximum amount of time that a job can exist before it "times out", resulting in the job being "finalized". | ||
| `lockIntervalMs` | `number` | no | `60_000` | The amount of time after a job is dequeued that it remains unavailable to other workers to consume. Ensure this value is larger than the longest possible runtime of your job | | ||
@@ -159,3 +159,3 @@ | `jobFunction` | `<T extends object> (T) => Promise<void>` | yes | N/A | The definition of the function to process/perform the job | | ||
#### **Scheduler#constructor** | ||
#### **Orchestrator#constructor** | ||
@@ -166,17 +166,10 @@ | Parameter | Type | Is Required | Default Value | Description | | ||
| `name` | `string` | no | `anon` | A nickname for your scheduler daemon | | ||
| `scheduleIntervalMs` | `number` | no | `30_000` | The amount of time a scheduler will sleep after failing to schedule a job before trying again | | ||
| `heartbeatIntervalMs` | `number` | no | `60_000` | The interval at which the scheduler will update scheduled jobs with a "heartbeat", to stop them from going stale. The scheduler will also remove stale scheduled jobs at this time | | ||
| `heartbeatCutoffMs` | `number` | no | `60_000 * 12` | The maximum amount of time after the last "heartbeat" before the scheduler considers a scheduled job as stale | | ||
| `repeatIntervalMs` | `number` | no | `30_000` | The amount of time the orchestrator will wait after not finding a repeatable job to schedule before trying again | | ||
| `heartbeatIntervalMs` | `number` | no | `60_000` | The amount of time the orchestrator will wait between updating the `heartbeat` state of all registered `JobDefinitions`. | | ||
| `cleanIntervalMs` | `number` | no | `5 * 60_000` | The amount of time the orchestrator will wait between performing a database clean | | ||
| `staleJobDefinitionSweepThresholdMs` | `number` | no | `60_000 * 60` | The maximum amount of time after the last "heartbeat" before the orchestrator considers a job definition as stale and tries to remove it | | ||
| `finalizedJobSweepThresholdMs` | `number` | no | `12 * 60 * 60_000` | The maximum amount of time a finalized job will exist before the orchestrator attempts to remove it | | ||
| `eventHandler` | `EventHandler` | no | N/A | A handler to listen to events emitted by the scheduler | | ||
#### **JobSweeper#constructor** | ||
| Parameter | Type | Is Required | Default Value | Description | | ||
| --------- | ---- | ----------- | ------------- | ----------- | | ||
| `config` | `cuckmq.Config` | yes | N/A | the instantiated `cuckmq` config object | | ||
| `name` | `string` | no | `anon` | A nickname for your sweeper daemon | | ||
| `sweepIntervalMs` | `number` | no | `10 * 60_000` | The amount of time a sweeper will sleep between sweeps of stale jobs | | ||
| `jobAgeCutoffMs` | `number` | no | `to_ms(1 week)` | The maximum amount of time after a job is created before the sweeper considers a job as stale and thus ready for deletion. | ||
| `eventHandler` | `EventHandler` | no | N/A | A handler to listen to events emitted by the sweeper | | ||
#### **Events** | ||
@@ -199,11 +192,11 @@ | ||
| ---- | ---------------- | ----------- | | ||
| `DaemonStarted` | `daemon-started` | The daemon starts (via `.start()`) | ||
| `DaemonStopSignalSent` | `daemon-stop-signal-sent` | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown | | ||
| `DaemonStopped` | `daemon-stopped` | The daemon stops | | ||
| `WorkerJobDequeued` | `worker-job-dequeued` | A worker daemon pulls a job from the database for processing | | ||
| `WorkerJobFinalized` | `worker-job-finalized` | A job has been marked as completed/finalized. This happens if the job succeeds, or if it fails with no more retries available | | ||
| `WorkerJobErrored` | `worker-job-errored` | A job that a worker tried to run has thrown an error | | ||
| `SchedulerJobScheduled` | `scheduler-job-scheduled` | The scheduler has enqueued a scheduled/periodic job to be run | | ||
| `SchedulerHeartbeat` | `scheduler-heartbeat` | The scheduler has updated the heartbeat of at least one scheduled job | | ||
| `SweeperJobsSwept` | `sweeper-jobs-swept` | The job sweeper has cleaned/deleted at least one stale job | | ||
| `DaemonStart` | `daemon-start` | The daemon starts (via `.start()`) | ||
| `DaemonStopSignalSend` | `daemon-stop-signal-send` | The daemon receives the signal to stop (via `.stop()). N.B. the daemon may continue running beyond this point to facilitate a graceful shutdown | | ||
| `DaemonStop` | `daemon-stop` | The daemon stops | | ||
| `WorkerJobDequeue` | `worker-job-dequeue` | A worker daemon pulls a job from the database for processing | | ||
| `WorkerJobFinalizeSuccess` | `worker-job-finalize-success` | A job has been successfully run by the worker | | ||
| `WorkerJobFinalizeFailureOrphaned` | `worker-job-finalize-failure-orphaned` | A job has been finalized because the worker is unable to find an associated `JobDefinition` | | ||
| `WorkerJobError` | `worker-job-error` | A job that a worker tried to run has thrown an error | | ||
| `OrchestratorJobSchedule` | `orchestrator-job-schedule` | The orchestrator has enqueued a scheduled/periodic job to be run | | ||
| `OrchestratorHeartbeat` | `orchestrator-heartbeat` | The orchestrator has updated the heartbeat of at least one scheduled job | | ||
@@ -210,0 +203,0 @@ |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
70005
60
1632
199
1