Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
nestjs-nats-jetstream-transport
Advanced tools
Build Event Driven Microservices Architecture with Nats JetStream Server and NestJS.
Support for both request-response and event based pattern.
npm i @nestjs/microservices
npm i nats
npm i @nestjs-plugins/nestjs-nats-jetstream-transport
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats -js -m 8222
Install cli tool on MacOS
brew install nats-io/nats-tools/nats
For other platforms see alternative installation methods.
To try the code example below, add a stream to the nats server:
nats stream add
Enter a stream name e.g. mystream. Then as subjects use order.*
For the rest of the choices just press enter and use the defaults.
You can also automatically create a stream by defining a streamConfig object to the NatsJestStreamOptions object. This will create a new stream or update existing. The code example bellow has this object defined so there is not really necessary to add this stream through nats cli.
You are now ready to publish and consume events on the stream. See the code example below for a test drive.
All
, ByStartSequence
or ByStartTime
since those deliver policies begin reading the stream at a position other than the end. If the policy is Original
, the messages in the stream will be pushed to the client at the same rate they were originally received, simulating the original timing of messages. If the policy is Instant
(the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.Nats-Msg-Size
header, no bodies.true
, the client prints protocol interactions to the console. Useful for debugging.true
the client will ignore any cluster updates provided by the server.createInbox(prefix)
()=>number
.connection_timeout
event with a NatsError that provides the hostport of the server where the connection was attempted.+OK
protocol acknowledgements.true
the client will fall back to a reconnect mode if it fails its first connection attempt.File
and Memory
Limits
, Interest
or WorkQueue
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { NatsJetStreamTransport } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
@Module({
imports: [
NatsJetStreamTransport.register({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-publisher'
}
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
// app.service.ts
import {
NatsJetStreamClientProxy,
} from '@nestjs-plugins/nestjs-nats-jetstream-transport';
import { Injectable } from '@nestjs/common';
import { PubAck } from 'nats';
import { Observable } from 'rxjs';
interface OrderCreatedEvent {
id: number;
product: string;
quantity: number;
}
interface OrderUpdatedEvent {
id: number;
quantity: number;
}
interface OrderDeleteEvent {
id: number;
}
const ORDER_CREATED = 'order.created';
const ORDER_UPDATED = 'order.updated';
const ORDER_DELETED = 'order.deleted';
@Injectable()
export class AppService {
constructor(private client: NatsJetStreamClientProxy) {}
createOrder(): string {
this.client
.emit<PubAck, OrderCreatedEvent>(ORDER_CREATED, {
id: 1,
product: 'Socks',
quantity: 1,
})
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order created.';
}
updateOrder(): string {
this.client
.emit<null, OrderUpdatedEvent>(ORDER_UPDATED, { id: 1, quantity: 10 })
.subscribe();
return 'order updated';
}
deleteOrder(): string {
this.client
.emit<PubAck, OrderDeleteEvent>(ORDER_DELETED, { id: 1 })
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order deleted';
}
// request - response
accumulate(payload: number[]): Observable<number> {
const pattern = { cmd: 'sum' };
return this.client.send<number>(pattern, payload);
}
}
// app.controller.ts
import { NatsJetStreamContext } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
import { Controller, Get } from '@nestjs/common';
import { Ctx, EventPattern, MessagePattern, Payload } from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
home(): string {
return 'Welcome to webshop'
}
@Get('/create')
createOrder(): string {
return this.appService.createOrder();
}
@Get('/update')
updateOrder(): string {
return this.appService.updateOrder();
}
@Get('/delete')
deleteOrder(): string {
return this.appService.deleteOrder();
}
// request - response
@Get('/sum')
calc() {
console.log('sum controller')
return this.appService.accumulate([1,2,3])
}
@EventPattern('order.updated')
public async orderUpdatedHandler(
@Payload() data: string,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.created')
public async orderCreatedHandler(
@Payload() data: { id: number; name: string },
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.deleted')
public async orderDeletedHandler(
@Payload() data:any,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
// request - response
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
console.log('message conroller', data)
return (data || []).reduce((a, b) => a + b);
}
}
// main.js
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { CustomStrategy } from '@nestjs/microservices';
import { NatsJetStreamServer } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
async function bootstrap() {
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-listener'
},
consumerOptions: {
deliverGroup: 'myservice-group',
durable: 'myservice-durable',
deliverTo: 'myservice-messages',
manualAck: true,
},
streamConfig: {
name: 'mystream',
subjects: ['order.*']
}
}),
};
// hybrid microservice and web application
const app = await NestFactory.create(AppModule);
const microService = app.connectMicroservice(options);
microService.listen();
app.listen(3000);
}
bootstrap();
FAQs
Nats JetStream Transport for NestJS
The npm package nestjs-nats-jetstream-transport receives a total of 0 weekly downloads. As such, nestjs-nats-jetstream-transport popularity was classified as not popular.
We found that nestjs-nats-jetstream-transport demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.