Security News
The Risks of Misguided Research in Supply Chain Security
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
@supy-io/nestjs-jetstream
Advanced tools
Breakable changes in v1.3..x
Replace NatsJetStreamClientProxy with NatsJetStreamClient
Build Event Driven Microservices Architecture with Nats JetStream Server and NestJS.
Support for both request-response and event based pattern.
npm i @nestjs/microservices
npm i nats
npm i @supy-io/nestjs-jetstream
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats -js -m 8222
Install cli tool on MacOS
brew install nats-io/nats-tools/nats
For other platforms see alternative installation methods.
To try the code example below, add a stream to the nats server:
nats stream add
Enter a stream name e.g. mystream. Then as subjects use order.*
For the rest of the choices just press enter and use the defaults.
You can also automatically create a stream by defining a streamConfig object to the NatsJestStreamOptions object. This will create a new stream or update existing. The code example bellow has this object defined so there is not really necessary to add this stream through nats cli.
You are now ready to publish and consume events on the stream. See the code example below for a test drive.
All
, ByStartSequence
or ByStartTime
since those deliver policies begin reading the stream at a position other than the end. If the policy is Original
, the messages in the stream will be pushed to the client at the same rate they were originally received, simulating the original timing of messages. If the policy is Instant
(the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.Nats-Msg-Size
header, no bodies.true
, the client prints protocol interactions to the console. Useful for debugging.true
the client will ignore any cluster updates provided by the server.createInbox(prefix)
()=>number
.connection_timeout
event with a NatsError that provides the hostport of the server where the connection was attempted.+OK
protocol acknowledgements.true
the client will fall back to a reconnect mode if it fails its first connection attempt.File
and Memory
Limits
, Interest
or WorkQueue
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { NatsJetStreamTransport } from '@supy-io/nestjs-jetstream';
@Module({
imports: [
NatsJetStreamTransport.register({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-publisher',
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { NatsJetStreamClient } from '@supy-io/nestjs-jetstream';
import { Injectable } from '@nestjs/common';
import { PubAck } from 'nats';
import { Observable } from 'rxjs';
interface OrderCreatedEvent {
id: number;
product: string;
quantity: number;
}
interface OrderUpdatedEvent {
id: number;
quantity: number;
}
interface OrderDeleteEvent {
id: number;
}
const ORDER_CREATED = 'order.created';
const ORDER_UPDATED = 'order.updated';
const ORDER_DELETED = 'order.deleted';
@Injectable()
export class AppService {
constructor(private client: NatsJetStreamClient) {}
createOrder(): string {
this.client
.emit<OrderCreatedEvent>(ORDER_CREATED, {
id: 1,
product: 'Socks',
quantity: 1,
})
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order created.';
}
updateOrder(): string {
this.client
.emit<OrderUpdatedEvent>(ORDER_UPDATED, { id: 1, quantity: 10 })
.subscribe();
return 'order updated';
}
deleteOrder(): string {
this.client
.emit<OrderDeleteEvent>(ORDER_DELETED, { id: 1 })
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order deleted';
}
// request - response
accumulate(payload: number[]): Observable<number> {
const pattern = { cmd: 'sum' };
return this.client.send<number>(pattern, payload);
}
}
// app.controller.ts
import { NatsJetStreamContext } from '@supy-io/nestjs-jetstream';
import { Controller, Get } from '@nestjs/common';
import {
Ctx,
EventPattern,
MessagePattern,
Payload,
} from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
home(): string {
return 'Welcome to webshop';
}
@Get('/create')
createOrder(): string {
return this.appService.createOrder();
}
@Get('/update')
updateOrder(): string {
return this.appService.updateOrder();
}
@Get('/delete')
deleteOrder(): string {
return this.appService.deleteOrder();
}
// request - response
@Get('/sum')
calc() {
console.log('sum controller');
return this.appService.accumulate([1, 2, 3]);
}
@EventPattern('order.updated')
public async orderUpdatedHandler(
@Payload() data: string,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.created')
public async orderCreatedHandler(
@Payload() data: { id: number; name: string },
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.deleted')
public async orderDeletedHandler(
@Payload() data: any,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
// request - response
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
console.log('message conroller', data);
return (data || []).reduce((a, b) => a + b);
}
}
// main.js
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { CustomStrategy } from '@nestjs/microservices';
import { NatsJetStreamServer } from '@supy-io/nestjs-jetstream';
async function bootstrap() {
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-listener',
},
consumerOptions: {
deliverGroup: 'myservice-group',
durable: 'myservice-durable',
deliverTo: 'myservice-messages',
manualAck: true,
},
streamConfig: {
name: 'mystream',
subjects: ['order.*'],
},
}),
};
// hybrid microservice and web application
const app = await NestFactory.create(AppModule);
const microService = app.connectMicroservice(options);
microService.listen();
app.listen(3000);
}
bootstrap();
FAQs
Nats JetStream Transport for NestJS
The npm package @supy-io/nestjs-jetstream receives a total of 1 weekly downloads. As such, @supy-io/nestjs-jetstream popularity was classified as not popular.
We found that @supy-io/nestjs-jetstream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
Research
Security News
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.