Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

job-processor

Package Overview
Dependencies
Maintainers
2
Versions
55
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

job-processor - npm Package Compare versions

Comparing version 1.0.0-next.10 to 1.0.0-next.11

8

dist/index.d.ts

@@ -215,6 +215,10 @@ import * as bson from 'bson';

}
type JobFn = (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>;
type JobDef = {
handler: (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>;
onJobExited?: (job: IJobsSimple) => Promise<unknown>;
};
type CronDef = {
cron_string: string;
handler: (signal: AbortSignal) => Promise<unknown>;
onJobExited?: (job: IJobsCronTask) => Promise<unknown>;
};

@@ -224,3 +228,3 @@ type JobProcessorOptions = {

logger: ILogger;
jobs: Record<string, JobFn>;
jobs: Record<string, JobDef>;
crons: Record<string, CronDef>;

@@ -227,0 +231,0 @@ };

@@ -311,10 +311,22 @@ // src/crons/crons.ts

import { formatDuration, intervalToDuration } from "date-fns";
function getJobSimpleFn(job) {
function getJobSimpleDef(job) {
const options2 = getOptions();
return options2.jobs[job.name] ?? null;
}
function getCronTaskFn(job) {
function getCronTaskDef(job) {
const options2 = getOptions();
return options2.crons[job.name]?.handler ?? null;
return options2.crons[job.name] ?? null;
}
async function onRunnerExit(startDate, job, error, result) {
const endDate = /* @__PURE__ */ new Date();
const ts = endDate.getTime() - startDate.getTime();
const duration = formatDuration(intervalToDuration({ start: startDate, end: endDate })) || `${ts}ms`;
const status = error ? "errored" : "finished";
await updateJob(job._id, {
status: error ? "errored" : "finished",
output: { duration, result, error },
ended_at: endDate
});
return { status, duration };
}
async function runner(job, signal) {

@@ -333,17 +345,17 @@ const jobLogger = getLogger().child({

});
let error = void 0;
let error = null;
let result = void 0;
try {
if (job.type === "simple") {
const jobFn = getJobSimpleFn(job);
if (!jobFn) {
throw new Error("Job function not found");
const jobDef = getJobSimpleDef(job);
if (!jobDef) {
throw new Error("Job not found");
}
result = await jobFn(job, signal);
result = await jobDef.handler(job, signal);
} else {
const jobFn = getCronTaskFn(job);
if (!jobFn) {
throw new Error("Job function not found");
const cronDef = getCronTaskDef(job);
if (!cronDef) {
throw new Error("Cron not found");
}
result = await jobFn(signal);
result = await cronDef.handler(signal);
}

@@ -356,20 +368,11 @@ } catch (err) {

);
error = err?.stack;
error = err?.stack ?? "Unknown";
}
const endDate = /* @__PURE__ */ new Date();
const ts = endDate.getTime() - startDate.getTime();
const duration = formatDuration(intervalToDuration({ start: startDate, end: endDate })) || `${ts}ms`;
const status = error ? "errored" : "finished";
await updateJob(job._id, {
status: error ? "errored" : "finished",
output: { duration, result, error },
ended_at: endDate
});
const { status, duration } = await onRunnerExit(
startDate,
job,
error,
result
);
jobLogger.info({ status, duration }, "job ended");
if (error) {
jobLogger.error(
{ error },
error.constructor.name === "EnvVarError" ? error.message : error
);
}
return error ? 1 : 0;

@@ -376,0 +379,0 @@ }

{
"name": "job-processor",
"version": "1.0.0-next.10",
"version": "1.0.0-next.11",
"description": "Job processor service",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

import { Db } from "mongodb";
import { IJobsSimple } from "./data/model.ts";
import { IJobsCronTask, IJobsSimple } from "./data/model.ts";
import { configureDbSchemaValidation } from "./data/actions.ts";
interface ILogger {
export interface ILogger {
debug(msg: string): unknown;

@@ -13,3 +13,7 @@ info(data: Record<string, unknown>, msg: string): unknown;

export type JobFn = (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>;
export type JobDef = {
handler: (job: IJobsSimple, signal: AbortSignal) => Promise<unknown>;
// Particularly usefull to handle unexpected errors, crash & interruptions
onJobExited?: (job: IJobsSimple) => Promise<unknown>;
};

@@ -19,2 +23,4 @@ export type CronDef = {

handler: (signal: AbortSignal) => Promise<unknown>;
// Particularly usefull to handle unexpected errors, crash & interruptions
onJobExited?: (job: IJobsCronTask) => Promise<unknown>;
};

@@ -25,3 +31,3 @@

logger: ILogger;
jobs: Record<string, JobFn>;
jobs: Record<string, JobDef>;
crons: Record<string, CronDef>;

@@ -28,0 +34,0 @@ };

import { updateJob } from "../data/actions.ts";
import { IJobsCronTask, IJobsSimple } from "../data/model.ts";
import { CronDef, JobFn, getLogger, getOptions } from "../setup.ts";
import { CronDef, JobDef, getLogger, getOptions } from "../setup.ts";
import {

@@ -11,3 +11,3 @@ captureException,

function getJobSimpleFn(job: IJobsSimple): JobFn | null {
function getJobSimpleDef(job: IJobsSimple): JobDef | null {
const options = getOptions();

@@ -17,7 +17,29 @@ return options.jobs[job.name] ?? null;

function getCronTaskFn(job: IJobsCronTask): CronDef["handler"] | null {
function getCronTaskDef(job: IJobsCronTask): CronDef | null {
const options = getOptions();
return options.crons[job.name]?.handler ?? null;
return options.crons[job.name] ?? null;
}
async function onRunnerExit(
startDate: Date,
job: IJobsCronTask | IJobsSimple,
error: string | null,
result: unknown,
) {
const endDate = new Date();
const ts = endDate.getTime() - startDate.getTime();
const duration =
formatDuration(intervalToDuration({ start: startDate, end: endDate })) ||
`${ts}ms`;
const status = error ? "errored" : "finished";
await updateJob(job._id, {
status: error ? "errored" : "finished",
output: { duration, result, error },
ended_at: endDate,
});
return { status, duration };
}
async function runner(

@@ -40,3 +62,3 @@ job: IJobsCronTask | IJobsSimple,

});
let error: Error | undefined = undefined;
let error: string | null = null;
let result: unknown = undefined;

@@ -46,13 +68,13 @@

if (job.type === "simple") {
const jobFn = getJobSimpleFn(job);
if (!jobFn) {
throw new Error("Job function not found");
const jobDef = getJobSimpleDef(job);
if (!jobDef) {
throw new Error("Job not found");
}
result = await jobFn(job, signal);
result = await jobDef.handler(job, signal);
} else {
const jobFn = getCronTaskFn(job);
if (!jobFn) {
throw new Error("Job function not found");
const cronDef = getCronTaskDef(job);
if (!cronDef) {
throw new Error("Cron not found");
}
result = await jobFn(signal);
result = await cronDef.handler(signal);
}

@@ -66,26 +88,14 @@ // eslint-disable-next-line @typescript-eslint/no-explicit-any

);
error = err?.stack;
error = (err as Error)?.stack ?? "Unknown";
}
const endDate = new Date();
const ts = endDate.getTime() - startDate.getTime();
const duration =
formatDuration(intervalToDuration({ start: startDate, end: endDate })) ||
`${ts}ms`;
const status = error ? "errored" : "finished";
await updateJob(job._id, {
status: error ? "errored" : "finished",
output: { duration, result, error },
ended_at: endDate,
});
const { status, duration } = await onRunnerExit(
startDate,
job,
error,
result,
);
jobLogger.info({ status, duration }, "job ended");
if (error) {
jobLogger.error(
{ error },
error.constructor.name === "EnvVarError" ? error.message : error,
);
}
return error ? 1 : 0;

@@ -92,0 +102,0 @@ }

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc