Security News
Opengrep Emerges as Open Source Alternative Amid Semgrep Licensing Controversy
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
@supy-io/nestjs-jetstream
Advanced tools
Breakable changes in v1.3..x
Replace NatsJetStreamClientProxy with NatsJetStreamClient
Build Event Driven Microservices Architecture with Nats JetStream Server and NestJS.
Support for both request-response and event based pattern.
npm i @nestjs/microservices
npm i nats
npm i @supy-io/nestjs-jetstream
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats -js -m 8222
Install cli tool on MacOS
brew install nats-io/nats-tools/nats
For other platforms see alternative installation methods.
To try the code example below, add a stream to the nats server:
nats stream add
Enter a stream name e.g. mystream. Then as subjects use order.*
For the rest of the choices just press enter and use the defaults.
You can also automatically create a stream by defining a streamConfig object to the NatsJestStreamOptions object. This will create a new stream or update existing. The code example bellow has this object defined so there is not really necessary to add this stream through nats cli.
You are now ready to publish and consume events on the stream. See the code example below for a test drive.
All
, ByStartSequence
or ByStartTime
since those deliver policies begin reading the stream at a position other than the end. If the policy is Original
, the messages in the stream will be pushed to the client at the same rate they were originally received, simulating the original timing of messages. If the policy is Instant
(the default), the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client’s ability to consume those messages.Nats-Msg-Size
header, no bodies.true
, the client prints protocol interactions to the console. Useful for debugging.true
the client will ignore any cluster updates provided by the server.createInbox(prefix)
()=>number
.connection_timeout
event with a NatsError that provides the hostport of the server where the connection was attempted.+OK
protocol acknowledgements.true
the client will fall back to a reconnect mode if it fails its first connection attempt.File
and Memory
Limits
, Interest
or WorkQueue
// app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { NatsJetStreamTransport } from '@supy-io/nestjs-jetstream';
@Module({
imports: [
NatsJetStreamTransport.register({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-publisher',
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
// app.service.ts
import { NatsJetStreamClient } from '@supy-io/nestjs-jetstream';
import { Injectable } from '@nestjs/common';
import { PubAck } from 'nats';
import { Observable } from 'rxjs';
interface OrderCreatedEvent {
id: number;
product: string;
quantity: number;
}
interface OrderUpdatedEvent {
id: number;
quantity: number;
}
interface OrderDeleteEvent {
id: number;
}
const ORDER_CREATED = 'order.created';
const ORDER_UPDATED = 'order.updated';
const ORDER_DELETED = 'order.deleted';
@Injectable()
export class AppService {
constructor(private client: NatsJetStreamClient) {}
createOrder(): string {
this.client
.emit<OrderCreatedEvent>(ORDER_CREATED, {
id: 1,
product: 'Socks',
quantity: 1,
})
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order created.';
}
updateOrder(): string {
this.client
.emit<OrderUpdatedEvent>(ORDER_UPDATED, { id: 1, quantity: 10 })
.subscribe();
return 'order updated';
}
deleteOrder(): string {
this.client
.emit<OrderDeleteEvent>(ORDER_DELETED, { id: 1 })
.subscribe((pubAck) => {
console.log(pubAck);
});
return 'order deleted';
}
// request - response
accumulate(payload: number[]): Observable<number> {
const pattern = { cmd: 'sum' };
return this.client.send<number>(pattern, payload);
}
}
// app.controller.ts
import { NatsJetStreamContext } from '@supy-io/nestjs-jetstream';
import { Controller, Get } from '@nestjs/common';
import {
Ctx,
EventPattern,
MessagePattern,
Payload,
} from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
home(): string {
return 'Welcome to webshop';
}
@Get('/create')
createOrder(): string {
return this.appService.createOrder();
}
@Get('/update')
updateOrder(): string {
return this.appService.updateOrder();
}
@Get('/delete')
deleteOrder(): string {
return this.appService.deleteOrder();
}
// request - response
@Get('/sum')
calc() {
console.log('sum controller');
return this.appService.accumulate([1, 2, 3]);
}
@EventPattern('order.updated')
public async orderUpdatedHandler(
@Payload() data: string,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.created')
public async orderCreatedHandler(
@Payload() data: { id: number; name: string },
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
@EventPattern('order.deleted')
public async orderDeletedHandler(
@Payload() data: any,
@Ctx() context: NatsJetStreamContext,
) {
context.message.ack();
console.log('received: ' + context.message.subject, data);
}
// request - response
@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
console.log('message conroller', data);
return (data || []).reduce((a, b) => a + b);
}
}
// main.js
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { CustomStrategy } from '@nestjs/microservices';
import { NatsJetStreamServer } from '@supy-io/nestjs-jetstream';
async function bootstrap() {
const options: CustomStrategy = {
strategy: new NatsJetStreamServer({
connectionOptions: {
servers: 'localhost:4222',
name: 'myservice-listener',
},
consumerOptions: {
deliverGroup: 'myservice-group',
durable: 'myservice-durable',
deliverTo: 'myservice-messages',
manualAck: true,
},
streamConfig: {
name: 'mystream',
subjects: ['order.*'],
},
}),
};
// hybrid microservice and web application
const app = await NestFactory.create(AppModule);
const microService = app.connectMicroservice(options);
microService.listen();
app.listen(3000);
}
bootstrap();
FAQs
Nats JetStream Transport for NestJS
The npm package @supy-io/nestjs-jetstream receives a total of 9 weekly downloads. As such, @supy-io/nestjs-jetstream popularity was classified as not popular.
We found that @supy-io/nestjs-jetstream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.