Socket
Socket
Sign inDemoInstall

graphile-worker

Package Overview
Dependencies
92
Maintainers
1
Versions
51
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.15.1 to 0.15.2-bridge.0

dist/plugins/LoadTaskFromExecutableFilePlugin.d.ts

128

dist/cli.js
#!/usr/bin/env node
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const yargs = require("yargs");
const config_1 = require("./config");
const tslib_1 = require("tslib");
const load_1 = require("graphile-config/load");
const yargs = tslib_1.__importStar(require("yargs"));
const getCronItems_1 = require("./getCronItems");
const getTasks_1 = require("./getTasks");
const index_1 = require("./index");
const lib_1 = require("./lib");
const preset_1 = require("./preset");
const runner_1 = require("./runner");
const argv_ = yargs
const defaults = preset_1.WorkerPreset.worker;
const argv = yargs
.parserConfiguration({

@@ -22,3 +25,3 @@ "boolean-negation": false,

alias: "s",
default: config_1.defaults.schema,
default: defaults.schema,
})

@@ -49,3 +52,3 @@ .string("schema")

alias: "j",
default: config_1.defaults.concurrentJobs,
default: defaults.concurrentJobs,
})

@@ -61,3 +64,3 @@ .number("jobs")

description: "how long to wait between polling for jobs in milliseconds (for jobs scheduled in the future/retries)",
default: config_1.defaults.pollInterval,
default: defaults.pollInterval,
})

@@ -70,22 +73,37 @@ .number("poll-interval")

