@lokalise/background-jobs-common
Advanced tools
Comparing version 1.2.2 to 2.0.0
@@ -1,5 +0,6 @@ | ||
import type { BaseLogger, Bindings, ChildLoggerOptions, Logger } from 'pino'; | ||
import type { CommonLogger } from '@lokalise/node-core'; | ||
import type { Bindings, ChildLoggerOptions, Logger } from 'pino'; | ||
import type pino from 'pino'; | ||
import type { SafeJob } from './types'; | ||
export declare class BackgroundJobProcessorLogger implements BaseLogger { | ||
export declare class BackgroundJobProcessorLogger implements CommonLogger { | ||
private readonly logger; | ||
@@ -17,4 +18,4 @@ private readonly job; | ||
fatal: pino.LogFn; | ||
child(bindings: Bindings, options?: ChildLoggerOptions): BaseLogger; | ||
child(bindings: Bindings, options?: ChildLoggerOptions): CommonLogger; | ||
private jobLog; | ||
} |
@@ -0,12 +1,13 @@ | ||
import type { CommonLogger } from '@lokalise/node-core'; | ||
import type { Queue, Worker, WorkerOptions, JobsOptions, Job, QueueOptions } from 'bullmq'; | ||
import type Redis from 'ioredis'; | ||
import type { BaseLogger, Logger } from 'pino'; | ||
import type { Logger } from 'pino'; | ||
import pino from 'pino'; | ||
import type { BackgroundJobProcessorConfig, BackgroundJobProcessorDependencies, BullmqProcessor, SafeJob, SafeQueue } from '../types'; | ||
import type { BackgroundJobProcessorConfig, BackgroundJobProcessorDependencies, BaseJobPayload, BullmqProcessor, SafeJob, SafeQueue } from '../types'; | ||
import type { BackgroundJobProcessorSpyInterface } from './spy/types'; | ||
export interface RequestContext { | ||
logger: BaseLogger; | ||
logger: CommonLogger; | ||
reqId: string; | ||
} | ||
export declare abstract class AbstractBackgroundJobProcessor<JobPayload extends object, JobReturn = void, JobType extends SafeJob<JobPayload, JobReturn> = Job, QueueType extends SafeQueue<JobOptionsType, JobPayload, JobReturn> = Queue<JobPayload, JobReturn>, QueueOptionsType extends QueueOptions = QueueOptions, WorkerType extends Worker<JobPayload, JobReturn> = Worker<JobPayload, JobReturn>, WorkerOptionsType extends WorkerOptions = WorkerOptions, ProcessorType extends BullmqProcessor<JobType, JobPayload, JobReturn> = BullmqProcessor<JobType, JobPayload, JobReturn>, JobOptionsType extends JobsOptions = JobsOptions> { | ||
export declare abstract class AbstractBackgroundJobProcessor<JobPayload extends BaseJobPayload, JobReturn = void, JobType extends SafeJob<JobPayload, JobReturn> = Job, QueueType extends SafeQueue<JobOptionsType, JobPayload, JobReturn> = Queue<JobPayload, JobReturn>, QueueOptionsType extends QueueOptions = QueueOptions, WorkerType extends Worker<JobPayload, JobReturn> = Worker<JobPayload, JobReturn>, WorkerOptionsType extends WorkerOptions = WorkerOptions, ProcessorType extends BullmqProcessor<JobType, JobPayload, JobReturn> = BullmqProcessor<JobType, JobPayload, JobReturn>, JobOptionsType extends JobsOptions = JobsOptions> { | ||
protected readonly logger: Logger; | ||
@@ -32,6 +33,24 @@ private readonly redis; | ||
private handleFailedEvent; | ||
private internalOnSuccess; | ||
private internalOnFailed; | ||
private internalOnHook; | ||
protected resolveExecutionLogger(jobId: string): pino.Logger<never>; | ||
protected abstract process(job: JobType, requestContext: RequestContext): Promise<JobReturn>; | ||
/** | ||
* The hook will be triggered on 'completed' job state. | ||
* | ||
* @param _job | ||
* @param _requestContext | ||
* @protected | ||
*/ | ||
protected onSuccess(_job: JobType, _requestContext: RequestContext): Promise<void>; | ||
/** | ||
* Removes all data associated with the job, keeps only correlationId. | ||
* This method only works if the result of the job is not removed right after it is finished. | ||
* | ||
* @param job | ||
* @protected | ||
*/ | ||
protected purgeJobData(job: JobType): Promise<void>; | ||
protected abstract onFailed(job: JobType, error: Error, requestContext: RequestContext): Promise<void>; | ||
} |
@@ -15,5 +15,5 @@ "use strict"; | ||
* Default config | ||
* - Retry config: 3 retries with 30s of total amount of wait time between retries using | ||
* exponential strategy https://docs.bullmq.io/guide/retrying-failing-jobs#built-in-backoff-strategies | ||
* - Job retention: 50 last completed jobs, 7 days for failed jobs | ||
* - Retry config: 3 retries with 30s of total amount of wait time between retries using | ||
* exponential strategy https://docs.bullmq.io/guide/retrying-failing-jobs#built-in-backoff-strategies | ||
* - Job retention: 50 last completed jobs, 7 days for failed jobs | ||
*/ | ||
@@ -99,5 +99,9 @@ const DEFAULT_JOB_CONFIG = { | ||
}); | ||
if (this.config.isTest) { | ||
this.worker?.on('completed', (job) => this._spy?.addJobProcessingResult(job, 'completed')); | ||
} | ||
this.worker?.on('completed', (job) => { | ||
// @ts-expect-error | ||
this.internalOnSuccess(job, job.requestContext).catch(() => undefined); // nothing to do in case of success | ||
if (this.config.isTest) { | ||
this._spy?.addJobProcessingResult(job, 'completed'); | ||
} | ||
}); | ||
} | ||
@@ -153,3 +157,5 @@ async dispose() { | ||
preparedOptions.removeOnFail = true; | ||
preparedOptions.removeOnComplete = true; | ||
if (preparedOptions.removeOnComplete === undefined) { | ||
preparedOptions.removeOnComplete = true; | ||
} | ||
} | ||
@@ -161,13 +167,15 @@ return preparedOptions; | ||
let isSuccess = false; | ||
const requestContext = { | ||
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), | ||
reqId: jobId, | ||
}; | ||
if (!job.requestContext) { | ||
job.requestContext = { | ||
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), | ||
reqId: jobId, | ||
}; | ||
} | ||
try { | ||
this.newRelicBackgroundTransactionManager.start(job.name); | ||
requestContext.logger.info({ | ||
job.requestContext.logger.info({ | ||
origin: this.constructor.name, | ||
jobId, | ||
}, `Started job ${job.name}`); | ||
const result = await this.process(job, requestContext); | ||
const result = await this.process(job, job.requestContext); | ||
isSuccess = true; | ||
@@ -178,3 +186,3 @@ await job.updateProgress(100); | ||
finally { | ||
requestContext.logger.info({ isSuccess, jobId }, `Finished job ${job.name}`); | ||
job.requestContext.logger.info({ isSuccess, jobId }, `Finished job ${job.name}`); | ||
this.newRelicBackgroundTransactionManager.stop(job.name); | ||
@@ -185,7 +193,9 @@ } | ||
const jobId = (0, utils_1.resolveJobId)(job); | ||
const requestContext = { | ||
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), | ||
reqId: jobId, | ||
}; | ||
requestContext.logger.error((0, node_core_1.resolveGlobalErrorLogObject)(error, jobId)); | ||
if (!job.requestContext) { | ||
job.requestContext = { | ||
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), | ||
reqId: jobId, | ||
}; | ||
} | ||
job.requestContext.logger.error((0, node_core_1.resolveGlobalErrorLogObject)(error, jobId)); | ||
this.errorReporter.report({ | ||
@@ -201,8 +211,26 @@ error, | ||
job.opts.attempts === job.attemptsMade) { | ||
void this.internalOnFailed(job, error, requestContext).catch(() => undefined); // nothing to do in case of error | ||
void this.internalOnFailed(job, error, job.requestContext).catch(() => undefined); // nothing to do in case of error | ||
} | ||
} | ||
async internalOnSuccess(job) { | ||
const jobId = (0, utils_1.resolveJobId)(job); | ||
if (!job.requestContext) { | ||
job.requestContext = { | ||
logger: new BackgroundJobProcessorLogger_1.BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), | ||
reqId: jobId, | ||
}; | ||
} | ||
await this.internalOnHook(job, job.requestContext, async (job, requestContext) => { | ||
await this.onSuccess(job, requestContext); | ||
}); | ||
} | ||
async internalOnFailed(job, error, requestContext) { | ||
await this.internalOnHook(job, requestContext, async (job, requestContext) => { | ||
await this.onFailed(job, error, requestContext); | ||
}); | ||
this._spy?.addJobProcessingResult(job, 'failed'); | ||
} | ||
async internalOnHook(job, requestContext, onHook) { | ||
try { | ||
await this.onFailed(job, error, requestContext); | ||
await onHook(job, requestContext); | ||
} | ||
@@ -222,3 +250,2 @@ catch (error) { | ||
} | ||
this._spy?.addJobProcessingResult(job, 'failed'); | ||
} | ||
@@ -228,3 +255,27 @@ resolveExecutionLogger(jobId) { | ||
} | ||
/** | ||
* The hook will be triggered on 'completed' job state. | ||
* | ||
* @param _job | ||
* @param _requestContext | ||
* @protected | ||
*/ | ||
onSuccess(_job, _requestContext) { | ||
return Promise.resolve(); | ||
} | ||
/** | ||
* Removes all data associated with the job, keeps only correlationId. | ||
* This method only works if the result of the job is not removed right after it is finished. | ||
* | ||
* @param job | ||
* @protected | ||
*/ | ||
async purgeJobData(job) { | ||
const jobOptsRemoveOnComplete = job.opts.removeOnComplete; | ||
if (jobOptsRemoveOnComplete === true || jobOptsRemoveOnComplete === 1) | ||
return; | ||
// @ts-ignore | ||
await job.updateData({ metadata: job.data.metadata }); | ||
} | ||
} | ||
exports.AbstractBackgroundJobProcessor = AbstractBackgroundJobProcessor; |
import type { Job } from 'bullmq'; | ||
import type { BackgroundJobProcessorDependencies } from '../types'; | ||
import type { BackgroundJobProcessorDependencies, BaseJobPayload } from '../types'; | ||
import { AbstractBackgroundJobProcessor } from './AbstractBackgroundJobProcessor'; | ||
export declare class FakeBackgroundJobProcessor<JobData extends object> extends AbstractBackgroundJobProcessor<JobData> { | ||
export declare class FakeBackgroundJobProcessor<JobData extends BaseJobPayload> extends AbstractBackgroundJobProcessor<JobData> { | ||
private _processCalls; | ||
@@ -6,0 +6,0 @@ constructor(dependencies: Omit<BackgroundJobProcessorDependencies<JobData>, 'bullmqFactory' | 'transactionObservabilityManager'>, queueName: string, isTest?: boolean); |
@@ -5,2 +5,3 @@ import type { ErrorReporter } from '@lokalise/node-core'; | ||
import type { Logger } from 'pino'; | ||
import type { RequestContext } from './processors/AbstractBackgroundJobProcessor'; | ||
import type { AbstractBullmqFactory } from './processors/factories/AbstractBullmqFactory'; | ||
@@ -18,3 +19,5 @@ export type JobFinalState = FinishedStatus; | ||
}; | ||
export type SafeJob<T = any, R = any, N extends string = string> = Omit<Job<T, R, N>, 'scripts'>; | ||
export type SafeJob<T = any, R = any, N extends string = string> = Omit<Job<T, R, N>, 'scripts'> & { | ||
requestContext?: RequestContext; | ||
}; | ||
export type SafeQueue<JobsOptionsType = JobsOptions, DataType = any, ResultType = any, NameType extends string = string> = Omit<Queue<DataType, ResultType, NameType>, 'add'> & { | ||
@@ -31,1 +34,6 @@ add(name: NameType, data: DataType, opts?: JobsOptionsType): Promise<SafeJob<DataType, ResultType, NameType>>; | ||
}; | ||
export type BaseJobPayload = { | ||
metadata: { | ||
correlationId: string; | ||
}; | ||
}; |
{ | ||
"name": "@lokalise/background-jobs-common", | ||
"version": "1.2.2", | ||
"version": "2.0.0", | ||
"files": [ | ||
@@ -38,4 +38,4 @@ "dist", | ||
"@lokalise/id-utils": "1.0.0", | ||
"@lokalise/node-core": "^9.14.0", | ||
"pino": "^8.20.0", | ||
"@lokalise/node-core": "^9.15.0", | ||
"pino": "^8.21.0", | ||
"ts-deepmerge": "^7.0.0" | ||
@@ -42,0 +42,0 @@ }, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
47681
723
Updated@lokalise/node-core@^9.15.0
Updatedpino@^8.21.0