@nestjs-plugins/nestjs-nats-jetstream-transport
Advanced tools
Comparing version 1.0.12 to 1.1.0
import { ClientProxy, ReadPacket, WritePacket } from "@nestjs/microservices"; | ||
import { NatsConnection } from "nats"; | ||
import { NatsJetStreamClientOptions } from "./interfaces"; | ||
import { NatsJetStreamClientOptions } from "./interfaces/nats-jetstream-client-options.interface"; | ||
export declare class NatsJetStreamClientProxy extends ClientProxy { | ||
@@ -5,0 +5,0 @@ private options; |
export * from "./server"; | ||
export * from "./client"; | ||
export * from "./nats-jetstream-transport.module"; | ||
export * from "./interfaces"; | ||
export * from "./interfaces/nats-jetstream-client-options.interface"; | ||
export * from "./interfaces/nats-jetstream-server-options.interface"; | ||
export * from "./interfaces/server-consumer-options.interface"; | ||
export * from "./utils/server-consumer-options-builder"; | ||
export * from "./nats-jetstream.context"; |
@@ -16,5 +16,7 @@ "use strict"; | ||
__exportStar(require("./nats-jetstream-transport.module"), exports); | ||
__exportStar(require("./interfaces"), exports); | ||
__exportStar(require("./interfaces/nats-jetstream-client-options.interface"), exports); | ||
__exportStar(require("./interfaces/nats-jetstream-server-options.interface"), exports); | ||
__exportStar(require("./interfaces/server-consumer-options.interface"), exports); | ||
__exportStar(require("./utils/server-consumer-options-builder"), exports); | ||
__exportStar(require("./nats-jetstream.context"), exports); | ||
//# sourceMappingURL=index.js.map |
import { DynamicModule } from "@nestjs/common"; | ||
import { NatsJetStreamClientOptions } from "./interfaces"; | ||
import { NatsJetStreamClientOptions } from "./interfaces/nats-jetstream-client-options.interface"; | ||
export declare class NatsJetStreamTransport { | ||
@@ -4,0 +4,0 @@ static register(options: NatsJetStreamClientOptions): DynamicModule; |
import { CustomTransportStrategy, Server } from "@nestjs/microservices"; | ||
import { NatsJetStreamServerOptions } from "./interfaces"; | ||
import { NatsJetStreamServerOptions } from "./interfaces/nats-jetstream-server-options.interface"; | ||
export declare class NatsJetStreamServer extends Server implements CustomTransportStrategy { | ||
@@ -10,7 +10,4 @@ private options; | ||
close(): Promise<void>; | ||
private createConsumerOptions; | ||
private bindEventHandlers; | ||
private filterEventHandlers; | ||
private bindMessageHandlers; | ||
private filterMessageHandlers; | ||
} |
@@ -31,17 +31,12 @@ "use strict"; | ||
async close() { | ||
await this.nc.close(); | ||
await this.nc.drain(); | ||
this.nc.close(); | ||
} | ||
createConsumerOptions(subject) { | ||
const opts = (0, server_consumer_options_builder_1.serverConsumerOptionsBuilder)(this.options.consumerOptions); | ||
if (this.options.consumerOptions.durable) { | ||
opts.durable(`${this.options.id}-${subject.replace(".", "_").replace("*", "_ALL")}`); | ||
} | ||
return opts; | ||
} | ||
async bindEventHandlers() { | ||
const eventHandlers = this.filterEventHandlers(); | ||
const eventHandlers = [...this.messageHandlers.entries()].filter(([, handler]) => handler.isEventHandler); | ||
const js = this.nc.jetstream(this.options.jetStreamOptions); | ||
eventHandlers.forEach(async ([subject, eventHandler]) => { | ||
var e_1, _a; | ||
const consumerOptions = this.createConsumerOptions(subject); | ||
const consumerOptions = (0, server_consumer_options_builder_1.serverConsumerOptionsBuilder)(this.options.consumerOptions, subject); | ||
console.log(consumerOptions); | ||
const subscription = await js.subscribe(subject, consumerOptions); | ||
@@ -72,11 +67,6 @@ this.logger.log(`Subscribed to ${subject} events`); | ||
} | ||
filterEventHandlers() { | ||
const eventHandlers = [...this.messageHandlers.entries()].filter(([, handler]) => handler.isEventHandler); | ||
return eventHandlers; | ||
} | ||
bindMessageHandlers() { | ||
const messageHandlers = this.filterMessageHandlers(); | ||
const messageHandlers = [...this.messageHandlers.entries()].filter(([, handler]) => !handler.isEventHandler); | ||
messageHandlers.forEach(async ([subject, messageHandler]) => { | ||
const subscriptionOptions = { | ||
queue: this.options.consumerOptions.deliverTo, | ||
callback: async (err, msg) => { | ||
@@ -96,8 +86,4 @@ if (err) { | ||
} | ||
filterMessageHandlers() { | ||
const eventHandlers = [...this.messageHandlers.entries()].filter(([, handler]) => !handler.isEventHandler); | ||
return eventHandlers; | ||
} | ||
} | ||
exports.NatsJetStreamServer = NatsJetStreamServer; | ||
//# sourceMappingURL=server.js.map |
@@ -1,2 +0,2 @@ | ||
import { ServerConsumerOptions } from "src/interfaces"; | ||
export declare function serverConsumerOptionsBuilder(serverConsumerOptions: ServerConsumerOptions): import("nats").ConsumerOptsBuilder; | ||
import { ServerConsumerOptions } from "src/interfaces/server-consumer-options.interface"; | ||
export declare function serverConsumerOptionsBuilder(serverConsumerOptions: ServerConsumerOptions, subject: string): import("nats").ConsumerOptsBuilder; |
@@ -5,4 +5,4 @@ "use strict"; | ||
const nats_1 = require("nats"); | ||
function serverConsumerOptionsBuilder(serverConsumerOptions) { | ||
const { deliverGroup, deliverToSubject, deliverTo, manualAck, ackPolicy, deliverPolicy, description, filterSubject, flowControl, headersOnly, idleHeartbeat, limit, maxAckPending, maxDeliver, maxMessages, maxWaiting, orderedConsumer, replayPolicy, sample, startAtTimeDelta, startSequence, startTime, } = serverConsumerOptions; | ||
function serverConsumerOptionsBuilder(serverConsumerOptions, subject) { | ||
const { deliverGroup, deliverToSubject, deliverTo, manualAck, ackPolicy, deliverPolicy, description, durable, filterSubject, flowControl, headersOnly, idleHeartbeat, limit, maxAckPending, maxDeliver, maxMessages, maxWaiting, orderedConsumer, replayPolicy, sample, startAtTimeDelta, startSequence, startTime, } = serverConsumerOptions; | ||
const opts = (0, nats_1.consumerOpts)(); | ||
@@ -21,2 +21,3 @@ deliverGroup && opts.deliverGroup(deliverGroup); | ||
description && opts.description(description); | ||
durable && opts.durable(`${durable}-${subject.replace(".", "_").replace("*", "_ALL")}`); | ||
filterSubject && opts.filterSubject(filterSubject); | ||
@@ -23,0 +24,0 @@ flowControl && opts.flowControl(); |
{ | ||
"name": "@nestjs-plugins/nestjs-nats-jetstream-transport", | ||
"version": "1.0.12", | ||
"version": "1.1.0", | ||
"description": "Nats JetStream Transport for NestJS", | ||
@@ -44,3 +44,3 @@ "main": "dist/index.js", | ||
}, | ||
"gitHead": "7bbfda99dd0193a1f007f5e882b891382e20f362" | ||
"gitHead": "6d901b84b42e304c45e00e60713886d117960d65" | ||
} |
# 🚀 Nats JetStream Transport Module for NestJS | ||
Build Event Driven Microservices Architecture with Nats JetSteam Server and NestJS. | ||
Build Event Driven Microservices Architecture with Nats JetStream Server and NestJS. | ||
@@ -58,3 +58,2 @@ - At-least-once delivery; exactly once within a window | ||
- **id**: string | ||
- **connectionOptions**: ConnectionOptions | ||
@@ -72,3 +71,3 @@ - **serverConsumerOptions**: ServerConsumerOptions | ||
- **durable**: boolean (default: false) - Durable subscriptions remember their position even if the client is disconnected. | ||
- **durable**: string - Durable subscriptions remember their position even if the client is disconnected. | ||
- **deliveryPolicy**: All | Last | New | ByStartSequence | ByStartTime | last_per_subject (default: All) - Specify where in the stream it wants to start receiving messages. | ||
@@ -78,3 +77,3 @@ - **startSequence**: number - If deliveryPolicy is set to ByStartSequence this will specify the sequence number to start on. | ||
- **startTime**: Date - If deliveryPolicy is set to ByStartTime this will specify the time in the stream at which to start. It will receive the closest available message on or after that time. | ||
- **deliverTo**: string - Queue group, a balanced message delivery across a group of subscribers. | ||
- **deliverTo**: string - Creates a unique delivery_subject prefix with this. | ||
- **deliverToSubject**: string - The subject to deliver observed messages. Not allowed for pull subscriptions. Deliver subject is required for queue subscribing as it configures a subject that all the queue consumers should listen on. | ||
@@ -155,3 +154,4 @@ - **ackPolicy**: Explicit | All | None (default: Excplicit ) - How messages should be acknowledged. If an ack is required but is not received within the AckWait window, the message will be redelivered. Excplicit is the only allowed option for pull consumers. | ||
connectionOptions: { | ||
servers: '127.0.0.1' | ||
servers: 'localhost:4222', | ||
name: 'myservice-publisher' | ||
} | ||
@@ -224,2 +224,8 @@ }), | ||
} | ||
// request - response | ||
accumulate(payload: number[]): Observable<number> { | ||
const pattern = { cmd: 'sum' }; | ||
return this.client.send<number>(pattern, payload); | ||
} | ||
} | ||
@@ -260,2 +266,9 @@ ``` | ||
// request - response | ||
@Get('/sum') | ||
calc() { | ||
console.log('sum controller') | ||
return this.appService.accumulate([1,2,3]) | ||
} | ||
@EventPattern('order.updated') | ||
@@ -287,2 +300,9 @@ public async orderUpdatedHandler( | ||
} | ||
// request - response | ||
@MessagePattern({ cmd: 'sum' }) | ||
async accumulate(data: number[]): Promise<number> { | ||
console.log('message conroller', data) | ||
return (data || []).reduce((a, b) => a + b); | ||
} | ||
} | ||
@@ -302,8 +322,10 @@ ``` | ||
strategy: new NatsJetStreamServer({ | ||
id: 'my-service', | ||
connectionOptions: {}, | ||
connectionOptions: { | ||
servers: 'localhost' | ||
name: 'myservice-listener' | ||
}, | ||
consumerOptions: { | ||
deliverGroup: 'test-service-group', | ||
deliverGroup: 'myservice-group', | ||
durable: true, | ||
deliverTo: 'my-service', | ||
deliverTo: 'myservice', | ||
manualAck: true, | ||
@@ -310,0 +332,0 @@ }, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
136557
33
336
372