What is @nestjs/bull?
@nestjs/bull is a package for the NestJS framework that provides integration with Bull, a popular library for handling distributed job queues. It allows you to create, manage, and process jobs in a queue, making it useful for background processing, task scheduling, and more.
What are @nestjs/bull's main functionalities?
Creating a Queue
This code demonstrates how to set up a queue named 'my-queue' using the @nestjs/bull package. It configures the connection to a Redis server and registers the queue within a NestJS module.
```typescript
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
BullModule.registerQueue({
name: 'my-queue',
}),
],
})
export class AppModule {}
```
Adding Jobs to a Queue
This code shows how to add jobs to a queue. The `MyService` class injects the 'my-queue' queue and uses the `addJob` method to add a job with the provided data.
```typescript
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { Injectable } from '@nestjs/common';
@Injectable()
export class MyService {
constructor(@InjectQueue('my-queue') private readonly myQueue: Queue) {}
async addJob(data: any) {
await this.myQueue.add(data);
}
}
```
Processing Jobs
This code demonstrates how to process jobs in a queue. The `MyQueueProcessor` class is decorated with `@Processor('my-queue')` to indicate it will handle jobs from 'my-queue'. The `handleJob` method processes each job.
```typescript
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('my-queue')
export class MyQueueProcessor {
@Process()
async handleJob(job: Job) {
console.log('Processing job:', job.data);
// Add your job processing logic here
}
}
```
Other packages similar to @nestjs/bull
bull
Bull is a Node.js library for creating and managing job queues. It is the underlying library used by @nestjs/bull. While @nestjs/bull provides integration with the NestJS framework, Bull can be used independently in any Node.js application.
agenda
Agenda is a lightweight job scheduling library for Node.js. It provides similar functionality to Bull but focuses more on job scheduling and recurring tasks. It uses MongoDB as its storage backend, whereas Bull uses Redis.
kue
Kue is another job queue library for Node.js that uses Redis. It provides a simple API for creating and processing jobs, along with a built-in UI for monitoring job status. Kue is less actively maintained compared to Bull.
A progressive Node.js framework for building efficient and scalable server-side applications.
Description
Bull module for Nest.
Installation
$ npm i --save @nestjs/bull bull
$ npm i --save-dev @types/bull
Quick Start
import { Body, Controller, Get, Module, Param, Post } from '@nestjs/common';
import { DoneCallback, Job, Queue } from 'bull';
import { BullModule, InjectQueue } from 'nest-bull';
@Controller()
export class AppController {
constructor(@InjectQueue('store') readonly queue: Queue) {}
@Post()
async addJob(@Body() value: any) {
const job: Job = await this.queue.add(value);
return job.id;
}
@Get(':id')
async getJob(@Param('id') id: string) {
return await this.queue.getJob(id);
}
}
@Module({
imports: [
BullModule.register({
name: 'store',
options: {
redis: {
port: 6379,
},
},
processors: [
(job: Job, done: DoneCallback) => {
done(null, job.data);
},
],
}),
],
controllers: [AppController],
})
export class ApplicationModule {}
Decorators
This module provides some decorators that will help you to set up your queue listeners.
@Processor()
The @Processor()
class decorator is mandatory if you plan to use this package's decorators.
It accepts an optional QueueDecoratorOptions
argument:
export interface QueueDecoratorOptions {
name?: string;
}
@Process()
The @Process()
method decorator flags a method as a processing function for the queued jobs.
It accepts an optional QueueProcessDecoratorOptions
argument:
export interface QueueProcessDecoratorOptions {
name?: string;
concurrency?: number;
}
Whenever a job matching the configured name
(if any) is queued, it will be processed by the decorated method.
Such method is expected to have the following signature (job: Job, done?: DoneCallback): any
;
@OnQueueEvent()
The OnQueueEvent()
method decorator flags a method as an event listener for the related queue.
It requires a BullQueueEvent
argument:
export type BullQueueEvent =
| 'error'
| 'waiting'
| 'active'
| 'stalled'
| 'progress'
| 'completed'
| 'failed'
| 'paused'
| 'resumed'
| 'cleaned'
| 'drained'
| 'removed'
| 'global:error'
| 'global:waiting'
| 'global:active'
| 'global:stalled'
| 'global:progress'
| 'global:completed'
| 'global:failed'
| 'global:paused'
| 'global:resumed'
| 'global:cleaned'
| 'global:drained'
| 'global:removed';
You can also use the BullQueueEvents
and BullQueueGlobalEvents
enums.
Fortunately, there is a shorthand decorator for each of the Bull events:
@OnQueueError()
@OnQueueWaiting()
@OnQueueActive()
@OnQueueStalled()
@OnQueueProgress()
@OnQueueCompleted()
@OnQueueFailed()
@OnQueuePaused()
@OnQueueResumed()
@OnQueueCleaned()
@OnQueueDrained()
@OnQueueRemoved()
@OnGlobalQueueError()
@OnGlobalQueueWaiting()
@OnGlobalQueueActive()
@OnGlobalQueueStalled()
@OnGlobalQueueProgress()
@OnGlobalQueueCompleted()
@OnGlobalQueueFailed()
@OnGlobalQueuePaused()
@OnGlobalQueueResumed()
@OnGlobalQueueCleaned()
@OnGlobalQueueDrained()
@OnGlobalQueueRemoved()
If you need more details about those events, head straight to Bull's reference doc.
Example
Here is a pretty self-explanatory example on how this package's decorators should be used.
import {
Processor,
Process,
OnQueueActive,
OnQueueEvent,
BullQueueEvents,
} from '../../lib';
import { NumberService } from './number.service';
import { Job, DoneCallback } from 'bull';
@Processor()
export class MyQueue {
private readonly logger = new Logger('MyQueue');
constructor(private readonly service: NumberService) {}
@Process({ name: 'twice' })
processTwice(job: Job<number>) {
return this.service.twice(job.data);
}
@Process({ name: 'thrice' })
processThrice(job: Job<number>, callback: DoneCallback) {
callback(null, this.service.thrice(job.data));
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
this.logger.log(
`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
);
}
}
Separate processes
This module allows you to run your job handlers in fork processes.
To do so, add the filesystem path to a file (or more) exporting your processor function to the processors
property of the BullModule options.
You can read more on this subject in Bull's documentation.
Please note that, your function being executed in a fork, Nestjs' DI won't be available.
Example
import { Module } from '@nestjs/common';
import { BullModule } from 'nest-bull';
import { join } from 'path';
@Module({
imports: [
BullModule.register({
processors: [join(__dirname, 'processor.ts')],
}),
],
})
export class AppModule {}
import { Job, DoneCallback } from 'bull';
export default function(job: Job, cb: DoneCallback) {
cb(null, 'It works');
}
People