@nestjs/microservices
Advanced tools
Comparing version 5.3.5 to 5.3.6
import { Logger } from '@nestjs/common/services/logger.service'; | ||
import { Observable } from 'rxjs'; | ||
import { MqttClient } from '../external/mqtt-client.interface'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
@@ -21,4 +21,4 @@ import { ClientProxy } from './client-proxy'; | ||
handleError(client: MqttClient): void; | ||
createResponseCallback(packet: ReadPacket & PacketId, callback: (packet: WritePacket) => any): (channel: string, buffer) => any; | ||
createResponseCallback(): (channel: string, buffer) => any; | ||
protected publish(partialPacket: ReadPacket, callback: (packet: WritePacket) => any): Function; | ||
} |
@@ -39,3 +39,3 @@ "use strict"; | ||
this.connection = this.mergeCloseEvent(this.mqttClient, connect$) | ||
.pipe(operators_1.share()) | ||
.pipe(operators_1.tap(() => this.mqttClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), operators_1.share()) | ||
.toPromise(); | ||
@@ -56,6 +56,7 @@ return this.connection; | ||
} | ||
createResponseCallback(packet, callback) { | ||
createResponseCallback() { | ||
return (channel, buffer) => { | ||
const { err, response, isDisposed, id } = JSON.parse(buffer.toString()); | ||
if (id !== packet.id) { | ||
const callback = this.routingMap.get(id); | ||
if (!callback) { | ||
return undefined; | ||
@@ -81,9 +82,12 @@ } | ||
const responseChannel = this.getResPatternName(pattern); | ||
const responseCallback = this.createResponseCallback(packet, callback); | ||
this.mqttClient.on(constants_1.MESSAGE_EVENT, responseCallback); | ||
this.mqttClient.subscribe(responseChannel); | ||
this.mqttClient.publish(this.getAckPatternName(pattern), JSON.stringify(packet)); | ||
this.mqttClient.subscribe(responseChannel, err => { | ||
if (err) { | ||
return; | ||
} | ||
this.routingMap.set(packet.id, callback); | ||
this.mqttClient.publish(this.getAckPatternName(pattern), JSON.stringify(packet)); | ||
}); | ||
return () => { | ||
this.mqttClient.unsubscribe(responseChannel); | ||
this.mqttClient.removeListener(constants_1.MESSAGE_EVENT, responseCallback); | ||
this.routingMap.delete(packet.id); | ||
}; | ||
@@ -90,0 +94,0 @@ } |
@@ -6,2 +6,3 @@ import { Observable, Observer } from 'rxjs'; | ||
abstract close(): any; | ||
protected routingMap: Map<string, Function>; | ||
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>; | ||
@@ -8,0 +9,0 @@ protected abstract publish(packet: ReadPacket, callback: (packet: WritePacket) => void): Function | void; |
@@ -10,2 +10,5 @@ "use strict"; | ||
class ClientProxy { | ||
constructor() { | ||
this.routingMap = new Map(); | ||
} | ||
send(pattern, data) { | ||
@@ -12,0 +15,0 @@ if (shared_utils_1.isNil(pattern) || shared_utils_1.isNil(data)) { |
import { Logger } from '@nestjs/common/services/logger.service'; | ||
import { Subject } from 'rxjs'; | ||
import { ClientOpts, RedisClient, RetryStrategyOptions } from '../external/redis.interface'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
@@ -13,2 +13,3 @@ import { ClientProxy } from './client-proxy'; | ||
protected subClient: RedisClient; | ||
protected connection: Promise<any>; | ||
private isExplicitlyTerminated; | ||
@@ -24,4 +25,4 @@ constructor(options: ClientOptions['options']); | ||
createRetryStrategy(options: RetryStrategyOptions, error$: Subject<Error>): undefined | number | Error; | ||
createResponseCallback(packet: ReadPacket & PacketId, callback: (packet: WritePacket) => any): Function; | ||
createResponseCallback(): Function; | ||
protected publish(partialPacket: ReadPacket, callback: (packet: WritePacket) => any): Function; | ||
} |
@@ -34,16 +34,15 @@ "use strict"; | ||
if (this.pubClient && this.subClient) { | ||
return Promise.resolve(); | ||
return this.connection; | ||
} | ||
return new Promise((resolve, reject) => { | ||
const error$ = new rxjs_1.Subject(); | ||
this.pubClient = this.createClient(error$); | ||
this.subClient = this.createClient(error$); | ||
this.handleError(this.pubClient); | ||
this.handleError(this.subClient); | ||
const pubConnect$ = rxjs_1.fromEvent(this.pubClient, constants_1.CONNECT_EVENT); | ||
const subClient$ = rxjs_1.fromEvent(this.subClient, constants_1.CONNECT_EVENT); | ||
rxjs_1.merge(error$, rxjs_1.zip(pubConnect$, subClient$)) | ||
.pipe(operators_1.take(1)) | ||
.subscribe(resolve, reject); | ||
}); | ||
const error$ = new rxjs_1.Subject(); | ||
this.pubClient = this.createClient(error$); | ||
this.subClient = this.createClient(error$); | ||
this.handleError(this.pubClient); | ||
this.handleError(this.subClient); | ||
const pubConnect$ = rxjs_1.fromEvent(this.pubClient, constants_1.CONNECT_EVENT); | ||
const subClient$ = rxjs_1.fromEvent(this.subClient, constants_1.CONNECT_EVENT); | ||
this.connection = rxjs_1.merge(error$, rxjs_1.zip(pubConnect$, subClient$)) | ||
.pipe(operators_1.take(1), operators_1.tap(() => this.subClient.on(constants_1.MESSAGE_EVENT, this.createResponseCallback())), operators_1.share()) | ||
.toPromise(); | ||
return this.connection; | ||
} | ||
@@ -75,7 +74,8 @@ createClient(error$) { | ||
} | ||
createResponseCallback(packet, callback) { | ||
createResponseCallback() { | ||
return (channel, buffer) => { | ||
const { err, response, isDisposed, id } = JSON.parse(buffer); | ||
if (id !== packet.id) { | ||
return undefined; | ||
const callback = this.routingMap.get(id); | ||
if (!callback) { | ||
return; | ||
} | ||
@@ -100,16 +100,12 @@ if (isDisposed || err) { | ||
const responseChannel = this.getResPatternName(pattern); | ||
const responseCallback = this.createResponseCallback(packet, callback); | ||
this.subClient.on(constants_1.MESSAGE_EVENT, responseCallback); | ||
this.subClient.subscribe(responseChannel); | ||
const handler = channel => { | ||
if (channel && channel !== responseChannel) { | ||
return undefined; | ||
this.routingMap.set(packet.id, callback); | ||
this.subClient.subscribe(responseChannel, err => { | ||
if (err) { | ||
return; | ||
} | ||
this.subClient.removeListener(constants_1.SUBSCRIBE, handler); | ||
}; | ||
this.subClient.on(constants_1.SUBSCRIBE, handler); | ||
this.pubClient.publish(this.getAckPatternName(pattern), JSON.stringify(packet)); | ||
this.pubClient.publish(this.getAckPatternName(pattern), JSON.stringify(packet)); | ||
}); | ||
return () => { | ||
this.subClient.unsubscribe(responseChannel); | ||
this.subClient.removeListener(constants_1.MESSAGE_EVENT, responseCallback); | ||
this.routingMap.delete(packet.id); | ||
}; | ||
@@ -116,0 +112,0 @@ } |
import * as JsonSocket from 'json-socket'; | ||
import { ReadPacket, WritePacket } from '../interfaces'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientTCP extends ClientProxy { | ||
protected connection: Promise<any>; | ||
private readonly logger; | ||
@@ -13,3 +14,3 @@ private readonly port; | ||
connect(): Promise<any>; | ||
handleResponse(callback: (packet: WritePacket) => any, buffer: WritePacket): void; | ||
handleResponse(buffer: WritePacket & PacketId): any; | ||
createSocket(): JsonSocket; | ||
@@ -16,0 +17,0 @@ close(): void; |
@@ -23,16 +23,21 @@ "use strict"; | ||
connect() { | ||
if (this.isConnected) { | ||
return Promise.resolve(); | ||
if (this.isConnected && this.connection) { | ||
return this.connection; | ||
} | ||
this.socket = this.createSocket(); | ||
return new Promise((resolve, reject) => { | ||
this.bindEvents(this.socket); | ||
this.connect$(this.socket._socket) | ||
.pipe(operators_1.tap(() => (this.isConnected = true))) | ||
.subscribe(resolve, reject); | ||
this.socket.connect(this.port, this.host); | ||
}); | ||
this.bindEvents(this.socket); | ||
const source$ = this.connect$(this.socket._socket).pipe(operators_1.tap(() => { | ||
this.isConnected = true; | ||
this.socket.on(constants_1.MESSAGE_EVENT, (buffer) => this.handleResponse(buffer)); | ||
}), operators_1.share()); | ||
this.socket.connect(this.port, this.host); | ||
this.connection = source$.toPromise(); | ||
return this.connection; | ||
} | ||
handleResponse(callback, buffer) { | ||
const { err, response, isDisposed } = buffer; | ||
handleResponse(buffer) { | ||
const { err, response, isDisposed, id } = buffer; | ||
const callback = this.routingMap.get(id); | ||
if (!callback) { | ||
return undefined; | ||
} | ||
if (isDisposed || err) { | ||
@@ -71,11 +76,5 @@ callback({ | ||
const packet = this.assignPacketId(partialPacket); | ||
const listener = (buffer) => { | ||
if (buffer.id !== packet.id) { | ||
return undefined; | ||
} | ||
this.handleResponse(callback, buffer); | ||
}; | ||
this.socket.on(constants_1.MESSAGE_EVENT, listener); | ||
this.routingMap.set(packet.id, callback); | ||
this.socket.sendMessage(packet); | ||
return () => this.socket._socket.removeListener(constants_1.MESSAGE_EVENT, listener); | ||
return () => this.routingMap.delete(packet.id); | ||
} | ||
@@ -82,0 +81,0 @@ catch (err) { |
@@ -0,3 +1,3 @@ | ||
import { Controller } from '@nestjs/common/interfaces/controllers/controller.interface'; | ||
import { InstanceWrapper } from '@nestjs/core/injector/container'; | ||
import { Controller } from '@nestjs/common/interfaces/controllers/controller.interface'; | ||
import { CustomTransportStrategy } from './interfaces'; | ||
@@ -12,4 +12,4 @@ import { Server } from './server/server'; | ||
bindListeners(controllers: Map<string, InstanceWrapper<Controller>>, server: Server & CustomTransportStrategy, module: string): void; | ||
bindClients(controllers: Map<string, InstanceWrapper<Controller>>): void; | ||
bindClients(items: Map<string, InstanceWrapper<Controller>>): void; | ||
close(): void; | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const listeners_controller_1 = require("./listeners-controller"); | ||
const runtime_exception_1 = require("@nestjs/core/errors/exceptions/runtime.exception"); | ||
const guards_consumer_1 = require("@nestjs/core/guards/guards-consumer"); | ||
const guards_context_creator_1 = require("@nestjs/core/guards/guards-context-creator"); | ||
const interceptors_consumer_1 = require("@nestjs/core/interceptors/interceptors-consumer"); | ||
const interceptors_context_creator_1 = require("@nestjs/core/interceptors/interceptors-context-creator"); | ||
const pipes_consumer_1 = require("@nestjs/core/pipes/pipes-consumer"); | ||
const pipes_context_creator_1 = require("@nestjs/core/pipes/pipes-context-creator"); | ||
const container_1 = require("./container"); | ||
const exception_filters_context_1 = require("./context/exception-filters-context"); | ||
const rpc_context_creator_1 = require("./context/rpc-context-creator"); | ||
const rpc_proxy_1 = require("./context/rpc-proxy"); | ||
const exception_filters_context_1 = require("./context/exception-filters-context"); | ||
const pipes_context_creator_1 = require("@nestjs/core/pipes/pipes-context-creator"); | ||
const pipes_consumer_1 = require("@nestjs/core/pipes/pipes-consumer"); | ||
const guards_context_creator_1 = require("@nestjs/core/guards/guards-context-creator"); | ||
const runtime_exception_1 = require("@nestjs/core/errors/exceptions/runtime.exception"); | ||
const guards_consumer_1 = require("@nestjs/core/guards/guards-consumer"); | ||
const interceptors_context_creator_1 = require("@nestjs/core/interceptors/interceptors-context-creator"); | ||
const interceptors_consumer_1 = require("@nestjs/core/interceptors/interceptors-consumer"); | ||
const listeners_controller_1 = require("./listeners-controller"); | ||
class MicroservicesModule { | ||
@@ -43,4 +43,4 @@ constructor() { | ||
} | ||
bindClients(controllers) { | ||
controllers.forEach(({ instance, isNotMetatype }) => { | ||
bindClients(items) { | ||
items.forEach(({ instance, isNotMetatype }) => { | ||
!isNotMetatype && | ||
@@ -47,0 +47,0 @@ this.listenersController.bindClientsToProperties(instance); |
{ | ||
"name": "@nestjs/microservices", | ||
"version": "5.3.5", | ||
"version": "5.3.6", | ||
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)", | ||
@@ -5,0 +5,0 @@ "author": "Kamil Mysliwiec", |
3137
138004