Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@lokalise/background-jobs-common

Package Overview
Dependencies
Maintainers
10
Versions
51
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@lokalise/background-jobs-common - npm Package Compare versions

Comparing version 1.2.2 to 2.0.0

7

dist/background-job-processor/BackgroundJobProcessorLogger.d.ts

@@ -1,5 +0,6 @@

import type { BaseLogger, Bindings, ChildLoggerOptions, Logger } from 'pino';
import type { CommonLogger } from '@lokalise/node-core';
import type { Bindings, ChildLoggerOptions, Logger } from 'pino';
import type pino from 'pino';
import type { SafeJob } from './types';
export declare class BackgroundJobProcessorLogger implements BaseLogger {
export declare class BackgroundJobProcessorLogger implements CommonLogger {
private readonly logger;

@@ -17,4 +18,4 @@ private readonly job;

fatal: pino.LogFn;
child(bindings: Bindings, options?: ChildLoggerOptions): BaseLogger;
child(bindings: Bindings, options?: ChildLoggerOptions): CommonLogger;
private jobLog;
}

@@ -0,12 +1,13 @@

import type { CommonLogger } from '@lokalise/node-core';
import type { Queue, Worker, WorkerOptions, JobsOptions, Job, QueueOptions } from 'bullmq';
import type Redis from 'ioredis';
import type { BaseLogger, Logger } from 'pino';
import type { Logger } from 'pino';
import pino from 'pino';
import type { BackgroundJobProcessorConfig, BackgroundJobProcessorDependencies, BullmqProcessor, SafeJob, SafeQueue } from '../types';
import type { BackgroundJobProcessorConfig, BackgroundJobProcessorDependencies, BaseJobPayload, BullmqProcessor, SafeJob, SafeQueue } from '../types';
import type { BackgroundJobProcessorSpyInterface } from './spy/types';
export interface RequestContext {
logger: BaseLogger;
logger: CommonLogger;
reqId: string;
}
export declare abstract class AbstractBackgroundJobProcessor<JobPayload extends object, JobReturn = void, JobType extends SafeJob<JobPayload, JobReturn> = Job, QueueType extends SafeQueue<JobOptionsType, JobPayload, JobReturn> = Queue<JobPayload, JobReturn>, QueueOptionsType extends QueueOptions = QueueOptions, WorkerType extends Worker<JobPayload, JobReturn> = Worker<JobPayload, JobReturn>, WorkerOptionsType extends WorkerOptions = WorkerOptions, ProcessorType extends BullmqProcessor<JobType, JobPayload, JobReturn> = BullmqProcessor<JobType, JobPayload, JobReturn>, JobOptionsType extends JobsOptions = JobsOptions> {
export declare abstract class AbstractBackgroundJobProcessor<JobPayload extends BaseJobPayload, JobReturn = void, JobType extends SafeJob<JobPayload, JobReturn> = Job, QueueType extends SafeQueue<JobOptionsType, JobPayload, JobReturn> = Queue<JobPayload, JobReturn>, QueueOptionsType extends QueueOptions = QueueOptions, WorkerType extends Worker<JobPayload, JobReturn> = Worker<JobPayload, JobReturn>, WorkerOptionsType extends WorkerOptions = WorkerOptions, ProcessorType extends BullmqProcessor<JobType, JobPayload, JobReturn> = BullmqProcessor<JobType, JobPayload, JobReturn>, JobOptionsType extends JobsOptions = JobsOptions> {
protected readonly logger: Logger;

@@ -32,6 +33,24 @@ private readonly redis;

private handleFailedEvent;
private internalOnSuccess;
private internalOnFailed;
private internalOnHook;
protected resolveExecutionLogger(jobId: string): pino.Logger<never>;
protected abstract process(job: JobType, requestContext: RequestContext): Promise<JobReturn>;
/**
* The hook will be triggered on 'completed' job state.
*
* @param _job
* @param _requestContext
* @protected
*/
protected onSuccess(_job: JobType, _requestContext: RequestContext): Promise<void>;
/**
* Removes all data associated with the job, keeps only correlationId.
* This method only works if the result of the job is not removed right after it is finished.
*
* @param job
* @protected
*/
protected purgeJobData(job: JobType): Promise<void>;
protected abstract onFailed(job: JobType, error: Error, requestContext: RequestContext): Promise<void>;
}

@@ -15,5 +15,5 @@ "use strict";

* Default config
* - Retry config: 3 retries with 30s of total amount of wait time between retries using
* exponential strategy https://docs.bullmq.io/guide/retrying-failing-jobs#built-in-backoff-strategies
* - Job retention: 50 last completed jobs, 7 days for failed jobs
* - Retry config: 3 retries with 30s of total amount of wait time between retries using
* exponential strategy https://docs.bullmq.io/guide/retrying-failing-jobs#built-in-backoff-strategies
* - Job retention: 50 last completed jobs, 7 days for failed jobs
*/

@@ -99,5 +99,9 @@ const DEFAULT_JOB_CONFIG = {

});
if (this.config.isTest) {
this.worker?.on('completed', (job) => this._spy?.addJobProcessingResult(job, 'completed'));
}
this.worker?.on('completed', (job) => {
// @ts-expect-error
this.internalOnSuccess(job, job.requestContext).catch(() => undefined); // nothing to do in case of success
if (this.config.isTest) {
this._spy?.addJobProcessingResult(job, 'completed');
}
});
}

@@ -153,3 +157,5 @@ async dispose() {

preparedOptions.removeOnFail = true;
preparedOptions.removeOnComplete = true;
if (preparedOptions.removeOnComplete === undefined) {
preparedOptions.removeOnComplete = true;
}
}

@@ -161,13 +167,15 @@ return preparedOptions;

