Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
nats-jetstream-transport
Advanced tools
Breakable changes in v1.3..x
Replace NatsJetStreamClientProxy with NatsJetStreamClient
Build Event Driven Microservices Architecture with Nats JetStream Server and NestJS.
Support for both request-response and event based pattern.
npm i @nestjs/microservices
npm i nats
npm i @nestjs-plugins/nestjs-nats-jetstream-transport
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats -js -m 8222
Install cli tool on MacOS
brew install nats-io/nats-tools/nats
For other platforms see alternative installation methods.
To try the code example below, add a stream to the nats server:
nats stream add
Enter a stream name e.g. mystream. Then as subjects use order.*
For the rest of the choices just press enter and use the defaults.
You can also automatically create a stream by defining a streamConfig object to the NatsJestStreamOptions object. This will create a new stream or update existing. The code example bellow has this object defined so there is not really necessary to add this stream through nats cli.
You are now ready to publish and consume events on the stream. See the code example below for a test drive.
All
, ByStartSequence
or ByStartTime
since those deliver policies begin reading the stream at a position other than the end. If the policy is Original
, the messages in the stream will be pushed to the client at the same rate they were originally received, simulating the original timing of messages. If the policy is Instant
(the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.Nats-Msg-Size
header, no bodies.true
, the client prints protocol interactions to the console. Useful for debugging.true
the client will ignore any cluster updates provided by the server.createInbox(prefix)
()=>number
.connection_timeout
event with a NatsError that provides the hostport of the server where the connection was attempted.+OK
protocol acknowledgements.true
the client will fall back to a reconnect mode if it fails its first connection attempt.File
and Memory
Limits
, Interest
or WorkQueue
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { NatsJetStreamTransport } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
@Module({
imports: [
NatsJetStreamTransport.register({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-publisher',
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { NatsJetStreamClient } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
import { Injectable } from '@nestjs/common';
import { PubAck } from 'nats';
import { Observable } from 'rxjs';
interface OrderCreatedEvent {
id: number;
product: string;
quantity: number;
}
interface OrderUpdatedEvent {
id: number;
quantity: number;
}
interface OrderDeleteEvent {
id: number;
}
const ORDER_CREATED = 'order.created';
const ORDER_UPDATED = 'order.updated';
const ORDER_DELETED = 'order.deleted';
@Injectable()
export class AppService {
constructor(private client: NatsJetStreamClient) {}
createOrder(): string {
this.client
.emit<OrderCreatedEvent>(ORDER_CREATED, {
id: 1,
product: 'Socks',
quantity: 1,
})
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order created.';
}
updateOrder(): string {
this.client
.emit<OrderUpdatedEvent>(ORDER_UPDATED, { id: 1, quantity: 10 })
.subscribe();
return 'order updated';
}
deleteOrder(): string {
this.client
.emit<OrderDeleteEvent>(ORDER_DELETED, { id: 1 })
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order deleted';
}
// request - response
accumulate(payload: number[]): Observable<PubAck> {
const pattern = { cmd: 'sum' };
return this.client.send<number[]>(pattern, payload);
}
}
// app.controller.ts
import { NatsJetStreamContext } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
import { Controller, Get } from '@nestjs/common';
import {
Ctx,
EventPattern,
MessagePattern,
Payload,
} from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
home(): string {
return 'Welcome to webshop';
}
@Get('/create')
createOrder(): string {
return this.appService.createOrder();
}
@Get('/update')
updateOrder(): string {
return this.appService.updateOrder();
}
@Get('/delete')
deleteOrder(): string {
return this.appService.deleteOrder();
}
// request - response
@Get('/sum')
calc() {
console.log('sum controller');
return this.appService.accumulate([1, 2, 3]);
}
@EventPattern('order.updated')
public async orderUpdatedHandler(
@Payload() data: string,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.created')
public async orderCreatedHandler(
@Payload() data: { id: number; name: string },
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.deleted')
public async orderDeletedHandler(
@Payload() data: any,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
// request - response
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
console.log('message conroller', data);
return (data || []).reduce((a, b) => a + b);
}
}
// main.js
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { CustomStrategy } from '@nestjs/microservices';
import { NatsJetStreamServer } from '@nestjs-plugins/nestjs-nats-jetstream-transport';
async function bootstrap() {
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-listener',
},
consumerOptions: {
deliverGroup: 'myservice-group',
durable: 'myservice-durable',
deliverTo: 'myservice-messages',
manualAck: true,
},
streamConfig: {
name: 'mystream',
subjects: ['order.*'],
},
}),
};
// hybrid microservice and web application
const app = await NestFactory.create(AppModule);
const microService = app.connectMicroservice(options);
microService.listen();
app.listen(3000);
}
bootstrap();
FAQs
Nats JetStream Transport for NestJS
We found that nats-jetstream-transport demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.