Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@nestjs/microservices

Package Overview
Dependencies
Maintainers
1
Versions
366
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@nestjs/microservices - npm Package Compare versions

Comparing version 10.2.2 to 10.2.3

6

package.json
{
"name": "@nestjs/microservices",
"version": "10.2.2",
"version": "10.2.3",
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)",

@@ -25,4 +25,4 @@ "author": "Kamil Mysliwiec",

"devDependencies": {
"@nestjs/common": "10.2.2",
"@nestjs/core": "10.2.2"
"@nestjs/common": "10.2.3",
"@nestjs/core": "10.2.3"
},

@@ -29,0 +29,0 @@ "peerDependencies": {

@@ -0,1 +1,2 @@

import { Observable } from 'rxjs';
import { GrpcMethodStreamingType } from '../decorators';

@@ -13,2 +14,3 @@ import { Transport } from '../enums';

on: Function;
off: Function;
emit: Function;

@@ -61,2 +63,13 @@ }

createStreamServiceMethod(methodHandler: Function): Function;
/**
* Writes an observable to a GRPC call.
*
* This function will ensure that backpressure is managed while writing values
* that come from an observable to a GRPC call.
*
* @param source The observable we want to write out to the GRPC call.
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
writeObservableToGrpc<T>(source: Observable<T>, call: GrpcCall<T>): Promise<void>;
createRequestStreamMethod(methodHandler: Function, isResponseStream: boolean): (call: GrpcCall, callback: (err: unknown, value: unknown) => void) => Promise<void>;

@@ -63,0 +76,0 @@ createStreamCallMethod(methodHandler: Function, isResponseStream: boolean): (call: GrpcCall, callback: (err: unknown, value: unknown) => void) => Promise<void>;

@@ -158,11 +158,103 @@ "use strict";

const result$ = this.transformToObservable(await handler);
await result$
.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
try {
await this.writeObservableToGrpc(result$, call);
}
catch (err) {
call.emit('error', err);
return rxjs_1.EMPTY;
}))
.forEach(data => call.write(data));
call.end();
return;
}
};
}
/**
* Writes an observable to a GRPC call.
*
* This function will ensure that backpressure is managed while writing values
* that come from an observable to a GRPC call.
*
* @param source The observable we want to write out to the GRPC call.
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
writeObservableToGrpc(source, call) {
return new Promise((resolve, reject) => {
// This buffer is used to house values that arrive
// while the call is in the process of writing and draining.
const buffer = [];
let isComplete = false;
let writing = false;
const cleanups = [];
const cleanup = () => {
for (const cleanup of cleanups) {
cleanup();
}
};
const write = (value) => {
writing = true;
call.write(value);
};
const done = () => {
call.end();
resolve();
cleanup();
};
// Handling backpressure by waiting for drain event
const drainHandler = () => {
if (writing) {
writing = false;
if (buffer.length > 0) {
// Write any queued values we have in
// our buffer. Note that the `writing` boolean
// flips from false to true synchronously in this case
// that will prevent values arriving in our `next` call
// below from being interleaved in the written output.
write(buffer.shift());
}
else if (isComplete) {
// Otherwise, if we're complete, end the call.
done();
}
}
};
call.on('drain', drainHandler);
cleanups.push(() => {
call.off('drain', drainHandler);
});
const subscription = new rxjs_1.Subscription();
// Make sure that a cancel event unsubscribes from
// the source observable.
const cancelHandler = () => {
subscription.unsubscribe();
done();
};
call.on(constants_1.CANCEL_EVENT, cancelHandler);
cleanups.push(() => {
call.off(constants_1.CANCEL_EVENT, cancelHandler);
});
subscription.add(source.subscribe({
next: (value) => {
if (writing) {
// If a value arrives while we're writing
// then we queue it up to be processed FIFO.
buffer.push(value);
}
else {
// If we're not currently writing, then
// we can write the value immediately.
write(value);
}
},
error: (err) => {
call.emit('error', err);
reject(err);
cleanup();
},
complete: () => {
isComplete = true;
if (buffer.length === 0) {
done();
}
},
}));
});
}
createRequestStreamMethod(methodHandler, isResponseStream) {

@@ -186,9 +278,3 @@ return async (call, callback) => {

if (isResponseStream) {
await res
.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => {
call.emit('error', err);
return rxjs_1.EMPTY;
}))
.forEach(m => call.write(m));
call.end();
await this.writeObservableToGrpc(res, call);
}

@@ -195,0 +281,0 @@ else {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc