Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

graphile-worker

Package Overview
Dependencies
Maintainers
1
Versions
53
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

graphile-worker - npm Package Compare versions

Comparing version 0.16.6 to 0.17.0-canary.6c2c85c

dist/localQueue.d.ts

9

dist/cli.js

@@ -13,3 +13,2 @@ #!/usr/bin/env node

const runner_1 = require("./runner");
const defaults = preset_1.WorkerPreset.worker;
const argv = yargs

@@ -27,3 +26,2 @@ .parserConfiguration({

alias: "s",
default: defaults.schema,
})

@@ -48,3 +46,2 @@ .string("schema")

alias: "j",
default: defaults.concurrentJobs,
})

@@ -55,3 +52,2 @@ .number("jobs")

alias: "m",
default: 10,
})

@@ -61,3 +57,2 @@ .number("max-pool-size")

description: "how long to wait between polling for jobs in milliseconds (for jobs scheduled in the future/retries)",
default: defaults.pollInterval,
})

@@ -67,3 +62,2 @@ .number("poll-interval")

description: "set this flag if you want to disable prepared statements, e.g. for compatibility with pgBouncer",
default: false,
})

@@ -111,5 +105,6 @@ .boolean("no-prepared-statements")

}
const argvPreset = argvToPreset(argv);
const [compiledOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)({
preset: {
extends: [userPreset ?? preset_1.EMPTY_PRESET, argvToPreset(argv)],
extends: [preset_1.WorkerPreset, userPreset ?? preset_1.EMPTY_PRESET, argvPreset],
},

@@ -116,0 +111,0 @@ });

@@ -131,2 +131,49 @@ import { Logger } from "@graphile/logger";

events?: WorkerEvents;
/**
* To enable processing jobs in batches, set this to an integer larger
* than 1. This will result in jobs being fetched by the pool rather than
* the worker, the pool will fetch (and lock!) `localQueueSize` jobs up
* front, and each time a worker requests a job it will be served from
* this list until the list is exhausted, at which point a new set of
* jobs will be fetched (and locked).
*
* This setting can help reduce the load on your database from looking
* for jobs, but is only really effective when there are often many jobs
* queued and ready to go, and can increase the latency of job execution
* because a single worker may lock jobs into its queue leaving other
* workers idle.
*
* @default `-1`
*/
localQueueSize?: number;
/**
* How long should jobs sit in the local queue before they are returned
* to the database? Defaults to 5 minutes.
*
* @default `300000`
*/
localQueueTtl?: number;
/**
* The time in milliseconds to wait after a `completeJob` call to see if
* there are any other completeJob calls that can be batched together. A
* setting of `-1` disables this.
*
* Enabling this feature increases the time for which jobs are locked
* past completion, thus increasing the risk of catastrophic failure
* resulting in the jobs being executed again once they expire.
*
* @default `-1`
*/
completeJobBatchDelay?: number;
/**
* The time in milliseconds to wait after a `failJob` call to see if
* there are any other failJob calls that can be batched together. A
* setting of `-1` disables this.
*
* Enabling this feature increases the time for which jobs are locked
* past failure.
*
* @default `-1`
*/
failJobBatchDelay?: number;
}

@@ -133,0 +180,0 @@ interface Preset {

@@ -329,2 +329,4 @@ /// <reference types="node" />

id: string;
/** Encourage `n` workers to look for jobs _right now_, cancelling the delay timers. */
nudge(n: number): void;
/** @deprecated Use gracefulShutdown instead */

@@ -340,2 +342,4 @@ release: () => Promise<void>;

/** @internal */
_forcefulShuttingDown: boolean;
/** @internal */
_active: boolean;

@@ -668,2 +672,10 @@ /** @internal */

/**
* When a worker pool fails to complete/fail a job
*/
"pool:fatalError": {
workerPool: WorkerPool;
error: unknown;
action: string;
};
/**
* When a worker pool is released

@@ -971,2 +983,9 @@ */

}
export type GetJobFunction = (workerId: string, flagsToSkip: string[] | null) => PromiseOrDirect<Job | undefined>;
export type CompleteJobFunction = (job: DbJob) => void;
export type FailJobFunction = (spec: {
job: DbJob;
message: string;
replacementPayload: undefined | unknown[];
}) => void;
export {};

@@ -11,4 +11,7 @@ "use strict";

const lib_1 = require("./lib");
const localQueue_1 = require("./localQueue");
const signals_1 = tslib_1.__importDefault(require("./signals"));
const completeJob_1 = require("./sql/completeJob");
const failJob_1 = require("./sql/failJob");
const getJob_1 = require("./sql/getJob");
const resetLockedAt_1 = require("./sql/resetLockedAt");

@@ -288,6 +291,5 @@ const worker_1 = require("./worker");

const payload = (0, lib_1.tryParseJson)(message.payload);
let n = payload?.count ?? 1;
const n = payload?.count ?? 1;
if (n > 0) {
// Nudge up to `n` workers
workerPool._workers.some((worker) => worker.nudge() && --n <= 0);
workerPool.nudge(n);
}

@@ -349,3 +351,3 @@ break;

function _runTaskList(compiledSharedOptions, tasks, withPgClient, options) {
const { resolvedPreset: { worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions;
const { resolvedPreset: { worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout, localQueueSize = -1, completeJobBatchDelay = -1, failJobBatchDelay = -1, }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions;
const { concurrency = baseConcurrency, continuous, autostart: rawAutostart = true, onTerminate, onDeactivate, } = options;

@@ -357,2 +359,5 @@ let autostart = rawAutostart;

}
if (localQueueSize > 0 && localQueueSize < concurrency) {
logger.warn(`Your job batch size (${localQueueSize}) is smaller than your concurrency setting (${concurrency}); this may result in drastically lower performance if your jobs can complete quickly. Please update to \`localQueueSize: ${concurrency}\` to improve performance, or \`localQueueSize: -1\` to disable batching.`);
}
let unregisterSignalHandlers = undefined;

@@ -364,5 +369,32 @@ if (!noHandleSignals) {

const promise = (0, deferred_1.default)();
function deactivate() {
async function deactivate() {
if (workerPool._active) {
workerPool._active = false;
// TODO: stop the batch()es and await the promises here
const releaseCompleteJobPromise = releaseCompleteJob?.();
const releaseFailJobPromise = releaseFailJob?.();
const releaseLocalQueue = localQueue?.release();
const [releaseCompleteJobResult, releaseFailJobResult, releaseLocalQueueResult,] = await Promise.allSettled([
releaseCompleteJobPromise,
releaseFailJobPromise,
releaseLocalQueue,
]);
if (releaseCompleteJobResult.status === "rejected") {
// Log but continue regardless
logger.error(`Releasing complete job batcher failed: ${releaseCompleteJobResult.reason}`, {
error: releaseCompleteJobResult.reason,
});
}
if (releaseFailJobResult.status === "rejected") {
// Log but continue regardless
logger.error(`Releasing failed job batcher failed: ${releaseFailJobResult.reason}`, {
error: releaseFailJobResult.reason,
});
}
if (releaseLocalQueueResult.status === "rejected") {
// Log but continue regardless
logger.error(`Releasing local queue failed: ${releaseLocalQueueResult.reason}`, {
error: releaseLocalQueueResult.reason,
});
}
return onDeactivate?.();

@@ -383,3 +415,8 @@ }

else {
logger.error(`Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`);
try {
throw new Error(`Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`);
}
catch (e) {
logger.error(String(e.stack));
}
}

@@ -395,2 +432,3 @@ }

_shuttingDown: false,
_forcefulShuttingDown: false,
_workers: [],

@@ -401,2 +439,12 @@ _withPgClient: withPgClient,

},
nudge(count) {
if (localQueue) {
localQueue.pulse();
}
else {
let n = count;
// Nudge up to `n` workers
this._workers.some((worker) => worker.nudge() && --n <= 0);
}
},
abortSignal,

@@ -412,2 +460,6 @@ release: async () => {

async gracefulShutdown(message = "Worker pool is shutting down gracefully") {
if (workerPool._forcefulShuttingDown) {
logger.error(`gracefulShutdown called when forcefulShutdown is already in progress`);
return;
}
if (workerPool._shuttingDown) {

@@ -471,3 +523,3 @@ logger.error(`gracefulShutdown called when gracefulShutdown is already in progress`);

});
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerIds, jobsToRelease, message);
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerPool.id, jobsToRelease, message);
logger.debug(`Cancelled ${cancelledJobs.length} jobs`, {

@@ -494,3 +546,5 @@ cancelledJobs,

}
terminate();
if (!terminated) {
terminate();
}
},

@@ -501,2 +555,7 @@ /**

async forcefulShutdown(message) {
if (workerPool._forcefulShuttingDown) {
logger.error(`forcefulShutdown called when forcefulShutdown is already in progress`);
return;
}
workerPool._forcefulShuttingDown = true;
events.emit("pool:forcefulShutdown", {

@@ -535,3 +594,3 @@ pool: workerPool,

});
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerIds, jobsInProgress, message);
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerPool.id, jobsInProgress, message);
logger.debug(`Cancelled ${cancelledJobs.length} jobs`, {

@@ -560,3 +619,5 @@ cancelledJobs,

}
terminate();
if (!terminated) {
terminate();
}
},

@@ -599,2 +660,45 @@ promise,

}
const localQueue = localQueueSize >= 1
? new localQueue_1.LocalQueue(compiledSharedOptions, tasks, withPgClient, workerPool, localQueueSize, continuous)
: null;
const getJob = localQueue
? localQueue.getJob // Already bound
: async (_workerId, flagsToSkip) => {
const jobs = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerPool.id, flagsToSkip, 1);
return jobs[0];
};
const { release: releaseCompleteJob, fn: completeJob } = (completeJobBatchDelay >= 0
? batch(completeJobBatchDelay, (jobs) => (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerPool.id, jobs), (error, jobs) => {
events.emit("pool:fatalError", {
error,
workerPool,
action: "completeJob",
});
logger.error(`Failed to complete jobs '${jobs
.map((j) => j.id)
.join("', '")}':\n${String(error)}`, { fatalError: error, jobs });
workerPool.gracefulShutdown();
})
: {
release: null,
fn: (job) => (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerPool.id, [job]),
});
const { release: releaseFailJob, fn: failJob } = (failJobBatchDelay >= 0
? batch(failJobBatchDelay, (specs) => (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerPool.id, specs), (error, specs) => {
events.emit("pool:fatalError", {
error,
workerPool,
action: "failJob",
});
logger.error(`Failed to fail jobs '${specs
.map((spec) => spec.job.id)
.join("', '")}':\n${String(error)}`, { fatalError: error, specs });
workerPool.gracefulShutdown();
})
: {
release: null,
fn: (spec) => (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerPool.id, [
spec,
]),
});
for (let i = 0; i < concurrency; i++) {

@@ -609,2 +713,5 @@ const worker = (0, worker_1.makeNewWorker)(compiledSharedOptions, {

workerId,
getJob,
completeJob,
failJob,
});

@@ -618,4 +725,3 @@ workerPool._workers.push(worker);

if (!continuous && workerPool._workers.length === 0) {
deactivate();
terminate();
deactivate().then(terminate, terminate);
}

@@ -654,2 +760,53 @@ };

exports.runTaskListOnce = runTaskListOnce;
function batch(delay, callback, errorHandler) {
let pending = 0;
let releasing = false;
let released = false;
const incrementPending = () => {
pending++;
};
const decrementPending = () => {
pending--;
if (releasing === true && pending === 0) {
released = true;
promise.resolve();
}
};
const promise = (0, deferred_1.default)();
let currentBatch = null;
return {
async release() {
if (releasing) {
return;
}
releasing = true;
if (pending === 0) {
released = true;
promise.resolve();
}
await promise;
},
fn(spec) {
if (released) {
throw new Error("This batcher has been released, and so no more calls can be made.");
}
if (currentBatch !== null) {
currentBatch.push(spec);
}
else {
const specs = [spec];
currentBatch = specs;
incrementPending();
setTimeout(() => {
currentBatch = null;
callback(specs).then(decrementPending, (error) => {
decrementPending();
errorHandler(error, specs);
});
}, delay);
}
return;
},
};
}
//# sourceMappingURL=main.js.map

@@ -23,2 +23,3 @@ "use strict";

await hooks.process("prebootstrap", event);
// Change to this query should be reflected in website/docs/schema.md
await event.client.query(`

@@ -25,0 +26,0 @@ create schema if not exists ${escapedWorkerSchema};

@@ -72,6 +72,6 @@ "use strict";

});
child.on("stdout", (data) => {
child.stdout.on("data", (data) => {
helpers.logger.info(data.toString("utf8"));
});
child.on("stderr", (data) => {
child.stderr.on("data", (data) => {
helpers.logger.error(data.toString("utf8"));

@@ -78,0 +78,0 @@ });

import { DbJob, EnhancedWithPgClient } from "../interfaces";
import { CompiledSharedOptions } from "../lib";
export declare function completeJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerId: string, job: DbJob): Promise<void>;
export declare function completeJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, jobs: ReadonlyArray<DbJob>): Promise<void>;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.completeJob = void 0;
async function completeJob(compiledSharedOptions, withPgClient, workerId, job) {
const manualPrepare = false;
async function completeJob(compiledSharedOptions, withPgClient, poolId, jobs) {
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;
const jobIdsWithoutQueue = [];
const jobIdsWithQueue = [];
for (const job of jobs) {
if (job.job_queue_id != null) {
jobIdsWithQueue.push(job.id);
}
else {
jobIdsWithoutQueue.push(job.id);
}
}
// TODO: retry logic, in case of server connection interruption
if (job.job_queue_id != null) {
if (jobIdsWithQueue.length > 0) {
await withPgClient.withRetries((client) => client.query({

@@ -12,3 +23,4 @@ text: `\

delete from ${escapedWorkerSchema}._private_jobs as jobs
where id = $1::bigint
using unnest($1::bigint[]) n(n)
where id = n
returning *

@@ -20,3 +32,3 @@ )

where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`,
values: [job.id, workerId],
values: [jobIdsWithQueue, poolId],
name: !preparedStatements

@@ -27,3 +39,3 @@ ? undefined

}
else {
if (jobIdsWithoutQueue.length === 1) {
await withPgClient.withRetries((client) => client.query({

@@ -33,8 +45,30 @@ text: `\

where id = $1::bigint`,
values: [job.id],
values: [jobIdsWithoutQueue[0]],
name: !preparedStatements ? undefined : `complete_job/${workerSchema}`,
}));
}
else if (jobIdsWithoutQueue.length > 1) {
if (manualPrepare) {
await withPgClient.withRetries((client) => client.query({
text: `\
prepare gwcj (bigint) as delete from ${escapedWorkerSchema}._private_jobs where id = $1;
${jobIdsWithoutQueue.map((id) => `execute gwcj(${id});`).join("\n")}
deallocate gwcj;`,
}));
}
else {
await withPgClient.withRetries((client) => client.query({
text: `\
delete from ${escapedWorkerSchema}._private_jobs as jobs
using unnest($1::bigint[]) n(n)
where id = n`,
values: [jobIdsWithoutQueue],
name: !preparedStatements
? undefined
: `complete_jobs/${workerSchema}`,
}));
}
}
}
exports.completeJob = completeJob;
//# sourceMappingURL=completeJob.js.map
import { DbJob, EnhancedWithPgClient } from "../interfaces";
import { CompiledSharedOptions } from "../lib";
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerId: string, job: DbJob, message: string, replacementPayload: undefined | unknown[]): Promise<void>;
export declare function failJobs(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerIds: string[], jobs: DbJob[], message: string): Promise<DbJob[]>;
interface Spec {
job: DbJob;
message: string;
replacementPayload: undefined | unknown[];
}
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, specs: ReadonlyArray<Spec>): Promise<void>;
export declare function failJobs(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, jobs: DbJob[], message: string): Promise<DbJob[]>;
export {};
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.failJobs = exports.failJob = void 0;
async function failJob(compiledSharedOptions, withPgClient, workerId, job, message, replacementPayload) {
async function failJob(compiledSharedOptions, withPgClient, poolId, specs) {
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;
const specsWithQueues = [];
const specsWithoutQueues = [];
for (const spec of specs) {
if (spec.job.job_queue_id != null) {
specsWithQueues.push(spec);
}
else {
specsWithoutQueues.push(spec);
}
}
// TODO: retry logic, in case of server connection interruption
if (job.job_queue_id != null) {
if (specsWithQueues.length > 0) {
await withPgClient.withRetries((client) => client.query({

@@ -13,8 +23,9 @@ text: `\

set
last_error = $2::text,
last_error = (el->>'message'),
run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'),
locked_by = null,
locked_at = null,
payload = coalesce($4::json, jobs.payload)
where id = $1::bigint and locked_by = $3::text
payload = coalesce(el->'payload', jobs.payload)
from json_array_elements($2::json) as els(el)
where id = (el->>'jobId')::bigint and locked_by = $1::text
returning *

@@ -25,10 +36,10 @@ )

from j
where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`,
where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`,
values: [
job.id,
message,
workerId,
replacementPayload != null
? JSON.stringify(replacementPayload)
: null,
poolId,
JSON.stringify(specsWithQueues.map(({ job, message, replacementPayload }) => ({
jobId: job.id,
message,
payload: replacementPayload,
}))),
],

@@ -38,3 +49,3 @@ name: !preparedStatements ? undefined : `fail_job_q/${workerSchema}`,

}
else {
if (specsWithoutQueues.length > 0) {
await withPgClient.withRetries((client) => client.query({

@@ -44,15 +55,16 @@ text: `\

set
last_error = $2::text,
last_error = (el->>'message'),
run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'),
locked_by = null,
locked_at = null,
payload = coalesce($4::json, jobs.payload)
where id = $1::bigint and locked_by = $3::text;`,
payload = coalesce(el->'payload', jobs.payload)
from json_array_elements($2::json) as els(el)
where id = (el->>'jobId')::bigint and locked_by = $1::text;`,
values: [
job.id,
message,
workerId,
replacementPayload != null
? JSON.stringify(replacementPayload)
: null,
poolId,
JSON.stringify(specsWithoutQueues.map(({ job, message, replacementPayload }) => ({
jobId: job.id,
message,
payload: replacementPayload,
}))),
],

@@ -64,3 +76,3 @@ name: !preparedStatements ? undefined : `fail_job/${workerSchema}`,

exports.failJob = failJob;
async function failJobs(compiledSharedOptions, withPgClient, workerIds, jobs, message) {
async function failJobs(compiledSharedOptions, withPgClient, poolId, jobs, message) {
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions;

@@ -77,3 +89,3 @@ // TODO: retry logic, in case of server connection interruption

locked_at = null
where id = any($1::int[]) and locked_by = any($3::text[])
where id = any($1::int[]) and locked_by = $3::text
returning *

@@ -84,6 +96,6 @@ ), queues as (

from j
where job_queues.id = j.job_queue_id and job_queues.locked_by = any($3::text[])
where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text
)
select * from j;`,
values: [jobs.map((job) => job.id), message, workerIds],
values: [jobs.map((job) => job.id), message, poolId],
name: !preparedStatements ? undefined : `fail_jobs/${workerSchema}`,

@@ -90,0 +102,0 @@ }));

import { EnhancedWithPgClient, Job, TaskList } from "../interfaces";
import { CompiledSharedOptions } from "../lib";
export declare function isPromise<T>(t: T | Promise<T>): t is Promise<T>;
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, workerId: string, flagsToSkip: string[] | null): Promise<Job | undefined>;
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, poolId: string, flagsToSkip: string[] | null, rawBatchSize: number): Promise<Job[]>;

@@ -12,3 +12,4 @@ "use strict";

exports.isPromise = isPromise;
async function getJob(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip) {
async function getJob(compiledSharedOptions, withPgClient, tasks, poolId, flagsToSkip, rawBatchSize) {
const batchSize = parseInt(String(rawBatchSize), 10) || 1;
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements, useNodeTime }, }, logger, } = compiledSharedOptions;

@@ -21,3 +22,3 @@ const taskDetailsPromise = (0, taskIdentifiers_1.getTaskDetails)(compiledSharedOptions, withPgClient, tasks);

logger.error("No tasks found; nothing to do!");
return undefined;
return [];
}

@@ -135,3 +136,3 @@ let i = 2;

order by priority asc, run_at asc
limit 1
limit ${batchSize}
for update

@@ -151,3 +152,3 @@ skip locked

const values = [
workerId,
poolId,
taskDetails.taskIds,

@@ -159,4 +160,4 @@ ...(hasFlags ? [flagsToSkip] : []),

? undefined
: `get_job${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`;
const { rows: [jobRow], } = await withPgClient.withRetries((client) => client.query({
: `get_job${batchSize === 1 ? "" : batchSize}${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`;
const { rows } = await withPgClient.withRetries((client) => client.query({
text,

@@ -166,12 +167,7 @@ values,

}));
if (jobRow) {
return Object.assign(jobRow, {
task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id],
});
}
else {
return undefined;
}
return rows.map((jobRow) => Object.assign(jobRow, {
task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id],
}));
}
exports.getJob = getJob;
//# sourceMappingURL=getJob.js.map

@@ -1,1 +0,1 @@

export declare const version = "0.16.6";
export declare const version = "0.17.0-canary.6c2c85c";

@@ -5,3 +5,3 @@ "use strict";

// This file is autogenerated by /scripts/postversion.mjs
exports.version = "0.16.6";
exports.version = "0.17.0-canary.6c2c85c";
//# sourceMappingURL=version.js.map

@@ -1,2 +0,2 @@

import { EnhancedWithPgClient, TaskList, Worker, WorkerPool, WorkerSharedOptions } from "./interfaces";
import { CompleteJobFunction, EnhancedWithPgClient, FailJobFunction, GetJobFunction, TaskList, Worker, WorkerPool, WorkerSharedOptions } from "./interfaces";
import { CompiledSharedOptions } from "./lib";

@@ -11,2 +11,5 @@ export declare function makeNewWorker(compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>, params: {

workerId?: string;
getJob: GetJobFunction;
completeJob: CompleteJobFunction;
failJob: FailJobFunction;
}): Worker;

@@ -9,7 +9,5 @@ "use strict";

const helpers_1 = require("./helpers");
const completeJob_1 = require("./sql/completeJob");
const failJob_1 = require("./sql/failJob");
const getJob_1 = require("./sql/getJob");
const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS;
function makeNewWorker(compiledSharedOptions, params) {
const { tasks, withPgClient, continuous, abortSignal, workerPool, autostart = true, workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, } = params;
const { tasks, withPgClient, continuous, abortSignal, workerPool, autostart = true, workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, getJob, completeJob, failJob, } = params;
const { events, resolvedPreset: { worker: { pollInterval }, }, hooks, _rawOptions: { forbiddenFlags }, } = compiledSharedOptions;

@@ -119,3 +117,3 @@ const logger = compiledSharedOptions.logger.scope({

events.emit("worker:getJob:start", { worker });
const jobRow = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip);
const jobRow = await getJob(workerPool.id, flagsToSkip);
// `doNext` cannot be executed concurrently, so we know this is safe.

@@ -265,7 +263,10 @@ // eslint-disable-next-line require-atomic-updates

logger.error(`Failed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms, attempt ${job.attempts} of ${job.max_attempts}) with error '${message}'${stack ? `:\n ${String(stack).replace(/\n/g, "\n ").trim()}` : ""}`, { failure: true, job, error: err, duration });
await (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerId, job, message,
// "Batch jobs": copy through only the unsuccessful parts of the payload
batchJobFailedPayloads.length > 0
? batchJobFailedPayloads
: undefined);
failJob({
job,
message,
// "Batch jobs": copy through only the unsuccessful parts of the payload
replacementPayload: batchJobFailedPayloads.length > 0
? batchJobFailedPayloads
: undefined,
});
}

@@ -279,3 +280,3 @@ else {

}
if (!process.env.NO_LOG_SUCCESS) {
if (!NO_LOG_SUCCESS) {
logger.info(`Completed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms${job.attempts > 1

@@ -285,3 +286,3 @@ ? `, attempt ${job.attempts} of ${job.max_attempts}`

}
await (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerId, job);
completeJob(job);
}

@@ -288,0 +289,0 @@ events.emit("job:complete", { worker, job, error: err });

{
"name": "graphile-worker",
"version": "0.16.6",
"version": "0.17.0-canary.6c2c85c",
"type": "commonjs",

@@ -15,3 +15,4 @@ "description": "Job queue for PostgreSQL",

"prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"test": "yarn prepack && depcheck && createdb graphile_worker_test || true && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && node --experimental-vm-modules node_modules/.bin/jest -i",
"test": "yarn prepack && yarn depcheck && ( createdb graphile_worker_test || true ) && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && node --experimental-vm-modules node_modules/.bin/jest -i",
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'",
"db:dump": "./scripts/dump_db",

@@ -18,0 +19,0 @@ "perfTest": "cd perfTest && node ./run.js",

@@ -35,7 +35,5 @@ <img width="120" height="120" title="Graphile Worker logo" src="https://cdn.rawgit.com/graphile/worker/2c8091e0bcce5d39e46a1b3833daf20b59097d1e/website/static/img/logo.optimized.svg" />

<td align="center"><a href="https://dovetailapp.com/"><img src="https://graphile.org/images/sponsors/dovetail.png" width="90" height="90" alt="Dovetail" /><br />Dovetail</a> *</td>
<td align="center"><a href="https://www.netflix.com/"><img src="https://graphile.org/images/sponsors/Netflix.png" width="90" height="90" alt="Netflix" /><br />Netflix</a> *</td>
<td align="center"><a href="https://stellate.co/"><img src="https://graphile.org/images/sponsors/Stellate.png" width="90" height="90" alt="Stellate" /><br />Stellate</a> *</td>
<td align="center"><a href="https://gosteelhead.com/"><img src="https://graphile.org/images/sponsors/steelhead.svg" width="90" height="90" alt="Steelhead" /><br />Steelhead</a> *</td>
</tr><tr>
<td align="center"><a href="https://gosteelhead.com/"><img src="https://graphile.org/images/sponsors/steelhead.svg" width="90" height="90" alt="Steelhead" /><br />Steelhead</a> *</td>
<td align="center"><a href="https://www.sylvera.com/"><img src="https://graphile.org/images/sponsors/sylvera.svg" width="90" height="90" alt="Sylvera" /><br />Sylvera</a> *</td>
<td align="center"><a href=""><img src="https://graphile.org/images/sponsors/triggerdev.png" width="90" height="90" alt="Trigger.dev" /><br />Trigger.dev</a></td>

@@ -42,0 +40,0 @@ </tr></table>

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc