NATS JetStream Custom Transporter for NestJS microservice
Goals of this library
Install
Library
npm i @nestjs/microservices
npm i nats
npm i @chance/nestjs-nats-jetstream-microservice
Server
NATS server could run locally
docker run -d --name nats -p 4222:4222 -p 6222:6222 -p 8222:8222 nats --jetstream -m 8222
or using Synadia NGS for quick setup.
Cli
NATS cli covers all needs from the command line
brew tap nats-io/nats-tools
brew install nats-io/nats-tools/nats
Or download official release.
Configuration
Streams
Streams are 'message stores', each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.
Subjects can be queried using NATS syntax
Configurations options could be set using the library
const bootstrap = async () => {
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
connectionOptions: {
servers: "127.0.0.1:4222",
name: `nats-connection.${os.hostname()}`,
},
assertStreams: [
{
name: "booking",
description: "Booking domain with all its events",
subjects: ["booking.>"],
} as Partial<StreamConfig>
],
}),
};
const app = await NestFactory.create<NestFastifyApplication>(HotelBookingModule);
const microService = app.connectMicroservice(options);
await microService.listen();
return app;
};
bootstrap();
Consumer options
A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients.
Unlike with core NATS which provides an at most once delivery guarantee of a message, a consumer can provide an at least once delivery guarantee.
Configuration options could be set using library and the decorator. Or be set using the cli and using the named consumer with the decorator
@Controller()
export class BotNatsController {
constructor(private scheduleCleaning: ScheduleCleaningCommandHandler) {}
@EventPattern("ConsumerName", {
description: "Trigger cleaning side effect when room is booked",
filter_subject: "booking.*.room-booked-event.>",
deliver_to: "cleanupInbox",
durable: "cleanupStack",
manual_ack: true,
} as ConsumeOptions)
async cleanup(
@Payload() event: RoomBookedEvent,
@Ctx() context: NatsJetStreamContext
) {
context.message.ack();
}
}
Publishing events
@Module({
imports: [
NatsJetStreamTransport.register({
connectionOptions: {
servers: "127.0.0.1:4222",
name: "hotel-booking-publisher",
}})
],
controllers: [BotNatsController,],
providers: [],
})
export class HotelBookingModule {}
@Injectable()
export class BookRoomCommandHandler {
constructor(private client: NatsJetStreamClient) {}
async handle(command: BookRoomCommand) {
const event = new RoomBookedEvent(
{ ...command.data, date: isoDate.toISOString() },
source,
correlationId
);
const uniqueBookingSlug = `booked-${correlationId}`;
this.client
.publish(
'my.super.subject',
event,
{ msgID: uniqueBookingSlug }
)
.then((res: PubAck) => {
if (!res.duplicate) {
return res;
}
throw new ConflictException('MsgID already exists error');
});
}
}