grpc-server-js
Advanced tools
Comparing version 0.1.4 to 0.1.5
@@ -7,2 +7,5 @@ 'use strict'; | ||
const kCall = Symbol('call'); | ||
const kReadableState = Symbol('readableState'); | ||
const kReadablePushOrBufferMessage = Symbol('readablePushOrBufferMessage'); | ||
const kReadablePushMessage = Symbol('readablePushMessage'); | ||
@@ -30,2 +33,15 @@ | ||
_read (size) { | ||
this[kReadableState].canPush = true; | ||
const { messagesToPush } = this[kReadableState]; | ||
while (messagesToPush.length > 0) { | ||
const nextMessage = messagesToPush.shift(); | ||
const canPush = this.push(nextMessage); | ||
if (nextMessage === null || canPush === false) { | ||
this[kReadableState].canPush = false; | ||
return; | ||
} | ||
} | ||
this.call.resume(); | ||
@@ -45,2 +61,5 @@ } | ||
ServerReadableStream.prototype.getPeer = getPeer; | ||
ServerReadableStream.prototype[kReadablePushOrBufferMessage] = | ||
readablePushOrBufferMessage; | ||
ServerReadableStream.prototype[kReadablePushMessage] = readablePushMessage; | ||
@@ -117,2 +136,6 @@ | ||
ServerReadableStream.prototype.deserialize; | ||
ServerDuplexStream.prototype[kReadablePushOrBufferMessage] = | ||
ServerReadableStream.prototype[kReadablePushOrBufferMessage]; | ||
ServerDuplexStream.prototype[kReadablePushMessage] = | ||
ServerReadableStream.prototype[kReadablePushMessage]; | ||
@@ -148,2 +171,9 @@ | ||
stream[kReadableState] = { | ||
canPush: false, // Can data be pushed to the readable stream. | ||
isPushPending: false, // Is an asynchronous push operation in progress. | ||
bufferedMessages: [], // Messages that have not been deserialized yet. | ||
messagesToPush: [] // Deserialized messages not yet pushed to the stream. | ||
}; | ||
stream.once('cancelled', () => { | ||
@@ -153,24 +183,70 @@ stream.destroy(); | ||
stream.call.on('data', async (data) => { | ||
const message = decoder.write(data); | ||
stream.call.on('data', (data) => { | ||
// It's possible that more than one message arrives in a single 'data' | ||
// event. pushOrBufferMessage() ensures that only a single message is | ||
// actually processed at a time, because the deserialization process is | ||
// asynchronous, and can lead to out of order messages. | ||
const messages = decoder.write(data); | ||
if (message === null) { | ||
return; | ||
for (let i = 0; i < messages.length; i++) { | ||
stream[kReadablePushOrBufferMessage](messages[i]); | ||
} | ||
}); | ||
try { | ||
const deserialized = await stream[kCall].deserializeMessage(message); | ||
stream.call.once('end', () => { | ||
stream[kReadablePushOrBufferMessage](null); | ||
}); | ||
} | ||
if (!stream.push(deserialized)) { | ||
stream.call.pause(); | ||
function readablePushOrBufferMessage (messageBytes) { | ||
const { bufferedMessages, isPushPending } = this[kReadableState]; | ||
if (isPushPending === true) { | ||
bufferedMessages.push(messageBytes); | ||
} else { | ||
this[kReadablePushMessage](messageBytes); | ||
} | ||
} | ||
async function readablePushMessage (messageBytes) { | ||
const { bufferedMessages, messagesToPush } = this[kReadableState]; | ||
if (messageBytes === null) { | ||
if (this[kReadableState].canPush === true) { | ||
this.push(null); | ||
} else { | ||
messagesToPush.push(null); | ||
} | ||
return; | ||
} | ||
this[kReadableState].isPushPending = true; | ||
try { | ||
const deserialized = await this[kCall].deserializeMessage(messageBytes); | ||
if (this[kReadableState].canPush === true) { | ||
if (!this.push(deserialized)) { | ||
this[kReadableState].canPush = false; | ||
this.call.pause(); | ||
} | ||
} catch (err) { | ||
err.code = Status.INTERNAL; | ||
stream.emit('error', err); | ||
} else { | ||
messagesToPush.push(deserialized); | ||
} | ||
}); | ||
} catch (err) { | ||
// Ignore any remaining messages when errors occur. | ||
bufferedMessages.length = 0; | ||
stream.call.once('end', () => { | ||
stream.push(null); | ||
}); | ||
err.code = Status.INTERNAL; | ||
this.emit('error', err); | ||
} | ||
this[kReadableState].isPushPending = false; | ||
if (bufferedMessages.length > 0) { | ||
this[kReadablePushMessage](bufferedMessages.shift()); | ||
} | ||
} | ||
@@ -177,0 +253,0 @@ |
@@ -19,2 +19,3 @@ 'use strict'; | ||
write (data) { | ||
const result = []; | ||
let readHead = 0; | ||
@@ -53,3 +54,3 @@ let toRead; | ||
this.readState = kNoData; | ||
return message; | ||
result.push(message); | ||
} | ||
@@ -75,3 +76,3 @@ } | ||
this.readState = kNoData; | ||
return framedMessage; | ||
result.push(framedMessage); | ||
} | ||
@@ -84,3 +85,3 @@ break; | ||
return null; | ||
return result; | ||
} | ||
@@ -87,0 +88,0 @@ } |
{ | ||
"name": "grpc-server-js", | ||
"version": "0.1.4", | ||
"version": "0.1.5", | ||
"description": "Pure JavaScript gRPC Server", | ||
@@ -26,3 +26,3 @@ "author": "Colin J. Ihrig <cjihrig@gmail.com> (http://www.cjihrig.com/)", | ||
"dependencies": { | ||
"@grpc/grpc-js": "0.4.2" | ||
"@grpc/grpc-js": "0.4.3" | ||
}, | ||
@@ -29,0 +29,0 @@ "devDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41483
1138
+ Added@grpc/grpc-js@0.4.3(transitive)
- Removed@grpc/grpc-js@0.4.2(transitive)
Updated@grpc/grpc-js@0.4.3