New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@nestjs/microservices

Package Overview
Dependencies
Maintainers
1
Versions
376
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@nestjs/microservices - npm Package Compare versions

Comparing version 6.2.4 to 6.3.0-next.1

interfaces/request-context.interface.d.ts

7

client/client-grpc.d.ts
import { Logger } from '@nestjs/common/services/logger.service';
import { Observable } from 'rxjs';
import { ClientGrpc } from '../interfaces';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { ClientGrpc, GrpcOptions } from '../interfaces';
import { ClientProxy } from './client-proxy';
export declare class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
protected readonly options: ClientOptions['options'];
protected readonly options: GrpcOptions['options'];
protected readonly logger: Logger;
protected readonly url: string;
protected grpcClient: any;
constructor(options: ClientOptions['options']);
constructor(options: GrpcOptions['options']);
getService<T extends {}>(name: string): T;

@@ -13,0 +12,0 @@ createServiceMethod(client: any, methodName: string): (...args: any[]) => Observable<any>;

@@ -20,6 +20,4 @@ "use strict";

this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
this.url =
this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
const protoLoader = this.getOptionsProp(options, 'protoLoader') ||
constants_1.GRPC_DEFAULT_PROTO_LOADER;
this.url = this.getOptionsProp(options, 'url') || constants_1.GRPC_DEFAULT_URL;
const protoLoader = this.getOptionsProp(options, 'protoLoader') || constants_1.GRPC_DEFAULT_PROTO_LOADER;
grpcPackage = load_package_util_1.loadPackage('grpc', ClientGrpcProxy.name, () => require('grpc'));

@@ -30,4 +28,10 @@ grpcProtoLoaderPackage = load_package_util_1.loadPackage(protoLoader, ClientGrpcProxy.name);

getService(name) {
const maxSendMessageLengthKey = 'grpc.max_send_message_length';
const maxReceiveMessageLengthKey = 'grpc.max_receive_message_length';
const maxMessageLengthOptions = {
[maxSendMessageLengthKey]: this.getOptionsProp(this.options, 'maxSendMessageLength', constants_1.GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH),
[maxReceiveMessageLengthKey]: this.getOptionsProp(this.options, 'maxReceiveMessageLength', constants_1.GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH),
};
const options = shared_utils_1.isObject(this.options)
? Object.assign({}, this.options, { loader: '' }) : {};
? Object.assign({}, this.options, maxMessageLengthOptions, { loader: '' }) : Object.assign({}, maxMessageLengthOptions);
if (!this.grpcClient[name]) {

@@ -34,0 +38,0 @@ throw new invalid_grpc_service_exception_1.InvalidGrpcServiceException();

@@ -5,7 +5,6 @@ /// <reference types="node" />

import { MqttClient } from '../external/mqtt-client.interface';
import { ReadPacket, WritePacket } from '../interfaces';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { MqttOptions, ReadPacket, WritePacket } from '../interfaces';
import { ClientProxy } from './client-proxy';
export declare class ClientMqtt extends ClientProxy {
protected readonly options: ClientOptions['options'];
protected readonly options: MqttOptions['options'];
protected readonly logger: Logger;

@@ -15,3 +14,3 @@ protected readonly url: string;

protected connection: Promise<any>;
constructor(options: ClientOptions['options']);
constructor(options: MqttOptions['options']);
getAckPatternName(pattern: string): string;

@@ -18,0 +17,0 @@ getResPatternName(pattern: string): string;

@@ -16,4 +16,3 @@ "use strict";

this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
this.url =
this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL;
this.url = this.getOptionsProp(this.options, 'url') || constants_1.MQTT_DEFAULT_URL;
mqttPackage = load_package_util_1.loadPackage('mqtt', ClientMqtt.name, () => require('mqtt'));

@@ -20,0 +19,0 @@ }

import { Logger } from '@nestjs/common/services/logger.service';
import { Client } from '../external/nats-client.interface';
import { PacketId, ReadPacket, WritePacket } from '../interfaces';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
import { ClientProxy } from './client-proxy';
export declare class ClientNats extends ClientProxy {
protected readonly options: ClientOptions['options'];
protected readonly options: NatsOptions['options'];
protected readonly logger: Logger;

@@ -12,3 +11,3 @@ protected readonly url: string;

protected connection: Promise<any>;
constructor(options: ClientOptions['options']);
constructor(options: NatsOptions['options']);
close(): void;

@@ -15,0 +14,0 @@ connect(): Promise<any>;

@@ -15,4 +15,3 @@ "use strict";

this.logger = new logger_service_1.Logger(client_proxy_1.ClientProxy.name);
this.url =
this.getOptionsProp(this.options, 'url') || constants_1.NATS_DEFAULT_URL;
this.url = this.getOptionsProp(this.options, 'url') || constants_1.NATS_DEFAULT_URL;
natsPackage = load_package_util_1.loadPackage('nats', ClientNats.name, () => require('nats'));

@@ -76,5 +75,5 @@ }

const pattern = this.normalizePattern(packet.pattern);
return new Promise((resolve, reject) => this.natsClient.publish(pattern, packet, err => (err ? reject(err) : resolve())));
return new Promise((resolve, reject) => this.natsClient.publish(pattern, packet, err => err ? reject(err) : resolve()));
}
}
exports.ClientNats = ClientNats;

