Security News
Input Validation Vulnerabilities Dominate MITRE's 2024 CWE Top 25 List
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
@lokalise/background-jobs-common
Advanced tools
This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available:
This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available:
Install all dependencies:
npm install
Start Docker containers:
docker compose up -d
Run all tests:
npm run test
See test implementations in ./test/processors
folder. Extend AbstractBackgroundJobProcessor
and implement required methods.
For that type of jobs, you will need to extend AbstractBackgroundJobProcessor
and implement a processInternal
method.
It will be called when a job is dequeued. Processing logic is automatically wrapped into NewRelic and basic logger calls,
so you only need to add your domain logic.
Both queue and worker is automatically started when you instantiate the processor. There is a default configuration which
you can override by passing queueConfig.queueOptions
and workerOptions
params to the constructor.
Use dispose()
to correctly stop processing any new messages and wait for the current ones to finish.
Testing asynchronous code can be challenging. To address this, we've implemented a built-in spy functionality for jobs.
const scheduledJobIds = await processor.scheduleBulk([
{
id: randomUUID(),
value: 'first',
metadata: { correlationId: generateMonotonicUuid() },
},
{
id: randomUUID(),
value: 'second',
metadata: { correlationId: randomUUID() },
},
]);
const firstJob = await processor.spy.waitForJobWithId(scheduledJobIds[0], 'completed');
const secondJob = await processor.spy.waitForJob(
(data) => data.value === 'second',
'completed'
);
expect(firstJob.data.value).toBe('first');
expect(secondJob.data.value).toBe('second');
processor.spy.waitForJobWithId(jobId, status)
:
processor.spy.waitForJob(predicate, status)
:
Spies can await jobs in the following states:
scheduled
: The job is scheduled but not yet processed.failed
: The job is processed but failed.completed
: The job is processed successfully.await processor.spy.waitForJobWithId(scheduledJobId[], {state})
after the job has already been scheduled or processed, the spy can still resolve the job state for you.isTest
option of BackgroundJobProcessorConfig
to true
in your processor configuration.By utilizing these spy functions, you can more effectively manage and test the behavior of asynchronous jobs within your system.
In case you want to conditionally delay execution of the job (e. g. until some data necessary for processing the job arrives, or until amount of jobs in the subsequent step go below the threshold), you can use the barrier
parameter, which delays the execution of the job until a specified condition passes.
Barrier looks like this:
const barrier = async(_job: Job<JobData>) => {
if (barrierConditionIsPassing) {
return {
isPassing: true,
}
}
return {
isPassing: false,
delayAmountInMs: 30000, // retry in 30 seconds
}
}
You pass it as a part of AbstractBackgroundJobProcessor config
.
You can also pass over some dependencies from the processor to the barrier:
class myJobProcessor extends AbstractBackgroundJobProcessor<Generics> {
protected override resolveExecutionContext(): ExecutionContext {
return {
userService: this.userService
}
}
}
This will be passed to the barrier:
const barrier = async(_job: Job<JobData>, context: ExecutionContext) => {
if (await context.userService.userExists(job.data.userId)) {
return {
isPassing: true,
}
}
return {
isPassing: false,
delayAmountInMs: 30000, // retry in 30 seconds
}
}
@lokalise/background-jobs-common
provides one barrier out-of-the-box - a JobQueueSizeThrottlingBarrier, which is used to control amount of jobs that are being spawned by a job processor (in a different queue).
Here is an example usage:
import { createJobQueueSizeThrottlingBarrier } from '@lokalise/background-jobs-common'
const processor = new MyJobProcessor(
dependencies, {
// ... the rest of the config
barrier: createJobQueueSizeThrottlingBarrier({
maxQueueJobsInclusive: 2, // optimistic limit, if exceeded, job with the barrier will be delayed
retryPeriodInMsecs: 30000, // job with the barrier will be retried in 30 seconds if there are too many jobs in the throttled queue
})
})
await processor.start()
Note that throttling is based on an optimistic check (checking the count and executing the job that uses the barrier is not an atomic operation), so potentially it is possible to go over the limit in a highly concurrent system. For this reason it is recommended to set the limits with a buffer for the possible overflow.
This barrier depends on defining the following ExecutionContext:
import type { JobQueueSizeThrottlingBarrierContext } from '@lokalise/background-jobs-common'
class myJobProcessor extends AbstractBackgroundJobProcessor<Generics> {
protected override resolveExecutionContext(): JobQueueSizeThrottlingBarrierContext {
return {
throttledQueueJobProcessor: this.throttledQueueJobProcessor, // AbstractBackgroundJobProcessor
}
}
}
The library optimized the default event stream settings to save memory. Specifically, the library sets the default
maximum length of the BullMQ queue events stream to 0
(doc). This means the
event stream will not store any events by default, greatly reducing memory usage.
If you need to store more events in the stream, you can easily configure the maximum length via the queueOptions
parameter during the processor creation.
export class Processor extends AbstractBackgroundJobProcessor<Data> {
constructor(dependencies: BackgroundJobProcessorDependencies<Data>) {
super(dependencies, {
queueId: 'queue',
ownerName: 'example owner',
queueOptions: {
streams: {events:{maxLen: 1000}},
}
})
}
// ...
}
FAQs
This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available:
We found that @lokalise/background-jobs-common demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.