@nestjs-plugins/nestjs-nats-jetstream-transport
Advanced tools
Comparing version 2.0.0 to 2.1.3
@@ -9,3 +9,3 @@ import { JetStreamOptions } from 'nats'; | ||
jetStreamOptions?: JetStreamOptions; | ||
streamConfig?: NatsStreamConfig; | ||
streamConfig?: NatsStreamConfig | NatsStreamConfig[]; | ||
} |
import { StreamConfig } from 'nats'; | ||
type $StreamConfig = Pick<StreamConfig, 'storage' | 'retention' | 'discard' | 'max_msgs' | 'max_msgs_per_subject' | 'max_msg_size' | 'max_age' | 'duplicate_window' | 'num_replicas'>; | ||
type $StreamConfig = Pick<StreamConfig, 'storage' | 'retention' | 'discard' | 'max_msgs' | 'max_msgs_per_subject' | 'max_msg_size' | 'max_age' | 'duplicate_window' | 'num_replicas' | 'max_bytes'>; | ||
export interface NatsStreamConfig extends Partial<$StreamConfig> { | ||
@@ -4,0 +4,0 @@ name: string; |
@@ -48,3 +48,10 @@ "use strict"; | ||
const context = new nats_jetstream_context_1.NatsJetStreamContext([msg]); | ||
this.send((0, rxjs_1.from)(eventHandler(data, context)), () => null); | ||
const resultOrStream = await eventHandler(data, context); | ||
if ((0, rxjs_1.isObservable)(resultOrStream)) { | ||
const connectableSource = (0, rxjs_1.connectable)(resultOrStream, { | ||
connector: () => new rxjs_1.Subject(), | ||
resetOnDisconnect: false, | ||
}); | ||
connectableSource.connect(); | ||
} | ||
} | ||
@@ -88,19 +95,24 @@ catch (err) { | ||
const streams = await this.jsm.streams.list().next(); | ||
const stream = streams.find((stream) => stream.config.name === streamConfig.name); | ||
if (stream) { | ||
const streamSubjects = new Set([ | ||
...stream.config.subjects, | ||
...streamConfig.subjects, | ||
]); | ||
const streamInfo = await this.jsm.streams.update(stream.config.name, { | ||
...stream.config, | ||
...streamConfig, | ||
subjects: [...streamSubjects.keys()], | ||
}); | ||
this.logger.log(`Stream ${streamInfo.config.name} updated`); | ||
const reqStreamConfigs = !Array.isArray(streamConfig) | ||
? [streamConfig] | ||
: streamConfig; | ||
for (const requiredStreamConfig of reqStreamConfigs) { | ||
const stream = streams.find((stream) => stream.config.name === requiredStreamConfig.name); | ||
if (stream) { | ||
const streamSubjects = new Set([ | ||
...stream.config.subjects, | ||
...requiredStreamConfig.subjects, | ||
]); | ||
const streamInfo = await this.jsm.streams.update(stream.config.name, { | ||
...stream.config, | ||
...requiredStreamConfig, | ||
subjects: [...streamSubjects.keys()], | ||
}); | ||
this.logger.log(`Stream ${streamInfo.config.name} updated`); | ||
} | ||
else { | ||
const streamInfo = await this.jsm.streams.add(requiredStreamConfig); | ||
this.logger.log(`Stream ${streamInfo.config.name} created`); | ||
} | ||
} | ||
else { | ||
const streamInfo = await this.jsm.streams.add(streamConfig); | ||
this.logger.log(`Stream ${streamInfo.config.name} created`); | ||
} | ||
} | ||
@@ -107,0 +119,0 @@ } |
@@ -22,3 +22,3 @@ "use strict"; | ||
durable && | ||
opts.durable(`${durable}-${subject.replaceAll('.', '_').replaceAll('*', '_ALL')}`); | ||
opts.durable(`${durable}-${subject.replaceAll('.', '_').replaceAll('*', '_ALL').replaceAll('>', '_ALL')}`); | ||
filterSubject && opts.filterSubject(filterSubject); | ||
@@ -25,0 +25,0 @@ flowControl && opts.flowControl(); |
{ | ||
"name": "@nestjs-plugins/nestjs-nats-jetstream-transport", | ||
"version": "2.0.0", | ||
"version": "2.1.3", | ||
"description": "Nats JetStream Transport for NestJS", | ||
@@ -35,3 +35,2 @@ "main": "dist/index.js", | ||
"dependencies": { | ||
"@nestjs-plugins/nestjs-nats-jetstream-transport": "^1.3.15", | ||
"nats": "^2.6.1", | ||
@@ -52,3 +51,3 @@ "reflect-metadata": "^0.1.13", | ||
}, | ||
"gitHead": "9436ea4bcb1c1645ed723607696e4a4a746f4dca" | ||
"gitHead": "9e421b9dc56d345d607c19f74570164e08679224" | ||
} |
@@ -350,2 +350,9 @@ # ๐ Nats JetStream Transport Module for NestJS | ||
}, | ||
// streamConfig: [{ | ||
// name: 'mystream', | ||
// subjects: ['order.*'], | ||
// },{ | ||
// name: 'myOtherStream', | ||
// subjects: ['other.*'], | ||
// }], | ||
}), | ||
@@ -352,0 +359,0 @@ }; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
181610
3
584
368
- Removed@nestjs-plugins/nestjs-nats-jetstream-transport@^1.3.15
- Removed@nestjs-plugins/nestjs-nats-jetstream-transport@1.4.4(transitive)