@@ -0,3 +1,5 @@

import { Transport } from '../enums/transport.enum';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { Closeable } from '../interfaces/closeable.interface';
import { ClientGrpcProxy } from './client-grpc';
import { ClientProxy } from './client-proxy';

@@ -8,3 +10,6 @@ export interface IClientProxyFactory {

export declare class ClientProxyFactory {
static create(clientOptions: {
transport: Transport.GRPC;
} & ClientOptions): ClientGrpcProxy;
static create(clientOptions: ClientOptions): ClientProxy & Closeable;
}

@@ -14,6 +14,4 @@ import { Observable, Observer } from 'rxjs';

protected connect$(instance: any, errorEvent?: string, connectEvent?: string): Observable<any>;
protected getOptionsProp<T extends {
options?: any;
}>(obj: ClientOptions['options'], prop: keyof T['options'], defaultValue?: any): any;
protected getOptionsProp<T extends ClientOptions['options'], K extends keyof T>(obj: T, prop: K, defaultValue?: T[K]): T[K];
protected normalizePattern<T = any>(pattern: T): string;
}

@@ -51,8 +51,8 @@ "use strict";

getOptionsProp(obj, prop, defaultValue = undefined) {
return obj ? obj[prop] : defaultValue;
return (obj && obj[prop]) || defaultValue;
}
normalizePattern(pattern) {
return shared_utils_1.isString(pattern) ? pattern : JSON.stringify(pattern);
return (shared_utils_1.isString(pattern) ? pattern : JSON.stringify(pattern));
}
}
exports.ClientProxy = ClientProxy;
import { Logger } from '@nestjs/common/services/logger.service';
import { Subject } from 'rxjs';
import { ClientOpts, RedisClient, RetryStrategyOptions } from '../external/redis.interface';
import { ReadPacket, WritePacket } from '../interfaces';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { ReadPacket, RedisOptions, WritePacket } from '../interfaces';
import { ClientProxy } from './client-proxy';
export declare class ClientRedis extends ClientProxy {
protected readonly options: ClientOptions['options'];
protected readonly options: RedisOptions['options'];
protected readonly logger: Logger;

@@ -15,3 +14,3 @@ protected readonly url: string;

protected isExplicitlyTerminated: boolean;
constructor(options: ClientOptions['options']);
constructor(options: RedisOptions['options']);
getAckPatternName(pattern: string): string;

@@ -18,0 +17,0 @@ getResPatternName(pattern: string): string;

@@ -17,4 +17,3 @@ "use strict";

this.isExplicitlyTerminated = false;
this.url =
this.getOptionsProp(options, 'url') || constants_1.REDIS_DEFAULT_URL;
this.url = this.getOptionsProp(options, 'url') || constants_1.REDIS_DEFAULT_URL;
redisPackage = load_package_util_1.loadPackage('redis', ClientRedis.name, () => require('redis'));

@@ -69,4 +68,3 @@ }

!this.getOptionsProp(this.options, 'retryAttempts') ||
options.attempt >
this.getOptionsProp(this.options, 'retryAttempts')) {
options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
return undefined;

@@ -119,5 +117,5 @@ }

const pattern = this.normalizePattern(packet.pattern);
return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(packet), err => (err ? reject(err) : resolve())));
return new Promise((resolve, reject) => this.pubClient.publish(pattern, JSON.stringify(packet), err => err ? reject(err) : resolve()));
}
}
exports.ClientRedis = ClientRedis;

@@ -5,7 +5,7 @@ /// <reference types="node" />

import { Observable } from 'rxjs';
import { ClientOptions, ReadPacket } from '../interfaces';
import { ReadPacket, RmqOptions } from '../interfaces';
import { WritePacket } from './../interfaces';
import { ClientProxy } from './client-proxy';
export declare class ClientRMQ extends ClientProxy {
protected readonly options: ClientOptions['options'];
protected readonly options: RmqOptions['options'];
protected readonly logger: Logger;

@@ -19,3 +19,3 @@ protected connection: Promise<any>;

protected responseEmitter: EventEmitter;
constructor(options: ClientOptions['options']);
constructor(options: RmqOptions['options']);
close(): void;

@@ -22,0 +22,0 @@ consumeChannel(): void;

@@ -20,8 +20,5 @@ "use strict";

this.channel = null;
this.urls = this.getOptionsProp(this.options, 'urls') || [
constants_1.RQM_DEFAULT_URL,
];
this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
this.queue =
this.getOptionsProp(this.options, 'queue') ||
constants_1.RQM_DEFAULT_QUEUE;
this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
this.queueOptions =

@@ -38,3 +35,6 @@ this.getOptionsProp(this.options, 'queueOptions') ||

consumeChannel() {
this.channel.addSetup((channel) => channel.consume(REPLY_QUEUE, (msg) => this.responseEmitter.emit(msg.properties.correlationId, msg), { noAck: true }));
const noAck = this.getOptionsProp(this.options, 'noAck') || constants_1.RQM_DEFAULT_NOACK;
this.channel.addSetup((channel) => channel.consume(REPLY_QUEUE, (msg) => this.responseEmitter.emit(msg.properties.correlationId, msg), {
noAck,
}));
}

@@ -41,0 +41,0 @@ connect() {

@@ -0,4 +1,4 @@

import { JsonSocket } from '../helpers/json-socket';
import { PacketId, ReadPacket, WritePacket } from '../interfaces';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import { JsonSocket } from '../helpers/json-socket';
import { TcpClientOptions } from '../interfaces/client-metadata.interface';
import { ClientProxy } from './client-proxy';

@@ -12,3 +12,3 @@ export declare class ClientTCP extends ClientProxy {

private socket;
constructor(options: ClientOptions['options']);
constructor(options: TcpClientOptions['options']);
connect(): Promise<any>;

@@ -15,0 +15,0 @@ handleResponse(buffer: WritePacket & PacketId): void;

@@ -15,8 +15,4 @@ "use strict";

this.isConnected = false;
this.port =
this.getOptionsProp(options, 'port') ||
constants_1.TCP_DEFAULT_PORT;
this.host =
this.getOptionsProp(options, 'host') ||
constants_1.TCP_DEFAULT_HOST;
this.port = this.getOptionsProp(options, 'port') || constants_1.TCP_DEFAULT_PORT;
this.host = this.getOptionsProp(options, 'host') || constants_1.TCP_DEFAULT_HOST;
}

@@ -23,0 +19,0 @@ connect() {

@@ -24,2 +24,3 @@ export declare const TCP_DEFAULT_PORT = 3000;

export declare const RQM_DEFAULT_QUEUE_OPTIONS: {};
export declare const RQM_DEFAULT_NOACK = true;
export declare const GRPC_DEFAULT_PROTO_LOADER = "@grpc/proto-loader";

@@ -26,0 +27,0 @@ export declare const NO_MESSAGE_HANDLER = "There is no matching message handler defined in the remote service.";

@@ -26,2 +26,3 @@ "use strict";

exports.RQM_DEFAULT_QUEUE_OPTIONS = {};
exports.RQM_DEFAULT_NOACK = true;
exports.GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';

@@ -28,0 +29,0 @@ exports.NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`;

import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
export declare enum GrpcMethodStreamingType {
NO_STREAMING = "no_stream",
RX_STREAMING = "rx_stream",
PT_STREAMING = "pt_stream"
}
/**

@@ -11,5 +16,28 @@ * Subscribes to incoming messages which fulfils chosen pattern.

export declare function GrpcMethod(service: string, method?: string): MethodDecorator;
export declare function createMethodMetadata(target: any, key: string | symbol, service: string | undefined, method: string | undefined): {
/**
* Registers gRPC call through RX handler for service and method
*
* @param service String parameter reflecting the name of service definition from proto file
*/
export declare function GrpcStreamMethod(service?: string): any;
/**
* @param service String parameter reflecting the name of service definition from proto file
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword
*/
export declare function GrpcStreamMethod(service: string, method?: string): any;
/**
* Registers gRPC call pass through handler for service and method
*
* @param service String parameter reflecting the name of service definition from proto file
*/
export declare function GrpcStreamCall(service?: string): any;
/**
* @param service String parameter reflecting the name of service definition from proto file
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword
*/
export declare function GrpcStreamCall(service: string, method?: string): any;
export declare function createMethodMetadata(target: any, key: string | symbol, service: string | undefined, method: string | undefined, streaming?: GrpcMethodStreamingType): {
service: any;
rpc: string;
streaming: GrpcMethodStreamingType;
};

@@ -5,2 +5,8 @@ "use strict";

const pattern_handler_enum_1 = require("../enums/pattern-handler.enum");
var GrpcMethodStreamingType;
(function (GrpcMethodStreamingType) {
GrpcMethodStreamingType["NO_STREAMING"] = "no_stream";
GrpcMethodStreamingType["RX_STREAMING"] = "rx_stream";
GrpcMethodStreamingType["PT_STREAMING"] = "pt_stream";
})(GrpcMethodStreamingType = exports.GrpcMethodStreamingType || (exports.GrpcMethodStreamingType = {}));
/**

@@ -23,13 +29,31 @@ * Subscribes to incoming messages which fulfils chosen pattern.

exports.GrpcMethod = GrpcMethod;
function createMethodMetadata(target, key, service, method) {
function GrpcStreamMethod(service, method) {
return (target, key, descriptor) => {
const metadata = createMethodMetadata(target, key, service, method, GrpcMethodStreamingType.RX_STREAMING);
return exports.MessagePattern(metadata)(target, key, descriptor);
};
}
exports.GrpcStreamMethod = GrpcStreamMethod;
function GrpcStreamCall(service, method) {
return (target, key, descriptor) => {
const metadata = createMethodMetadata(target, key, service, method, GrpcMethodStreamingType.PT_STREAMING);
return exports.MessagePattern(metadata)(target, key, descriptor);
};
}
exports.GrpcStreamCall = GrpcStreamCall;
function createMethodMetadata(target, key, service, method, streaming = GrpcMethodStreamingType.NO_STREAMING) {
const capitalizeFirstLetter = (str) => str.charAt(0).toUpperCase() + str.slice(1);
if (!service) {
const { name } = target.constructor;
return { service: name, rpc: capitalizeFirstLetter(key) };
return {
service: name,
rpc: capitalizeFirstLetter(key),
streaming,
};
}
if (service && !method) {
return { service, rpc: capitalizeFirstLetter(key) };
return { service, rpc: capitalizeFirstLetter(key), streaming };
}
return { service, rpc: method };
return { service, rpc: method, streaming };
}
exports.createMethodMetadata = createMethodMetadata;

@@ -10,1 +10,2 @@ import 'reflect-metadata';

export * from './server';
export * from './tokens';

@@ -20,1 +20,2 @@ "use strict";

__export(require("./server"));
__export(require("./tokens"));

@@ -9,1 +9,2 @@ export * from './client-grpc.interface';

export * from './pattern-metadata.interface';
export * from './request-context.interface';

@@ -82,3 +82,4 @@ import { MqttClientOptions } from '@nestjs/common/interfaces/external/mqtt-options.interface';

socketOptions?: any;
noAck?: boolean;
};
}

@@ -21,2 +21,3 @@ import { Controller } from '@nestjs/common/interfaces/controllers/controller.interface';

assignClientToInstance<T = any>(instance: Controller, property: string, client: T): void;
private registerRequestProvider;
}

@@ -5,2 +5,3 @@ "use strict";

const metadata_scanner_1 = require("@nestjs/core/metadata-scanner");
const request_constants_1 = require("@nestjs/core/router/request/request-constants");
const listener_metadata_explorer_1 = require("./listener-metadata-explorer");

@@ -29,2 +30,3 @@ class ListenersController {

const contextId = context_id_factory_1.createContextId();
this.registerRequestProvider({ pattern, data }, contextId);
const contextInstance = await this.injector.loadPerContext(instance, module, collection, contextId);

@@ -46,3 +48,11 @@ const proxy = this.contextCreator.create(contextInstance, contextInstance[methodKey], moduleKey);

}
registerRequestProvider(request, contextId) {
const coreModuleRef = this.container.getInternalCoreModuleRef();
const wrapper = coreModuleRef.getProviderByKey(request_constants_1.REQUEST);
wrapper.setInstanceByContextId(contextId, {
instance: request,
isResolved: true,
});
}
}
exports.ListenersController = ListenersController;

@@ -14,3 +14,2 @@ import { CanActivate, ExceptionFilter, INestMicroservice, NestInterceptor, PipeTransform, WebSocketAdapter } from '@nestjs/common';

private isTerminated;
private isInitialized;
private isInitHookCalled;

@@ -17,0 +16,0 @@ constructor(container: NestContainer, config: MicroserviceOptions, applicationConfig: ApplicationConfig);

@@ -19,3 +19,2 @@ "use strict";

this.isTerminated = false;
this.isInitialized = false;
this.isInitHookCalled = false;

@@ -22,0 +21,0 @@ this.microservicesModule.register(container, this.applicationConfig);

{
"name": "@nestjs/microservices",
"version": "6.2.4",
"version": "6.3.0-next.1",
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)",

@@ -5,0 +5,0 @@ "author": "Kamil Mysliwiec",

@@ -0,4 +1,13 @@

import { GrpcMethodStreamingType } from '../decorators';
import { CustomTransportStrategy } from '../interfaces';
import { GrpcOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
interface GrpcCall<TRequest = any, TMetadata = any> {
request: TRequest;
metadata: TMetadata;
end: Function;
write: Function;
on: Function;
emit: Function;
}
export declare class ServerGrpc extends Server implements CustomTransportStrategy {

@@ -21,7 +30,31 @@ private readonly options;

}[];
/**
* Will create service mapping from gRPC generated Object to handlers
* defined with @GrpcMethod or @GrpcStreamMethod annotations
*
* @param grpcService
* @param name
*/
createService(grpcService: any, name: string): Promise<{}>;
createPattern(service: string, methodName: string): string;
createServiceMethod(methodHandler: Function, protoNativeHandler: any): Function;
/**
* Will create a string of a JSON serialized format
*
* @param service name of the service which should be a match to gRPC service definition name
* @param methodName name of the method which is coming after rpc keyword
* @param streaming GrpcMethodStreamingType parameter which should correspond to
* stream keyword in gRPC service request part
*/
createPattern(service: string, methodName: string, streaming: GrpcMethodStreamingType): string;
/**
* Will return async function which will handle gRPC call
* with Rx streams or as a direct call passthrough
*
* @param methodHandler
* @param protoNativeHandler
*/
createServiceMethod(methodHandler: Function, protoNativeHandler: any, streamType: GrpcMethodStreamingType): Function;
createUnaryServiceMethod(methodHandler: Function): Function;
createStreamServiceMethod(methodHandler: Function): Function;
createStreamDuplexMethod(methodHandler: Function): (call: GrpcCall<any, any>) => Promise<void>;
createStreamCallMethod(methodHandler: Function): (call: GrpcCall<any, any>) => Promise<void>;
close(): void;

@@ -48,1 +81,2 @@ deserialize(obj: any): any;

}
export {};

@@ -7,2 +7,3 @@ "use strict";

const constants_1 = require("../constants");
const decorators_1 = require("../decorators");
const invalid_grpc_package_exception_1 = require("../errors/invalid-grpc-package.exception");

@@ -62,2 +63,9 @@ const invalid_proto_definition_exception_1 = require("../errors/invalid-proto-definition.exception");

}
/**
* Will create service mapping from gRPC generated Object to handlers
* defined with @GrpcMethod or @GrpcStreamMethod annotations
*
* @param grpcService
* @param name
*/
async createService(grpcService, name) {

@@ -67,17 +75,67 @@ const service = {};

for (const methodName in grpcService.prototype) {
const methodHandler = this.getHandlerByPattern(this.createPattern(name, methodName));
let pattern = '';
let methodHandler = null;
let streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
const methodFunction = grpcService.prototype[methodName];
const methodReqStreaming = methodFunction.requestStream;
if (!shared_utils_1.isUndefined(methodReqStreaming) && methodReqStreaming) {
// Try first pattern to be presented, RX streaming pattern would be
// a preferable pattern to select among a few defined
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.RX_STREAMING);
methodHandler = this.messageHandlers.get(pattern);
streamingType = decorators_1.GrpcMethodStreamingType.RX_STREAMING;
// If first pattern didn't match to any of handlers then try
// pass-through handler to be presented
if (!methodHandler) {
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.PT_STREAMING);
methodHandler = this.messageHandlers.get(pattern);
streamingType = decorators_1.GrpcMethodStreamingType.PT_STREAMING;
}
}
else {
pattern = this.createPattern(name, methodName, decorators_1.GrpcMethodStreamingType.NO_STREAMING);
// Select handler if any presented for No-Streaming pattern
methodHandler = this.messageHandlers.get(pattern);
streamingType = decorators_1.GrpcMethodStreamingType.NO_STREAMING;
}
if (!methodHandler) {
continue;
}
service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName]);
service[methodName] = await this.createServiceMethod(methodHandler, grpcService.prototype[methodName], streamingType);
}
return service;
}
createPattern(service, methodName) {
/**
* Will create a string of a JSON serialized format
*
* @param service name of the service which should be a match to gRPC service definition name
* @param methodName name of the method which is coming after rpc keyword
* @param streaming GrpcMethodStreamingType parameter which should correspond to
* stream keyword in gRPC service request part
*/
createPattern(service, methodName, streaming) {
return JSON.stringify({
service,
rpc: methodName,
streaming,
});
}
createServiceMethod(methodHandler, protoNativeHandler) {
/**
* Will return async function which will handle gRPC call
* with Rx streams or as a direct call passthrough
*
* @param methodHandler
* @param protoNativeHandler
*/
createServiceMethod(methodHandler, protoNativeHandler, streamType) {
// If proto handler has request stream as "true" then we expect it to have
// streaming from the side of requester
if (protoNativeHandler.requestStream) {
// If any handlers were defined with GrpcStreamMethod annotation use RX
if (streamType === decorators_1.GrpcMethodStreamingType.RX_STREAMING)
return this.createStreamDuplexMethod(methodHandler);
// If any handlers were defined with GrpcStreamCall annotation
else if (streamType === decorators_1.GrpcMethodStreamingType.PT_STREAMING)
return this.createStreamCallMethod(methodHandler);
}
return protoNativeHandler.responseStream

@@ -98,3 +156,6 @@ ? this.createStreamServiceMethod(methodHandler)

await result$
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)))
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => {
call.emit('error', err);
return rxjs_1.EMPTY;
}))
.forEach(data => call.write(data));

@@ -104,2 +165,34 @@ call.end();

}
createStreamDuplexMethod(methodHandler) {
return async (call) => {
const req = new rxjs_1.Subject();
call.on('data', (m) => req.next(m));
call.on('error', (e) => {
// Check if error means that stream ended on other end
if (String(e)
.toLowerCase()
.indexOf('cancelled') > -1) {
call.end();
return;
}
// If another error then just pass it along
req.error(e);
});
call.on('end', () => req.complete());
const handler = methodHandler(req.asObservable());
const res = this.transformToObservable(await handler);
await res
.pipe(operators_1.takeUntil(rxjs_1.fromEvent(call, constants_1.CANCEL_EVENT)), operators_1.catchError(err => {
call.emit('error', err);
return rxjs_1.EMPTY;
}))
.forEach(m => call.write(m));
call.end();
};
}
createStreamCallMethod(methodHandler) {
return async (call) => {
methodHandler(call);
};
}
close() {

@@ -106,0 +199,0 @@ this.grpcClient && this.grpcClient.forceShutdown();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc