@grpc/grpc-js
Advanced tools
Comparing version 0.4.3 to 0.5.0
@@ -11,2 +11,3 @@ /// <reference types="node" /> | ||
import { Metadata } from './metadata'; | ||
import { Server } from './server'; | ||
import { KeyCertPair, ServerCredentials } from './server-credentials'; | ||
@@ -53,3 +54,3 @@ import { StatusBuilder } from './status-builder'; | ||
export declare const setLogVerbosity: (verbosity: LogVerbosity) => void; | ||
export declare const Server: (options: any) => never; | ||
export { Server }; | ||
export { ServerCredentials }; | ||
@@ -56,0 +57,0 @@ export { KeyCertPair }; |
@@ -39,2 +39,4 @@ "use strict"; | ||
exports.Metadata = metadata_1.Metadata; | ||
const server_1 = require("./server"); | ||
exports.Server = server_1.Server; | ||
const server_credentials_1 = require("./server-credentials"); | ||
@@ -137,5 +139,2 @@ exports.ServerCredentials = server_credentials_1.ServerCredentials; | ||
}; | ||
exports.Server = (options) => { | ||
throw new Error('Not yet implemented'); | ||
}; | ||
exports.getClientChannel = (client) => { | ||
@@ -142,0 +141,0 @@ return client_1.Client.prototype.getChannel.call(client); |
@@ -105,2 +105,6 @@ /// <reference types="node" /> | ||
private metadataSent; | ||
private canPush; | ||
private isPushPending; | ||
private bufferedMessages; | ||
private messagesToPush; | ||
constructor(stream: http2.ServerHttp2Stream, handler: Handler<RequestType, ResponseType>); | ||
@@ -119,2 +123,5 @@ sendMetadata(customMetadata?: Metadata): void; | ||
setupReadable(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>): void; | ||
consumeUnpushedMessages(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>): boolean; | ||
private pushOrBufferMessage; | ||
private pushMessage; | ||
} |
@@ -78,2 +78,5 @@ "use strict"; | ||
_read(size) { | ||
if (!this.call.consumeUnpushedMessages(this)) { | ||
return; | ||
} | ||
this.call.resume(); | ||
@@ -185,2 +188,6 @@ } | ||
this.metadataSent = false; | ||
this.canPush = false; | ||
this.isPushPending = false; | ||
this.bufferedMessages = []; | ||
this.messagesToPush = []; | ||
this.stream.once('error', (err) => { | ||
@@ -338,32 +345,66 @@ err.code = constants_1.Status.INTERNAL; | ||
const decoder = new stream_decoder_1.StreamDecoder(); | ||
/* This code here is wrong but getting the client working is the priority | ||
* right now and I'm not going to block that on fixing what is currently | ||
* unused code. If multiple messages come in with a single frame, this will | ||
* keep calling readable.push after it has returned false, which should not | ||
* happen. Independent of that, deserializeMessage is asynchronous, which | ||
* means that incoming messages could be reordered when emitted to the | ||
* application. That effect will become more pronounced when compression | ||
* support is added and deserializeMessage takes longer by an amount of | ||
* time dependent on the size of the message. A system like the one in | ||
* call-stream.ts should be added to buffer incoming messages and | ||
* preserve ordering more strongly */ | ||
this.stream.on('data', async (data) => { | ||
const messages = decoder.write(data); | ||
for (const message of messages) { | ||
try { | ||
const deserialized = await this.deserializeMessage(message); | ||
if (!readable.push(deserialized)) { | ||
this.stream.pause(); | ||
} | ||
} | ||
catch (err) { | ||
err.code = constants_1.Status.INTERNAL; | ||
readable.emit('error', err); | ||
} | ||
this.pushOrBufferMessage(readable, message); | ||
} | ||
}); | ||
this.stream.once('end', () => { | ||
readable.push(null); | ||
this.pushOrBufferMessage(readable, null); | ||
}); | ||
} | ||
consumeUnpushedMessages(readable) { | ||
this.canPush = true; | ||
while (this.messagesToPush.length > 0) { | ||
const nextMessage = this.messagesToPush.shift(); | ||
const canPush = readable.push(nextMessage); | ||
if (nextMessage === null || canPush === false) { | ||
this.canPush = false; | ||
break; | ||
} | ||
} | ||
return this.canPush; | ||
} | ||
pushOrBufferMessage(readable, messageBytes) { | ||
if (this.isPushPending) { | ||
this.bufferedMessages.push(messageBytes); | ||
} | ||
else { | ||
this.pushMessage(readable, messageBytes); | ||
} | ||
} | ||
async pushMessage(readable, messageBytes) { | ||
if (messageBytes === null) { | ||
if (this.canPush) { | ||
readable.push(null); | ||
} | ||
else { | ||
this.messagesToPush.push(null); | ||
} | ||
return; | ||
} | ||
this.isPushPending = true; | ||
try { | ||
const deserialized = await this.deserializeMessage(messageBytes); | ||
if (this.canPush) { | ||
if (!readable.push(deserialized)) { | ||
this.canPush = false; | ||
this.stream.pause(); | ||
} | ||
} | ||
else { | ||
this.messagesToPush.push(deserialized); | ||
} | ||
} | ||
catch (err) { | ||
// Ignore any remaining messages when errors occur. | ||
this.bufferedMessages.length = 0; | ||
err.code = constants_1.Status.INTERNAL; | ||
readable.emit('error', err); | ||
} | ||
this.isPushPending = false; | ||
if (this.bufferedMessages.length > 0) { | ||
this.pushMessage(readable, this.bufferedMessages.shift()); | ||
} | ||
} | ||
} | ||
@@ -370,0 +411,0 @@ exports.Http2ServerCallStream = Http2ServerCallStream; |
@@ -7,2 +7,3 @@ import { Deserialize, Serialize, ServiceDefinition } from './make-client'; | ||
private handlers; | ||
private sessions; | ||
private started; | ||
@@ -9,0 +10,0 @@ constructor(options?: object); |
@@ -49,2 +49,3 @@ "use strict"; | ||
this.handlers = new Map(); | ||
this.sessions = new Set(); | ||
this.started = false; | ||
@@ -144,3 +145,16 @@ } | ||
forceShutdown() { | ||
throw new Error('Not yet implemented'); | ||
// Close the server if it is still running. | ||
if (this.http2Server && this.http2Server.listening) { | ||
this.http2Server.close(); | ||
} | ||
this.started = false; | ||
// Always destroy any available sessions. It's possible that one or more | ||
// tryShutdown() calls are in progress. Don't wait on them to finish. | ||
this.sessions.forEach(session => { | ||
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to | ||
// recognize destroy(code) as a valid signature. | ||
// tslint:disable-next-line:no-any | ||
session.destroy(http2.constants.NGHTTP2_CANCEL); | ||
}); | ||
this.sessions.clear(); | ||
} | ||
@@ -169,11 +183,24 @@ register(name, handler, serialize, deserialize, type) { | ||
tryShutdown(callback) { | ||
callback = typeof callback === 'function' ? callback : noop; | ||
if (this.http2Server === null) { | ||
callback(new Error('server is not running')); | ||
return; | ||
let pendingChecks = 0; | ||
function maybeCallback() { | ||
pendingChecks--; | ||
if (pendingChecks === 0) { | ||
callback(); | ||
} | ||
} | ||
this.http2Server.close((err) => { | ||
this.started = false; | ||
callback(err); | ||
// Close the server if necessary. | ||
this.started = false; | ||
if (this.http2Server && this.http2Server.listening) { | ||
pendingChecks++; | ||
this.http2Server.close(maybeCallback); | ||
} | ||
// If any sessions are active, close them gracefully. | ||
pendingChecks += this.sessions.size; | ||
this.sessions.forEach(session => { | ||
session.close(maybeCallback); | ||
}); | ||
// If the server is closed and there are no active sessions, just call back. | ||
if (pendingChecks === 0) { | ||
callback(); | ||
} | ||
} | ||
@@ -223,3 +250,5 @@ addHttp2Port() { | ||
const call = new server_call_1.Http2ServerCallStream(stream, null); | ||
err.code = constants_1.Status.INTERNAL; | ||
if (err.code === undefined) { | ||
err.code = constants_1.Status.INTERNAL; | ||
} | ||
call.sendError(err); | ||
@@ -233,2 +262,3 @@ } | ||
} | ||
this.sessions.add(session); | ||
}); | ||
@@ -235,0 +265,0 @@ } |
{ | ||
"name": "@grpc/grpc-js", | ||
"version": "0.4.3", | ||
"version": "0.5.0", | ||
"description": "gRPC Library for Node - pure JS implementation", | ||
@@ -5,0 +5,0 @@ "homepage": "https://grpc.io/", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
191639
4516