let isSuccess = false;
const requestContext = {
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job),
reqId: jobId,
};
if (!job.requestContext) {
job.requestContext = {
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job),
reqId: jobId,
};
}
try {
this.newRelicBackgroundTransactionManager.start(job.name);
requestContext.logger.info({
job.requestContext.logger.info({
origin: this.constructor.name,
jobId,
}, `Started job ${job.name}`);
const result = await this.process(job, requestContext);
const result = await this.process(job, job.requestContext);
isSuccess = true;

@@ -178,3 +186,3 @@ await job.updateProgress(100);

finally {
requestContext.logger.info({ isSuccess, jobId }, `Finished job ${job.name}`);
job.requestContext.logger.info({ isSuccess, jobId }, `Finished job ${job.name}`);
this.newRelicBackgroundTransactionManager.stop(job.name);

@@ -185,7 +193,9 @@ }

const jobId = (0, utils_1.resolveJobId)(job);
const requestContext = {
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job),
reqId: jobId,
};
requestContext.logger.error((0, node_core_1.resolveGlobalErrorLogObject)(error, jobId));
if (!job.requestContext) {
job.requestContext = {
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job),
reqId: jobId,
};
}
job.requestContext.logger.error((0, node_core_1.resolveGlobalErrorLogObject)(error, jobId));
this.errorReporter.report({

@@ -201,8 +211,26 @@ error,

job.opts.attempts === job.attemptsMade) {
void this.internalOnFailed(job, error, requestContext).catch(() => undefined); // nothing to do in case of error
void this.internalOnFailed(job, error, job.requestContext).catch(() => undefined); // nothing to do in case of error
}
}
async internalOnSuccess(job) {
const jobId = (0, utils_1.resolveJobId)(job);
if (!job.requestContext) {
job.requestContext = {
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job),
reqId: jobId,
};
}
await this.internalOnHook(job, job.requestContext, async (job, requestContext) => {
await this.onSuccess(job, requestContext);
});
}
async internalOnFailed(job, error, requestContext) {
await this.internalOnHook(job, requestContext, async (job, requestContext) => {
await this.onFailed(job, error, requestContext);
});
this._spy?.addJobProcessingResult(job, 'failed');
}
async internalOnHook(job, requestContext, onHook) {
try {
await this.onFailed(job, error, requestContext);
await onHook(job, requestContext);
}

@@ -222,3 +250,2 @@ catch (error) {

}
this._spy?.addJobProcessingResult(job, 'failed');
}

@@ -228,3 +255,27 @@ resolveExecutionLogger(jobId) {

}
/**
* The hook will be triggered on 'completed' job state.
*
* @param _job
* @param _requestContext
* @protected
*/
onSuccess(_job, _requestContext) {
return Promise.resolve();
}
/**
* Removes all data associated with the job, keeps only correlationId.
* This method only works if the result of the job is not removed right after it is finished.
*
* @param job
* @protected
*/
async purgeJobData(job) {
const jobOptsRemoveOnComplete = job.opts.removeOnComplete;
if (jobOptsRemoveOnComplete === true || jobOptsRemoveOnComplete === 1)
return;
// @ts-ignore
await job.updateData({ metadata: job.data.metadata });
}
}
exports.AbstractBackgroundJobProcessor = AbstractBackgroundJobProcessor;
import type { Job } from 'bullmq';
import type { BackgroundJobProcessorDependencies } from '../types';
import type { BackgroundJobProcessorDependencies, BaseJobPayload } from '../types';
import { AbstractBackgroundJobProcessor } from './AbstractBackgroundJobProcessor';
export declare class FakeBackgroundJobProcessor<JobData extends object> extends AbstractBackgroundJobProcessor<JobData> {
export declare class FakeBackgroundJobProcessor<JobData extends BaseJobPayload> extends AbstractBackgroundJobProcessor<JobData> {
private _processCalls;

@@ -6,0 +6,0 @@ constructor(dependencies: Omit<BackgroundJobProcessorDependencies<JobData>, 'bullmqFactory' | 'transactionObservabilityManager'>, queueName: string, isTest?: boolean);

@@ -5,2 +5,3 @@ import type { ErrorReporter } from '@lokalise/node-core';

import type { Logger } from 'pino';
import type { RequestContext } from './processors/AbstractBackgroundJobProcessor';
import type { AbstractBullmqFactory } from './processors/factories/AbstractBullmqFactory';

@@ -18,3 +19,5 @@ export type JobFinalState = FinishedStatus;

};
export type SafeJob<T = any, R = any, N extends string = string> = Omit<Job<T, R, N>, 'scripts'>;
export type SafeJob<T = any, R = any, N extends string = string> = Omit<Job<T, R, N>, 'scripts'> & {
requestContext?: RequestContext;
};
export type SafeQueue<JobsOptionsType = JobsOptions, DataType = any, ResultType = any, NameType extends string = string> = Omit<Queue<DataType, ResultType, NameType>, 'add'> & {

@@ -31,1 +34,6 @@ add(name: NameType, data: DataType, opts?: JobsOptionsType): Promise<SafeJob<DataType, ResultType, NameType>>;

};
export type BaseJobPayload = {
metadata: {
correlationId: string;
};
};
{
"name": "@lokalise/background-jobs-common",
"version": "1.2.2",
"version": "2.0.0",
"files": [

@@ -38,4 +38,4 @@ "dist",

"@lokalise/id-utils": "1.0.0",
"@lokalise/node-core": "^9.14.0",
"pino": "^8.20.0",
"@lokalise/node-core": "^9.15.0",
"pino": "^8.21.0",
"ts-deepmerge": "^7.0.0"

@@ -42,0 +42,0 @@ },

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc