@nestjs/microservices
Advanced tools
Comparing version 6.2.4 to 6.3.0-next.1
import { Logger } from '@nestjs/common/services/logger.service'; | ||
import { Observable } from 'rxjs'; | ||
import { ClientGrpc } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { ClientGrpc, GrpcOptions } from '../interfaces'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientGrpcProxy extends ClientProxy implements ClientGrpc { | ||
protected readonly options: ClientOptions['options']; | ||
protected readonly options: GrpcOptions['options']; | ||
protected readonly logger: Logger; | ||
protected readonly url: string; | ||
protected grpcClient: any; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: GrpcOptions['options']); | ||
getService<T extends {}>(name: string): T; | ||
@@ -13,0 +12,0 @@ createServiceMethod(client: any, methodName: string): (...args: any[]) => Observable<any>; |
@@ -20,6 +20,4 @@ "use strict"; | ||
this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name); | ||
this.url = | ||
this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL; | ||
const protoLoader = this.getOptionsProp(options, 'protoLoader') || | ||
constants_1.GRPC_DEFAULT_PROTO_LOADER; | ||
this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL; | ||
const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER; | ||
grpcPackage = load_package_util_1.loadPackage('grpc', ClientGrpcProxy.name, () => require('grpc')); | ||
@@ -30,4 +28,10 @@ grpcProtoLoaderPackage = load_package_util_1.loadPackage(protoLoader, ClientGrpcProxy.name); | ||
getService(name) { | ||
const maxSendMessageLengthKey = 'grpc.max_send_message_length'; | ||
const maxReceiveMessageLengthKey = 'grpc.max_receive_message_length'; | ||
const maxMessageLengthOptions = { | ||
[maxSendMessageLengthKey]: this.getOptionsProp(this.options, 'maxSendMessageLength', constants_1.GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH), | ||
[maxReceiveMessageLengthKey]: this.getOptionsProp(this.options, 'maxReceiveMessageLength', constants_1.GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH), | ||
}; | ||
const options = shared_utils_1.isObject(this.options) | ||
? Object.assign({}, this.options, { loader: '' }) : {}; | ||
? Object.assign({}, this.options, maxMessageLengthOptions, { loader: '' }) : Object.assign({}, maxMessageLengthOptions); | ||
if (!this.grpcClient[name]) { | ||
@@ -34,0 +38,0 @@ throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException(); |
@@ -5,7 +5,6 @@ /// <reference types="node" /> | ||
import { MqttClient } from '../external/mqtt-client.interface'; | ||
import { ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { MqttOptions, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientMqtt extends ClientProxy { | ||
protected readonly options: ClientOptions['options']; | ||
protected readonly options: MqttOptions['options']; | ||
protected readonly logger: Logger; | ||
@@ -15,3 +14,3 @@ protected readonly url: string; | ||
protected connection: Promise<any>; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: MqttOptions['options']); | ||
getAckPatternName(pattern: string): string; | ||
@@ -18,0 +17,0 @@ getResPatternName(pattern: string): string; |
@@ -16,4 +16,3 @@ "use strict"; | ||
this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name); | ||
this.url = | ||
this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL; | ||
this.url = this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL; | ||
mqttPackage = load_package_util_1.loadPackage('mqtt', ClientMqtt.name, () => require('mqtt')); | ||
@@ -20,0 +19,0 @@ } |
import { Logger } from '@nestjs/common/services/logger.service'; | ||
import { Client } from '../external/nats-client.interface'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientNats extends ClientProxy { | ||
protected readonly options: ClientOptions['options']; | ||
protected readonly options: NatsOptions['options']; | ||
protected readonly logger: Logger; | ||
@@ -12,3 +11,3 @@ protected readonly url: string; | ||
protected connection: Promise<any>; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: NatsOptions['options']); | ||
close(): void; | ||
@@ -15,0 +14,0 @@ connect(): Promise<any>; |
@@ -15,4 +15,3 @@ "use strict"; | ||
this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name); | ||
this.url = | ||
this.getOptionsProp(this.options, 'url') || constants_1.NATS_DEFAULT_URL; | ||
this.url = this.getOptionsProp(this.options, 'url') || constants_1.NATS_DEFAULT_URL; | ||
natsPackage = load_package_util_1.loadPackage('nats', ClientNats.name, () => require('nats')); | ||
@@ -76,5 +75,5 @@ } | ||
const pattern = this.normalizePattern(packet.pattern); | ||
return new Promise((resolve, reject) => this.natsClient.publish(pattern, packet, err => (err ? reject(err) : resolve()))); | ||
return new Promise((resolve, reject) => this.natsClient.publish(pattern, packet, err => err ? reject(err) : resolve())); | ||
} | ||
} | ||
exports.ClientNats = ClientNats; |
@@ -0,3 +1,5 @@ | ||
import { Transport } from '../enums/transport.enum'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { Closeable } from '../interfaces/closeable.interface'; | ||
import { ClientGrpcProxy } from './client-grpc'; | ||
import { ClientProxy } from './client-proxy'; | ||
@@ -8,3 +10,6 @@ export interface IClientProxyFactory { | ||
export declare class ClientProxyFactory { | ||
static create(clientOptions: { | ||
transport: Transport.GRPC; | ||
} & ClientOptions): ClientGrpcProxy; | ||
static create(clientOptions: ClientOptions): ClientProxy & Closeable; | ||
} |
@@ -14,6 +14,4 @@ import { Observable, Observer } from 'rxjs'; | ||
protected connect$(instance: any, errorEvent?: string, connectEvent?: string): Observable<any>; | ||
protected getOptionsProp<T extends { | ||
options?: any; | ||
}>(obj: ClientOptions['options'], prop: keyof T['options'], defaultValue?: any): any; | ||
protected getOptionsProp<T extends ClientOptions['options'], K extends keyof T>(obj: T, prop: K, defaultValue?: T[K]): T[K]; | ||
protected normalizePattern<T = any>(pattern: T): string; | ||
} |
@@ -51,8 +51,8 @@ "use strict"; | ||
getOptionsProp(obj, prop, defaultValue = undefined) { | ||
return obj ? obj[prop] : defaultValue; | ||
return (obj && obj[prop]) || defaultValue; | ||
} | ||
normalizePattern(pattern) { | ||
return shared_utils_1.isString(pattern) ? pattern : JSON.stringify(pattern); | ||
return (shared_utils_1.isString(pattern) ? pattern : JSON.stringify(pattern)); | ||
} | ||
} | ||
exports.ClientProxy = ClientProxy; |
import { Logger } from '@nestjs/common/services/logger.service'; | ||
import { Subject } from 'rxjs'; | ||
import { ClientOpts, RedisClient, RetryStrategyOptions } from '../external/redis.interface'; | ||
import { ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { ReadPacket, RedisOptions, WritePacket } from '../interfaces'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientRedis extends ClientProxy { | ||
protected readonly options: ClientOptions['options']; | ||
protected readonly options: RedisOptions['options']; | ||
protected readonly logger: Logger; | ||
@@ -15,3 +14,3 @@ protected readonly url: string; | ||
protected isExplicitlyTerminated: boolean; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: RedisOptions['options']); | ||
getAckPatternName(pattern: string): string; | ||
@@ -18,0 +17,0 @@ getResPatternName(pattern: string): string; |
@@ -17,4 +17,3 @@ "use strict"; | ||
this.isExplicitlyTerminated = false; | ||
this.url = | ||
this.getOptionsProp(options, 'url') || constants_1.REDIS_DEFAULT_URL; | ||
this.url = this.getOptionsProp(options, 'url') || constants_1.REDIS_DEFAULT_URL; | ||
redisPackage = load_package_util_1.loadPackage('redis', ClientRedis.name, () => require('redis')); | ||
@@ -69,4 +68,3 @@ } | ||
!this.getOptionsProp(this.options, 'retryAttempts') || | ||
options.attempt > | ||
this.getOptionsProp(this.options, 'retryAttempts')) { | ||
options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) { | ||
return undefined; | ||
@@ -119,5 +117,5 @@ } | ||
const pattern = this.normalizePattern(packet.pattern); | ||
return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(packet), err => (err ? reject(err) : resolve()))); | ||
return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(packet), err => err ? reject(err) : resolve())); | ||
} | ||
} | ||
exports.ClientRedis = ClientRedis; |
@@ -5,7 +5,7 @@ /// <reference types="node" /> | ||
import { Observable } from 'rxjs'; | ||
import { ClientOptions, ReadPacket } from '../interfaces'; | ||
import { ReadPacket, RmqOptions } from '../interfaces'; | ||
import { WritePacket } from './../interfaces'; | ||
import { ClientProxy } from './client-proxy'; | ||
export declare class ClientRMQ extends ClientProxy { | ||
protected readonly options: ClientOptions['options']; | ||
protected readonly options: RmqOptions['options']; | ||
protected readonly logger: Logger; | ||
@@ -19,3 +19,3 @@ protected connection: Promise<any>; | ||
protected responseEmitter: EventEmitter; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: RmqOptions['options']); | ||
close(): void; | ||
@@ -22,0 +22,0 @@ consumeChannel(): void; |
@@ -20,8 +20,5 @@ "use strict"; | ||
this.channel = null; | ||
this.urls = this.getOptionsProp(this.options, 'urls') || [ | ||
constants_1.RQM_DEFAULT_URL, | ||
]; | ||
this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL]; | ||
this.queue = | ||
this.getOptionsProp(this.options, 'queue') || | ||
constants_1.RQM_DEFAULT_QUEUE; | ||
this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE; | ||
this.queueOptions = | ||
@@ -38,3 +35,6 @@ this.getOptionsProp(this.options, 'queueOptions') || | ||
consumeChannel() { | ||
this.channel.addSetup((channel) => channel.consume(REPLY_QUEUE, (msg) => this.responseEmitter.emit(msg.properties.correlationId, msg), { noAck: true })); | ||
const noAck = this.getOptionsProp(this.options, 'noAck') || constants_1.RQM_DEFAULT_NOACK; | ||
this.channel.addSetup((channel) => channel.consume(REPLY_QUEUE, (msg) => this.responseEmitter.emit(msg.properties.correlationId, msg), { | ||
noAck, | ||
})); | ||
} | ||
@@ -41,0 +41,0 @@ connect() { |
@@ -0,4 +1,4 @@ | ||
import { JsonSocket } from '../helpers/json-socket'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { ClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { JsonSocket } from '../helpers/json-socket'; | ||
import { TcpClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { ClientProxy } from './client-proxy'; | ||
@@ -12,3 +12,3 @@ export declare class ClientTCP extends ClientProxy { | ||
private socket; | ||
constructor(options: ClientOptions['options']); | ||
constructor(options: TcpClientOptions['options']); | ||
connect(): Promise<any>; | ||
@@ -15,0 +15,0 @@ handleResponse(buffer: WritePacket & PacketId): void; |
@@ -15,8 +15,4 @@ "use strict"; | ||
this.isConnected = false; | ||
this.port = | ||
this.getOptionsProp(options, 'port') || | ||
constants_1.TCP_DEFAULT_PORT; | ||
this.host = | ||
this.getOptionsProp(options, 'host') || | ||
constants_1.TCP_DEFAULT_HOST; | ||
this.port = this.getOptionsProp(options, 'port') || constants_1.TCP_DEFAULT_PORT; | ||
this.host = this.getOptionsProp(options, 'host') || constants_1.TCP_DEFAULT_HOST; | ||
} | ||
@@ -23,0 +19,0 @@ connect() { |
@@ -24,2 +24,3 @@ export declare const TCP_DEFAULT_PORT = 3000; | ||
export declare const RQM_DEFAULT_QUEUE_OPTIONS: {}; | ||
export declare const RQM_DEFAULT_NOACK = true; | ||
export declare const GRPC_DEFAULT_PROTO_LOADER = "@grpc/proto-loader"; | ||
@@ -26,0 +27,0 @@ export declare const NO_MESSAGE_HANDLER = "There is no matching message handler defined in the remote service."; |
@@ -26,2 +26,3 @@ "use strict"; | ||
exports.RQM_DEFAULT_QUEUE_OPTIONS = {}; | ||
exports.RQM_DEFAULT_NOACK = true; | ||
exports.GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader'; | ||
@@ -28,0 +29,0 @@ exports.NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`; |
import { PatternMetadata } from '../interfaces/pattern-metadata.interface'; | ||
export declare enum GrpcMethodStreamingType { | ||
NO_STREAMING = "no_stream", | ||
RX_STREAMING = "rx_stream", | ||
PT_STREAMING = "pt_stream" | ||
} | ||
/** | ||
@@ -11,5 +16,28 @@ * Subscribes to incoming messages which fulfils chosen pattern. | ||
export declare function GrpcMethod(service: string, method?: string): MethodDecorator; | ||
export declare function createMethodMetadata(target: any, key: string | symbol, service: string | undefined, method: string | undefined): { | ||
/** | ||
* Registers gRPC call through RX handler for service and method | ||
* | ||
* @param service String parameter reflecting the name of service definition from proto file | ||
*/ | ||
export declare function GrpcStreamMethod(service?: string): any; | ||
/** | ||
* @param service String parameter reflecting the name of service definition from proto file | ||
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword | ||
*/ | ||
export declare function GrpcStreamMethod(service: string, method?: string): any; | ||
/** | ||
* Registers gRPC call pass through handler for service and method | ||
* | ||
* @param service String parameter reflecting the name of service definition from proto file | ||
*/ | ||
export declare function GrpcStreamCall(service?: string): any; | ||
/** | ||
* @param service String parameter reflecting the name of service definition from proto file | ||
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword | ||
*/ | ||
export declare function GrpcStreamCall(service: string, method?: string): any; | ||
export declare function createMethodMetadata(target: any, key: string | symbol, service: string | undefined, method: string | undefined, streaming?: GrpcMethodStreamingType): { | ||
service: any; | ||
rpc: string; | ||
streaming: GrpcMethodStreamingType; | ||
}; |
@@ -5,2 +5,8 @@ "use strict"; | ||
const pattern_handler_enum_1 = require("../enums/pattern-handler.enum"); | ||
var GrpcMethodStreamingType; | ||
(function (GrpcMethodStreamingType) { | ||
GrpcMethodStreamingType["NO_STREAMING"] = "no_stream"; | ||
GrpcMethodStreamingType["RX_STREAMING"] = "rx_stream"; | ||
GrpcMethodStreamingType["PT_STREAMING"] = "pt_stream"; | ||
})(GrpcMethodStreamingType = exports.GrpcMethodStreamingType || (exports.GrpcMethodStreamingType = {})); | ||
/** | ||
@@ -23,13 +29,31 @@ * Subscribes to incoming messages which fulfils chosen pattern. | ||
exports.GrpcMethod = GrpcMethod; | ||
function createMethodMetadata(target, key, service, method) { | ||
function GrpcStreamMethod(service, method) { | ||
return (target, key, descriptor) => { | ||
const metadata = createMethodMetadata(target, key, service, method, GrpcMethodStreamingType.RX_STREAMING); | ||
return exports.MessagePattern(metadata)(target, key, descriptor); | ||
}; | ||
} | ||
exports.GrpcStreamMethod = GrpcStreamMethod; | ||
function GrpcStreamCall(service, method) { | ||
return (target, key, descriptor) => { | ||
const metadata = createMethodMetadata(target, key, service, method, GrpcMethodStreamingType.PT_STREAMING); | ||
return exports.MessagePattern(metadata)(target, key, descriptor); | ||
}; | ||
} | ||
exports.GrpcStreamCall = GrpcStreamCall; | ||
function createMethodMetadata(target, key, service, method, streaming = GrpcMethodStreamingType.NO_STREAMING) { | ||
const capitalizeFirstLetter = (str) => str.charAt(0).toUpperCase() + str.slice(1); | ||
if (!service) { | ||
const { name } = target.constructor; | ||
return { service: name, rpc: capitalizeFirstLetter(key) }; | ||
return { | ||
service: name, | ||
rpc: capitalizeFirstLetter(key), | ||
streaming, | ||
}; | ||
} | ||
if (service && !method) { | ||
return { service, rpc: capitalizeFirstLetter(key) }; | ||
return { service, rpc: capitalizeFirstLetter(key), streaming }; | ||
} | ||
return { service, rpc: method }; | ||
return { service, rpc: method, streaming }; | ||
} | ||
exports.createMethodMetadata = createMethodMetadata; |
@@ -10,1 +10,2 @@ import 'reflect-metadata'; | ||
export * from './server'; | ||
export * from './tokens'; |
@@ -20,1 +20,2 @@ "use strict"; | ||
__export(require("./server")); | ||
__export(require("./tokens")); |
@@ -9,1 +9,2 @@ export * from './client-grpc.interface'; | ||
export * from './pattern-metadata.interface'; | ||
export * from './request-context.interface'; |
@@ -82,3 +82,4 @@ import { MqttClientOptions } from '@nestjs/common/interfaces/external/mqtt-options.interface'; | ||
socketOptions?: any; | ||
noAck?: boolean; | ||
}; | ||
} |
@@ -21,2 +21,3 @@ import { Controller } from '@nestjs/common/interfaces/controllers/controller.interface'; | ||
assignClientToInstance<T = any>(instance: Controller, property: string, client: T): void; | ||
private registerRequestProvider; | ||
} |
@@ -5,2 +5,3 @@ "use strict"; | ||
const metadata_scanner_1 = require("@nestjs/core/metadata-scanner"); | ||
const request_constants_1 = require("@nestjs/core/router/request/request-constants"); | ||
const listener_metadata_explorer_1 = require("./listener-metadata-explorer"); | ||
@@ -29,2 +30,3 @@ class ListenersController { | ||
const contextId = context_id_factory_1.createContextId(); | ||
this.registerRequestProvider({ pattern, data }, contextId); | ||
const contextInstance = await this.injector.loadPerContext(instance, module, collection, contextId); | ||
@@ -46,3 +48,11 @@ const proxy = this.contextCreator.create(contextInstance, contextInstance[methodKey], moduleKey); | ||
} | ||
registerRequestProvider(request, contextId) { | ||
const coreModuleRef = this.container.getInternalCoreModuleRef(); | ||
const wrapper = coreModuleRef.getProviderByKey(request_constants_1.REQUEST); | ||
wrapper.setInstanceByContextId(contextId, { | ||
instance: request, | ||
isResolved: true, | ||
}); | ||
} | ||
} | ||
exports.ListenersController = ListenersController; |
@@ -14,3 +14,2 @@ import { CanActivate, ExceptionFilter, INestMicroservice, NestInterceptor, PipeTransform, WebSocketAdapter } from '@nestjs/common'; | ||
private isTerminated; | ||
private isInitialized; | ||
private isInitHookCalled; | ||
@@ -17,0 +16,0 @@ constructor(container: NestContainer, config: MicroserviceOptions, applicationConfig: ApplicationConfig); |
@@ -19,3 +19,2 @@ "use strict"; | ||
this.isTerminated = false; | ||
this.isInitialized = false; | ||
this.isInitHookCalled = false; | ||
@@ -22,0 +21,0 @@ this.microservicesModule.register(container, this.applicationConfig); |
{ | ||
"name": "@nestjs/microservices", | ||
"version": "6.2.4", | ||
"version": "6.3.0-next.1", | ||
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)", | ||
@@ -5,0 +5,0 @@ "author": "Kamil Mysliwiec", |
@@ -0,4 +1,13 @@ | ||
import { GrpcMethodStreamingType } from '../decorators'; | ||
import { CustomTransportStrategy } from '../interfaces'; | ||
import { GrpcOptions } from '../interfaces/microservice-configuration.interface'; | ||
import { Server } from './server'; | ||
interface GrpcCall<TRequest = any, TMetadata = any> { | ||
request: TRequest; | ||
metadata: TMetadata; | ||
end: Function; | ||
write: Function; | ||
on: Function; | ||
emit: Function; | ||
} | ||
export declare class ServerGrpc extends Server implements CustomTransportStrategy { | ||
@@ -21,7 +30,31 @@ private readonly options; | ||
}[]; | ||
/** | ||
* Will create service mapping from gRPC generated Object to handlers | ||
* defined with @GrpcMethod or @GrpcStreamMethod annotations | ||
* | ||
* @param grpcService | ||
* @param name | ||
*/ | ||
createService(grpcService: any, name: string): Promise<{}>; | ||
createPattern(service: string, methodName: string): string; | ||
createServiceMethod(methodHandler: Function, protoNativeHandler: any): Function; | ||
/** | ||
* Will create a string of a JSON serialized format | ||
* | ||
* @param service name of the service which should be a match to gRPC service definition name | ||
* @param methodName name of the method which is coming after rpc keyword | ||
* @param streaming GrpcMethodStreamingType parameter which should correspond to | ||
* stream keyword in gRPC service request part | ||
*/ | ||
createPattern(service: string, methodName: string, streaming: GrpcMethodStreamingType): string; | ||
/** | ||
* Will return async function which will handle gRPC call | ||
* with Rx streams or as a direct call passthrough | ||
* | ||
* @param methodHandler | ||
* @param protoNativeHandler | ||
*/ | ||
createServiceMethod(methodHandler: Function, protoNativeHandler: any, streamType: GrpcMethodStreamingType): Function; | ||
createUnaryServiceMethod(methodHandler: Function): Function; | ||
createStreamServiceMethod(methodHandler: Function): Function; | ||
createStreamDuplexMethod(methodHandler: Function): (call: GrpcCall<any, any>) => Promise<void>; | ||
createStreamCallMethod(methodHandler: Function): (call: GrpcCall<any, any>) => Promise<void>; | ||
close(): void; | ||
@@ -48,1 +81,2 @@ deserialize(obj: any): any; | ||
} | ||
export {}; |
@@ -7,2 +7,3 @@ "use strict"; | ||
const constants_1 = require("../constants"); | ||
const decorators_1 = require("../decorators"); | ||
const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception"); | ||
@@ -62,2 +63,9 @@ const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception"); | ||
} | ||
/** | ||
* Will create service mapping from gRPC generated Object to handlers | ||
* defined with @GrpcMethod or @GrpcStreamMethod annotations | ||
* | ||
* @param grpcService | ||
* @param name | ||
*/ | ||
async createService(grpcService, name) { | ||
@@ -67,17 +75,67 @@ const service = {}; | ||
for (const methodName in grpcService.prototype) { | ||
const methodHandler = this.getHandlerByPattern(this.createPattern(name, methodName)); | ||
let pattern = ''; | ||
let methodHandler = null; | ||
let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING; | ||
const methodFunction = grpcService.prototype[methodName]; | ||
const methodReqStreaming = methodFunction.requestStream; | ||
if (!shared_utils_1.isUndefined(methodReqStreaming) && methodReqStreaming) { | ||
// Try first pattern to be presented, RX streaming pattern would be | ||
// a preferable pattern to select among a few defined | ||
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING); | ||
methodHandler = this.messageHandlers.get(pattern); | ||
streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING; | ||
// If first pattern didn't match to any of handlers then try | ||
// pass-through handler to be presented | ||
if (!methodHandler) { | ||
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING); | ||
methodHandler = this.messageHandlers.get(pattern); | ||
streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING; | ||
} | ||
} | ||
else { | ||
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING); | ||
// Select handler if any presented for No-Streaming pattern | ||
methodHandler = this.messageHandlers.get(pattern); | ||
streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING; | ||
} | ||
if (!methodHandler) { | ||
continue; | ||
} | ||
service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName]); | ||
service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType); | ||
} | ||
return service; | ||
} | ||
createPattern(service, methodName) { | ||
/** | ||
* Will create a string of a JSON serialized format | ||
* | ||
* @param service name of the service which should be a match to gRPC service definition name | ||
* @param methodName name of the method which is coming after rpc keyword | ||
* @param streaming GrpcMethodStreamingType parameter which should correspond to | ||
* stream keyword in gRPC service request part | ||
*/ | ||
createPattern(service, methodName, streaming) { | ||
return JSON.stringify({ | ||
service, | ||
rpc: methodName, | ||
streaming, | ||
}); | ||
} | ||
createServiceMethod(methodHandler, protoNativeHandler) { | ||
/** | ||
* Will return async function which will handle gRPC call | ||
* with Rx streams or as a direct call passthrough | ||
* | ||
* @param methodHandler | ||
* @param protoNativeHandler | ||
*/ | ||
createServiceMethod(methodHandler, protoNativeHandler, streamType) { | ||
// If proto handler has request stream as "true" then we expect it to have | ||
// streaming from the side of requester | ||
if (protoNativeHandler.requestStream) { | ||
// If any handlers were defined with GrpcStreamMethod annotation use RX | ||
if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING) | ||
return this.createStreamDuplexMethod(methodHandler); | ||
// If any handlers were defined with GrpcStreamCall annotation | ||
else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING) | ||
return this.createStreamCallMethod(methodHandler); | ||
} | ||
return protoNativeHandler.responseStream | ||
@@ -98,3 +156,6 @@ ? this.createStreamServiceMethod(methodHandler) | ||
await result$ | ||
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT))) | ||
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => { | ||
call.emit('error', err); | ||
return rxjs_1.EMPTY; | ||
})) | ||
.forEach(data => call.write(data)); | ||
@@ -104,2 +165,34 @@ call.end(); | ||
} | ||
createStreamDuplexMethod(methodHandler) { | ||
return async (call) => { | ||
const req = new rxjs_1.Subject(); | ||
call.on('data', (m) => req.next(m)); | ||
call.on('error', (e) => { | ||
// Check if error means that stream ended on other end | ||
if (String(e) | ||
.toLowerCase() | ||
.indexOf('cancelled') > -1) { | ||
call.end(); | ||
return; | ||
} | ||
// If another error then just pass it along | ||
req.error(e); | ||
}); | ||
call.on('end', () => req.complete()); | ||
const handler = methodHandler(req.asObservable()); | ||
const res = this.transformToObservable(await handler); | ||
await res | ||
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => { | ||
call.emit('error', err); | ||
return rxjs_1.EMPTY; | ||
})) | ||
.forEach(m => call.write(m)); | ||
call.end(); | ||
}; | ||
} | ||
createStreamCallMethod(methodHandler) { | ||
return async (call) => { | ||
methodHandler(call); | ||
}; | ||
} | ||
close() { | ||
@@ -106,0 +199,0 @@ this.grpcClient && this.grpcClient.forceShutdown(); |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
186114
143
4184
2