.boolean("no-prepared-statements")
.option("config", {
alias: "C",
description: "The path to the config file",
normalize: true,
})
.string("config")
.strict(true).argv;
function isPromise(val) {
return typeof val === "object" && val && typeof val.then === "function";
const integerOrUndefined = (n) => {
return typeof n === "number" && isFinite(n) && Math.round(n) === n
? n
: undefined;
};
function stripUndefined(t) {
return Object.fromEntries(Object.entries(t).filter(([_, value]) => value !== undefined));
}
// Hack TypeScript to stop whinging about argv potentially being a promise
if (isPromise(argv_)) {
throw new Error("yargs returned a promise");
function argvToPreset(inArgv) {
return {
worker: stripUndefined({
connectionString: inArgv["connection"],
maxPoolSize: integerOrUndefined(inArgv["max-pool-size"]),
pollInterval: integerOrUndefined(inArgv["poll-interval"]),
preparedStatements: !inArgv["no-prepared-statements"],
schema: inArgv.schema,
crontabFile: inArgv["crontab"],
concurrentJobs: integerOrUndefined(inArgv.jobs),
}),
};
}
const argv = argv_;
const isInteger = (n) => {
return isFinite(n) && Math.round(n) === n;
};
async function main() {
const DATABASE_URL = argv.connection || process.env.DATABASE_URL || undefined;
const SCHEMA = argv.schema || undefined;
const userPreset = await (0, load_1.loadConfig)(argv.config);
const ONCE = argv.once;
const SCHEMA_ONLY = argv["schema-only"];
const WATCH = argv.watch;
if (SCHEMA_ONLY && WATCH) {
throw new Error("Cannot specify both --watch and --schema-only");
if (WATCH) {
throw new Error("Watch mode is no longer supported");
}

@@ -95,35 +113,43 @@ if (SCHEMA_ONLY && ONCE) {

}
if (WATCH && ONCE) {
throw new Error("Cannot specify both --watch and --once");
const [compiledOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)({
preset: {
extends: [userPreset ?? preset_1.EMPTY_PRESET, argvToPreset(argv)],
},
});
try {
if (!compiledOptions.resolvedPreset.worker.connectionString &&
!process.env.PGDATABASE) {
throw new Error("Please use `--connection` flag, set `DATABASE_URL` or `PGDATABASE` envvars to indicate the PostgreSQL connection to use.");
}
if (SCHEMA_ONLY) {
console.log("Schema updated");
return;
}
const watchedTasks = await (0, getTasks_1.getTasksInternal)(compiledOptions, compiledOptions.resolvedPreset.worker.taskDirectory);
compiledOptions.releasers.push(() => watchedTasks.release());
const watchedCronItems = await (0, getCronItems_1.getCronItemsInternal)(compiledOptions, compiledOptions.resolvedPreset.worker.crontabFile);
compiledOptions.releasers.push(() => watchedCronItems.release());
if (ONCE) {
await (0, runner_1.runOnceInternal)(compiledOptions, watchedTasks.tasks, () => {
/* noop */
});
}
else {
const { promise } = await (0, runner_1.runInternal)(compiledOptions, watchedTasks.tasks, watchedCronItems.items, () => {
/*noop*/
});
// Continue forever(ish)
await promise;
}
}
if (!DATABASE_URL && !process.env.PGDATABASE) {
throw new Error("Please use `--connection` flag, set `DATABASE_URL` or `PGDATABASE` envvars to indicate the PostgreSQL connection to use.");
finally {
const timer = setTimeout(() => {
console.error(`Worker failed to exit naturally after 1 second; terminating manually. This may indicate a bug in Graphile Worker, or it might be that you triggered a forceful shutdown and some of your executing tasks have yet to exit.`);
process.exit(1);
}, 1000);
timer.unref();
compiledOptions.logger.debug("CLI shutting down...");
await release();
compiledOptions.logger.debug("CLI shutdown complete.");
}
const options = {
schema: SCHEMA || config_1.defaults.schema,
concurrency: isInteger(argv.jobs) ? argv.jobs : config_1.defaults.concurrentJobs,
maxPoolSize: isInteger(argv["max-pool-size"])
? argv["max-pool-size"]
: config_1.defaults.maxPoolSize,
pollInterval: isInteger(argv["poll-interval"])
? argv["poll-interval"]
: config_1.defaults.pollInterval,
connectionString: DATABASE_URL,
noPreparedStatements: !!argv["no-prepared-statements"],
};
if (SCHEMA_ONLY) {
await (0, runner_1.runMigrations)(options);
console.log("Schema updated");
return;
}
const watchedTasks = await (0, getTasks_1.default)(options, `${process.cwd()}/tasks`, WATCH);
const watchedCronItems = await (0, getCronItems_1.default)(options, argv.crontab || `${process.cwd()}/crontab`, WATCH);
if (ONCE) {
await (0, index_1.runOnce)(options, watchedTasks.tasks);
}
else {
const { promise } = await (0, index_1.run)(options, watchedTasks.tasks, watchedCronItems.items);
// Continue forever(ish)
await promise;
}
}

@@ -130,0 +156,0 @@ main().catch((e) => {

@@ -5,32 +5,17 @@ /**

*/
interface WorkerDefaults {
/**
* How long to wait between polling for jobs.
*
* Note: this does NOT need to be short, because we use LISTEN/NOTIFY to be
* notified when new jobs are added - this is just used for jobs scheduled in
* the future, retried jobs, and in the case where LISTEN/NOTIFY fails for
* whatever reason.
*/
export declare const makeWorkerPresetWorkerOptions: () => {
connectionString: string | undefined;
schema: string;
pollInterval: number;
/**
* Which PostgreSQL schema should Graphile Worker use? Defaults to 'graphile_worker'.
*/
schema: string;
/**
* How many errors in a row can we get fetching a job before we raise a higher
* exception?
*/
maxContiguousErrors: number;
/**
* Number of jobs to run concurrently
*/
concurrentJobs: number;
/**
* The maximum size of the PostgreSQL pool. Defaults to the node-postgres
* default (10). Only useful when `connectionString` is given.
*/
maxPoolSize: number;
}
export declare const defaults: WorkerDefaults;
export {};
preparedStatements: boolean;
crontabFile: string;
taskDirectory: string;
fileExtensions: string[];
logger: import("./logger").Logger;
minResetLockedInterval: number;
maxResetLockedInterval: number;
gracefulShutdownAbortTimeout: number;
useNodeTime: false;
};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.defaults = void 0;
exports.makeWorkerPresetWorkerOptions = void 0;
const cosmiconfig_1 = require("cosmiconfig");
const cronConstants_1 = require("./cronConstants");
const logger_1 = require("./logger");
const cosmiconfigResult = (0, cosmiconfig_1.cosmiconfigSync)("graphile-worker").search();
const cosmiconfig = cosmiconfigResult?.config;
exports.defaults = {
/**
* Defaults to use for various options throughout the codebase, sourced from
* environmental variables, cosmiconfig, and finally sensible defaults.
*/
const makeWorkerPresetWorkerOptions = () => ({
connectionString: process.env.DATABASE_URL,
schema: process.env.GRAPHILE_WORKER_SCHEMA ||
enforceStringOrUndefined("schema", cosmiconfig?.schema) ||
"graphile_worker",
maxContiguousErrors: enforceNumberOrUndefined("maxContiguousErrors", cosmiconfig?.maxContiguousErrors) || 10,
pollInterval: enforceNumberOrUndefined("pollInterval", cosmiconfig?.pollInterval) || 2000,
pollInterval: enforceNumberOrUndefined("pollInterval", cosmiconfig?.pollInterval) ||
2000,
concurrentJobs: enforceNumberOrUndefined("concurrentJobs", cosmiconfig?.concurrentJobs) ||
1,
maxPoolSize: enforceNumberOrUndefined("maxPoolSize", cosmiconfig?.maxPoolSize) || 10,
};
preparedStatements: true,
crontabFile: `${process.cwd()}/crontab`,
taskDirectory: `${process.cwd()}/tasks`,
fileExtensions: [".js", ".cjs", ".mjs"],
logger: logger_1.defaultLogger,
minResetLockedInterval: 8 * cronConstants_1.MINUTE,
maxResetLockedInterval: 10 * cronConstants_1.MINUTE,
gracefulShutdownAbortTimeout: 5 * cronConstants_1.SECOND,
useNodeTime: false,
});
exports.makeWorkerPresetWorkerOptions = makeWorkerPresetWorkerOptions;
function enforceStringOrUndefined(keyName, str) {

@@ -18,0 +35,0 @@ if (typeof str === "string") {

import { Pool } from "pg";
import { Cron, ParsedCronItem, RunnerOptions, WorkerEvents } from "./interfaces";
import { Releasers } from "./lib";
import { CompiledOptions, CompiledSharedOptions, Releasers } from "./lib";
interface CronRequirements {

@@ -19,4 +19,5 @@ pgPool: Pool;

*/
export declare const runCron: (options: RunnerOptions, parsedCronItems: ParsedCronItem[], requirements: CronRequirements) => Cron;
export declare function getParsedCronItemsFromOptions(options: RunnerOptions, releasers: Releasers): Promise<Array<ParsedCronItem>>;
export declare const runCron: (compiledSharedOptions: CompiledSharedOptions<RunnerOptions>, parsedCronItems: ParsedCronItem[], requirements: CronRequirements) => Cron;
/** @internal */
export declare function getParsedCronItemsFromOptions(compiledOptions: CompiledOptions, releasers: Releasers): Promise<Array<ParsedCronItem>>;
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.getParsedCronItemsFromOptions = exports.runCron = void 0;
const assert = require("assert");
const tslib_1 = require("tslib");
const assert = tslib_1.__importStar(require("assert"));
const crontab_1 = require("./crontab");
const deferred_1 = require("./deferred");
const deferred_1 = tslib_1.__importDefault(require("./deferred"));
const getCronItems_1 = require("./getCronItems");
const interfaces_1 = require("./interfaces");
const lib_1 = require("./lib");
/**

@@ -31,2 +31,3 @@ * This function looks through all the cron items we have (e.g. from our

notBefore,
itemDetails: known,
});

@@ -68,2 +69,4 @@ }

maxAttempts: item.options.maxAttempts,
jobKey: item.options.jobKey,
jobKeyMode: item.options.jobKeyMode,
priority: item.options.priority,

@@ -90,4 +93,6 @@ };

((json->'job')->>'runAt')::timestamptz as run_at,
((json->'job')->>'maxAttempts')::int as max_attempts,
((json->'job')->>'priority')::int as priority
((json->'job')->>'maxAttempts')::smallint as max_attempts,
((json->'job')->>'priority')::smallint as priority,
((json->'job')->>'jobKey')::text as job_key,
((json->'job')->>'jobKeyMode')::text as job_key_mode
from json_array_elements($1::json) with ordinality AS entries (json, index)

@@ -114,4 +119,6 @@ ),

specs.max_attempts,
null, -- job key
specs.priority
specs.job_key,
specs.priority,
null, -- flags
specs.job_key_mode
)

@@ -170,3 +177,3 @@ from specs

// See if anything needs backfilling for this timestamp
for (const { item, notBefore } of backfillItemsAndDates) {
for (const { item, notBefore, itemDetails } of backfillItemsAndDates) {
if (item.options.backfillPeriod >= timeAgo &&

@@ -178,2 +185,4 @@ unsafeTs >= notBefore &&

job: makeJobForItem(item, ts, true),
known_since: itemDetails.known_since,
last_execution: itemDetails.last_execution,
});

@@ -213,16 +222,15 @@ }

*/
const runCron = (options, parsedCronItems, requirements) => {
const runCron = (compiledSharedOptions, parsedCronItems, requirements) => {
const { pgPool } = requirements;
const { logger, escapedWorkerSchema, events, useNodeTime } = (0, lib_1.processSharedOptions)(options);
const { logger, escapedWorkerSchema, events, resolvedPreset: { worker: { useNodeTime }, }, } = compiledSharedOptions;
const promise = (0, deferred_1.default)();
let released = false;
let timeout = null;
let stopCalled = false;
function stop(e) {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
if (!stopCalled) {
stopCalled = true;
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
if (e) {

@@ -240,3 +248,3 @@ promise.reject(e);

async function cronMain() {
if (released) {
if (!cron._active) {
return stop();

@@ -250,3 +258,3 @@ }

events.emit("cron:started", { cron: this, start });
if (released) {
if (!cron._active) {
return stop();

@@ -257,5 +265,6 @@ }

// count as a backfill.
let nextTimestamp = unsafeRoundToMinute(new Date(+start), true);
/** This timestamp will be mutated! */
const nextTimestamp = unsafeRoundToMinute(new Date(+start), true);
const scheduleNextLoop = () => {
if (released) {
if (!cron._active) {
return stop();

@@ -272,3 +281,3 @@ }

try {
if (released) {
if (!cron._active) {
return stop();

@@ -339,3 +348,3 @@ }

});
if (released) {
if (!cron._active) {
return stop();

@@ -358,7 +367,7 @@ }

}
cronMain().catch(stop);
return {
const cron = {
_active: true,
release() {
if (!released) {
released = true;
if (cron._active) {
cron._active = false;
if (timeout) {

@@ -373,6 +382,9 @@ // Next loop is queued; lets cancel it early

};
cronMain().catch(stop);
return cron;
};
exports.runCron = runCron;
async function getParsedCronItemsFromOptions(options, releasers) {
const { crontabFile, parsedCronItems, crontab } = options;
/** @internal */
async function getParsedCronItemsFromOptions(compiledOptions, releasers) {
const { resolvedPreset: { worker: { crontabFile }, }, _rawOptions: { parsedCronItems, crontab }, } = compiledOptions;
if (!crontabFile && !parsedCronItems && !crontab) {

@@ -382,14 +394,6 @@ return [];

if (crontab) {
assert(!crontabFile, "`crontab` and `crontabFile` must not be set at the same time.");
assert(!parsedCronItems, "`crontab` and `parsedCronItems` must not be set at the same time.");
assert.ok(!parsedCronItems, "`crontab` and `parsedCronItems` must not be set at the same time.");
return (0, crontab_1.parseCrontab)(crontab);
}
else if (crontabFile) {
assert(!parsedCronItems, "`crontabFile` and `parsedCronItems` must not be set at the same time.");
const watchedCronItems = await (0, getCronItems_1.default)(options, crontabFile, false);
releasers.push(() => watchedCronItems.release());
return watchedCronItems.items;
}
else {
assert(parsedCronItems != null, "Expected `parsedCronItems` to be set.");
else if (parsedCronItems) {
// Basic check to ensure that users remembered to call

@@ -401,3 +405,3 @@ // `parseCronItems`/`parseCrontab`; not intended to be a full check, just a

// performance reasons.
assert(Array.isArray(parsedCronItems), "Expected `parsedCronItems` to be an array; you must use a helper e.g. `parseCrontab()` or `parseCronItems()` to produce this value.");
assert.ok(Array.isArray(parsedCronItems), "Expected `parsedCronItems` to be an array; you must use a helper e.g. `parseCrontab()` or `parseCronItems()` to produce this value.");
const firstItem = parsedCronItems[0];

@@ -411,2 +415,7 @@ if (firstItem) {

}
else {
const watchedCronItems = await (0, getCronItems_1.getCronItemsInternal)(compiledOptions, crontabFile);
releasers.push(() => watchedCronItems.release());
return watchedCronItems.items;
}
}

@@ -413,0 +422,0 @@ exports.getParsedCronItemsFromOptions = getParsedCronItemsFromOptions;

@@ -31,2 +31,6 @@ /** One second in milliseconds */

export declare const CRONTAB_OPTIONS_QUEUE: RegExp;
/** Matches the jobKey=foo option, capturing the unique identifier for the job */
export declare const CRONTAB_OPTIONS_JOB_KEY: RegExp;
/** Matches the jobKeyMode=replace option, defining the replacement strategy this job */
export declare const CRONTAB_OPTIONS_JOB_KEY_MODE: RegExp;
/** Matches the priority=n option, capturing the priority value */

@@ -33,0 +37,0 @@ export declare const CRONTAB_OPTIONS_PRIORITY: RegExp;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.PERIOD_DURATIONS = exports.TIMEPHRASE_PART = exports.CRONTAB_OPTIONS_PRIORITY = exports.CRONTAB_OPTIONS_QUEUE = exports.CRONTAB_OPTIONS_MAX = exports.CRONTAB_OPTIONS_BACKFILL = exports.CRONTAB_OPTIONS_ID = exports.CRONTAB_COMMAND = exports.CRONTAB_WILDCARD = exports.CRONTAB_RANGE = exports.CRONTAB_NUMBER = exports.CRONTAB_TIME_PARTS = exports.CRONTAB_LINE_PARTS = exports.WEEK = exports.DAY = exports.HOUR = exports.MINUTE = exports.SECOND = void 0;
exports.PERIOD_DURATIONS = exports.TIMEPHRASE_PART = exports.CRONTAB_OPTIONS_PRIORITY = exports.CRONTAB_OPTIONS_JOB_KEY_MODE = exports.CRONTAB_OPTIONS_JOB_KEY = exports.CRONTAB_OPTIONS_QUEUE = exports.CRONTAB_OPTIONS_MAX = exports.CRONTAB_OPTIONS_BACKFILL = exports.CRONTAB_OPTIONS_ID = exports.CRONTAB_COMMAND = exports.CRONTAB_WILDCARD = exports.CRONTAB_RANGE = exports.CRONTAB_NUMBER = exports.CRONTAB_TIME_PARTS = exports.CRONTAB_LINE_PARTS = exports.WEEK = exports.DAY = exports.HOUR = exports.MINUTE = exports.SECOND = void 0;
/** One second in milliseconds */

@@ -38,2 +38,6 @@ exports.SECOND = 1000;

exports.CRONTAB_OPTIONS_QUEUE = /^([-a-zA-Z0-9_:]+)$/;
/** Matches the jobKey=foo option, capturing the unique identifier for the job */
exports.CRONTAB_OPTIONS_JOB_KEY = /^(.*)$/;
/** Matches the jobKeyMode=replace option, defining the replacement strategy this job */
exports.CRONTAB_OPTIONS_JOB_KEY_MODE = /^(replace|preserve_run_at)$/;
/** Matches the priority=n option, capturing the priority value */

@@ -40,0 +44,0 @@ exports.CRONTAB_OPTIONS_PRIORITY = /^(-?[0-9]+)$/;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.parseCronItem = exports.parseCronItems = exports.parseCrontab = exports.parseCrontabLine = void 0;
const JSON5 = require("json5");
const tslib_1 = require("tslib");
const JSON5 = tslib_1.__importStar(require("json5"));
const querystring_1 = require("querystring");

@@ -45,2 +46,4 @@ const cronConstants_1 = require("./cronConstants");

let queueName = undefined;
let jobKey = undefined;
let jobKeyMode = undefined;
let priority = undefined;

@@ -72,2 +75,14 @@ const matchers = {

],
jobKey: [
cronConstants_1.CRONTAB_OPTIONS_JOB_KEY,
(matches) => {
jobKey = matches[1];
},
],
jobKeyMode: [
cronConstants_1.CRONTAB_OPTIONS_JOB_KEY_MODE,
(matches) => {
jobKeyMode = matches[1];
},
],
priority: [

@@ -107,4 +122,14 @@ cronConstants_1.CRONTAB_OPTIONS_PRIORITY,

}
if (!jobKeyMode && jobKey) {
jobKeyMode = "replace";
}
return {
options: { backfillPeriod, maxAttempts, queueName, priority },
options: {
backfillPeriod,
maxAttempts,
queueName,
priority,
jobKey,
jobKeyMode,
},
identifier,

@@ -111,0 +136,0 @@ };

/// <reference types="node" />
import * as rawFs from "fs";
export declare const stat: typeof rawFs.stat.__promisify__;
export declare const readFile: typeof rawFs.readFile.__promisify__;
export declare const readdir: typeof rawFs.readdir.__promisify__;
export declare function tryStat(pathToStat: string): Promise<rawFs.Stats | null>;
export declare function tryStat(pathToStat: string): Promise<import("fs").Stats | null>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.tryStat = exports.readdir = exports.readFile = exports.stat = void 0;
const rawFs = require("fs");
const util_1 = require("util");
exports.stat = (0, util_1.promisify)(rawFs.stat);
exports.readFile = (0, util_1.promisify)(rawFs.readFile);
exports.readdir = (0, util_1.promisify)(rawFs.readdir);
exports.tryStat = void 0;
const promises_1 = require("fs/promises");
async function tryStat(pathToStat) {
try {
return await (0, exports.stat)(pathToStat);
return await (0, promises_1.stat)(pathToStat);
}

@@ -13,0 +9,0 @@ catch (e) {

@@ -7,3 +7,4 @@ "use strict";

exports.migrations = {
"000001.sql": String.raw `-- Create the tables
"000001.sql": String.raw `--! breaking-change
-- Create the tables
create table :GRAPHILE_WORKER_SCHEMA.job_queues (

@@ -347,3 +348,4 @@ queue_name text not null primary key,

`,
"000003.sql": String.raw `alter table :GRAPHILE_WORKER_SCHEMA.jobs alter column queue_name drop not null;
"000003.sql": String.raw `--! breaking-change
alter table :GRAPHILE_WORKER_SCHEMA.jobs alter column queue_name drop not null;

@@ -1128,3 +1130,4 @@ create or replace function :GRAPHILE_WORKER_SCHEMA.add_job(

`,
"000011.sql": String.raw `lock table :GRAPHILE_WORKER_SCHEMA.jobs;
"000011.sql": String.raw `--! breaking-change
lock table :GRAPHILE_WORKER_SCHEMA.jobs;
lock table :GRAPHILE_WORKER_SCHEMA.job_queues;

@@ -1640,3 +1643,4 @@

`,
"000013.sql": String.raw `alter type :GRAPHILE_WORKER_SCHEMA.job_spec alter attribute max_attempts type smallint;
"000013.sql": String.raw `--! breaking-change
alter type :GRAPHILE_WORKER_SCHEMA.job_spec alter attribute max_attempts type smallint;
alter type :GRAPHILE_WORKER_SCHEMA.job_spec alter attribute priority type smallint;

@@ -1746,3 +1750,4 @@

`,
"000014.sql": String.raw `-- Go back to exposing 'int' on public interfaces, use smallint internally.
"000014.sql": String.raw `--! breaking-change
-- Go back to exposing 'int' on public interfaces, use smallint internally.

@@ -1749,0 +1754,0 @@ drop function :GRAPHILE_WORKER_SCHEMA.add_job;

import { SharedOptions, WatchedCronItems } from "./interfaces";
export default function getCronItems(options: SharedOptions, crontabPath: string, watch?: boolean): Promise<WatchedCronItems>;
import { CompiledSharedOptions } from "./lib";
export declare function getCronItems(options: SharedOptions, crontabPath: string): Promise<WatchedCronItems>;
export declare function getCronItemsInternal(compiledSharedOptions: CompiledSharedOptions, crontabPath: string): Promise<WatchedCronItems>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const chokidar = require("chokidar");
exports.getCronItemsInternal = exports.getCronItems = void 0;
const fs_1 = require("fs");

@@ -35,19 +35,10 @@ const crontab_1 = require("./crontab");

}
async function getCronItems(options, crontabPath, watch = false) {
const { logger } = (0, lib_1.processSharedOptions)(options);
let watcher = null;
async function getCronItems(options, crontabPath) {
const compiledSharedOptions = (0, lib_1.processSharedOptions)(options);
return getCronItemsInternal(compiledSharedOptions, crontabPath);
}
exports.getCronItems = getCronItems;
async function getCronItemsInternal(compiledSharedOptions, crontabPath) {
const { logger } = compiledSharedOptions;
const items = [];
if (watch) {
const watchLogger = logger.scope({ label: "watch" });
watcher = chokidar
.watch(crontabPath, { ignoreInitial: true })
.on("all", () => {
loadCrontabIntoCronItems(watchLogger, items, crontabPath).catch((error) => {
watchLogger.error(`Error in ${crontabPath}: ${error.message}`, {
crontabPath,
error,
});
});
});
}
// Try and require it

@@ -63,9 +54,6 @@ await loadCrontabIntoCronItems(logger, items, crontabPath);

released = true;
if (watcher) {
watcher.close();
}
},
};
}
exports.default = getCronItems;
exports.getCronItemsInternal = getCronItemsInternal;
//# sourceMappingURL=getCronItems.js.map
import { SharedOptions, WatchedTaskList } from "./interfaces";
export default function getTasks(options: SharedOptions, taskPath: string, watch?: boolean): Promise<WatchedTaskList>;
import { CompiledSharedOptions } from "./lib";
export declare function getTasks(options: SharedOptions, taskPath: string): Promise<WatchedTaskList>;
export declare function getTasksInternal(compiledSharedOptions: CompiledSharedOptions, taskPath: string): Promise<WatchedTaskList>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const chokidar = require("chokidar");
exports.getTasksInternal = exports.getTasks = void 0;
const promises_1 = require("fs/promises");
const path_1 = require("path");

@@ -8,15 +9,16 @@ const fs_1 = require("./fs");

const lib_1 = require("./lib");
const module_1 = require("./module");
const DIRECTORY_REGEXP = /^[A-Za-z0-9_-]+$/;
const FILE_REGEXP = /^([A-Za-z0-9_-]+)((?:\.[A-Za-z0-9_-]+)*)$/;
function validTasks(logger, obj) {
const tasks = {};
Object.keys(obj).forEach((taskName) => {
const task = obj[taskName];
Object.keys(obj).forEach((taskIdentifier) => {
const task = obj[taskIdentifier];
if ((0, interfaces_1.isValidTask)(task)) {
tasks[taskName] = task;
tasks[taskIdentifier] = task;
}
else {
logger.warn(`Not a valid task '${taskName}' - expected function, received ${task ? typeof task : String(task)}.`, {
logger.warn(`Not a valid task '${taskIdentifier}' - expected function, received ${task ? typeof task : String(task)}.`, {
invalidTask: true,
task,
taskName,
taskIdentifier,
});

@@ -27,9 +29,11 @@ }

}
async function loadFileIntoTasks(logger, tasks, filename, name = null, watch = false) {
const replacementModule = watch ? (0, module_1.fauxRequire)(filename) : require(filename);
if (!replacementModule) {
throw new Error(`Module '${filename}' doesn't have an export`);
}
async function loadFileIntoTasks(logger, tasks, filename, name = null) {
const rawMod = await import(filename);
const mod = Object.keys(rawMod).length === 1 &&
typeof rawMod.default === "object" &&
rawMod.default !== null
? rawMod.default
: rawMod;
if (name) {
const task = replacementModule.default || replacementModule;
const task = mod.default || mod;
if ((0, interfaces_1.isValidTask)(task)) {

@@ -43,94 +47,56 @@ tasks[name] = task;

else {
Object.keys(tasks).forEach((taskName) => {
delete tasks[taskName];
Object.keys(tasks).forEach((taskIdentifier) => {
delete tasks[taskIdentifier];
});
if (!replacementModule.default ||
typeof replacementModule.default === "function") {
Object.assign(tasks, validTasks(logger, replacementModule));
if (!mod.default || typeof mod.default === "function") {
Object.assign(tasks, validTasks(logger, mod));
}
else {
Object.assign(tasks, validTasks(logger, replacementModule.default));
Object.assign(tasks, validTasks(logger, mod.default));
}
}
}
async function getTasks(options, taskPath, watch = false) {
const { logger } = (0, lib_1.processSharedOptions)(options);
async function getTasks(options, taskPath) {
const compiledSharedOptions = (0, lib_1.processSharedOptions)(options);
const result = await getTasksInternal(compiledSharedOptions, taskPath);
// This assign is used in `__tests__/getTasks.test.ts`
return Object.assign(result, { compiledSharedOptions });
}
exports.getTasks = getTasks;
async function getTasksInternal(compiledSharedOptions, taskPath) {
const { logger } = compiledSharedOptions;
const pathStat = await (0, fs_1.tryStat)(taskPath);
if (!pathStat) {
throw new Error(`Could not find tasks to execute - '${taskPath}' does not exist`);
throw new Error(`Could not find tasks to execute - taskDirectory '${taskPath}' does not exist`);
}
const watchers = [];
let taskNames = [];
const tasks = {};
const debugSupported = (debugLogger = logger) => {
const oldTaskNames = taskNames;
taskNames = Object.keys(tasks).sort();
if (oldTaskNames.join(",") !== taskNames.join(",")) {
debugLogger.debug(`Supported task names: '${taskNames.join("', '")}'`, {
taskNames,
});
}
};
const watchLogger = logger.scope({ label: "watch" });
const tasks = Object.create(null);
if (pathStat.isFile()) {
if (watch) {
watchers.push(chokidar.watch(taskPath, { ignoreInitial: true }).on("all", () => {
loadFileIntoTasks(watchLogger, tasks, taskPath, null, watch)
.then(() => debugSupported(watchLogger))
.catch((error) => {
watchLogger.error(`Error in ${taskPath}: ${error.message}`, {
taskPath,
error,
});
});
}));
}
// Try and require it
await loadFileIntoTasks(logger, tasks, taskPath, null, watch);
await loadFileIntoTasks(logger, tasks, taskPath, null);
}
else if (pathStat.isDirectory()) {
if (watch) {
watchers.push(chokidar
.watch(`${taskPath}/*.js`, {
ignoreInitial: true,
})
.on("all", (event, eventFilePath) => {
const taskName = (0, path_1.basename)(eventFilePath, ".js");
if (event === "unlink") {
delete tasks[taskName];
debugSupported(watchLogger);
}
else {
loadFileIntoTasks(watchLogger, tasks, eventFilePath, taskName, watch)
.then(() => debugSupported(watchLogger))
.catch((error) => {
watchLogger.error(`Error in ${eventFilePath}: ${error.message}`, { eventFilePath, error });
});
}
}));
}
// Try and require its contents
const files = await (0, fs_1.readdir)(taskPath);
for (const file of files) {
if (file.endsWith(".js")) {
const taskName = file.slice(0, -3);
try {
await loadFileIntoTasks(logger, tasks, `${taskPath}/${file}`, taskName, watch);
}
catch (error) {
const message = `Error processing '${taskPath}/${file}': ${error.message}`;
if (watch) {
watchLogger.error(message, { error });
}
else {
throw new Error(message);
}
}
const collectedTaskPaths = Object.create(null);
await getTasksFromDirectory(compiledSharedOptions, collectedTaskPaths, taskPath, []);
const taskIdentifiers = Object.keys(collectedTaskPaths).sort((a, z) => a.localeCompare(z, "en-US"));
for (const taskIdentifier of taskIdentifiers) {
const fileDetailsList = collectedTaskPaths[taskIdentifier];
const event = {
handler: undefined,
taskIdentifier,
fileDetailsList,
};
await compiledSharedOptions.hooks.process("loadTaskFromFiles", event);
const handler = event.handler;
if (handler) {
tasks[taskIdentifier] = handler;
}
else {
logger.warn(`Failed to load task '${taskIdentifier}' - no supported handlers found for path${fileDetailsList.length > 1 ? "s" : ""}: '${fileDetailsList.map((d) => d.fullPath).join("', '")}'`);
}
}
}
taskNames = Object.keys(tasks).sort();
let released = false;
return {
tasks,
compiledSharedOptions,
release: () => {

@@ -141,7 +107,58 @@ if (released) {

released = true;
watchers.forEach((watcher) => watcher.close());
},
};
}
exports.default = getTasks;
exports.getTasksInternal = getTasksInternal;
async function getTasksFromDirectory(compiledSharedOptions, collectedTaskPaths, taskPath, subpath) {
const { logger } = compiledSharedOptions;
const folderPath = (0, path_1.join)(taskPath, ...subpath);
// Try and require its contents
const entries = await (0, promises_1.readdir)(folderPath);
await Promise.all(entries.map(async (entry) => {
const fullPath = (0, path_1.join)(taskPath, ...subpath, entry);
const stats = await (0, promises_1.lstat)(fullPath);
if (stats.isDirectory()) {
if (DIRECTORY_REGEXP.test(entry)) {
await getTasksFromDirectory(compiledSharedOptions, collectedTaskPaths, taskPath, [...subpath, entry]);
}
else {
logger.info(`Ignoring directory '${fullPath}' - '${entry}' does not match allowed regexp.`);
}
}
else if (stats.isSymbolicLink()) {
// Must be a symbolic link to a file, otherwise ignore
const symlinkTarget = await (0, promises_1.realpath)(fullPath);
const targetStats = await (0, promises_1.lstat)(symlinkTarget);
if (targetStats.isFile() && !targetStats.isSymbolicLink()) {
maybeAddFile(compiledSharedOptions, collectedTaskPaths, subpath, entry, symlinkTarget, targetStats);
}
}
else if (stats.isFile()) {
maybeAddFile(compiledSharedOptions, collectedTaskPaths, subpath, entry, fullPath, stats);
}
}));
}
function maybeAddFile(compiledSharedOptions, collectedTaskPaths, subpath, entry, fullPath, stats) {
const { logger } = compiledSharedOptions;
const matches = FILE_REGEXP.exec(entry);
if (matches) {
const [, baseName, extension] = matches;
const entry = {
fullPath,
stats,
baseName,
extension,
};
const taskIdentifier = [...subpath, baseName].join("/");
if (!collectedTaskPaths[taskIdentifier]) {
collectedTaskPaths[taskIdentifier] = [entry];
}
else {
collectedTaskPaths[taskIdentifier].push(entry);
}
}
else {
logger.info(`Ignoring file '${fullPath}' - '${entry}' does not match allowed regexp.`);
}
}
//# sourceMappingURL=getTasks.js.map
import { Pool, PoolClient } from "pg";
import { Job, JobHelpers, SharedOptions, TaskSpec, WithPgClient, WorkerSharedOptions } from "./interfaces";
import { AddJobFunction, Job, JobHelpers, WithPgClient } from "./interfaces";
import { CompiledSharedOptions } from "./lib";
import { Logger } from "./logger";
export declare function makeAddJob(options: WorkerSharedOptions, withPgClient: WithPgClient): (identifier: string, payload?: unknown, spec?: TaskSpec) => Promise<Job>;
export declare function makeJobHelpers(options: SharedOptions, job: Job, { withPgClient, logger: overrideLogger, }: {
export declare function makeAddJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient): AddJobFunction;
export declare function makeJobHelpers(compiledSharedOptions: CompiledSharedOptions, job: Job, { withPgClient, abortSignal, logger: overrideLogger, }: {
withPgClient: WithPgClient;
abortSignal: AbortSignal | undefined;
logger?: Logger;

@@ -8,0 +10,0 @@ }): JobHelpers;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.makeWithPgClientFromClient = exports.makeWithPgClientFromPool = exports.makeJobHelpers = exports.makeAddJob = void 0;
const lib_1 = require("./lib");
function makeAddJob(options, withPgClient) {
const { escapedWorkerSchema, useNodeTime } = (0, lib_1.processSharedOptions)(options);
return (identifier, payload = {}, spec = {}) => {
function makeAddJob(compiledSharedOptions, withPgClient) {
const { escapedWorkerSchema, resolvedPreset: { worker: { useNodeTime }, }, } = compiledSharedOptions;
return (identifier, payload, spec = {}) => {
return withPgClient(async (pgClient) => {

@@ -23,3 +22,3 @@ const { rows } = await pgClient.query(`

identifier,
JSON.stringify(payload),
JSON.stringify(payload ?? {}),
spec.queueName || null,

@@ -47,4 +46,4 @@ // If there's an explicit run at, use that. Otherwise, if we've been

exports.makeAddJob = makeAddJob;
function makeJobHelpers(options, job, { withPgClient, logger: overrideLogger, }) {
const baseLogger = overrideLogger || (0, lib_1.processSharedOptions)(options).logger;
function makeJobHelpers(compiledSharedOptions, job, { withPgClient, abortSignal, logger: overrideLogger, }) {
const baseLogger = overrideLogger ?? compiledSharedOptions.logger;
const logger = baseLogger.scope({

@@ -56,2 +55,3 @@ label: "job",

const helpers = {
abortSignal,
job,

@@ -61,3 +61,3 @@ logger,

query: (queryText, values) => withPgClient((pgClient) => pgClient.query(queryText, values)),
addJob: makeAddJob(options, withPgClient),
addJob: makeAddJob(compiledSharedOptions, withPgClient),
// TODO: add an API for giving workers more helpers

@@ -76,3 +76,3 @@ };

function makeWithPgClientFromPool(pgPool) {
return async (callback) => {
return async function withPgClientFromPool(callback) {
const client = await pgPool.connect();

@@ -79,0 +79,0 @@ try {

@@ -1,7 +0,13 @@

import getCronItems from "./getCronItems";
import getTasks from "./getTasks";
import { Logger } from "@graphile/logger";
import { PluginHook } from "graphile-config";
import type { PoolClient } from "pg";
import { getCronItems } from "./getCronItems";
import { getTasks } from "./getTasks";
import { FileDetails, PromiseOrDirect, Task, TaskList, WithPgClient, Worker, WorkerEvents, WorkerPluginContext } from "./interfaces";
import { CompiledSharedOptions } from "./lib";
export { parseCronItem, parseCronItems, parseCrontab } from "./crontab";
export * from "./interfaces";
export { consoleLogFactory, LogFunctionFactory, Logger } from "./logger";
export { consoleLogFactory, LogFunctionFactory, Logger, LogLevel, } from "./logger";
export { runTaskList, runTaskListOnce } from "./main";
export { WorkerPreset } from "./preset";
export { run, runMigrations, runOnce } from "./runner";

@@ -11,1 +17,173 @@ export { makeWorkerUtils, quickAddJob } from "./workerUtils";

export { getCronItems };
export { CompiledSharedOptions };
declare global {
namespace GraphileWorker {
interface Tasks {
}
interface MigrateEvent {
/**
* The client used to run the migration. Replacing this is not officially
* supported, but...
*/
client: PoolClient;
/**
* The Postgres version number, e.g. 120000 for PostgreSQL 12.0
*/
readonly postgresVersion: number;
/**
* Somewhere to store temporary data from plugins, only used during
* premigrate, postmigrate, prebootstrap and postbootstrap
*/
readonly scratchpad: Record<string, unknown>;
}
}
namespace GraphileConfig {
interface WorkerOptions {
/**
* Database connection string.
*
* @defaultValue `process.env.DATABASE_URL`
*/
connectionString?: string;
/**
* Maximum number of concurrent connections to Postgres
*
* @defaultValue `10`
*/
maxPoolSize?: number;
/**
*
* @defaultValue `2000` */
pollInterval?: number;
/** @defaultValue `true` */
preparedStatements?: boolean;
/**
* The database schema in which Graphile Worker is (to be) located.
*
* @defaultValue `graphile_worker`
*/
schema?: string;
/**
* Override path to find tasks
*
* @defaultValue `process.cwd() + "/tasks"`
*/
taskDirectory?: string;
/**
* Override path to crontab file.
*
* @defaultValue `process.cwd() + "/crontab"`
*/
crontabFile?: string;
/**
* Number of jobs to run concurrently.
*
* @defaultValue `1`
*/
concurrentJobs?: number;
/**
* A list of file extensions (in priority order) that Graphile Worker
* should attempt to import directly when loading tasks. Defaults to
* `[".js", ".cjs", ".mjs"]`.
*/
fileExtensions?: string[];
/**
* How long in milliseconds after a gracefulShutdown is triggered should
* we wait to trigger the AbortController, which should cancel supported
* asynchronous actions?
*
* @defaultValue `5000`
*/
gracefulShutdownAbortTimeout?: number;
/**
* Set `true` to use the time as recorded by Node.js rather than
* PostgreSQL. It's strongly recommended that you ensure the Node.js and
* PostgreSQL times are synchronized, making this setting moot.
*/
useNodeTime?: boolean;
/**
* **Experimental**
*
* How often should we scan for jobs that have been locked too long and
* release them? This is the minimum interval, we'll choose a time between
* this and `maxResetLockedInterval`.
*/
minResetLockedInterval?: number;
/**
* **Experimental**
*
* The upper bound of how long we'll wait between scans for jobs that have
* been locked too long. See `minResetLockedInterval`.
*/
maxResetLockedInterval?: number;
/**
* A Logger instance.
*/
logger?: Logger;
events?: WorkerEvents;
}
interface Preset {
worker?: WorkerOptions;
}
interface Plugin {
worker?: {
hooks?: {
[key in keyof WorkerHooks]?: PluginHook<WorkerHooks[key] extends (...args: infer UArgs) => infer UResult ? (ctx: WorkerPluginContext, ...args: UArgs) => UResult : never>;
};
};
}
interface WorkerHooks {
/**
* Called when Graphile Worker starts up.
*/
init(): void;
/**
* Called before installing the Graphile Worker DB schema (or upgrading it).
*/
prebootstrap(event: GraphileWorker.MigrateEvent): PromiseOrDirect<void>;
/**
* Called after installing the Graphile Worker DB schema (or upgrading it).
*/
postbootstrap(event: GraphileWorker.MigrateEvent): PromiseOrDirect<void>;
/**
* Called before migrating the DB.
*/
premigrate(event: GraphileWorker.MigrateEvent): PromiseOrDirect<void>;
/**
* Called after migrating the DB.
*/
postmigrate(event: GraphileWorker.MigrateEvent): PromiseOrDirect<void>;
/**
* Used to build a given `taskIdentifier`'s handler given a list of files,
* if possible.
*/
loadTaskFromFiles(event: {
/**
* If set, you should not replace this. If unset and you can support
* this task identifier (see `details`), you should set it.
*/
handler?: Task;
/**
* The string that will identify this task (inferred from the file
* path).
*/
readonly taskIdentifier: string;
/**
* A list of the files (and associated metadata) that match this task
* identifier.
*/
readonly fileDetailsList: readonly FileDetails[];
}): PromiseOrDirect<void>;
startWorker(event: {
readonly worker: Worker;
flagsToSkip: null | string[];
readonly tasks: TaskList;
readonly withPgClient: WithPgClient;
}): PromiseOrDirect<void>;
stopWorker(event: {
readonly worker: Worker;
readonly withPgClient: WithPgClient;
}): PromiseOrDirect<void>;
}
}
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.getCronItems = exports.getTasks = exports.quickAddJob = exports.makeWorkerUtils = exports.runOnce = exports.runMigrations = exports.run = exports.runTaskListOnce = exports.runTaskList = exports.Logger = exports.consoleLogFactory = exports.parseCrontab = exports.parseCronItems = exports.parseCronItem = void 0;
exports.getCronItems = exports.getTasks = exports.quickAddJob = exports.makeWorkerUtils = exports.runOnce = exports.runMigrations = exports.run = exports.WorkerPreset = exports.runTaskListOnce = exports.runTaskList = exports.Logger = exports.consoleLogFactory = exports.parseCrontab = exports.parseCronItems = exports.parseCronItem = void 0;
const tslib_1 = require("tslib");
const getCronItems_1 = require("./getCronItems");
exports.getCronItems = getCronItems_1.default;
Object.defineProperty(exports, "getCronItems", { enumerable: true, get: function () { return getCronItems_1.getCronItems; } });
const getTasks_1 = require("./getTasks");
exports.getTasks = getTasks_1.default;
Object.defineProperty(exports, "getTasks", { enumerable: true, get: function () { return getTasks_1.getTasks; } });
var crontab_1 = require("./crontab");

@@ -20,2 +20,4 @@ Object.defineProperty(exports, "parseCronItem", { enumerable: true, get: function () { return crontab_1.parseCronItem; } });

Object.defineProperty(exports, "runTaskListOnce", { enumerable: true, get: function () { return main_1.runTaskListOnce; } });
var preset_1 = require("./preset");
Object.defineProperty(exports, "WorkerPreset", { enumerable: true, get: function () { return preset_1.WorkerPreset; } });
var runner_1 = require("./runner");

@@ -22,0 +24,0 @@ Object.defineProperty(exports, "run", { enumerable: true, get: function () { return runner_1.run; } });

/// <reference types="node" />
import { EventEmitter } from "events";
import { Pool, PoolClient, QueryResult, QueryResultRow } from "pg";
import { Release } from "./lib";
import { Logger } from "./logger";
import { Signal } from "./signals";
export declare type WithPgClient = <T = void>(callback: (pgClient: PoolClient) => Promise<T>) => Promise<T>;
/// <reference types="node" />
import type { EventEmitter } from "events";
import type { Stats } from "fs";
import { AsyncHooks } from "graphile-config";
import type { Notification, Pool, PoolClient, QueryResult, QueryResultRow } from "pg";
import type { CompiledSharedOptions, Release, ResolvedWorkerPreset } from "./lib";
import type { Logger } from "./logger";
import type { Signal } from "./signals";
export type WithPgClient = <T = void>(callback: (pgClient: PoolClient) => Promise<T>) => Promise<T>;
/**

@@ -12,11 +15,11 @@ * The `addJob` interface is implemented in many places in the library, all

*/
export declare type AddJobFunction = (
export type AddJobFunction = <TIdentifier extends keyof GraphileWorker.Tasks | (string & {}) = string>(
/**
* The name of the task that will be executed for this job.
*/
identifier: string,
identifier: TIdentifier,
/**
* The payload (typically a JSON object) that will be passed to the task executor.
*/
payload?: unknown,
payload: TIdentifier extends keyof GraphileWorker.Tasks ? GraphileWorker.Tasks[TIdentifier] : unknown,
/**

@@ -53,3 +56,9 @@ * Additional details about how the job should be handled.

*/
query<R extends QueryResultRow>(queryText: string, values?: any[]): Promise<QueryResult<R>>;
query<R extends QueryResultRow>(queryText: string, values?: unknown[]): Promise<QueryResult<R>>;
/**
* An AbortSignal that will be triggered when the job should exit.
*
* @experimental
*/
abortSignal?: AbortSignal;
}

@@ -106,11 +115,13 @@ /**

}
export declare type PromiseOrDirect<T> = Promise<T> | T;
export declare type Task = (payload: unknown, helpers: JobHelpers) => PromiseOrDirect<void | PromiseOrDirect<unknown>[]>;
export declare function isValidTask(fn: unknown): fn is Task;
export interface TaskList {
[name: string]: Task;
}
export type PromiseOrDirect<T> = Promise<T> | T;
export type Task<TName extends keyof GraphileWorker.Tasks | (string & {}) = string & {}> = (payload: TName extends keyof GraphileWorker.Tasks ? GraphileWorker.Tasks[TName] : unknown, helpers: JobHelpers) => PromiseOrDirect<void | PromiseOrDirect<unknown>[]>;
export declare function isValidTask<T extends string = keyof GraphileWorker.Tasks>(fn: unknown): fn is Task<T>;
export type TaskList = {
[Key in keyof GraphileWorker.Tasks | (string & {})]?: Key extends keyof GraphileWorker.Tasks ? Task<Key> : Task<any>;
};
export interface WatchedTaskList {
tasks: TaskList;
release: () => void;
/** @internal */
compiledSharedOptions: CompiledSharedOptions;
}

@@ -133,2 +144,10 @@ export interface WatchedCronItems {

priority?: number;
/** Optionally prevent duplicate copies of this job from running */
jobKey?: string;
/**
* Modifies the behavior of `jobKey`; when 'replace' all attributes will be
* updated, when 'preserve_run_at' all attributes except 'run_at' will be
* updated. (Default: 'replace')
*/
jobKeyMode?: "replace" | "preserve_run_at";
}

@@ -169,3 +188,3 @@ /**

*/
export declare type CronMatcher = (digest: TimestampDigest) => boolean;
export type CronMatcher = (digest: TimestampDigest) => boolean;
/**

@@ -199,4 +218,4 @@ * Symbol to determine that the item was indeed fed through a parser function.

payload: {
[key: string]: any;
};
[key: string]: unknown;
} | null;
/** An identifier so that we can prevent double-scheduling of a task and determine whether or not to backfill. */

@@ -223,3 +242,3 @@ identifier: string;

payload?: {
[key: string]: any;
[key: string]: unknown;
};

@@ -259,2 +278,3 @@ /** An identifier so that we can prevent double-scheduling of a task and determine whether or not to backfill. */

} | null;
is_available: boolean;
}

@@ -272,9 +292,13 @@ export interface Job extends DbJob {

export interface Worker {
workerPool: WorkerPool;
nudge: () => boolean;
workerId: string;
release: () => void | Promise<void>;
release: (force?: boolean) => void | Promise<void>;
promise: Promise<void>;
getActiveJob: () => Job | null;
/** @internal */
_start: (() => void) | null;
}
export interface WorkerPool {
id: string;
/** @deprecated Use gracefulShutdown instead */

@@ -285,2 +309,23 @@ release: () => Promise<void>;

promise: Promise<void>;
/** @experimental */
abortSignal: AbortSignal;
/** @internal */
_shuttingDown: boolean;
/** @internal */
_active: boolean;
/** @internal */
_workers: Worker[];
/** @internal */
_withPgClient: WithPgClient;
/** @internal */
_start: (() => void) | null;
/**
* Only works if concurrency === 1!
*
* @internal
*/
worker: Worker | null;
then<TResult1 = void, TResult2 = never>(onfulfilled?: ((value: void) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | undefined | null): Promise<TResult1 | TResult2>;
catch<TResult = never>(onrejected?: ((reason: unknown) => TResult | PromiseLike<TResult>) | undefined | null): Promise<void | TResult>;
finally(onfinally?: (() => void) | undefined | null): Promise<void>;
}

@@ -296,2 +341,4 @@ export interface Runner {

promise: Promise<void>;
/** @internal */
_active: boolean;
}

@@ -336,3 +383,3 @@ export interface TaskSpec {

}
export declare type ForbiddenFlagsFn = () => null | string[] | Promise<null | string[]>;
export type ForbiddenFlagsFn = () => null | string[] | Promise<null | string[]>;
/**

@@ -402,2 +449,11 @@ * These options are common Graphile Worker pools, workers, and utils.

maxResetLockedInterval?: number;
preset?: GraphileConfig.Preset;
/**
* How long in milliseconds after a gracefulShutdown is triggered should
* we wait to trigger the AbortController, which should cancel supported
* asynchronous actions?
*
* @defaultValue `5000`
*/
gracefulShutdownAbortTimeout?: number;
}

@@ -421,4 +477,29 @@ /**

workerId?: string;
abortSignal?: AbortSignal;
workerPool: WorkerPool;
/**
* If set true, we won't install signal handlers and it'll be up to you to
* handle graceful shutdown of the worker if the process receives a signal.
*/
noHandleSignals?: boolean;
/** If false, worker won't start looking for jobs until you call `worker._start()` */
autostart?: boolean;
}
/**
* Options for an individual worker
*/
export interface RunOnceOptions extends SharedOptions {
/**
* An identifier for this specific worker; if unset then a random ID will be assigned. Do not assign multiple workers the same worker ID!
*/
workerId?: string;
/**
* If set true, we won't install signal handlers and it'll be up to you to
* handle graceful shutdown of the worker if the process receives a signal.
*/
noHandleSignals?: boolean;
/** Single worker only! */
concurrency?: 1;
}
/**
* Options for a worker pool.

@@ -442,3 +523,3 @@ */

/**
* Task names and handler, e.g. from `getTasks` (use this if you need watch mode)
* Task names and handler, e.g. from `getTasks`. Overrides `taskDirectory`
*/

@@ -451,3 +532,4 @@ taskList?: TaskList;

/**
* A crontab string to use instead of reading a crontab file
* A crontab string to use instead of reading a crontab file. Overrides
* `crontabFile`
*/

@@ -463,3 +545,3 @@ crontab?: string;

* express, and if you don't adhere to them then you'll get unexpected
* behaviours.
* behaviours. Overrides `crontabFile`
*/

@@ -480,2 +562,4 @@ parsedCronItems?: Array<ParsedCronItem>;

runAt: string;
jobKey?: string;
jobKeyMode: CronItemOptions["jobKeyMode"];
maxAttempts?: number;

@@ -488,7 +572,11 @@ priority?: number;

}
export interface JobAndCronIdentifierWithDetails extends JobAndCronIdentifier {
known_since: Date;
last_execution: Date | null;
}
export interface WorkerUtilsOptions extends SharedOptions {
}
declare type BaseEventMap = Record<string, any>;
declare type EventMapKey<TEventMap extends BaseEventMap> = string & keyof TEventMap;
declare type EventCallback<TPayload> = (params: TPayload) => void;
type BaseEventMap = Record<string, unknown>;
type EventMapKey<TEventMap extends BaseEventMap> = string & keyof TEventMap;
type EventCallback<TPayload> = (params: TPayload) => void;
interface TypedEventEmitter<TEventMap extends BaseEventMap> extends EventEmitter {

@@ -505,3 +593,3 @@ addListener<TEventName extends EventMapKey<TEventMap>>(eventName: TEventName, callback: EventCallback<TEventMap[TEventName]>): this;

*/
export declare type WorkerEventMap = {
export type WorkerEventMap = {
/**

@@ -533,10 +621,28 @@ * When a worker pool is created

workerPool: WorkerPool;
error: any;
error: unknown;
client: PoolClient;
};
/**
* When a worker pool receives a notification
*/
"pool:listen:notification": {
workerPool: WorkerPool;
message: Notification;
client: PoolClient;
};
/**
* When a worker pool listening client is no longer available
*/
"pool:listen:release": {
workerPool: WorkerPool;
/** If you use this client, be careful to handle errors - it may be in an invalid state (errored, disconnected, etc). */
client: PoolClient;
};
/**
* When a worker pool is released
*/
"pool:release": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -547,3 +653,5 @@ /**

"pool:gracefulShutdown": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
workerPool: WorkerPool;
message: string;

@@ -555,4 +663,6 @@ };

"pool:gracefulShutdown:error": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
error: any;
workerPool: WorkerPool;
error: unknown;
};

@@ -564,4 +674,6 @@ /**

"pool:gracefulShutdown:workerError": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
error: any;
workerPool: WorkerPool;
error: unknown;
job: Job | null;

@@ -573,3 +685,5 @@ };

"pool:gracefulShutdown:complete": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -580,3 +694,5 @@ /**

"pool:forcefulShutdown": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
workerPool: WorkerPool;
message: string;

@@ -588,4 +704,6 @@ };

"pool:forcefulShutdown:error": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
error: any;
workerPool: WorkerPool;
error: unknown;
};

@@ -596,3 +714,5 @@ /**

"pool:forcefulShutdown:complete": {
/** @deprecated Use workerPool for consistency */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -617,3 +737,3 @@ /**

worker: Worker;
error?: any;
error?: unknown;
};

@@ -631,3 +751,3 @@ /**

worker: Worker;
error: any;
error: unknown;
};

@@ -645,4 +765,4 @@ /**

worker: Worker;
error: any;
jobError: any | null;
error: unknown;
jobError: unknown | null;
};

@@ -669,4 +789,4 @@ /**

job: Job;
error: any;
batchJobErrors?: any[];
error: unknown;
batchJobErrors?: unknown[];
};

@@ -679,4 +799,4 @@ /**

job: Job;
error: any;
batchJobErrors?: any[];
error: unknown;
batchJobErrors?: unknown[];
};

@@ -690,3 +810,3 @@ /**

job: Job;
error: any;
error: unknown;
};

@@ -706,3 +826,3 @@ /** **Experimental** When the cron starts working (before backfilling) */

cron: Cron;
itemsToBackfill: JobAndCronIdentifier[];
itemsToBackfill: JobAndCronIdentifierWithDetails[];
timestamp: string;

@@ -755,3 +875,3 @@ };

/** @internal Not sure this'll stay on pool */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -769,3 +889,3 @@ /**

/** @internal Not sure this'll stay on pool */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -783,3 +903,3 @@ /**

/** @internal Not sure this'll stay on pool */
pool: WorkerPool;
workerPool: WorkerPool;
};

@@ -801,5 +921,5 @@ /**

*/
stop: {};
stop: Record<string, never>;
};
export declare type WorkerEvents = TypedEventEmitter<WorkerEventMap>;
export type WorkerEvents = TypedEventEmitter<WorkerEventMap>;
/**

@@ -815,2 +935,29 @@ * The digest of a timestamp into the component parts that a cron schedule cares about.

}
/** Details of a file (guaranteed not to be a directory, nor a symlink) */
export interface FileDetails {
/** The full path to the file (possibly relative to the current working directory) */
fullPath: string;
/** The stats of the file */
stats: Stats;
/** The name of the file, excluding any extensions */
baseName: string;
/** The extensions of the file, e.g. `""` for no extensions, `".js"` or even `".test.js"`. */
extension: string;
}
export type Writeable<T> = {
-readonly [P in keyof T]: T[P];
};
export interface WorkerPluginContext {
version: string;
maxMigrationNumber: number;
breakingMigrationNumbers: number[];
events: WorkerEvents;
logger: Logger;
workerSchema: string;
escapedWorkerSchema: string;
/** @internal */
_rawOptions: SharedOptions;
hooks: AsyncHooks<GraphileConfig.WorkerHooks>;
resolvedPreset: ResolvedWorkerPreset;
}
export {};

@@ -0,5 +1,14 @@

import { AsyncHooks } from "graphile-config";
import { Pool } from "pg";
import { AddJobFunction, RunnerOptions, SharedOptions, WithPgClient, WorkerEvents } from "./interfaces";
import { makeWorkerPresetWorkerOptions } from "./config";
import { AddJobFunction, PromiseOrDirect, RunnerOptions, RunOnceOptions, SharedOptions, WithPgClient, WorkerEvents, WorkerOptions, WorkerSharedOptions, WorkerUtilsOptions } from "./interfaces";
import { Logger, LogScope } from "./logger";
export interface CompiledSharedOptions {
export declare const BREAKING_MIGRATIONS: number[];
export type ResolvedWorkerPreset = GraphileConfig.ResolvedPreset & {
worker: GraphileConfig.WorkerOptions & ReturnType<typeof makeWorkerPresetWorkerOptions>;
};
export interface CompiledSharedOptions<T extends SharedOptions = SharedOptions> {
version: string;
maxMigrationNumber: number;
breakingMigrationNumbers: number[];
events: WorkerEvents;

@@ -9,7 +18,10 @@ logger: Logger;

escapedWorkerSchema: string;
maxContiguousErrors: number;
useNodeTime: boolean;
minResetLockedInterval: number;
maxResetLockedInterval: number;
options: SharedOptions;
/**
* DO NOT USE THIS! As we move over to presets this will be removed.
*
* @internal
*/
_rawOptions: T;
resolvedPreset: ResolvedWorkerPreset;
hooks: AsyncHooks<GraphileConfig.WorkerHooks>;
}

@@ -19,6 +31,6 @@ interface ProcessSharedOptionsSettings {

}
export declare function processSharedOptions(options: SharedOptions, { scope }?: ProcessSharedOptionsSettings): CompiledSharedOptions;
export declare type Releasers = Array<() => void | Promise<void>>;
export declare function assertPool(options: SharedOptions, releasers: Releasers): Promise<Pool>;
export declare type Release = () => Promise<void>;
export declare function processSharedOptions<T extends SharedOptions | WorkerSharedOptions | WorkerOptions | RunOnceOptions | WorkerUtilsOptions>(options: T, { scope }?: ProcessSharedOptionsSettings): CompiledSharedOptions<T>;
export type Releasers = Array<() => void | Promise<void>>;
export declare function assertPool(compiledSharedOptions: CompiledSharedOptions, releasers: Releasers): Promise<Pool>;
export type Release = () => PromiseOrDirect<void>;
export declare function withReleasers<T>(callback: (releasers: Releasers, release: Release) => Promise<T>): Promise<T>;

@@ -29,8 +41,12 @@ interface ProcessOptionsExtensions {

addJob: AddJobFunction;
release: Release;
releasers: Releasers;
}
export interface CompiledOptions extends CompiledSharedOptions, ProcessOptionsExtensions {
export interface CompiledOptions extends CompiledSharedOptions<RunnerOptions>, ProcessOptionsExtensions {
}
export declare const getUtilsAndReleasersFromOptions: (options: RunnerOptions, settings?: ProcessSharedOptionsSettings) => Promise<CompiledOptions>;
type CompiledOptionsAndRelease = [
compiledOptions: CompiledOptions,
release: (error?: Error) => PromiseOrDirect<void>
];
export declare const getUtilsAndReleasersFromOptions: (options: RunnerOptions, settings?: ProcessSharedOptionsSettings) => Promise<CompiledOptionsAndRelease>;
export declare function tryParseJson<T = object>(json: string | null | undefined): T | null;
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.getUtilsAndReleasersFromOptions = exports.withReleasers = exports.assertPool = exports.processSharedOptions = void 0;
const assert = require("assert");
exports.tryParseJson = exports.getUtilsAndReleasersFromOptions = exports.withReleasers = exports.assertPool = exports.processSharedOptions = exports.BREAKING_MIGRATIONS = void 0;
const tslib_1 = require("tslib");
const assert = tslib_1.__importStar(require("assert"));
const events_1 = require("events");
const graphile_config_1 = require("graphile-config");
const pg_1 = require("pg");
const config_1 = require("./config");
const cronConstants_1 = require("./cronConstants");
const sql_1 = require("./generated/sql");
const helpers_1 = require("./helpers");
const logger_1 = require("./logger");
const migrate_1 = require("./migrate");
const preset_1 = require("./preset");
const version_1 = require("./version");
const MAX_MIGRATION_NUMBER = Object.keys(sql_1.migrations).reduce((memo, migrationFile) => {
const migrationNumber = parseInt(migrationFile.slice(0, 6), 10);
return Math.max(memo, migrationNumber);
}, 0);
exports.BREAKING_MIGRATIONS = Object.entries(sql_1.migrations)
.filter(([_, text]) => {
return text.startsWith("--! breaking");
})
.map(([migrationFile]) => parseInt(migrationFile.slice(0, 6), 10));
/**
* Important: ensure you still handle `forbiddenFlags`, `pgPool`, `workerId`,
* `autostart`, `workerPool`, `abortSignal`, `noHandleSignals`, `taskList`,
* `crontab`, `parsedCronItems`!
*/
function legacyOptionsToPreset(options) {
if ("_rawOptions" in options) {
console.trace("GraphileWorkerInternalError: CompiledSharedOptions used where SharedOptions was expected.");
throw new Error("GraphileWorkerInternalError: CompiledSharedOptions used where SharedOptions was expected.");
}
assert.ok(!options.taskList || !options.taskDirectory, "Exactly one of either `taskDirectory` or `taskList` should be set");
const preset = {
extends: [],
worker: {},
};
for (const key of Object.keys(options)) {
if (options[key] == null) {
continue;
}
switch (key) {
case "forbiddenFlags":
case "pgPool":
case "workerId":
case "autostart":
case "workerPool":
case "abortSignal":
case "noHandleSignals":
case "taskList":
case "crontab":
case "parsedCronItems": {
// ignore
break;
}
case "preset": {
preset.extends.push(options[key]);
break;
}
case "logger": {
preset.worker.logger = options[key];
break;
}
case "schema": {
preset.worker.schema = options[key];
break;
}
case "connectionString": {
preset.worker.connectionString = options[key];
break;
}
case "events": {
preset.worker.events = options[key];
break;
}
case "maxPoolSize": {
preset.worker.maxPoolSize = options[key];
break;
}
case "useNodeTime": {
preset.worker.useNodeTime = options[key];
break;
}
case "noPreparedStatements": {
preset.worker.preparedStatements = !options[key];
break;
}
case "minResetLockedInterval": {
preset.worker.minResetLockedInterval = options[key];
break;
}
case "maxResetLockedInterval": {
preset.worker.maxResetLockedInterval = options[key];
break;
}
case "gracefulShutdownAbortTimeout": {
preset.worker.gracefulShutdownAbortTimeout = options[key];
break;
}
case "pollInterval": {
preset.worker.pollInterval = options[key];
break;
}
case "concurrency": {
preset.worker.concurrentJobs = options[key];
break;
}
case "taskDirectory": {
preset.worker.taskDirectory = options[key];
break;
}
case "crontabFile": {
preset.worker.crontabFile = options[key];
break;
}
default: {
const never = key;
console.warn(`Do not know how to convert config option '${never}' into its preset equivalent; ignoring.`);
}
}
}
return preset;
}
const _sharedOptionsCache = new WeakMap();
function processSharedOptions(options, { scope } = {}) {
if ("_rawOptions" in options) {
throw new Error(`Fed processed options to processSharedOptions; this is invalid.`);
}
let compiled = _sharedOptionsCache.get(options);
if (!compiled) {
const { logger = logger_1.defaultLogger, schema: workerSchema = config_1.defaults.schema, events = new events_1.EventEmitter(), useNodeTime = false, minResetLockedInterval = 8 * cronConstants_1.MINUTE, maxResetLockedInterval = 10 * cronConstants_1.MINUTE, } = options;
const resolvedPreset = (0, graphile_config_1.resolvePresets)([
preset_1.WorkerPreset,
// Explicit options override the preset
legacyOptionsToPreset(options),
]);
const { worker: { minResetLockedInterval, maxResetLockedInterval, schema: workerSchema, logger, events = new events_1.EventEmitter(), }, } = resolvedPreset;
const escapedWorkerSchema = pg_1.Client.prototype.escapeIdentifier(workerSchema);

@@ -24,3 +144,7 @@ if (!Number.isFinite(minResetLockedInterval) ||

}
const hooks = new graphile_config_1.AsyncHooks();
compiled = {
version: version_1.version,
maxMigrationNumber: MAX_MIGRATION_NUMBER,
breakingMigrationNumbers: exports.BREAKING_MIGRATIONS,
events,

@@ -30,9 +154,17 @@ logger,

escapedWorkerSchema,
maxContiguousErrors: config_1.defaults.maxContiguousErrors,
useNodeTime,
minResetLockedInterval,
maxResetLockedInterval,
options,
_rawOptions: options,
hooks,
resolvedPreset,
};
(0, graphile_config_1.applyHooks)(resolvedPreset.plugins, (p) => p.worker?.hooks, (name, fn, plugin) => {
const context = compiled;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const cb = ((...args) => fn(context, ...args));
cb.displayName = `${plugin.name}_hook_${name}`;
hooks.hook(name, cb);
});
_sharedOptionsCache.set(options, compiled);
Promise.resolve(hooks.process("init")).catch((error) => {
logger.error(`One of the plugins you are using raised an error during 'init'; but errors during 'init' are currently ignored. Continuing. Error: ${error}`, { error });
});
}

@@ -50,9 +182,11 @@ if (scope) {

exports.processSharedOptions = processSharedOptions;
async function assertPool(options, releasers) {
const { logger } = processSharedOptions(options);
assert(!options.pgPool || !options.connectionString, "Both `pgPool` and `connectionString` are set, at most one of these options should be provided");
async function assertPool(compiledSharedOptions, releasers) {
const { logger, resolvedPreset: { worker: { maxPoolSize, connectionString }, }, _rawOptions, } = compiledSharedOptions;
assert.ok(
// NOTE: we explicitly want `_rawOptions.connectionString` here - we don't
// mind if `connectionString` is set as part of the preset.
!_rawOptions.pgPool || !_rawOptions.connectionString, "Both `pgPool` and `connectionString` are set, at most one of these options should be provided");
let pgPool;
const connectionString = options.connectionString || process.env.DATABASE_URL;
if (options.pgPool) {
pgPool = options.pgPool;
if (_rawOptions.pgPool) {
pgPool = _rawOptions.pgPool;
}

@@ -62,3 +196,3 @@ else if (connectionString) {

connectionString,
max: options.maxPoolSize,
max: maxPoolSize,
});

@@ -72,3 +206,3 @@ releasers.push(() => {

/* Pool automatically pulls settings from envvars */
max: options.maxPoolSize,
max: maxPoolSize,
});

@@ -124,3 +258,10 @@ releasers.push(() => {

const releasers = [];
let released = false;
const release = async () => {
if (released) {
throw new Error(`Internal error: compiledOptions was released twice.`);
}
else {
released = true;
}
let firstError = null;

@@ -155,26 +296,45 @@ // Call releasers in reverse order - LIFO queue.

const getUtilsAndReleasersFromOptions = async (options, settings = {}) => {
const shared = processSharedOptions(options, settings);
const { concurrency = config_1.defaults.concurrentJobs } = options;
return withReleasers(async (releasers, release) => {
const pgPool = await assertPool(options, releasers);
if ("_rawOptions" in options) {
throw new Error(`Fed processed options to getUtilsAndReleasersFromOptions; this is invalid.`);
}
const compiledSharedOptions = processSharedOptions(options, settings);
const { logger, resolvedPreset: { worker: { concurrentJobs: concurrency }, }, } = compiledSharedOptions;
return withReleasers(async function getUtilsFromOptions(releasers, release) {
const pgPool = await assertPool(compiledSharedOptions, releasers);
// @ts-ignore
const max = pgPool?.options?.max || 10;
if (max < concurrency) {
console.warn(`WARNING: having maxPoolSize (${max}) smaller than concurrency (${concurrency}) may lead to non-optimal performance.`);
logger.warn(`WARNING: having maxPoolSize (${max}) smaller than concurrency (${concurrency}) may lead to non-optimal performance.`, { max, concurrency });
}
const withPgClient = (0, helpers_1.makeWithPgClientFromPool)(pgPool);
// Migrate
await withPgClient((client) => (0, migrate_1.migrate)(options, client));
const addJob = (0, helpers_1.makeAddJob)(options, withPgClient);
return {
...shared,
pgPool,
withPgClient,
addJob,
await withPgClient(function migrateWithPgClient(client) {
return (0, migrate_1.migrate)(compiledSharedOptions, client);
});
const addJob = (0, helpers_1.makeAddJob)(compiledSharedOptions, withPgClient);
return [
{
...compiledSharedOptions,
pgPool,
withPgClient,
addJob,
releasers,
},
release,
releasers,
};
];
});
};
exports.getUtilsAndReleasersFromOptions = getUtilsAndReleasersFromOptions;
function tryParseJson(json) {
if (json == null) {
return null;
}
try {
return JSON.parse(json);
}
catch (e) {
return null;
}
}
exports.tryParseJson = tryParseJson;
//# sourceMappingURL=lib.js.map

@@ -1,2 +0,2 @@

import { LogFunctionFactory as GraphileLogFunctionFactory, Logger as GraphileLogger } from "@graphile/logger";
import { LogFunctionFactory as GraphileLogFunctionFactory, Logger as GraphileLogger, LogLevel } from "@graphile/logger";
export interface LogScope {

@@ -8,6 +8,7 @@ label?: string;

}
export { LogLevel };
export declare class Logger extends GraphileLogger<LogScope> {
}
export declare type LogFunctionFactory = GraphileLogFunctionFactory<LogScope>;
export declare const consoleLogFactory: (scope: Partial<LogScope>) => (level: import("@graphile/logger").LogLevel, message: string) => void;
export type LogFunctionFactory = GraphileLogFunctionFactory<LogScope>;
export declare const consoleLogFactory: (scope: Partial<LogScope>) => (level: LogLevel, message: string) => void;
export declare const defaultLogger: Logger;
import { Pool, PoolClient } from "pg";
import { TaskList, WorkerOptions, WorkerPool, WorkerPoolOptions } from "./interfaces";
import { RunOnceOptions, TaskList, WithPgClient, WorkerPool, WorkerPoolOptions } from "./interfaces";
import { CompiledSharedOptions } from "./lib";
declare const allWorkerPools: Array<WorkerPool>;
export { allWorkerPools as _allWorkerPools };
export declare function runTaskList(options: WorkerPoolOptions, tasks: TaskList, pgPool: Pool): WorkerPool;
export declare const runTaskListOnce: (options: WorkerOptions, tasks: TaskList, client: PoolClient) => Promise<void>;
export declare function runTaskList(rawOptions: WorkerPoolOptions, tasks: TaskList, pgPool: Pool): WorkerPool;
export declare function runTaskListInternal(compiledSharedOptions: CompiledSharedOptions<WorkerPoolOptions>, tasks: TaskList, pgPool: Pool): WorkerPool;
export declare function _runTaskList(compiledSharedOptions: CompiledSharedOptions<RunOnceOptions | WorkerPoolOptions>, tasks: TaskList, withPgClient: WithPgClient, options: {
concurrency?: number | undefined;
noHandleSignals?: boolean | undefined;
continuous: boolean;
/** If false, you need to call `pool._start()` to start execution */
autostart?: boolean;
onDeactivate?: () => Promise<void> | void;
onTerminate?: () => Promise<void> | void;
}): WorkerPool;
export declare const runTaskListOnce: (options: RunOnceOptions, tasks: TaskList, client: PoolClient) => WorkerPool;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.runTaskListOnce = exports.runTaskList = exports._allWorkerPools = void 0;
exports.runTaskListOnce = exports._runTaskList = exports.runTaskListInternal = exports.runTaskList = exports._allWorkerPools = void 0;
const tslib_1 = require("tslib");
const crypto_1 = require("crypto");
const events_1 = require("events");
const util_1 = require("util");
const config_1 = require("./config");
const deferred_1 = require("./deferred");
const deferred_1 = tslib_1.__importDefault(require("./deferred"));
const helpers_1 = require("./helpers");
const lib_1 = require("./lib");
const signals_1 = require("./signals");
const signals_1 = tslib_1.__importDefault(require("./signals"));
const failJob_1 = require("./sql/failJob");

@@ -24,3 +25,3 @@ const resetLockedAt_1 = require("./sql/resetLockedAt");

*/
let _signalHandlersEventEmitter = new events_1.EventEmitter();
const _signalHandlersEventEmitter = new events_1.EventEmitter();
/**

@@ -35,2 +36,3 @@ * Only register the signal handlers once _globally_.

let _shuttingDownForcefully = false;
let _registeredSignalHandlersCount = 0;
/**

@@ -46,10 +48,21 @@ * This will register the signal handlers to make sure the worker shuts down

}
_signalHandlersEventEmitter.on("gracefulShutdown", (o) => events.emit("gracefulShutdown", o));
_signalHandlersEventEmitter.on("forcefulShutdown", (o) => events.emit("forcefulShutdown", o));
if (_registeredSignalHandlers) {
return;
const gscb = (o) => events.emit("gracefulShutdown", o);
const fscb = (o) => events.emit("forcefulShutdown", o);
if (!_registeredSignalHandlers) {
_reallyRegisterSignalHandlers(logger);
}
else {
_registeredSignalHandlers = true;
}
_registeredSignalHandlersCount++;
_signalHandlersEventEmitter.on("gracefulShutdown", gscb);
_signalHandlersEventEmitter.on("forcefulShutdown", fscb);
return function release() {
_signalHandlersEventEmitter.off("gracefulShutdown", gscb);
_signalHandlersEventEmitter.off("forcefulShutdown", fscb);
_registeredSignalHandlersCount--;
if (_registeredSignalHandlersCount === 0) {
_releaseSignalHandlers();
}
};
}
let _releaseSignalHandlers = () => void 0;
function _reallyRegisterSignalHandlers(logger) {
const switchToForcefulHandler = () => {

@@ -107,18 +120,45 @@ logger.debug(`Switching to forceful handler for termination signals (${signals_1.default.join(", ")}); another termination signal will force a fast (unsafe) shutdown`, { switchToForcefulHandlers: true });

logger.debug(`Registering termination signal handlers (${signals_1.default.join(", ")})`, { registeringSignalHandlers: signals_1.default });
_registeredSignalHandlers = true;
for (const signal of signals_1.default) {
process.on(signal, gracefulHandler);
}
_releaseSignalHandlers = () => {
if (_shuttingDownGracefully || _shuttingDownForcefully) {
logger.warn(`Not unregistering signal handlers as we're shutting down`);
return;
}
_releaseSignalHandlers = () => void 0;
for (const signal of signals_1.default) {
process.off(signal, gracefulHandler);
}
_registeredSignalHandlers = false;
};
}
function runTaskList(options, tasks, pgPool) {
const { logger, events } = (0, lib_1.processSharedOptions)(options);
if (ENABLE_DANGEROUS_LOGS) {
logger.debug(`Worker pool options are ${(0, util_1.inspect)(options)}`, { options });
}
const { concurrency = config_1.defaults.concurrentJobs, noHandleSignals } = options;
if (!noHandleSignals) {
// Clean up when certain signals occur
registerSignalHandlers(logger, events);
}
const promise = (0, deferred_1.default)();
const workers = [];
function runTaskList(rawOptions, tasks, pgPool) {
const compiledSharedOptions = (0, lib_1.processSharedOptions)(rawOptions);
return runTaskListInternal(compiledSharedOptions, tasks, pgPool);
}
exports.runTaskList = runTaskList;
function runTaskListInternal(compiledSharedOptions, tasks, pgPool) {
const { events, logger, resolvedPreset: { worker: { minResetLockedInterval, maxResetLockedInterval }, }, } = compiledSharedOptions;
const withPgClient = (0, helpers_1.makeWithPgClientFromPool)(pgPool);
const workerPool = _runTaskList(compiledSharedOptions, tasks, withPgClient, {
continuous: true,
onTerminate() {
return resetLockedAtPromise;
},
onDeactivate() {
if (resetLockedTimeout) {
clearTimeout(resetLockedTimeout);
resetLockedTimeout = null;
}
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
return unlistenForChanges();
},
});
let attempts = 0;
let reconnectTimeout = null;
let changeListener = null;

@@ -128,3 +168,3 @@ const unlistenForChanges = async () => {

try {
changeListener.release();
await changeListener.release();
}

@@ -136,19 +176,15 @@ catch (e) {

};
let active = true;
let reconnectTimeout = null;
const compiledSharedOptions = (0, lib_1.processSharedOptions)(options);
const { minResetLockedInterval, maxResetLockedInterval } = compiledSharedOptions;
let resetLockedAtPromise;
const resetLockedDelay = () => Math.ceil(minResetLockedInterval +
Math.random() * (maxResetLockedInterval - minResetLockedInterval));
let resetLockedAtPromise;
const resetLocked = () => {
resetLockedAtPromise = (0, resetLockedAt_1.resetLockedAt)(compiledSharedOptions, withPgClient).then(() => {
resetLockedAtPromise = undefined;
if (active) {
if (workerPool._active) {
const delay = resetLockedDelay();
events.emit("resetLocked:success", { pool: this, delay });
events.emit("resetLocked:success", { workerPool, delay });
resetLockedTimeout = setTimeout(resetLocked, delay);
}
else {
events.emit("resetLocked:success", { pool: this, delay: null });
events.emit("resetLocked:success", { workerPool, delay: null });
}

@@ -158,5 +194,9 @@ }, (e) => {

// TODO: push this error out via an event.
if (active) {
if (workerPool._active) {
const delay = resetLockedDelay();
events.emit("resetLocked:failure", { pool: this, error: e, delay });
events.emit("resetLocked:failure", {
workerPool,
error: e,
delay,
});
resetLockedTimeout = setTimeout(resetLocked, delay);

@@ -169,3 +209,3 @@ logger.error(`Failed to reset locked; we'll try again in ${delay}ms`, {

events.emit("resetLocked:failure", {
pool: this,
workerPool,
error: e,

@@ -179,3 +219,3 @@ delay: null,

});
events.emit("resetLocked:started", { pool: this });
events.emit("resetLocked:started", { workerPool });
};

@@ -185,17 +225,136 @@ // Reset locked in the first 60 seconds, not immediately because we don't

let resetLockedTimeout = setTimeout(resetLocked, Math.random() * Math.min(60000, maxResetLockedInterval));
function deactivate() {
if (active) {
active = false;
if (resetLockedTimeout) {
clearTimeout(resetLockedTimeout);
resetLockedTimeout = null;
}
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
const listenForChanges = (err, client, releaseClient) => {
if (!workerPool._active) {
// We were released, release this new client and abort
releaseClient?.();
return;
}
const reconnectWithExponentialBackoff = (err) => {
events.emit("pool:listen:error", { workerPool, client, error: err });
attempts++;
// When figuring the next delay we want exponential back-off, but we also
// want to avoid the thundering herd problem. For now, we'll add some
// randomness to it via the `jitter` variable, this variable is
// deliberately weighted towards the higher end of the duration.
const jitter = 0.5 + Math.sqrt(Math.random()) / 2;
// Backoff (ms): 136, 370, 1005, 2730, 7421, 20172, 54832
const delay = Math.ceil(jitter * Math.min(MAX_DELAY, 50 * Math.exp(attempts)));
logger.error(`Error with notify listener (trying again in ${delay}ms): ${err.message}`, { error: err });
reconnectTimeout = setTimeout(() => {
reconnectTimeout = null;
events.emit("pool:listen:connecting", { workerPool, attempts });
pgPool.connect(listenForChanges);
}, delay);
};
if (err) {
// Try again
reconnectWithExponentialBackoff(err);
return;
}
//----------------------------------------
let errorHandled = false;
function onErrorReleaseClientAndTryAgain(e) {
if (errorHandled) {
return;
}
events.emit("pool:release", { pool: this });
unlistenForChanges();
errorHandled = true;
try {
release();
}
catch (e) {
logger.error(`Error occurred releasing client: ${e.stack}`, {
error: e,
});
}
reconnectWithExponentialBackoff(e);
}
function handleNotification(message) {
if (changeListener?.client === client && !workerPool._shuttingDown) {
events.emit("pool:listen:notification", {
workerPool,
message,
client,
});
switch (message.channel) {
case "jobs:insert": {
const payload = (0, lib_1.tryParseJson)(message.payload);
let n = payload?.count ?? 1;
if (n > 0) {
// Nudge up to `n` workers
workerPool._workers.some((worker) => worker.nudge() && --n <= 0);
}
break;
}
case "worker:migrate": {
const payload = (0, lib_1.tryParseJson)(message.payload);
if (payload?.breaking) {
logger.warn(`Graphile Worker detected breaking migration to database schema revision '${payload?.migrationNumber}'; it would be unsafe to continue, so shutting down...`);
process.exitCode = 57;
workerPool.gracefulShutdown();
}
break;
}
default: {
logger.debug(`Received NOTIFY message on channel '${message.channel}'`);
}
}
}
}
function release() {
// No need to call changeListener.release() because the client errored
changeListener = null;
client.removeListener("notification", handleNotification);
// TODO: ideally we'd only stop handling errors once all pending queries are complete; but either way we shouldn't try again!
client.removeListener("error", onErrorReleaseClientAndTryAgain);
events.emit("pool:listen:release", { workerPool, client });
return client
.query('UNLISTEN "jobs:insert"; UNLISTEN "worker:migrate";')
.catch((error) => {
/* ignore errors */
logger.error(`Error occurred attempting to UNLISTEN: ${error}`, {
error,
});
})
.then(() => releaseClient());
}
// On error, release this client and try again
client.on("error", onErrorReleaseClientAndTryAgain);
//----------------------------------------
changeListener = { client, release };
events.emit("pool:listen:success", { workerPool, client });
client.on("notification", handleNotification);
// Subscribe to jobs:insert message
client.query('LISTEN "jobs:insert"; LISTEN "worker:migrate";').then(() => {
// Successful listen; reset attempts
attempts = 0;
}, onErrorReleaseClientAndTryAgain);
const supportedTaskNames = Object.keys(tasks);
logger.info(`Worker connected and looking for jobs... (task names: '${supportedTaskNames.join("', '")}')`);
};
// Create a client dedicated to listening for new jobs.
events.emit("pool:listen:connecting", { workerPool, attempts });
pgPool.connect(listenForChanges);
return workerPool;
}
exports.runTaskListInternal = runTaskListInternal;
function _runTaskList(compiledSharedOptions, tasks, withPgClient, options) {
const { resolvedPreset: { worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions;
const { concurrency = baseConcurrency, continuous, autostart: rawAutostart = true, onTerminate, onDeactivate, } = options;
let autostart = rawAutostart;
const { logger, events } = compiledSharedOptions;
if (ENABLE_DANGEROUS_LOGS) {
logger.debug(`Worker pool options are ${(0, util_1.inspect)(compiledSharedOptions._rawOptions)}`, { options: compiledSharedOptions._rawOptions });
}
let unregisterSignalHandlers = undefined;
if (!noHandleSignals) {
// Clean up when certain signals occur
unregisterSignalHandlers = registerSignalHandlers(logger, events);
}
const promise = (0, deferred_1.default)();
function deactivate() {
if (workerPool._active) {
workerPool._active = false;
return onDeactivate?.();
}
}
let terminated = false;

@@ -207,3 +366,6 @@ function terminate() {

allWorkerPools.splice(idx, 1);
promise.resolve(resetLockedAtPromise);
promise.resolve(onTerminate?.());
if (unregisterSignalHandlers) {
unregisterSignalHandlers();
}
}

@@ -214,6 +376,18 @@ else {

}
const abortController = new AbortController();
const abortSignal = abortController.signal;
// This is a representation of us that can be interacted with externally
const workerPool = {
// "otpool" - "one time pool"
id: `${continuous ? "pool" : "otpool"}-${(0, crypto_1.randomBytes)(9).toString("hex")}`,
_active: true,
_shuttingDown: false,
_workers: [],
_withPgClient: withPgClient,
get worker() {
return concurrency === 1 ? this._workers[0] ?? null : null;
},
abortSignal,
release: async () => {
console.trace("DEPRECATED: You are calling `workerPool.release()`; please use `workerPool.gracefulShutdown()` instead.");
logger.error("DEPRECATED: You are calling `workerPool.release()`; please use `workerPool.gracefulShutdown()` instead.");
return this.gracefulShutdown();

@@ -226,10 +400,30 @@ },

async gracefulShutdown(message = "Worker pool is shutting down gracefully") {
events.emit("pool:gracefulShutdown", { pool: this, message });
if (workerPool._shuttingDown) {
logger.error(`gracefulShutdown called when gracefulShutdown is already in progress`);
return;
}
workerPool._shuttingDown = true;
const abortTimer = setTimeout(() => {
abortController.abort();
}, gracefulShutdownAbortTimeout);
abortTimer.unref();
events.emit("pool:gracefulShutdown", {
pool: workerPool,
workerPool,
message,
});
try {
logger.debug(`Attempting graceful shutdown`);
// Stop new jobs being added
deactivate();
const deactivatePromise = deactivate();
// Remove all the workers - we're shutting them down manually
const workers = [...workerPool._workers];
const workerPromises = workers.map((worker) => worker.release());
const workerReleaseResults = await Promise.allSettled(workerPromises);
const [deactivateResult, ...workerReleaseResults] = await Promise.allSettled([deactivatePromise, ...workerPromises]);
if (deactivateResult.status === "rejected") {
// Log but continue regardless
logger.error(`Deactivation failed: ${deactivateResult.reason}`, {
error: deactivateResult.reason,
});
}
const jobsToRelease = [];

@@ -242,3 +436,4 @@ for (let i = 0; i < workerReleaseResults.length; i++) {

events.emit("pool:gracefulShutdown:workerError", {
pool: this,
pool: workerPool,
workerPool,
error: workerReleaseResult.reason,

@@ -270,7 +465,14 @@ job,

}
events.emit("pool:gracefulShutdown:complete", { pool: this });
events.emit("pool:gracefulShutdown:complete", {
pool: workerPool,
workerPool,
});
logger.debug("Graceful shutdown complete");
}
catch (e) {
events.emit("pool:gracefulShutdown:error", { pool: this, error: e });
events.emit("pool:gracefulShutdown:error", {
pool: workerPool,
workerPool,
error: e,
});
logger.error(`Error occurred during graceful shutdown: ${e.message}`, {

@@ -287,8 +489,13 @@ error: e,

async forcefulShutdown(message) {
events.emit("pool:forcefulShutdown", { pool: this, message });
events.emit("pool:forcefulShutdown", {
pool: workerPool,
workerPool,
message,
});
try {
logger.debug(`Attempting forceful shutdown`);
// Stop new jobs being added
deactivate();
const deactivatePromise = deactivate();
// Release all our workers' jobs
const workers = [...workerPool._workers];
const jobsInProgress = workers

@@ -298,5 +505,12 @@ .map((worker) => worker.getActiveJob())

// Remove all the workers - we're shutting them down manually
const workerPromises = workers.map((worker) => worker.release());
const workerPromises = workers.map((worker) => worker.release(true));
// Ignore the results, we're shutting down anyway
Promise.allSettled(workerPromises);
// TODO: add a timeout
const [deactivateResult, ..._ignoreWorkerReleaseResults] = await Promise.allSettled([deactivatePromise, ...workerPromises]);
if (deactivateResult.status === "rejected") {
// Log but continue regardless
logger.error(`Deactivation failed: ${deactivateResult.reason}`, {
error: deactivateResult.reason,
});
}
if (jobsInProgress.length > 0) {

@@ -318,7 +532,14 @@ const workerIds = workers.map((worker) => worker.workerId);

}
events.emit("pool:forcefulShutdown:complete", { pool: this });
events.emit("pool:forcefulShutdown:complete", {
pool: workerPool,
workerPool,
});
logger.debug("Forceful shutdown complete");
}
catch (e) {
events.emit("pool:forcefulShutdown:error", { pool: this, error: e });
events.emit("pool:forcefulShutdown:error", {
pool: workerPool,
workerPool,
error: e,
});
logger.error(`Error occurred during forceful shutdown: ${e.message}`, {

@@ -331,88 +552,65 @@ error: e,

promise,
then(onfulfilled, onrejected) {
return promise.then(onfulfilled, onrejected);
},
catch(onrejected) {
return promise.catch(onrejected);
},
finally(onfinally) {
return promise.finally(onfinally);
},
_start: autostart
? null
: () => {
autostart = true;
workerPool._workers.forEach((worker) => worker._start());
workerPool._start = null;
},
};
promise.finally(() => {
events.emit("pool:release", { pool: workerPool, workerPool });
});
abortSignal.addEventListener("abort", () => {
if (!workerPool._shuttingDown) {
workerPool.gracefulShutdown();
}
});
// Ensure that during a forced shutdown we get cleaned up too
allWorkerPools.push(workerPool);
events.emit("pool:create", { workerPool });
let attempts = 0;
const listenForChanges = (err, client, releaseClient) => {
if (!active) {
// We were released, release this new client and abort
releaseClient?.();
return;
}
const reconnectWithExponentialBackoff = (err) => {
events.emit("pool:listen:error", { workerPool, client, error: err });
attempts++;
// When figuring the next delay we want exponential back-off, but we also
// want to avoid the thundering herd problem. For now, we'll add some
// randomness to it via the `jitter` variable, this variable is
// deliberately weighted towards the higher end of the duration.
const jitter = 0.5 + Math.sqrt(Math.random()) / 2;
// Backoff (ms): 136, 370, 1005, 2730, 7421, 20172, 54832
const delay = Math.ceil(jitter * Math.min(MAX_DELAY, 50 * Math.exp(attempts)));
logger.error(`Error with notify listener (trying again in ${delay}ms): ${err.message}`, { error: err });
reconnectTimeout = setTimeout(() => {
reconnectTimeout = null;
events.emit("pool:listen:connecting", { workerPool, attempts });
pgPool.connect(listenForChanges);
}, delay);
};
if (err) {
// Try again
reconnectWithExponentialBackoff(err);
return;
}
//----------------------------------------
let errorHandled = false;
function onErrorReleaseClientAndTryAgain(e) {
if (errorHandled) {
return;
// Spawn our workers; they can share clients from the pool.
const workerId = "workerId" in compiledSharedOptions._rawOptions
? compiledSharedOptions._rawOptions.workerId
: undefined;
if (workerId != null && concurrency > 1) {
throw new Error(`You must not set workerId when concurrency > 1; each worker must have a unique identifier`);
}
for (let i = 0; i < concurrency; i++) {
const worker = (0, worker_1.makeNewWorker)(compiledSharedOptions, {
tasks,
withPgClient,
continuous,
abortSignal,
workerPool,
autostart,
workerId,
});
workerPool._workers.push(worker);
const remove = () => {
if (continuous && workerPool._active && !workerPool._shuttingDown) {
logger.error(`Worker exited, but pool is in continuous mode, is active, and is not shutting down... Did something go wrong?`);
}
errorHandled = true;
try {
release();
workerPool._workers.splice(workerPool._workers.indexOf(worker), 1);
if (!continuous && workerPool._workers.length === 0) {
deactivate();
terminate();
}
catch (e) {
logger.error(`Error occurred releasing client: ${e.stack}`, {
error: e,
});
}
reconnectWithExponentialBackoff(e);
}
function handleNotification() {
if (changeListener?.client === client) {
// Find a worker that's available
workers.some((worker) => worker.nudge());
}
}
function release() {
changeListener = null;
client.removeListener("error", onErrorReleaseClientAndTryAgain);
client.removeListener("notification", handleNotification);
client.query('UNLISTEN "jobs:insert"').catch(() => {
/* ignore errors */
});
releaseClient();
}
// On error, release this client and try again
client.on("error", onErrorReleaseClientAndTryAgain);
//----------------------------------------
events.emit("pool:listen:success", { workerPool, client });
changeListener = { client, release };
client.on("notification", handleNotification);
// Subscribe to jobs:insert message
client.query('LISTEN "jobs:insert"').then(() => {
// Successful listen; reset attempts
attempts = 0;
}, onErrorReleaseClientAndTryAgain);
const supportedTaskNames = Object.keys(tasks);
logger.info(`Worker connected and looking for jobs... (task names: '${supportedTaskNames.join("', '")}')`);
};
// Create a client dedicated to listening for new jobs.
events.emit("pool:listen:connecting", { workerPool, attempts });
pgPool.connect(listenForChanges);
// Spawn our workers; they can share clients from the pool.
const withPgClient = (0, helpers_1.makeWithPgClientFromPool)(pgPool);
for (let i = 0; i < concurrency; i++) {
workers.push((0, worker_1.makeNewWorker)(options, tasks, withPgClient));
};
worker.promise.then(() => {
remove();
}, (error) => {
remove();
console.trace(error);
logger.error(`Worker exited with error: ${error}`, { error });
});
}

@@ -422,15 +620,22 @@ // TODO: handle when a worker shuts down (spawn a new one)

}
exports.runTaskList = runTaskList;
exports._runTaskList = _runTaskList;
const runTaskListOnce = (options, tasks, client) => {
const withPgClient = (0, helpers_1.makeWithPgClientFromClient)(client);
const compiledSharedOptions = (0, lib_1.processSharedOptions)(options);
const pool = _runTaskList(compiledSharedOptions, tasks, withPgClient, {
concurrency: 1,
autostart: false,
noHandleSignals: options.noHandleSignals,
continuous: false,
});
const resetPromise = (0, resetLockedAt_1.resetLockedAt)(compiledSharedOptions, withPgClient);
const finalPromise = resetPromise.then(() => {
const worker = (0, worker_1.makeNewWorker)(options, tasks, (0, helpers_1.makeWithPgClientFromClient)(client), false);
finalPromise["worker"] = worker;
return worker.promise;
resetPromise.then(() => {
pool._start();
}, (error) => {
compiledSharedOptions.logger.error(`Error occurred resetting locked at; continuing regardless: ${error}`, { error });
pool._start();
});
return finalPromise;
return pool;
};
exports.runTaskListOnce = runTaskListOnce;
//# sourceMappingURL=main.js.map
import { PoolClient } from "pg";
import { WorkerSharedOptions } from "./interfaces";
export declare function migrate(options: WorkerSharedOptions, client: PoolClient): Promise<void>;
import { CompiledSharedOptions } from "./lib";
/** @internal */
export declare function migrate(compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>, client: PoolClient): Promise<void>;

@@ -7,68 +7,113 @@ "use strict";

function checkPostgresVersion(versionString) {
const version = parseInt(versionString, 10);
if (version < 120000) {
const postgresVersion = parseInt(versionString, 10);
if (postgresVersion < 120000) {
throw new Error(`This version of Graphile Worker requires PostgreSQL v12.0 or greater (detected \`server_version_num\` = ${versionString})`);
}
return postgresVersion;
}
async function fetchAndCheckPostgresVersion(client) {
const { rows: [row], } = await client.query("select current_setting('server_version_num') as server_version_num");
checkPostgresVersion(row.server_version_num);
return checkPostgresVersion(row.server_version_num);
}
async function installSchema(options, client) {
const { escapedWorkerSchema } = (0, lib_1.processSharedOptions)(options);
await fetchAndCheckPostgresVersion(client);
await client.query(`
create schema ${escapedWorkerSchema};
create table ${escapedWorkerSchema}.migrations(
async function installSchema(compiledSharedOptions, event) {
const { hooks, escapedWorkerSchema } = compiledSharedOptions;
event.postgresVersion =
await fetchAndCheckPostgresVersion(event.client);
await hooks.process("prebootstrap", event);
await event.client.query(`
create schema if not exists ${escapedWorkerSchema};
create table if not exists ${escapedWorkerSchema}.migrations(
id int primary key,
ts timestamptz default now() not null
);
alter table ${escapedWorkerSchema}.migrations add column if not exists breaking boolean not null default false;
`);
await event.client.query(`update ${escapedWorkerSchema}.migrations set breaking = true where id = any($1::int[])`, [lib_1.BREAKING_MIGRATIONS]);
await hooks.process("postbootstrap", event);
}
async function runMigration(options, client, migrationFile, migrationNumber) {
const { escapedWorkerSchema } = (0, lib_1.processSharedOptions)(options);
async function runMigration(compiledSharedOptions, event, migrationFile, migrationNumber) {
const { escapedWorkerSchema, logger } = compiledSharedOptions;
const rawText = sql_1.migrations[migrationFile];
const text = rawText.replace(/:GRAPHILE_WORKER_SCHEMA\b/g, escapedWorkerSchema);
await client.query("begin");
const breaking = lib_1.BREAKING_MIGRATIONS.includes(migrationNumber);
logger.debug(`Running ${breaking ? "breaking" : "backwards-compatible"} migration ${migrationFile}`);
let migrationInsertComplete = false;
await event.client.query("begin");
try {
await client.query({
// Must come first so we can detect concurrent migration
await event.client.query({
text: `insert into ${escapedWorkerSchema}.migrations (id, breaking) values ($1, $2)`,
values: [migrationNumber, breaking],
});
migrationInsertComplete = true;
await event.client.query({
text,
});
await client.query({
text: `insert into ${escapedWorkerSchema}.migrations (id) values ($1)`,
values: [migrationNumber],
});
await client.query("commit");
await event.client.query("select pg_notify($1, $2)", [
"worker:migrate",
JSON.stringify({ migrationNumber, breaking }),
]);
await event.client.query("commit");
}
catch (e) {
await client.query("rollback");
await event.client.query("rollback");
if (!migrationInsertComplete && e.code === "23505") {
// Someone else did this migration! Success!
logger.debug(`Some other worker has performed migration ${migrationFile}; continuing.`);
return;
}
throw e;
}
}
async function migrate(options, client) {
const { escapedWorkerSchema } = (0, lib_1.processSharedOptions)(options);
/** @internal */
async function migrate(compiledSharedOptions, client) {
const { escapedWorkerSchema, hooks, logger } = compiledSharedOptions;
let latestMigration = null;
try {
const { rows: [row], } = await client.query(`select current_setting('server_version_num') as server_version_num,
(select id from ${escapedWorkerSchema}.migrations order by id desc limit 1) as id;`);
latestMigration = row.id;
checkPostgresVersion(row.server_version_num);
}
catch (e) {
if (e.code === "42P01") {
await installSchema(options, client);
let latestBreakingMigration = null;
const event = { client, postgresVersion: 0, scratchpad: Object.create(null) };
for (let attempts = 0; attempts < 2; attempts++) {
try {
const { rows: [row], } = await event.client.query(`select current_setting('server_version_num') as server_version_num,
(select id from ${escapedWorkerSchema}.migrations order by id desc limit 1) as id,
(select id from ${escapedWorkerSchema}.migrations where breaking is true order by id desc limit 1) as biggest_breaking_id;`);
latestMigration = row.id;
latestBreakingMigration = row.biggest_breaking_id;
event.postgresVersion = checkPostgresVersion(row.server_version_num);
}
else {
throw e;
catch (e) {
if (attempts === 0 && (e.code === "42P01" || e.code === "42703")) {
await installSchema(compiledSharedOptions, event);
}
else {
throw e;
}
}
}
await hooks.process("premigrate", event);
const migrationFiles = Object.keys(sql_1.migrations);
let highestMigration = 0;
let migrated = false;
for (const migrationFile of migrationFiles) {
const migrationNumber = parseInt(migrationFile.slice(0, 6), 10);
if (migrationNumber > highestMigration) {
highestMigration = migrationNumber;
}
if (latestMigration == null || migrationNumber > latestMigration) {
await runMigration(options, client, migrationFile, migrationNumber);
migrated = true;
await runMigration(compiledSharedOptions, event, migrationFile, migrationNumber);
}
}
if (migrated) {
logger.debug(`Migrations complete`);
}
if (latestBreakingMigration && highestMigration < latestBreakingMigration) {
process.exitCode = 57;
throw new Error(`Database is using Graphile Worker schema revision ${latestMigration} which includes breaking migration ${latestBreakingMigration}, but the currently running worker only supports up to revision ${highestMigration}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible.`);
}
else if (latestMigration && highestMigration < latestMigration) {
logger.warn(`Database is using Graphile Worker schema revision ${latestMigration}, but the currently running worker only supports up to revision ${highestMigration} which may or may not be compatible. Please ensure all versions of Graphile Worker you're running are compatible, or use Worker Pro which will perform this check for you. Attempting to continue regardless.`);
}
await hooks.process("postmigrate", event);
}
exports.migrate = migrate;
//# sourceMappingURL=migrate.js.map

@@ -1,4 +0,7 @@

import { ParsedCronItem, Runner, RunnerOptions, TaskList } from "./interfaces";
import { ParsedCronItem, PromiseOrDirect, Runner, RunnerOptions, TaskList } from "./interfaces";
import { CompiledOptions } from "./lib";
export declare const runMigrations: (options: RunnerOptions) => Promise<void>;
export declare const runOnce: (options: RunnerOptions, overrideTaskList?: TaskList) => Promise<void>;
export declare const run: (options: RunnerOptions, overrideTaskList?: TaskList, overrideParsedCronItems?: Array<ParsedCronItem>) => Promise<Runner>;
export declare const runOnceInternal: (compiledOptions: CompiledOptions, overrideTaskList: TaskList | undefined, release: () => PromiseOrDirect<void>) => Promise<void>;
export declare const run: (rawOptions: RunnerOptions, overrideTaskList?: TaskList, overrideParsedCronItems?: Array<ParsedCronItem>) => Promise<Runner>;
export declare const runInternal: (compiledOptions: CompiledOptions, overrideTaskList: TaskList | undefined, overrideParsedCronItems: Array<ParsedCronItem> | undefined, release: () => PromiseOrDirect<void>) => Promise<Runner>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.run = exports.runOnce = exports.runMigrations = void 0;
const assert = require("assert");
exports.runInternal = exports.run = exports.runOnceInternal = exports.runOnce = exports.runMigrations = void 0;
const cron_1 = require("./cron");

@@ -9,20 +8,15 @@ const getTasks_1 = require("./getTasks");

const main_1 = require("./main");
const migrate_1 = require("./migrate");
const runMigrations = async (options) => {
const { withPgClient, release } = await (0, lib_1.getUtilsAndReleasersFromOptions)(options);
try {
await withPgClient((client) => (0, migrate_1.migrate)(options, client));
}
finally {
await release();
}
const [, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)(options);
await release();
};
exports.runMigrations = runMigrations;
async function assertTaskList(options, releasers) {
assert(!options.taskDirectory || !options.taskList, "Exactly one of either `taskDirectory` or `taskList` should be set");
if (options.taskList) {
return options.taskList;
/** @internal */
async function assertTaskList(compiledOptions, releasers) {
const { resolvedPreset: { worker: { taskDirectory }, }, _rawOptions: { taskList }, } = compiledOptions;
if (taskList) {
return taskList;
}
else if (options.taskDirectory) {
const watchedTasks = await (0, getTasks_1.default)(options, options.taskDirectory, false);
else if (taskDirectory) {
const watchedTasks = await (0, getTasks_1.getTasksInternal)(compiledOptions, taskDirectory);
releasers.push(() => watchedTasks.release());

@@ -32,15 +26,20 @@ return watchedTasks.tasks;

else {
throw new Error("You must specify either `options.taskList` or `options.taskDirectory`");
throw new Error("You must specify either `taskList` or `taskDirectory`");
}
}
const runOnce = async (options, overrideTaskList) => {
const { concurrency = 1 } = options;
const { withPgClient, release, releasers } = await (0, lib_1.getUtilsAndReleasersFromOptions)(options);
const [compiledOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)(options);
return (0, exports.runOnceInternal)(compiledOptions, overrideTaskList, release);
};
exports.runOnce = runOnce;
const runOnceInternal = async (compiledOptions, overrideTaskList, release) => {
const { withPgClient, releasers, resolvedPreset: { worker: { concurrentJobs: concurrency }, }, _rawOptions: { noHandleSignals }, } = compiledOptions;
try {
const taskList = overrideTaskList || (await assertTaskList(options, releasers));
const promises = [];
for (let i = 0; i < concurrency; i++) {
promises.push(withPgClient((client) => (0, main_1.runTaskListOnce)(options, taskList, client)));
}
await Promise.all(promises);
const taskList = overrideTaskList || (await assertTaskList(compiledOptions, releasers));
const workerPool = (0, main_1._runTaskList)(compiledOptions, taskList, withPgClient, {
concurrency,
noHandleSignals,
continuous: false,
});
return await workerPool.promise;
}

@@ -51,10 +50,14 @@ finally {

};
exports.runOnce = runOnce;
const run = async (options, overrideTaskList, overrideParsedCronItems) => {
const compiledOptions = await (0, lib_1.getUtilsAndReleasersFromOptions)(options);
const { release, releasers } = compiledOptions;
exports.runOnceInternal = runOnceInternal;
const run = async (rawOptions, overrideTaskList, overrideParsedCronItems) => {
const [compiledOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)(rawOptions);
return (0, exports.runInternal)(compiledOptions, overrideTaskList, overrideParsedCronItems, release);
};
exports.run = run;
const runInternal = async (compiledOptions, overrideTaskList, overrideParsedCronItems, release) => {
const { releasers } = compiledOptions;
try {
const taskList = overrideTaskList || (await assertTaskList(options, releasers));
const taskList = overrideTaskList || (await assertTaskList(compiledOptions, releasers));
const parsedCronItems = overrideParsedCronItems ||
(await (0, cron_1.getParsedCronItemsFromOptions)(options, releasers));
(await (0, cron_1.getParsedCronItemsFromOptions)(compiledOptions, releasers));
// The result of 'buildRunner' must be returned immediately, so that the

@@ -65,14 +68,19 @@ // user can await its promise property immediately. If this is broken then

return buildRunner({
options,
compiledOptions,
taskList,
parsedCronItems,
release,
});
}
catch (e) {
await release();
try {
await release();
}
catch (e2) {
compiledOptions.logger.error(`Error occurred whilst attempting to release options after error occurred`, { error: e, secondError: e2 });
}
throw e;
}
};
exports.run = run;
exports.runInternal = runInternal;
/**

@@ -86,14 +94,31 @@ * This _synchronous_ function exists to ensure that the promises are built and

function buildRunner(input) {
const { options, compiledOptions, taskList, parsedCronItems } = input;
const { events, pgPool, releasers, release, addJob } = compiledOptions;
const cron = (0, cron_1.runCron)(options, parsedCronItems, { pgPool, events });
const { compiledOptions, taskList, parsedCronItems, release } = input;
const { events, pgPool, releasers, addJob, logger } = compiledOptions;
const cron = (0, cron_1.runCron)(compiledOptions, parsedCronItems, { pgPool, events });
releasers.push(() => cron.release());
const workerPool = (0, main_1.runTaskList)(options, taskList, pgPool);
releasers.push(() => workerPool.gracefulShutdown("Runner is shutting down"));
const workerPool = (0, main_1.runTaskListInternal)(compiledOptions, taskList, pgPool);
releasers.push(() => {
if (!workerPool._shuttingDown) {
return workerPool.gracefulShutdown("Runner is shutting down");
}
});
let running = true;
const stop = async () => {
compiledOptions.logger.debug("Runner stopping");
if (running) {
running = false;
events.emit("stop", {});
await release();
try {
const promises = [];
if (cron._active) {
promises.push(cron.release());
}
if (workerPool._active) {
promises.push(workerPool.gracefulShutdown());
}
await Promise.all(promises).then(release);
}
catch (error) {
logger.error(`Error occurred whilst attempting to release runner options: ${error.message}`, { error });
}
}

@@ -104,13 +129,23 @@ else {

};
const promise = Promise.all([cron.promise, workerPool.promise]).then(() => {
/* void */
}, (e) => {
workerPool.promise.finally(() => {
if (running) {
console.error(`Stopping worker due to an error: ${e}`);
stop();
}
});
cron.promise.finally(() => {
if (running) {
stop();
}
});
const promise = Promise.all([cron.promise, workerPool.promise]).then(() => {
/* noop */
}, async (error) => {
if (running) {
logger.error(`Stopping worker due to an error: ${error}`, { error });
await stop();
}
else {
console.error(`Error occurred, but worker is already stopping: ${e}`);
logger.error(`Error occurred, but worker is already stopping: ${error}`, { error });
}
return Promise.reject(e);
return Promise.reject(error);
});

@@ -117,0 +152,0 @@ return {

@@ -1,3 +0,3 @@

export declare type Signal = "SIGUSR2" | "SIGINT" | "SIGTERM" | "SIGHUP" | "SIGABRT";
export type Signal = "SIGUSR2" | "SIGINT" | "SIGTERM" | "SIGHUP" | "SIGABRT";
declare const _default: Signal[];
export default _default;

@@ -5,3 +5,3 @@ "use strict";

async function completeJob(compiledSharedOptions, withPgClient, workerId, job) {
const { escapedWorkerSchema, workerSchema, options: { noPreparedStatements }, } = compiledSharedOptions;
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;
// TODO: retry logic, in case of server connection interruption

@@ -21,3 +21,3 @@ if (job.job_queue_id != null) {

values: [job.id, workerId],
name: noPreparedStatements
name: !preparedStatements
? undefined

@@ -33,3 +33,3 @@ : `complete_job_q/${workerSchema}`,

values: [job.id],
name: noPreparedStatements ? undefined : `complete_job/${workerSchema}`,
name: !preparedStatements ? undefined : `complete_job/${workerSchema}`,
}));

@@ -36,0 +36,0 @@ }

import { DbJob, WithPgClient } from "../interfaces";
import { CompiledSharedOptions } from "../lib";
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient, workerId: string, job: DbJob, message: string, replacementPayload: undefined | any[]): Promise<void>;
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient, workerId: string, job: DbJob, message: string, replacementPayload: undefined | unknown[]): Promise<void>;
export declare function failJobs(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient, workerIds: string[], jobs: DbJob[], message: string): Promise<DbJob[]>;

@@ -5,3 +5,3 @@ "use strict";

async function failJob(compiledSharedOptions, withPgClient, workerId, job, message, replacementPayload) {
const { escapedWorkerSchema, workerSchema, options: { noPreparedStatements }, } = compiledSharedOptions;
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;
// TODO: retry logic, in case of server connection interruption

@@ -34,3 +34,3 @@ if (job.job_queue_id != null) {

],
name: noPreparedStatements ? undefined : `fail_job_q/${workerSchema}`,
name: !preparedStatements ? undefined : `fail_job_q/${workerSchema}`,
}));

@@ -57,3 +57,3 @@ }

],
name: noPreparedStatements ? undefined : `fail_job/${workerSchema}`,
name: !preparedStatements ? undefined : `fail_job/${workerSchema}`,
}));

@@ -64,3 +64,4 @@ }

async function failJobs(compiledSharedOptions, withPgClient, workerIds, jobs, message) {
const { escapedWorkerSchema, workerSchema, options: { noPreparedStatements }, } = compiledSharedOptions;
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;
// TODO: retry logic, in case of server connection interruption
const { rows: failedJobs } = await withPgClient((client) => client.query({

@@ -85,3 +86,3 @@ text: `\

values: [jobs.map((job) => job.id), message, workerIds],
name: noPreparedStatements ? undefined : `fail_jobs/${workerSchema}`,
name: !preparedStatements ? undefined : `fail_jobs/${workerSchema}`,
}));

@@ -88,0 +89,0 @@ return failedJobs;

import { Job, TaskList, WithPgClient } from "../interfaces";
import { CompiledSharedOptions } from "../lib";
export declare function isPromise<T>(t: T | Promise<T>): t is Promise<T>;
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient, tasks: TaskList, workerId: string, useNodeTime: boolean, flagsToSkip: string[] | null): Promise<Job | undefined>;
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: WithPgClient, tasks: TaskList, workerId: string, flagsToSkip: string[] | null): Promise<Job | undefined>;

@@ -12,4 +12,4 @@ "use strict";

exports.isPromise = isPromise;
async function getJob(compiledSharedOptions, withPgClient, tasks, workerId, useNodeTime, flagsToSkip) {
const { escapedWorkerSchema, workerSchema, options: { noPreparedStatements }, } = compiledSharedOptions;
async function getJob(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip) {
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements, useNodeTime }, }, logger, } = compiledSharedOptions;
const taskDetailsPromise = (0, taskIdentifiers_1.getTaskDetails)(compiledSharedOptions, withPgClient, tasks);

@@ -19,2 +19,6 @@ const taskDetails = isPromise(taskDetailsPromise)

: taskDetailsPromise;
if (taskDetails.taskIds.length === 0) {
logger.error("No tasks found; nothing to do!");
return undefined;
}
let i = 2;

@@ -151,3 +155,3 @@ const hasFlags = flagsToSkip && flagsToSkip.length > 0;

];
const name = noPreparedStatements
const name = !preparedStatements
? undefined

@@ -154,0 +158,0 @@ : `get_job${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`;

@@ -5,3 +5,3 @@ "use strict";

async function resetLockedAt(compiledSharedOptions, withPgClient) {
const { escapedWorkerSchema, workerSchema, options: { noPreparedStatements }, useNodeTime, } = compiledSharedOptions;
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements, useNodeTime }, }, } = compiledSharedOptions;
const now = useNodeTime ? "$1::timestamptz" : "now()";

@@ -12,3 +12,3 @@ await withPgClient((client) => client.query({

update ${escapedWorkerSchema}.jobs
set locked_at = null, locked_by = null, run_at = greatest(run_at, now())
set locked_at = null, locked_by = null, run_at = greatest(run_at, ${now})
where locked_at < ${now} - interval '4 hours'

@@ -20,3 +20,3 @@ )

values: useNodeTime ? [new Date().toISOString()] : [],
name: noPreparedStatements
name: !preparedStatements
? undefined

@@ -23,0 +23,0 @@ : `clear_stale_locks${useNodeTime ? "N" : ""}/${workerSchema}`,

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.getSupportedTaskIds = exports.getSupportedTaskIdentifierByTaskId = exports.getTaskDetails = void 0;
const assert = require("assert");
const tslib_1 = require("tslib");
const assert = tslib_1.__importStar(require("assert"));
const cacheByOptions = new Map();

@@ -22,3 +23,3 @@ function getTaskDetails(compiledSharedOptions, withPgClient, tasks) {

const { escapedWorkerSchema } = compiledSharedOptions;
assert(supportedTaskNames.length, "No runnable tasks!");
assert.ok(supportedTaskNames.length, "No runnable tasks!");
cache.lastStr = str;

@@ -25,0 +26,0 @@ cache.lastDigest = (async () => {

@@ -1,2 +0,11 @@

import { TaskList, WithPgClient, Worker, WorkerOptions } from "./interfaces";
export declare function makeNewWorker(options: WorkerOptions, tasks: TaskList, withPgClient: WithPgClient, continuous?: boolean): Worker;
import { TaskList, WithPgClient, Worker, WorkerPool, WorkerSharedOptions } from "./interfaces";
import { CompiledSharedOptions } from "./lib";
export declare function makeNewWorker(compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>, params: {
tasks: TaskList;
withPgClient: WithPgClient;
continuous: boolean;
abortSignal: AbortSignal;
workerPool: WorkerPool;
autostart?: boolean;
workerId?: string;
}): Worker;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.makeNewWorker = void 0;
const assert = require("assert");
const tslib_1 = require("tslib");
const assert = tslib_1.__importStar(require("assert"));
const crypto_1 = require("crypto");
const config_1 = require("./config");
const deferred_1 = require("./deferred");
const deferred_1 = tslib_1.__importDefault(require("./deferred"));
const helpers_1 = require("./helpers");
const lib_1 = require("./lib");
const completeJob_1 = require("./sql/completeJob");
const failJob_1 = require("./sql/failJob");
const getJob_1 = require("./sql/getJob");
function makeNewWorker(options, tasks, withPgClient, continuous = true) {
const { workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, pollInterval = config_1.defaults.pollInterval, forbiddenFlags, } = options;
const compiledSharedOptions = (0, lib_1.processSharedOptions)(options, {
function makeNewWorker(compiledSharedOptions, params) {
const { tasks, withPgClient, continuous, abortSignal, workerPool, autostart = true, workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, } = params;
const { events, resolvedPreset: { worker: { pollInterval }, }, hooks, _rawOptions: { forbiddenFlags }, } = compiledSharedOptions;
const logger = compiledSharedOptions.logger.scope({
scope: {

@@ -21,4 +21,6 @@ label: "worker",

});
const { logger, maxContiguousErrors, events, useNodeTime } = compiledSharedOptions;
const promise = (0, deferred_1.default)();
const workerDeferred = (0, deferred_1.default)();
const promise = workerDeferred.finally(() => {
return hooks.process("stopWorker", { worker, withPgClient });
});
promise.then(() => {

@@ -40,15 +42,21 @@ events.emit("worker:stop", { worker });

let active = true;
const release = () => {
if (!active) {
return promise;
const release = (force = false) => {
if (active) {
active = false;
events.emit("worker:release", { worker });
if (cancelDoNext()) {
workerDeferred.resolve();
}
else if (force) {
// TODO: do `abortController.abort()` instead
workerDeferred.resolve();
}
}
active = false;
events.emit("worker:release", { worker });
if (cancelDoNext()) {
promise.resolve();
else if (force) {
workerDeferred.resolve();
}
return promise;
return Promise.resolve(promise);
};
const nudge = () => {
assert(active, "nudge called after worker terminated");
assert.ok(active, "nudge called after worker terminated");
if (doNextTimer) {

@@ -66,2 +74,3 @@ // Must be idle; call early

const worker = {
workerPool,
nudge,

@@ -72,2 +81,8 @@ workerId,

getActiveJob: () => activeJob,
_start: autostart
? null
: () => {
doNext(true);
worker._start = null;
},
};

@@ -78,7 +93,7 @@ events.emit("worker:create", { worker, tasks });

let again = false;
const doNext = async () => {
const doNext = async (first = false) => {
again = false;
cancelDoNext();
assert(active, "doNext called when active was false");
assert(!activeJob, "There should be no active job");
assert.ok(active, "doNext called when active was false");
assert.ok(!activeJob, "There should be no active job");
// Find us a job

@@ -99,4 +114,14 @@ try {

}
if (first) {
const event = {
worker,
flagsToSkip,
tasks,
withPgClient,
};
await hooks.process("startWorker", event);
flagsToSkip = event.flagsToSkip;
}
events.emit("worker:getJob:start", { worker });
const jobRow = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerId, useNodeTime, flagsToSkip);
const jobRow = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip);
// `doNext` cannot be executed concurrently, so we know this is safe.

@@ -116,21 +141,14 @@ // eslint-disable-next-line require-atomic-updates

contiguousErrors++;
logger.debug(`Failed to acquire job: ${err.message} (${contiguousErrors}/${maxContiguousErrors})`);
if (contiguousErrors >= maxContiguousErrors) {
promise.reject(new Error(`Failed ${contiguousErrors} times in a row to acquire job; latest error: ${err.message}`));
release();
return;
logger.debug(`Failed to acquire job: ${err.message} (${contiguousErrors} contiguous fails)`);
if (active) {
// Error occurred fetching a job; try again...
doNextTimer = setTimeout(() => doNext(), pollInterval);
}
else {
if (active) {
// Error occurred fetching a job; try again...
doNextTimer = setTimeout(() => doNext(), pollInterval);
}
else {
promise.reject(err);
}
return;
workerDeferred.reject(err);
}
return;
}
else {
promise.reject(err);
workerDeferred.reject(err);
release();

@@ -156,7 +174,7 @@ return;

else {
promise.resolve();
workerDeferred.resolve();
}
}
else {
promise.resolve();
workerDeferred.resolve();
release();

@@ -176,8 +194,12 @@ }

const startTimestamp = process.hrtime();
let result;
let result = undefined;
try {
logger.debug(`Found task ${job.id} (${job.task_identifier})`);
const task = tasks[job.task_identifier];
assert(task, `Unsupported task '${job.task_identifier}'`);
const helpers = (0, helpers_1.makeJobHelpers)(options, job, { withPgClient, logger });
assert.ok(task, `Unsupported task '${job.task_identifier}'`);
const helpers = (0, helpers_1.makeJobHelpers)(compiledSharedOptions, job, {
withPgClient,
logger,
abortSignal,
});
result = await task(job.payload, helpers);

@@ -251,3 +273,3 @@ }

"Non error or error without message thrown.";
logger.error(`Failed task ${job.id} (${job.task_identifier}) with error ${message} (${duration.toFixed(2)}ms)${stack ? `:\n ${String(stack).replace(/\n/g, "\n ").trim()}` : ""}`, { failure: true, job, error: err, duration });
logger.error(`Failed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms, attempt ${job.attempts} of ${job.max_attempts}) with error '${message}'${stack ? `:\n ${String(stack).replace(/\n/g, "\n ").trim()}` : ""}`, { failure: true, job, error: err, duration });
await (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerId, job, message,

@@ -267,3 +289,5 @@ // "Batch jobs": copy through only the unsuccessful parts of the payload

if (!process.env.NO_LOG_SUCCESS) {
logger.info(`Completed task ${job.id} (${job.task_identifier}) with success (${duration.toFixed(2)}ms)`, { job, duration, success: true });
logger.info(`Completed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms${job.attempts > 1
? `, attempt ${job.attempts} of ${job.max_attempts}`
: ""}) with success`, { job, duration, success: true });
}

@@ -287,3 +311,3 @@ await (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerId, job);

logger.error(`Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, { fatalError, job });
promise.reject(fatalError);
workerDeferred.reject(fatalError);
release();

@@ -301,9 +325,11 @@ return;

else {
promise.resolve();
workerDeferred.resolve();
}
};
// Start!
doNext();
if (autostart) {
doNext(true);
}
// For tests
promise["worker"] = worker;
promise.worker = worker;
return worker;

@@ -310,0 +336,0 @@ }

@@ -11,2 +11,2 @@ import { TaskSpec, WorkerUtils, WorkerUtilsOptions } from "./interfaces";

*/
export declare function quickAddJob(options: WorkerUtilsOptions, identifier: string, payload?: unknown, spec?: TaskSpec): Promise<import("./interfaces").Job>;
export declare function quickAddJob<TIdentifier extends keyof GraphileWorker.Tasks | (string & {}) = string>(options: WorkerUtilsOptions, identifier: TIdentifier, payload: TIdentifier extends keyof GraphileWorker.Tasks ? GraphileWorker.Tasks[TIdentifier] : unknown, spec?: TaskSpec): Promise<import("./interfaces").Job>;

@@ -10,3 +10,3 @@ "use strict";

async function makeWorkerUtils(options) {
const { logger, escapedWorkerSchema, release, withPgClient, addJob } = await (0, lib_1.getUtilsAndReleasersFromOptions)(options, {
const [compiledSharedOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)(options, {
scope: {

@@ -16,2 +16,3 @@ label: "WorkerUtils",

});
const { logger, escapedWorkerSchema, withPgClient, addJob } = compiledSharedOptions;
return {

@@ -22,3 +23,3 @@ withPgClient,

addJob,
migrate: () => withPgClient((pgClient) => (0, migrate_1.migrate)(options, pgClient)),
migrate: () => withPgClient((pgClient) => (0, migrate_1.migrate)(compiledSharedOptions, pgClient)),
async completeJobs(ids) {

@@ -56,3 +57,3 @@ const { rows } = await withPgClient((client) => client.query(`select * from ${escapedWorkerSchema}.complete_jobs($1::bigint[])`, [ids]));

*/
async function quickAddJob(options, identifier, payload = {}, spec = {}) {
async function quickAddJob(options, identifier, payload, spec = {}) {
const utils = await makeWorkerUtils(options);

@@ -59,0 +60,0 @@ try {

{
"name": "graphile-worker",
"version": "0.15.1",
"version": "0.15.2-bridge.0",
"type": "commonjs",
"description": "Job queue for PostgreSQL",
"main": "dist/index.js",
"scripts": {
"build:sql": "node scripts/buildSqlModule.js",
"build:sql": "node scripts/buildSqlModule.mjs",
"website:update": "node scripts/options.mjs",
"prepack": "rm -Rf dist && npm run build:sql && tsc && chmod +x dist/cli.js",
"watch": "mkdir -p dist && touch dist/cli.js && chmod +x dist/cli.js && npm run build:sql && tsc --watch",
"lint": "yarn prettier:check && eslint --ext .js,.jsx,.ts,.tsx,.graphql .",
"lint:fix": "eslint --ext .js,.jsx,.ts,.tsx,.graphql . --fix; prettier --ignore-path .eslintignore --write '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"prettier:check": "prettier --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"test": "yarn prepack && depcheck && createdb graphile_worker_test || true && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && jest -i",
"lint:fix": "eslint --ext .js,.jsx,.ts,.tsx,.graphql . --fix; prettier --cache --ignore-path .eslintignore --write '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"test": "yarn prepack && depcheck && createdb graphile_worker_test || true && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && node --experimental-vm-modules node_modules/.bin/jest -i",
"db:dump": "./scripts/dump_db",
"perfTest": "cd perfTest && node ./run.js",
"preversion": "grep '^### Pending' RELEASE_NOTES.md && echo \"⚠️ Cannot publish with 'Pending' in RELEASE_NOTES ⚠️\" && exit 1 || true"
"preversion": "grep '^### Pending' RELEASE_NOTES.md && echo \"⚠️ Cannot publish with 'Pending' in RELEASE_NOTES ⚠️\" && exit 1 || true",
"postversion": "node scripts/postversion.mjs",
"website": "cd website && yarn run"
},

@@ -45,31 +49,44 @@ "bin": {

"@graphile/logger": "^0.2.0",
"@types/debug": "^4.1.2",
"@types/pg": ">=6 <9",
"chokidar": "^3.4.0",
"cosmiconfig": "^7.0.0",
"json5": "^2.1.3",
"pg": ">=6.5 <9",
"tslib": "^2.1.0",
"yargs": "^17.5.1"
"@types/debug": "^4.1.10",
"@types/pg": "^8.10.5",
"cosmiconfig": "^8.3.6",
"graphile-config": "^0.0.1-beta.4",
"json5": "^2.2.3",
"pg": "^8.11.3",
"tslib": "^2.6.2",
"yargs": "^17.7.2"
},
"devDependencies": {
"@types/jest": "^26.0.20",
"@docusaurus/core": "2.4.3",
"@docusaurus/module-type-aliases": "2.4.3",
"@docusaurus/preset-classic": "2.4.3",
"@docusaurus/remark-plugin-npm2yarn": "^2.4.3",
"@mdx-js/react": "^1.6.22",
"@types/jest": "^26.0.0",
"@types/json5": "^2.2.0",
"@types/node": "^18.0.0",
"@typescript-eslint/eslint-plugin": "^5.29.0",
"@typescript-eslint/parser": "^5.29.0",
"depcheck": "^1.3.1",
"eslint": "^8.18.0",
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-import": "^2.22.1",
"eslint-plugin-jest": "^26.5.3",
"eslint-plugin-simple-import-sort": "^7.0.0",
"eslint_d": "^12.2.0",
"jest": "^26.6.3",
"@types/node": "^20.8.7",
"@typescript-eslint/eslint-plugin": "^6.8.0",
"@typescript-eslint/parser": "^6.8.0",
"clsx": "^2.0.0",
"depcheck": "^1.4.7",
"eslint": "^8.51.0",
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-import": "^2.28.1",
"eslint-plugin-jest": "^26.0.0",
"eslint-plugin-simple-import-sort": "^10.0.0",
"eslint_d": "^13.0.0",
"graphile": "^5.0.0-beta.16",
"jest": "^26.0.0",
"jest-time-helpers": "0.1.0",
"pg-connection-string": "^2.4.0",
"prettier": "^2.2.1",
"ts-jest": "^26.4.4",
"ts-node": "^10.8.1",
"typescript": "^4.1.3"
"juice": "5.2.0",
"pg-connection-string": "^2.6.2",
"prettier": "^2.0.0",
"prism-react-renderer": "^2.1.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"svgo": "^1.3.0",
"ts-jest": "^26.0.0",
"ts-node": "^10.9.1",
"typescript": "^5.2.2",
"zx": "^7.2.3"
},

@@ -82,3 +99,15 @@ "files": [

"node": ">=14.0.0"
},
"browserslist": {
"production": [
">0.5%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc