graphile-worker
Advanced tools
Comparing version 0.16.6 to 0.17.0-canary.6c2c85c
@@ -13,3 +13,2 @@ #!/usr/bin/env node | ||
const runner_1 = require("./runner"); | ||
const defaults = preset_1.WorkerPreset.worker; | ||
const argv = yargs | ||
@@ -27,3 +26,2 @@ .parserConfiguration({ | ||
alias: "s", | ||
default: defaults.schema, | ||
}) | ||
@@ -48,3 +46,2 @@ .string("schema") | ||
alias: "j", | ||
default: defaults.concurrentJobs, | ||
}) | ||
@@ -55,3 +52,2 @@ .number("jobs") | ||
alias: "m", | ||
default: 10, | ||
}) | ||
@@ -61,3 +57,2 @@ .number("max-pool-size") | ||
description: "how long to wait between polling for jobs in milliseconds (for jobs scheduled in the future/retries)", | ||
default: defaults.pollInterval, | ||
}) | ||
@@ -67,3 +62,2 @@ .number("poll-interval") | ||
description: "set this flag if you want to disable prepared statements, e.g. for compatibility with pgBouncer", | ||
default: false, | ||
}) | ||
@@ -111,5 +105,6 @@ .boolean("no-prepared-statements") | ||
} | ||
const argvPreset = argvToPreset(argv); | ||
const [compiledOptions, release] = await (0, lib_1.getUtilsAndReleasersFromOptions)({ | ||
preset: { | ||
extends: [userPreset ?? preset_1.EMPTY_PRESET, argvToPreset(argv)], | ||
extends: [preset_1.WorkerPreset, userPreset ?? preset_1.EMPTY_PRESET, argvPreset], | ||
}, | ||
@@ -116,0 +111,0 @@ }); |
@@ -131,2 +131,49 @@ import { Logger } from "@graphile/logger"; | ||
events?: WorkerEvents; | ||
/** | ||
* To enable processing jobs in batches, set this to an integer larger | ||
* than 1. This will result in jobs being fetched by the pool rather than | ||
* the worker, the pool will fetch (and lock!) `localQueueSize` jobs up | ||
* front, and each time a worker requests a job it will be served from | ||
* this list until the list is exhausted, at which point a new set of | ||
* jobs will be fetched (and locked). | ||
* | ||
* This setting can help reduce the load on your database from looking | ||
* for jobs, but is only really effective when there are often many jobs | ||
* queued and ready to go, and can increase the latency of job execution | ||
* because a single worker may lock jobs into its queue leaving other | ||
* workers idle. | ||
* | ||
* @default `-1` | ||
*/ | ||
localQueueSize?: number; | ||
/** | ||
* How long should jobs sit in the local queue before they are returned | ||
* to the database? Defaults to 5 minutes. | ||
* | ||
* @default `300000` | ||
*/ | ||
localQueueTtl?: number; | ||
/** | ||
* The time in milliseconds to wait after a `completeJob` call to see if | ||
* there are any other completeJob calls that can be batched together. A | ||
* setting of `-1` disables this. | ||
* | ||
* Enabling this feature increases the time for which jobs are locked | ||
* past completion, thus increasing the risk of catastrophic failure | ||
* resulting in the jobs being executed again once they expire. | ||
* | ||
* @default `-1` | ||
*/ | ||
completeJobBatchDelay?: number; | ||
/** | ||
* The time in milliseconds to wait after a `failJob` call to see if | ||
* there are any other failJob calls that can be batched together. A | ||
* setting of `-1` disables this. | ||
* | ||
* Enabling this feature increases the time for which jobs are locked | ||
* past failure. | ||
* | ||
* @default `-1` | ||
*/ | ||
failJobBatchDelay?: number; | ||
} | ||
@@ -133,0 +180,0 @@ interface Preset { |
@@ -329,2 +329,4 @@ /// <reference types="node" /> | ||
id: string; | ||
/** Encourage `n` workers to look for jobs _right now_, cancelling the delay timers. */ | ||
nudge(n: number): void; | ||
/** @deprecated Use gracefulShutdown instead */ | ||
@@ -340,2 +342,4 @@ release: () => Promise<void>; | ||
/** @internal */ | ||
_forcefulShuttingDown: boolean; | ||
/** @internal */ | ||
_active: boolean; | ||
@@ -668,2 +672,10 @@ /** @internal */ | ||
/** | ||
* When a worker pool fails to complete/fail a job | ||
*/ | ||
"pool:fatalError": { | ||
workerPool: WorkerPool; | ||
error: unknown; | ||
action: string; | ||
}; | ||
/** | ||
* When a worker pool is released | ||
@@ -971,2 +983,9 @@ */ | ||
} | ||
export type GetJobFunction = (workerId: string, flagsToSkip: string[] | null) => PromiseOrDirect<Job | undefined>; | ||
export type CompleteJobFunction = (job: DbJob) => void; | ||
export type FailJobFunction = (spec: { | ||
job: DbJob; | ||
message: string; | ||
replacementPayload: undefined | unknown[]; | ||
}) => void; | ||
export {}; |
181
dist/main.js
@@ -11,4 +11,7 @@ "use strict"; | ||
const lib_1 = require("./lib"); | ||
const localQueue_1 = require("./localQueue"); | ||
const signals_1 = tslib_1.__importDefault(require("./signals")); | ||
const completeJob_1 = require("./sql/completeJob"); | ||
const failJob_1 = require("./sql/failJob"); | ||
const getJob_1 = require("./sql/getJob"); | ||
const resetLockedAt_1 = require("./sql/resetLockedAt"); | ||
@@ -288,6 +291,5 @@ const worker_1 = require("./worker"); | ||
const payload = (0, lib_1.tryParseJson)(message.payload); | ||
let n = payload?.count ?? 1; | ||
const n = payload?.count ?? 1; | ||
if (n > 0) { | ||
// Nudge up to `n` workers | ||
workerPool._workers.some((worker) => worker.nudge() && --n <= 0); | ||
workerPool.nudge(n); | ||
} | ||
@@ -349,3 +351,3 @@ break; | ||
function _runTaskList(compiledSharedOptions, tasks, withPgClient, options) { | ||
const { resolvedPreset: { worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions; | ||
const { resolvedPreset: { worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout, localQueueSize = -1, completeJobBatchDelay = -1, failJobBatchDelay = -1, }, }, _rawOptions: { noHandleSignals = false }, } = compiledSharedOptions; | ||
const { concurrency = baseConcurrency, continuous, autostart: rawAutostart = true, onTerminate, onDeactivate, } = options; | ||
@@ -357,2 +359,5 @@ let autostart = rawAutostart; | ||
} | ||
if (localQueueSize > 0 && localQueueSize < concurrency) { | ||
logger.warn(`Your job batch size (${localQueueSize}) is smaller than your concurrency setting (${concurrency}); this may result in drastically lower performance if your jobs can complete quickly. Please update to \`localQueueSize: ${concurrency}\` to improve performance, or \`localQueueSize: -1\` to disable batching.`); | ||
} | ||
let unregisterSignalHandlers = undefined; | ||
@@ -364,5 +369,32 @@ if (!noHandleSignals) { | ||
const promise = (0, deferred_1.default)(); | ||
function deactivate() { | ||
async function deactivate() { | ||
if (workerPool._active) { | ||
workerPool._active = false; | ||
// TODO: stop the batch()es and await the promises here | ||
const releaseCompleteJobPromise = releaseCompleteJob?.(); | ||
const releaseFailJobPromise = releaseFailJob?.(); | ||
const releaseLocalQueue = localQueue?.release(); | ||
const [releaseCompleteJobResult, releaseFailJobResult, releaseLocalQueueResult,] = await Promise.allSettled([ | ||
releaseCompleteJobPromise, | ||
releaseFailJobPromise, | ||
releaseLocalQueue, | ||
]); | ||
if (releaseCompleteJobResult.status === "rejected") { | ||
// Log but continue regardless | ||
logger.error(`Releasing complete job batcher failed: ${releaseCompleteJobResult.reason}`, { | ||
error: releaseCompleteJobResult.reason, | ||
}); | ||
} | ||
if (releaseFailJobResult.status === "rejected") { | ||
// Log but continue regardless | ||
logger.error(`Releasing failed job batcher failed: ${releaseFailJobResult.reason}`, { | ||
error: releaseFailJobResult.reason, | ||
}); | ||
} | ||
if (releaseLocalQueueResult.status === "rejected") { | ||
// Log but continue regardless | ||
logger.error(`Releasing local queue failed: ${releaseLocalQueueResult.reason}`, { | ||
error: releaseLocalQueueResult.reason, | ||
}); | ||
} | ||
return onDeactivate?.(); | ||
@@ -383,3 +415,8 @@ } | ||
else { | ||
logger.error(`Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`); | ||
try { | ||
throw new Error(`Graphile Worker internal error: terminate() was called twice for worker pool. Ignoring second call; but this indicates a bug - please file an issue.`); | ||
} | ||
catch (e) { | ||
logger.error(String(e.stack)); | ||
} | ||
} | ||
@@ -395,2 +432,3 @@ } | ||
_shuttingDown: false, | ||
_forcefulShuttingDown: false, | ||
_workers: [], | ||
@@ -401,2 +439,12 @@ _withPgClient: withPgClient, | ||
}, | ||
nudge(count) { | ||
if (localQueue) { | ||
localQueue.pulse(); | ||
} | ||
else { | ||
let n = count; | ||
// Nudge up to `n` workers | ||
this._workers.some((worker) => worker.nudge() && --n <= 0); | ||
} | ||
}, | ||
abortSignal, | ||
@@ -412,2 +460,6 @@ release: async () => { | ||
async gracefulShutdown(message = "Worker pool is shutting down gracefully") { | ||
if (workerPool._forcefulShuttingDown) { | ||
logger.error(`gracefulShutdown called when forcefulShutdown is already in progress`); | ||
return; | ||
} | ||
if (workerPool._shuttingDown) { | ||
@@ -471,3 +523,3 @@ logger.error(`gracefulShutdown called when gracefulShutdown is already in progress`); | ||
}); | ||
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerIds, jobsToRelease, message); | ||
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerPool.id, jobsToRelease, message); | ||
logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { | ||
@@ -494,3 +546,5 @@ cancelledJobs, | ||
} | ||
terminate(); | ||
if (!terminated) { | ||
terminate(); | ||
} | ||
}, | ||
@@ -501,2 +555,7 @@ /** | ||
async forcefulShutdown(message) { | ||
if (workerPool._forcefulShuttingDown) { | ||
logger.error(`forcefulShutdown called when forcefulShutdown is already in progress`); | ||
return; | ||
} | ||
workerPool._forcefulShuttingDown = true; | ||
events.emit("pool:forcefulShutdown", { | ||
@@ -535,3 +594,3 @@ pool: workerPool, | ||
}); | ||
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerIds, jobsInProgress, message); | ||
const cancelledJobs = await (0, failJob_1.failJobs)(compiledSharedOptions, withPgClient, workerPool.id, jobsInProgress, message); | ||
logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { | ||
@@ -560,3 +619,5 @@ cancelledJobs, | ||
} | ||
terminate(); | ||
if (!terminated) { | ||
terminate(); | ||
} | ||
}, | ||
@@ -599,2 +660,45 @@ promise, | ||
} | ||
const localQueue = localQueueSize >= 1 | ||
? new localQueue_1.LocalQueue(compiledSharedOptions, tasks, withPgClient, workerPool, localQueueSize, continuous) | ||
: null; | ||
const getJob = localQueue | ||
? localQueue.getJob // Already bound | ||
: async (_workerId, flagsToSkip) => { | ||
const jobs = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerPool.id, flagsToSkip, 1); | ||
return jobs[0]; | ||
}; | ||
const { release: releaseCompleteJob, fn: completeJob } = (completeJobBatchDelay >= 0 | ||
? batch(completeJobBatchDelay, (jobs) => (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerPool.id, jobs), (error, jobs) => { | ||
events.emit("pool:fatalError", { | ||
error, | ||
workerPool, | ||
action: "completeJob", | ||
}); | ||
logger.error(`Failed to complete jobs '${jobs | ||
.map((j) => j.id) | ||
.join("', '")}':\n${String(error)}`, { fatalError: error, jobs }); | ||
workerPool.gracefulShutdown(); | ||
}) | ||
: { | ||
release: null, | ||
fn: (job) => (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerPool.id, [job]), | ||
}); | ||
const { release: releaseFailJob, fn: failJob } = (failJobBatchDelay >= 0 | ||
? batch(failJobBatchDelay, (specs) => (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerPool.id, specs), (error, specs) => { | ||
events.emit("pool:fatalError", { | ||
error, | ||
workerPool, | ||
action: "failJob", | ||
}); | ||
logger.error(`Failed to fail jobs '${specs | ||
.map((spec) => spec.job.id) | ||
.join("', '")}':\n${String(error)}`, { fatalError: error, specs }); | ||
workerPool.gracefulShutdown(); | ||
}) | ||
: { | ||
release: null, | ||
fn: (spec) => (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerPool.id, [ | ||
spec, | ||
]), | ||
}); | ||
for (let i = 0; i < concurrency; i++) { | ||
@@ -609,2 +713,5 @@ const worker = (0, worker_1.makeNewWorker)(compiledSharedOptions, { | ||
workerId, | ||
getJob, | ||
completeJob, | ||
failJob, | ||
}); | ||
@@ -618,4 +725,3 @@ workerPool._workers.push(worker); | ||
if (!continuous && workerPool._workers.length === 0) { | ||
deactivate(); | ||
terminate(); | ||
deactivate().then(terminate, terminate); | ||
} | ||
@@ -654,2 +760,53 @@ }; | ||
exports.runTaskListOnce = runTaskListOnce; | ||
function batch(delay, callback, errorHandler) { | ||
let pending = 0; | ||
let releasing = false; | ||
let released = false; | ||
const incrementPending = () => { | ||
pending++; | ||
}; | ||
const decrementPending = () => { | ||
pending--; | ||
if (releasing === true && pending === 0) { | ||
released = true; | ||
promise.resolve(); | ||
} | ||
}; | ||
const promise = (0, deferred_1.default)(); | ||
let currentBatch = null; | ||
return { | ||
async release() { | ||
if (releasing) { | ||
return; | ||
} | ||
releasing = true; | ||
if (pending === 0) { | ||
released = true; | ||
promise.resolve(); | ||
} | ||
await promise; | ||
}, | ||
fn(spec) { | ||
if (released) { | ||
throw new Error("This batcher has been released, and so no more calls can be made."); | ||
} | ||
if (currentBatch !== null) { | ||
currentBatch.push(spec); | ||
} | ||
else { | ||
const specs = [spec]; | ||
currentBatch = specs; | ||
incrementPending(); | ||
setTimeout(() => { | ||
currentBatch = null; | ||
callback(specs).then(decrementPending, (error) => { | ||
decrementPending(); | ||
errorHandler(error, specs); | ||
}); | ||
}, delay); | ||
} | ||
return; | ||
}, | ||
}; | ||
} | ||
//# sourceMappingURL=main.js.map |
@@ -23,2 +23,3 @@ "use strict"; | ||
await hooks.process("prebootstrap", event); | ||
// Change to this query should be reflected in website/docs/schema.md | ||
await event.client.query(` | ||
@@ -25,0 +26,0 @@ create schema if not exists ${escapedWorkerSchema}; |
@@ -72,6 +72,6 @@ "use strict"; | ||
}); | ||
child.on("stdout", (data) => { | ||
child.stdout.on("data", (data) => { | ||
helpers.logger.info(data.toString("utf8")); | ||
}); | ||
child.on("stderr", (data) => { | ||
child.stderr.on("data", (data) => { | ||
helpers.logger.error(data.toString("utf8")); | ||
@@ -78,0 +78,0 @@ }); |
import { DbJob, EnhancedWithPgClient } from "../interfaces"; | ||
import { CompiledSharedOptions } from "../lib"; | ||
export declare function completeJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerId: string, job: DbJob): Promise<void>; | ||
export declare function completeJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, jobs: ReadonlyArray<DbJob>): Promise<void>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.completeJob = void 0; | ||
async function completeJob(compiledSharedOptions, withPgClient, workerId, job) { | ||
const manualPrepare = false; | ||
async function completeJob(compiledSharedOptions, withPgClient, poolId, jobs) { | ||
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions; | ||
const jobIdsWithoutQueue = []; | ||
const jobIdsWithQueue = []; | ||
for (const job of jobs) { | ||
if (job.job_queue_id != null) { | ||
jobIdsWithQueue.push(job.id); | ||
} | ||
else { | ||
jobIdsWithoutQueue.push(job.id); | ||
} | ||
} | ||
// TODO: retry logic, in case of server connection interruption | ||
if (job.job_queue_id != null) { | ||
if (jobIdsWithQueue.length > 0) { | ||
await withPgClient.withRetries((client) => client.query({ | ||
@@ -12,3 +23,4 @@ text: `\ | ||
delete from ${escapedWorkerSchema}._private_jobs as jobs | ||
where id = $1::bigint | ||
using unnest($1::bigint[]) n(n) | ||
where id = n | ||
returning * | ||
@@ -20,3 +32,3 @@ ) | ||
where job_queues.id = j.job_queue_id and job_queues.locked_by = $2::text;`, | ||
values: [job.id, workerId], | ||
values: [jobIdsWithQueue, poolId], | ||
name: !preparedStatements | ||
@@ -27,3 +39,3 @@ ? undefined | ||
} | ||
else { | ||
if (jobIdsWithoutQueue.length === 1) { | ||
await withPgClient.withRetries((client) => client.query({ | ||
@@ -33,8 +45,30 @@ text: `\ | ||
where id = $1::bigint`, | ||
values: [job.id], | ||
values: [jobIdsWithoutQueue[0]], | ||
name: !preparedStatements ? undefined : `complete_job/${workerSchema}`, | ||
})); | ||
} | ||
else if (jobIdsWithoutQueue.length > 1) { | ||
if (manualPrepare) { | ||
await withPgClient.withRetries((client) => client.query({ | ||
text: `\ | ||
prepare gwcj (bigint) as delete from ${escapedWorkerSchema}._private_jobs where id = $1; | ||
${jobIdsWithoutQueue.map((id) => `execute gwcj(${id});`).join("\n")} | ||
deallocate gwcj;`, | ||
})); | ||
} | ||
else { | ||
await withPgClient.withRetries((client) => client.query({ | ||
text: `\ | ||
delete from ${escapedWorkerSchema}._private_jobs as jobs | ||
using unnest($1::bigint[]) n(n) | ||
where id = n`, | ||
values: [jobIdsWithoutQueue], | ||
name: !preparedStatements | ||
? undefined | ||
: `complete_jobs/${workerSchema}`, | ||
})); | ||
} | ||
} | ||
} | ||
exports.completeJob = completeJob; | ||
//# sourceMappingURL=completeJob.js.map |
import { DbJob, EnhancedWithPgClient } from "../interfaces"; | ||
import { CompiledSharedOptions } from "../lib"; | ||
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerId: string, job: DbJob, message: string, replacementPayload: undefined | unknown[]): Promise<void>; | ||
export declare function failJobs(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, workerIds: string[], jobs: DbJob[], message: string): Promise<DbJob[]>; | ||
interface Spec { | ||
job: DbJob; | ||
message: string; | ||
replacementPayload: undefined | unknown[]; | ||
} | ||
export declare function failJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, specs: ReadonlyArray<Spec>): Promise<void>; | ||
export declare function failJobs(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, poolId: string, jobs: DbJob[], message: string): Promise<DbJob[]>; | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.failJobs = exports.failJob = void 0; | ||
async function failJob(compiledSharedOptions, withPgClient, workerId, job, message, replacementPayload) { | ||
async function failJob(compiledSharedOptions, withPgClient, poolId, specs) { | ||
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions; | ||
const specsWithQueues = []; | ||
const specsWithoutQueues = []; | ||
for (const spec of specs) { | ||
if (spec.job.job_queue_id != null) { | ||
specsWithQueues.push(spec); | ||
} | ||
else { | ||
specsWithoutQueues.push(spec); | ||
} | ||
} | ||
// TODO: retry logic, in case of server connection interruption | ||
if (job.job_queue_id != null) { | ||
if (specsWithQueues.length > 0) { | ||
await withPgClient.withRetries((client) => client.query({ | ||
@@ -13,8 +23,9 @@ text: `\ | ||
set | ||
last_error = $2::text, | ||
last_error = (el->>'message'), | ||
run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), | ||
locked_by = null, | ||
locked_at = null, | ||
payload = coalesce($4::json, jobs.payload) | ||
where id = $1::bigint and locked_by = $3::text | ||
payload = coalesce(el->'payload', jobs.payload) | ||
from json_array_elements($2::json) as els(el) | ||
where id = (el->>'jobId')::bigint and locked_by = $1::text | ||
returning * | ||
@@ -25,10 +36,10 @@ ) | ||
from j | ||
where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text;`, | ||
where job_queues.id = j.job_queue_id and job_queues.locked_by = $1::text;`, | ||
values: [ | ||
job.id, | ||
message, | ||
workerId, | ||
replacementPayload != null | ||
? JSON.stringify(replacementPayload) | ||
: null, | ||
poolId, | ||
JSON.stringify(specsWithQueues.map(({ job, message, replacementPayload }) => ({ | ||
jobId: job.id, | ||
message, | ||
payload: replacementPayload, | ||
}))), | ||
], | ||
@@ -38,3 +49,3 @@ name: !preparedStatements ? undefined : `fail_job_q/${workerSchema}`, | ||
} | ||
else { | ||
if (specsWithoutQueues.length > 0) { | ||
await withPgClient.withRetries((client) => client.query({ | ||
@@ -44,15 +55,16 @@ text: `\ | ||
set | ||
last_error = $2::text, | ||
last_error = (el->>'message'), | ||
run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second'), | ||
locked_by = null, | ||
locked_at = null, | ||
payload = coalesce($4::json, jobs.payload) | ||
where id = $1::bigint and locked_by = $3::text;`, | ||
payload = coalesce(el->'payload', jobs.payload) | ||
from json_array_elements($2::json) as els(el) | ||
where id = (el->>'jobId')::bigint and locked_by = $1::text;`, | ||
values: [ | ||
job.id, | ||
message, | ||
workerId, | ||
replacementPayload != null | ||
? JSON.stringify(replacementPayload) | ||
: null, | ||
poolId, | ||
JSON.stringify(specsWithoutQueues.map(({ job, message, replacementPayload }) => ({ | ||
jobId: job.id, | ||
message, | ||
payload: replacementPayload, | ||
}))), | ||
], | ||
@@ -64,3 +76,3 @@ name: !preparedStatements ? undefined : `fail_job/${workerSchema}`, | ||
exports.failJob = failJob; | ||
async function failJobs(compiledSharedOptions, withPgClient, workerIds, jobs, message) { | ||
async function failJobs(compiledSharedOptions, withPgClient, poolId, jobs, message) { | ||
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements }, }, } = compiledSharedOptions; | ||
@@ -77,3 +89,3 @@ // TODO: retry logic, in case of server connection interruption | ||
locked_at = null | ||
where id = any($1::int[]) and locked_by = any($3::text[]) | ||
where id = any($1::int[]) and locked_by = $3::text | ||
returning * | ||
@@ -84,6 +96,6 @@ ), queues as ( | ||
from j | ||
where job_queues.id = j.job_queue_id and job_queues.locked_by = any($3::text[]) | ||
where job_queues.id = j.job_queue_id and job_queues.locked_by = $3::text | ||
) | ||
select * from j;`, | ||
values: [jobs.map((job) => job.id), message, workerIds], | ||
values: [jobs.map((job) => job.id), message, poolId], | ||
name: !preparedStatements ? undefined : `fail_jobs/${workerSchema}`, | ||
@@ -90,0 +102,0 @@ })); |
import { EnhancedWithPgClient, Job, TaskList } from "../interfaces"; | ||
import { CompiledSharedOptions } from "../lib"; | ||
export declare function isPromise<T>(t: T | Promise<T>): t is Promise<T>; | ||
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, workerId: string, flagsToSkip: string[] | null): Promise<Job | undefined>; | ||
export declare function getJob(compiledSharedOptions: CompiledSharedOptions, withPgClient: EnhancedWithPgClient, tasks: TaskList, poolId: string, flagsToSkip: string[] | null, rawBatchSize: number): Promise<Job[]>; |
@@ -12,3 +12,4 @@ "use strict"; | ||
exports.isPromise = isPromise; | ||
async function getJob(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip) { | ||
async function getJob(compiledSharedOptions, withPgClient, tasks, poolId, flagsToSkip, rawBatchSize) { | ||
const batchSize = parseInt(String(rawBatchSize), 10) || 1; | ||
const { escapedWorkerSchema, workerSchema, resolvedPreset: { worker: { preparedStatements, useNodeTime }, }, logger, } = compiledSharedOptions; | ||
@@ -21,3 +22,3 @@ const taskDetailsPromise = (0, taskIdentifiers_1.getTaskDetails)(compiledSharedOptions, withPgClient, tasks); | ||
logger.error("No tasks found; nothing to do!"); | ||
return undefined; | ||
return []; | ||
} | ||
@@ -135,3 +136,3 @@ let i = 2; | ||
order by priority asc, run_at asc | ||
limit 1 | ||
limit ${batchSize} | ||
for update | ||
@@ -151,3 +152,3 @@ skip locked | ||
const values = [ | ||
workerId, | ||
poolId, | ||
taskDetails.taskIds, | ||
@@ -159,4 +160,4 @@ ...(hasFlags ? [flagsToSkip] : []), | ||
? undefined | ||
: `get_job${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`; | ||
const { rows: [jobRow], } = await withPgClient.withRetries((client) => client.query({ | ||
: `get_job${batchSize === 1 ? "" : batchSize}${hasFlags ? "F" : ""}${useNodeTime ? "N" : ""}/${workerSchema}`; | ||
const { rows } = await withPgClient.withRetries((client) => client.query({ | ||
text, | ||
@@ -166,12 +167,7 @@ values, | ||
})); | ||
if (jobRow) { | ||
return Object.assign(jobRow, { | ||
task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id], | ||
}); | ||
} | ||
else { | ||
return undefined; | ||
} | ||
return rows.map((jobRow) => Object.assign(jobRow, { | ||
task_identifier: taskDetails.supportedTaskIdentifierByTaskId[jobRow.task_id], | ||
})); | ||
} | ||
exports.getJob = getJob; | ||
//# sourceMappingURL=getJob.js.map |
@@ -1,1 +0,1 @@ | ||
export declare const version = "0.16.6"; | ||
export declare const version = "0.17.0-canary.6c2c85c"; |
@@ -5,3 +5,3 @@ "use strict"; | ||
// This file is autogenerated by /scripts/postversion.mjs | ||
exports.version = "0.16.6"; | ||
exports.version = "0.17.0-canary.6c2c85c"; | ||
//# sourceMappingURL=version.js.map |
@@ -1,2 +0,2 @@ | ||
import { EnhancedWithPgClient, TaskList, Worker, WorkerPool, WorkerSharedOptions } from "./interfaces"; | ||
import { CompleteJobFunction, EnhancedWithPgClient, FailJobFunction, GetJobFunction, TaskList, Worker, WorkerPool, WorkerSharedOptions } from "./interfaces"; | ||
import { CompiledSharedOptions } from "./lib"; | ||
@@ -11,2 +11,5 @@ export declare function makeNewWorker(compiledSharedOptions: CompiledSharedOptions<WorkerSharedOptions>, params: { | ||
workerId?: string; | ||
getJob: GetJobFunction; | ||
completeJob: CompleteJobFunction; | ||
failJob: FailJobFunction; | ||
}): Worker; |
@@ -9,7 +9,5 @@ "use strict"; | ||
const helpers_1 = require("./helpers"); | ||
const completeJob_1 = require("./sql/completeJob"); | ||
const failJob_1 = require("./sql/failJob"); | ||
const getJob_1 = require("./sql/getJob"); | ||
const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; | ||
function makeNewWorker(compiledSharedOptions, params) { | ||
const { tasks, withPgClient, continuous, abortSignal, workerPool, autostart = true, workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, } = params; | ||
const { tasks, withPgClient, continuous, abortSignal, workerPool, autostart = true, workerId = `worker-${(0, crypto_1.randomBytes)(9).toString("hex")}`, getJob, completeJob, failJob, } = params; | ||
const { events, resolvedPreset: { worker: { pollInterval }, }, hooks, _rawOptions: { forbiddenFlags }, } = compiledSharedOptions; | ||
@@ -119,3 +117,3 @@ const logger = compiledSharedOptions.logger.scope({ | ||
events.emit("worker:getJob:start", { worker }); | ||
const jobRow = await (0, getJob_1.getJob)(compiledSharedOptions, withPgClient, tasks, workerId, flagsToSkip); | ||
const jobRow = await getJob(workerPool.id, flagsToSkip); | ||
// `doNext` cannot be executed concurrently, so we know this is safe. | ||
@@ -265,7 +263,10 @@ // eslint-disable-next-line require-atomic-updates | ||
logger.error(`Failed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms, attempt ${job.attempts} of ${job.max_attempts}) with error '${message}'${stack ? `:\n ${String(stack).replace(/\n/g, "\n ").trim()}` : ""}`, { failure: true, job, error: err, duration }); | ||
await (0, failJob_1.failJob)(compiledSharedOptions, withPgClient, workerId, job, message, | ||
// "Batch jobs": copy through only the unsuccessful parts of the payload | ||
batchJobFailedPayloads.length > 0 | ||
? batchJobFailedPayloads | ||
: undefined); | ||
failJob({ | ||
job, | ||
message, | ||
// "Batch jobs": copy through only the unsuccessful parts of the payload | ||
replacementPayload: batchJobFailedPayloads.length > 0 | ||
? batchJobFailedPayloads | ||
: undefined, | ||
}); | ||
} | ||
@@ -279,3 +280,3 @@ else { | ||
} | ||
if (!process.env.NO_LOG_SUCCESS) { | ||
if (!NO_LOG_SUCCESS) { | ||
logger.info(`Completed task ${job.id} (${job.task_identifier}, ${duration.toFixed(2)}ms${job.attempts > 1 | ||
@@ -285,3 +286,3 @@ ? `, attempt ${job.attempts} of ${job.max_attempts}` | ||
} | ||
await (0, completeJob_1.completeJob)(compiledSharedOptions, withPgClient, workerId, job); | ||
completeJob(job); | ||
} | ||
@@ -288,0 +289,0 @@ events.emit("job:complete", { worker, job, error: err }); |
{ | ||
"name": "graphile-worker", | ||
"version": "0.16.6", | ||
"version": "0.17.0-canary.6c2c85c", | ||
"type": "commonjs", | ||
@@ -15,3 +15,4 @@ "description": "Job queue for PostgreSQL", | ||
"prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'", | ||
"test": "yarn prepack && depcheck && createdb graphile_worker_test || true && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && node --experimental-vm-modules node_modules/.bin/jest -i", | ||
"test": "yarn prepack && yarn depcheck && ( createdb graphile_worker_test || true ) && psql -X -v GRAPHILE_WORKER_SCHEMA=\"${GRAPHILE_WORKER_SCHEMA:-graphile_worker}\" -v ON_ERROR_STOP=1 -f __tests__/reset-db.sql graphile_worker_test && node --experimental-vm-modules node_modules/.bin/jest -i", | ||
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'", | ||
"db:dump": "./scripts/dump_db", | ||
@@ -18,0 +19,0 @@ "perfTest": "cd perfTest && node ./run.js", |
@@ -35,7 +35,5 @@ <img width="120" height="120" title="Graphile Worker logo" src="https://cdn.rawgit.com/graphile/worker/2c8091e0bcce5d39e46a1b3833daf20b59097d1e/website/static/img/logo.optimized.svg" /> | ||
<td align="center"><a href="https://dovetailapp.com/"><img src="https://graphile.org/images/sponsors/dovetail.png" width="90" height="90" alt="Dovetail" /><br />Dovetail</a> *</td> | ||
<td align="center"><a href="https://www.netflix.com/"><img src="https://graphile.org/images/sponsors/Netflix.png" width="90" height="90" alt="Netflix" /><br />Netflix</a> *</td> | ||
<td align="center"><a href="https://stellate.co/"><img src="https://graphile.org/images/sponsors/Stellate.png" width="90" height="90" alt="Stellate" /><br />Stellate</a> *</td> | ||
<td align="center"><a href="https://gosteelhead.com/"><img src="https://graphile.org/images/sponsors/steelhead.svg" width="90" height="90" alt="Steelhead" /><br />Steelhead</a> *</td> | ||
</tr><tr> | ||
<td align="center"><a href="https://gosteelhead.com/"><img src="https://graphile.org/images/sponsors/steelhead.svg" width="90" height="90" alt="Steelhead" /><br />Steelhead</a> *</td> | ||
<td align="center"><a href="https://www.sylvera.com/"><img src="https://graphile.org/images/sponsors/sylvera.svg" width="90" height="90" alt="Sylvera" /><br />Sylvera</a> *</td> | ||
<td align="center"><a href=""><img src="https://graphile.org/images/sponsors/triggerdev.png" width="90" height="90" alt="Trigger.dev" /><br />Trigger.dev</a></td> | ||
@@ -42,0 +40,0 @@ </tr></table> |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
544868
126
8408
44