@nestjs/microservices
Advanced tools
Comparing version 10.2.2 to 10.2.3
{ | ||
"name": "@nestjs/microservices", | ||
"version": "10.2.2", | ||
"version": "10.2.3", | ||
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)", | ||
@@ -25,4 +25,4 @@ "author": "Kamil Mysliwiec", | ||
"devDependencies": { | ||
"@nestjs/common": "10.2.2", | ||
"@nestjs/core": "10.2.2" | ||
"@nestjs/common": "10.2.3", | ||
"@nestjs/core": "10.2.3" | ||
}, | ||
@@ -29,0 +29,0 @@ "peerDependencies": { |
@@ -0,1 +1,2 @@ | ||
import { Observable } from 'rxjs'; | ||
import { GrpcMethodStreamingType } from '../decorators'; | ||
@@ -13,2 +14,3 @@ import { Transport } from '../enums'; | ||
on: Function; | ||
off: Function; | ||
emit: Function; | ||
@@ -61,2 +63,13 @@ } | ||
createStreamServiceMethod(methodHandler: Function): Function; | ||
/** | ||
* Writes an observable to a GRPC call. | ||
* | ||
* This function will ensure that backpressure is managed while writing values | ||
* that come from an observable to a GRPC call. | ||
* | ||
* @param source The observable we want to write out to the GRPC call. | ||
* @param call The GRPC call we want to write to. | ||
* @returns A promise that resolves when we're done writing to the call. | ||
*/ | ||
writeObservableToGrpc<T>(source: Observable<T>, call: GrpcCall<T>): Promise<void>; | ||
createRequestStreamMethod(methodHandler: Function, isResponseStream: boolean): (call: GrpcCall, callback: (err: unknown, value: unknown) => void) => Promise<void>; | ||
@@ -63,0 +76,0 @@ createStreamCallMethod(methodHandler: Function, isResponseStream: boolean): (call: GrpcCall, callback: (err: unknown, value: unknown) => void) => Promise<void>; |
@@ -158,11 +158,103 @@ "use strict"; | ||
const result$ = this.transformToObservable(await handler); | ||
await result$ | ||
.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => { | ||
try { | ||
await this.writeObservableToGrpc(result$, call); | ||
} | ||
catch (err) { | ||
call.emit('error', err); | ||
return rxjs_1.EMPTY; | ||
})) | ||
.forEach(data => call.write(data)); | ||
call.end(); | ||
return; | ||
} | ||
}; | ||
} | ||
/** | ||
* Writes an observable to a GRPC call. | ||
* | ||
* This function will ensure that backpressure is managed while writing values | ||
* that come from an observable to a GRPC call. | ||
* | ||
* @param source The observable we want to write out to the GRPC call. | ||
* @param call The GRPC call we want to write to. | ||
* @returns A promise that resolves when we're done writing to the call. | ||
*/ | ||
writeObservableToGrpc(source, call) { | ||
return new Promise((resolve, reject) => { | ||
// This buffer is used to house values that arrive | ||
// while the call is in the process of writing and draining. | ||
const buffer = []; | ||
let isComplete = false; | ||
let writing = false; | ||
const cleanups = []; | ||
const cleanup = () => { | ||
for (const cleanup of cleanups) { | ||
cleanup(); | ||
} | ||
}; | ||
const write = (value) => { | ||
writing = true; | ||
call.write(value); | ||
}; | ||
const done = () => { | ||
call.end(); | ||
resolve(); | ||
cleanup(); | ||
}; | ||
// Handling backpressure by waiting for drain event | ||
const drainHandler = () => { | ||
if (writing) { | ||
writing = false; | ||
if (buffer.length > 0) { | ||
// Write any queued values we have in | ||
// our buffer. Note that the `writing` boolean | ||
// flips from false to true synchronously in this case | ||
// that will prevent values arriving in our `next` call | ||
// below from being interleaved in the written output. | ||
write(buffer.shift()); | ||
} | ||
else if (isComplete) { | ||
// Otherwise, if we're complete, end the call. | ||
done(); | ||
} | ||
} | ||
}; | ||
call.on('drain', drainHandler); | ||
cleanups.push(() => { | ||
call.off('drain', drainHandler); | ||
}); | ||
const subscription = new rxjs_1.Subscription(); | ||
// Make sure that a cancel event unsubscribes from | ||
// the source observable. | ||
const cancelHandler = () => { | ||
subscription.unsubscribe(); | ||
done(); | ||
}; | ||
call.on(constants_1.CANCEL_EVENT, cancelHandler); | ||
cleanups.push(() => { | ||
call.off(constants_1.CANCEL_EVENT, cancelHandler); | ||
}); | ||
subscription.add(source.subscribe({ | ||
next: (value) => { | ||
if (writing) { | ||
// If a value arrives while we're writing | ||
// then we queue it up to be processed FIFO. | ||
buffer.push(value); | ||
} | ||
else { | ||
// If we're not currently writing, then | ||
// we can write the value immediately. | ||
write(value); | ||
} | ||
}, | ||
error: (err) => { | ||
call.emit('error', err); | ||
reject(err); | ||
cleanup(); | ||
}, | ||
complete: () => { | ||
isComplete = true; | ||
if (buffer.length === 0) { | ||
done(); | ||
} | ||
}, | ||
})); | ||
}); | ||
} | ||
createRequestStreamMethod(methodHandler, isResponseStream) { | ||
@@ -186,9 +278,3 @@ return async (call, callback) => { | ||
if (isResponseStream) { | ||
await res | ||
.pipe((0, operators_1.takeUntil)((0, rxjs_1.fromEvent)(call, constants_1.CANCEL_EVENT)), (0, operators_1.catchError)(err => { | ||
call.emit('error', err); | ||
return rxjs_1.EMPTY; | ||
})) | ||
.forEach(m => call.write(m)); | ||
call.end(); | ||
await this.writeObservableToGrpc(res, call); | ||
} | ||
@@ -195,0 +281,0 @@ else { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
382307
9262