job-processor
Advanced tools
Comparing version 1.0.0-next.10 to 1.0.0-next.11
@@ -215,6 +215,10 @@ import * as bson from 'bson'; | ||
} | ||
type JobFn = (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>; | ||
type JobDef = { | ||
handler: (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>; | ||
onJobExited?: (job: IJobsSimple) => Promise<unknown>; | ||
}; | ||
type CronDef = { | ||
cron_string: string; | ||
handler: (signal: AbortSignal) => Promise<unknown>; | ||
onJobExited?: (job: IJobsCronTask) => Promise<unknown>; | ||
}; | ||
@@ -224,3 +228,3 @@ type JobProcessorOptions = { | ||
logger: ILogger; | ||
jobs: Record<string, JobFn>; | ||
jobs: Record<string, JobDef>; | ||
crons: Record<string, CronDef>; | ||
@@ -227,0 +231,0 @@ }; |
@@ -311,10 +311,22 @@ // src/crons/crons.ts | ||
import { formatDuration, intervalToDuration } from "date-fns"; | ||
function getJobSimpleFn(job) { | ||
function getJobSimpleDef(job) { | ||
const options2 = getOptions(); | ||
return options2.jobs[job.name] ?? null; | ||
} | ||
function getCronTaskFn(job) { | ||
function getCronTaskDef(job) { | ||
const options2 = getOptions(); | ||
return options2.crons[job.name]?.handler ?? null; | ||
return options2.crons[job.name] ?? null; | ||
} | ||
async function onRunnerExit(startDate, job, error, result) { | ||
const endDate = /* @__PURE__ */ new Date(); | ||
const ts = endDate.getTime() - startDate.getTime(); | ||
const duration = formatDuration(intervalToDuration({ start: startDate, end: endDate })) || `${ts}ms`; | ||
const status = error ? "errored" : "finished"; | ||
await updateJob(job._id, { | ||
status: error ? "errored" : "finished", | ||
output: { duration, result, error }, | ||
ended_at: endDate | ||
}); | ||
return { status, duration }; | ||
} | ||
async function runner(job, signal) { | ||
@@ -333,17 +345,17 @@ const jobLogger = getLogger().child({ | ||
}); | ||
let error = void 0; | ||
let error = null; | ||
let result = void 0; | ||
try { | ||
if (job.type === "simple") { | ||
const jobFn = getJobSimpleFn(job); | ||
if (!jobFn) { | ||
throw new Error("Job function not found"); | ||
const jobDef = getJobSimpleDef(job); | ||
if (!jobDef) { | ||
throw new Error("Job not found"); | ||
} | ||
result = await jobFn(job, signal); | ||
result = await jobDef.handler(job, signal); | ||
} else { | ||
const jobFn = getCronTaskFn(job); | ||
if (!jobFn) { | ||
throw new Error("Job function not found"); | ||
const cronDef = getCronTaskDef(job); | ||
if (!cronDef) { | ||
throw new Error("Cron not found"); | ||
} | ||
result = await jobFn(signal); | ||
result = await cronDef.handler(signal); | ||
} | ||
@@ -356,20 +368,11 @@ } catch (err) { | ||
); | ||
error = err?.stack; | ||
error = err?.stack ?? "Unknown"; | ||
} | ||
const endDate = /* @__PURE__ */ new Date(); | ||
const ts = endDate.getTime() - startDate.getTime(); | ||
const duration = formatDuration(intervalToDuration({ start: startDate, end: endDate })) || `${ts}ms`; | ||
const status = error ? "errored" : "finished"; | ||
await updateJob(job._id, { | ||
status: error ? "errored" : "finished", | ||
output: { duration, result, error }, | ||
ended_at: endDate | ||
}); | ||
const { status, duration } = await onRunnerExit( | ||
startDate, | ||
job, | ||
error, | ||
result | ||
); | ||
jobLogger.info({ status, duration }, "job ended"); | ||
if (error) { | ||
jobLogger.error( | ||
{ error }, | ||
error.constructor.name === "EnvVarError" ? error.message : error | ||
); | ||
} | ||
return error ? 1 : 0; | ||
@@ -376,0 +379,0 @@ } |
{ | ||
"name": "job-processor", | ||
"version": "1.0.0-next.10", | ||
"version": "1.0.0-next.11", | ||
"description": "Job processor service", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
import { Db } from "mongodb"; | ||
import { IJobsSimple } from "./data/model.ts"; | ||
import { IJobsCronTask, IJobsSimple } from "./data/model.ts"; | ||
import { configureDbSchemaValidation } from "./data/actions.ts"; | ||
interface ILogger { | ||
export interface ILogger { | ||
debug(msg: string): unknown; | ||
@@ -13,3 +13,7 @@ info(data: Record<string, unknown>, msg: string): unknown; | ||
export type JobFn = (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>; | ||
export type JobDef = { | ||
handler: (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>; | ||
// Particularly usefull to handle unexpected errors, crash & interruptions | ||
onJobExited?: (job: IJobsSimple) => Promise<unknown>; | ||
}; | ||
@@ -19,2 +23,4 @@ export type CronDef = { | ||
handler: (signal: AbortSignal) => Promise<unknown>; | ||
// Particularly usefull to handle unexpected errors, crash & interruptions | ||
onJobExited?: (job: IJobsCronTask) => Promise<unknown>; | ||
}; | ||
@@ -25,3 +31,3 @@ | ||
logger: ILogger; | ||
jobs: Record<string, JobFn>; | ||
jobs: Record<string, JobDef>; | ||
crons: Record<string, CronDef>; | ||
@@ -28,0 +34,0 @@ }; |
import { updateJob } from "../data/actions.ts"; | ||
import { IJobsCronTask, IJobsSimple } from "../data/model.ts"; | ||
import { CronDef, JobFn, getLogger, getOptions } from "../setup.ts"; | ||
import { CronDef, JobDef, getLogger, getOptions } from "../setup.ts"; | ||
import { | ||
@@ -11,3 +11,3 @@ captureException, | ||
function getJobSimpleFn(job: IJobsSimple): JobFn | null { | ||
function getJobSimpleDef(job: IJobsSimple): JobDef | null { | ||
const options = getOptions(); | ||
@@ -17,7 +17,29 @@ return options.jobs[job.name] ?? null; | ||
function getCronTaskFn(job: IJobsCronTask): CronDef["handler"] | null { | ||
function getCronTaskDef(job: IJobsCronTask): CronDef | null { | ||
const options = getOptions(); | ||
return options.crons[job.name]?.handler ?? null; | ||
return options.crons[job.name] ?? null; | ||
} | ||
async function onRunnerExit( | ||
startDate: Date, | ||
job: IJobsCronTask | IJobsSimple, | ||
error: string | null, | ||
result: unknown, | ||
) { | ||
const endDate = new Date(); | ||
const ts = endDate.getTime() - startDate.getTime(); | ||
const duration = | ||
formatDuration(intervalToDuration({ start: startDate, end: endDate })) || | ||
`${ts}ms`; | ||
const status = error ? "errored" : "finished"; | ||
await updateJob(job._id, { | ||
status: error ? "errored" : "finished", | ||
output: { duration, result, error }, | ||
ended_at: endDate, | ||
}); | ||
return { status, duration }; | ||
} | ||
async function runner( | ||
@@ -40,3 +62,3 @@ job: IJobsCronTask | IJobsSimple, | ||
}); | ||
let error: Error | undefined = undefined; | ||
let error: string | null = null; | ||
let result: unknown = undefined; | ||
@@ -46,13 +68,13 @@ | ||
if (job.type === "simple") { | ||
const jobFn = getJobSimpleFn(job); | ||
if (!jobFn) { | ||
throw new Error("Job function not found"); | ||
const jobDef = getJobSimpleDef(job); | ||
if (!jobDef) { | ||
throw new Error("Job not found"); | ||
} | ||
result = await jobFn(job, signal); | ||
result = await jobDef.handler(job, signal); | ||
} else { | ||
const jobFn = getCronTaskFn(job); | ||
if (!jobFn) { | ||
throw new Error("Job function not found"); | ||
const cronDef = getCronTaskDef(job); | ||
if (!cronDef) { | ||
throw new Error("Cron not found"); | ||
} | ||
result = await jobFn(signal); | ||
result = await cronDef.handler(signal); | ||
} | ||
@@ -66,26 +88,14 @@ // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
); | ||
error = err?.stack; | ||
error = (err as Error)?.stack ?? "Unknown"; | ||
} | ||
const endDate = new Date(); | ||
const ts = endDate.getTime() - startDate.getTime(); | ||
const duration = | ||
formatDuration(intervalToDuration({ start: startDate, end: endDate })) || | ||
`${ts}ms`; | ||
const status = error ? "errored" : "finished"; | ||
await updateJob(job._id, { | ||
status: error ? "errored" : "finished", | ||
output: { duration, result, error }, | ||
ended_at: endDate, | ||
}); | ||
const { status, duration } = await onRunnerExit( | ||
startDate, | ||
job, | ||
error, | ||
result, | ||
); | ||
jobLogger.info({ status, duration }, "job ended"); | ||
if (error) { | ||
jobLogger.error( | ||
{ error }, | ||
error.constructor.name === "EnvVarError" ? error.message : error, | ||
); | ||
} | ||
return error ? 1 : 0; | ||
@@ -92,0 +102,0 @@ } |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
69065
1237