Socket
Socket
Sign inDemoInstall

grpc-server-js

Package Overview
Dependencies
Maintainers
1
Versions
34
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

grpc-server-js - npm Package Compare versions

Comparing version 0.0.9 to 0.0.10

11

lib/compression-filter.js

@@ -200,9 +200,2 @@ 'use strict';

sendHeaders () {
return {
[kGrpcEncodingHeader]: this.send.name,
[kGrpcAcceptEncodingHeader]: this.accepts.join(',')
};
}
receiveMetadata (metadata) {

@@ -244,3 +237,3 @@ const receiveEncoding = metadata.get(kGrpcEncodingHeader);

async serializeMessage (message) { // eslint-disable-line require-await
serializeMessage (message) {
// TODO: Add support for flags (compression) later.

@@ -250,3 +243,3 @@ return this.send.writeMessage(message, false);

async deserializeMessage (message) { // eslint-disable-line require-await
deserializeMessage (message) {
return this.receive.readMessage(message);

@@ -253,0 +246,0 @@ }

35

lib/handler.js

@@ -31,9 +31,6 @@ 'use strict';

this.push(null);
return;
}
this.reading = true;
}
terminate () {
_terminate () {
this.finished = true;

@@ -125,4 +122,4 @@ this.on('data', noop);

ServerReadableStream.prototype.deserialize;
ServerDuplexStream.prototype.terminate =
ServerReadableStream.prototype.terminate;
ServerDuplexStream.prototype._terminate =
ServerReadableStream.prototype._terminate;

@@ -139,3 +136,2 @@

function getPeer () {
// TODO: Implement this. See grpc-native-core/src/server.js
throw new Error('not implemented');

@@ -162,25 +158,20 @@ }

stream.finished = false;
stream.reading = false;
stream.once('cancelled', () => {
stream.terminate();
stream._terminate();
});
//
stream.decoder = new StreamDecoder();
stream.call.on('data', (data) => {
stream.decoder.write(data);
});
stream.call.on('data', async (data) => {
const message = stream.decoder.write(data);
stream.call.once('end', () => {
stream.reading = false;
stream.push(null);
});
if (message === null) {
return;
}
stream.decoder.on('message', async (bytes) => {
try {
const message = await stream[kCall].deserializeMessage(bytes);
const deserialized = await stream[kCall].deserializeMessage(message);
stream.push(message);
stream.push(deserialized);
} catch (err) {

@@ -191,2 +182,6 @@ err.code = Status.INTERNAL;

});
stream.call.once('end', () => {
stream.push(null);
});
}

@@ -193,0 +188,0 @@

@@ -9,2 +9,4 @@ 'use strict';

const kGrpcTimeoutHeader = 'grpc-timeout';
const kGrpcEncodingHeader = 'grpc-encoding';
const kGrpcAcceptEncodingHeader = 'grpc-accept-encoding';
const kDeadlineRegex = /(\d{1,8})\s*([HMSmun])/;

@@ -19,7 +21,9 @@ const deadlineUnitsToMs = {

};
const defaultResponseHeaders = {
[Http2.constants.HTTP2_HEADER_STATUS]: Http2.constants.HTTP_STATUS_OK,
[Http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto'
};
const defaultResponseOptions = { waitForTrailers: true };
const {
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_STATUS,
HTTP_STATUS_OK,
NGHTTP2_CANCEL
} = Http2.constants;

@@ -45,8 +49,19 @@

const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
const headers = Object.assign(this.compression.sendHeaders(),
defaultResponseHeaders, custom);
const headers = {
[kGrpcEncodingHeader]: this.compression.send.name,
[kGrpcAcceptEncodingHeader]: this.compression.accepts.join(','),
[HTTP2_HEADER_STATUS]: HTTP_STATUS_OK,
[HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto'
};
this.stream.once('wantTrailers', onWantTrailers.bind(this));
this.stream.respond(headers, defaultResponseOptions);
if (customMetadata === undefined || customMetadata === null) {
this.stream.respond(headers, defaultResponseOptions);
} else {
this.stream.respond({
...headers,
...customMetadata.toHttp2Headers()
}, defaultResponseOptions);
}
}

@@ -78,27 +93,25 @@

async receiveUnaryMessage () { // eslint-disable-line require-await
return new Promise((resolve, reject) => {
const stream = this.stream;
const chunks = [];
let totalLength = 0;
receiveUnaryMessage (callback) {
const stream = this.stream;
const chunks = [];
let totalLength = 0;
stream.on('data', (data) => {
chunks.push(data);
totalLength += data.byteLength;
});
stream.on('data', (data) => {
chunks.push(data);
totalLength += data.byteLength;
});
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
resolve(await this.deserializeMessage(requestBytes));
} catch (err) {
this.sendError(err, Status.INTERNAL);
resolve();
}
});
callback(null, await this.deserializeMessage(requestBytes));
} catch (err) {
this.sendError(err, Status.INTERNAL);
callback(err, null);
}
});
}
async serializeMessage (value) { // eslint-disable-line require-await
serializeMessage (value) {
const messageBuffer = this.handler.serialize(value);

@@ -199,3 +212,3 @@

// `this` is bound to the Call instance, not the stream itself.
if (this.stream.rstCode === Http2.constants.NGHTTP2_CANCEL) {
if (this.stream.rstCode === NGHTTP2_CANCEL) {
this.cancelled = true;

@@ -215,4 +228,4 @@ this.emit('cancelled', 'cancelled');

if (this.status.metadata) {
trailersToSend = Object.assign(trailersToSend, metadata.toHttp2Headers());
if (this.status.metadata !== null) {
trailersToSend = { ...trailersToSend, ...metadata.toHttp2Headers() };
}

@@ -229,9 +242,1 @@

}
// TODO: Make sure everything is cleaned up afterwards.
// function cleanup (call) {
// call.deadline = null;
// remove onStreamError
// remote stream wantTrailers handler
// }

@@ -16,2 +16,7 @@ 'use strict';

const kStarted = Symbol('started');
const kUnaryHandlerType = 0;
const kClientStreamHandlerType = 1;
const kServerStreamHandlerType = 2;
const kBidiHandlerType = 3;
const { HTTP2_HEADER_PATH } = Http2.constants;

@@ -25,16 +30,16 @@

const defaultHandler = {
unary: function (call, callback) {
const defaultHandler = [
function unary (call, callback) {
callback(unimplementedStatusResponse);
},
clientStream: function (call, callback) {
function clientStream (call, callback) {
callback(unimplementedStatusResponse);
},
serverStream: function (call) {
function serverStream (call) {
call.emit('error', unimplementedStatusResponse);
},
bidi: function (call) {
function bidi (call) {
call.emit('error', unimplementedStatusResponse);
}
};
];

@@ -52,3 +57,3 @@

async bind (port, creds) { // eslint-disable-line require-await
bind (port, creds) {
if (this[kStarted] === true) {

@@ -123,11 +128,11 @@ throw new Error('server is already started');

if (attrs.responseStream) {
methodType = 'bidi';
methodType = kBidiHandlerType;
} else {
methodType = 'clientStream';
methodType = kClientStreamHandlerType;
}
} else {
if (attrs.responseStream) {
methodType = 'serverStream';
methodType = kServerStreamHandlerType;
} else {
methodType = 'unary';
methodType = kUnaryHandlerType;
}

@@ -178,3 +183,2 @@ }

forceShutdown () { // eslint-disable-line class-methods-use-this
// TODO: Implement this.
throw new Error('not implemented');

@@ -187,8 +191,8 @@ }

const handlerTypes = {
unary: handleUnary,
clientStream: handleClientStreaming,
serverStream: handleServerStreaming,
bidi: handleBidiStreaming
};
const handlerTypes = [
handleUnary,
handleClientStreaming,
handleServerStreaming,
handleBidiStreaming
];

@@ -206,3 +210,3 @@

try {
const path = headers[Http2.constants.HTTP2_HEADER_PATH];
const path = headers[HTTP2_HEADER_PATH];
const handler = grpcServer[kHandlers].get(path);

@@ -225,13 +229,12 @@

async function handleUnary (call, handler, metadata) {
const emitter = new ServerUnaryCall(call, metadata);
const request = await call.receiveUnaryMessage();
function handleUnary (call, handler, metadata) {
call.receiveUnaryMessage((err, request) => {
if (err !== null || call.cancelled === true) {
return;
}
if (request === undefined || call.cancelled === true) {
return;
}
const emitter = new ServerUnaryCall(call, metadata);
emitter.request = request;
handler.func(emitter, (err, value, trailer, flags) => {
call.sendUnaryMessage(err, value, trailer, flags);
emitter.request = request;
handler.func(emitter, call.sendUnaryMessage.bind(call));
});

@@ -245,3 +248,3 @@ }

function respond (err, value, trailer, flags) {
stream.terminate();
stream._terminate();
call.sendUnaryMessage(err, value, trailer, flags);

@@ -259,13 +262,13 @@ }

async function handleServerStreaming (call, handler, metadata) {
const request = await call.receiveUnaryMessage();
function handleServerStreaming (call, handler, metadata) {
call.receiveUnaryMessage((err, request) => {
if (err !== null || call.cancelled === true) {
return;
}
if (request === undefined || call.cancelled === true) {
return;
}
const stream = new ServerWritableStream(call, metadata);
const stream = new ServerWritableStream(call, metadata);
stream.request = request;
handler.func(stream);
stream.request = request;
handler.func(stream);
});
}

@@ -272,0 +275,0 @@

'use strict';
const EventEmitter = require('events');
const kNoData = 1;
const kReadingSize = 2;
const kReadingMessage = 3;
const ReadState = {
NO_DATA: 1,
READING_SIZE: 2,
READING_MESSAGE: 3
};
class StreamDecoder extends EventEmitter {
class StreamDecoder {
constructor () {
super();
this.readState = ReadState.NO_DATA;
this.readState = kNoData;
this.readCompressFlag = Buffer.alloc(1);

@@ -30,6 +24,6 @@ this.readPartialSize = Buffer.alloc(4);

switch (this.readState) {
case ReadState.NO_DATA :
case kNoData :
this.readCompressFlag = data.slice(readHead, readHead + 1);
readHead += 1;
this.readState = ReadState.READING_SIZE;
this.readState = kReadingSize;
this.readPartialSize.fill(0);

@@ -41,3 +35,3 @@ this.readSizeRemaining = 4;

break;
case ReadState.READING_SIZE :
case kReadingSize :
toRead = Math.min(data.length - readHead, this.readSizeRemaining);

@@ -54,11 +48,13 @@ data.copy(

if (this.readMessageRemaining > 0) {
this.readState = ReadState.READING_MESSAGE;
this.readState = kReadingMessage;
} else {
this.emit('message', Buffer.concat(
[this.readCompressFlag, this.readPartialSize]));
this.readState = ReadState.NO_DATA;
const message = Buffer.concat(
[this.readCompressFlag, this.readPartialSize], 5);
this.readState = kNoData;
return message;
}
}
break;
case ReadState.READING_MESSAGE :
case kReadingMessage :
toRead =

@@ -78,4 +74,5 @@ Math.min(data.length - readHead, this.readMessageRemaining);

framedMessageBuffers, this.readMessageSize + 5);
this.emit('message', framedMessage);
this.readState = ReadState.NO_DATA;
this.readState = kNoData;
return framedMessage;
}

@@ -87,2 +84,4 @@ break;

}
return null;
}

@@ -89,0 +88,0 @@ }

{
"name": "grpc-server-js",
"version": "0.0.9",
"version": "0.0.10",
"description": "Pure JavaScript gRPC Server",

@@ -28,3 +28,3 @@ "author": "Colin J. Ihrig <cjihrig@gmail.com> (http://www.cjihrig.com/)",

"devDependencies": {
"@grpc/proto-loader": "0.4.0",
"@grpc/proto-loader": "0.5.0",
"belly-button": "5.x.x",

@@ -31,0 +31,0 @@ "cb-barrier": "1.x.x",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc