Socket
Socket
Sign inDemoInstall

@grpc/grpc-js

Package Overview
Dependencies
Maintainers
3
Versions
178
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@grpc/grpc-js - npm Package Compare versions

Comparing version 1.4.6 to 1.5.0

src/compression-algorithms.ts

2

build/src/call-stream.js

@@ -456,3 +456,3 @@ "use strict";

code = constants_1.Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
details = 'Bandwidth exhausted or memory limit exceeded';
break;

@@ -459,0 +459,0 @@ case http2.constants.NGHTTP2_INADEQUATE_SECURITY:

/// <reference types="node" />
import { ConnectionOptions } from 'tls';
import { ConnectionOptions, PeerCertificate } from 'tls';
import { CallCredentials } from './call-credentials';
/**
* A certificate as received by the checkServerIdentity callback.
*/
export interface Certificate {
/**
* The raw certificate in DER form.
*/
raw: Buffer;
}
/**
* A callback that will receive the expected hostname and presented peer

@@ -19,3 +10,3 @@ * certificate as parameters. The callback should return an error to

*/
export declare type CheckServerIdentityCallback = (hostname: string, cert: Certificate) => Error | undefined;
export declare type CheckServerIdentityCallback = (hostname: string, cert: PeerCertificate) => Error | undefined;
/**

@@ -22,0 +13,0 @@ * Additional peer verification options that can be set when creating

@@ -110,7 +110,8 @@ "use strict";

});
this.connectionOptions = { secureContext };
if (verifyOptions && verifyOptions.checkServerIdentity) {
this.connectionOptions.checkServerIdentity = (host, cert) => {
return verifyOptions.checkServerIdentity(host, { raw: cert.raw });
};
this.connectionOptions = {
secureContext
};
// Node asserts that this option is a function, so we cannot pass undefined
if (verifyOptions === null || verifyOptions === void 0 ? void 0 : verifyOptions.checkServerIdentity) {
this.connectionOptions.checkServerIdentity = verifyOptions.checkServerIdentity;
}

@@ -117,0 +118,0 @@ }

@@ -0,1 +1,2 @@

import { CompressionAlgorithms } from './compression-algorithms';
/**

@@ -22,2 +23,3 @@ * An interface that contains options used when initializing a Channel instance.

'grpc.http_connect_creds'?: string;
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
'grpc.enable_channelz'?: number;

@@ -24,0 +26,0 @@ 'grpc-node.max_session_memory'?: number;

@@ -194,3 +194,3 @@ "use strict";

new max_message_size_filter_1.MaxMessageSizeFilterFactory(this.options),
new compression_filter_1.CompressionFilterFactory(this),
new compression_filter_1.CompressionFilterFactory(this, this.options),
]);

@@ -197,0 +197,0 @@ this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));

/// <reference types="node" />
import { Call, WriteObject } from './call-stream';
import { Channel } from './channel';
import { ChannelOptions } from './channel-options';
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Metadata } from './metadata';
declare type SharedCompressionFilterConfig = {
serverSupportedEncodingHeader?: string;
};
export declare class CompressionFilter extends BaseFilter implements Filter {
private sharedFilterConfig;
private sendCompression;
private receiveCompression;
private currentCompressionAlgorithm;
constructor(channelOptions: ChannelOptions, sharedFilterConfig: SharedCompressionFilterConfig);
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>;

@@ -16,4 +23,7 @@ receiveMetadata(metadata: Metadata): Metadata;

private readonly channel;
constructor(channel: Channel);
private readonly options;
private sharedFilterConfig;
constructor(channel: Channel, options: ChannelOptions);
createFilter(callStream: Call): CompressionFilter;
}
export {};

@@ -21,3 +21,9 @@ "use strict";

const zlib = require("zlib");
const compression_algorithms_1 = require("./compression-algorithms");
const constants_1 = require("./constants");
const filter_1 = require("./filter");
const logging = require("./logging");
const isCompressionAlgorithmKey = (key) => {
return typeof key === 'number' && typeof compression_algorithms_1.CompressionAlgorithms[key] === 'string';
};
class CompressionHandler {

@@ -148,6 +154,30 @@ /**

class CompressionFilter extends filter_1.BaseFilter {
constructor() {
super(...arguments);
constructor(channelOptions, sharedFilterConfig) {
var _a;
super();
this.sharedFilterConfig = sharedFilterConfig;
this.sendCompression = new IdentityHandler();
this.receiveCompression = new IdentityHandler();
this.currentCompressionAlgorithm = 'identity';
const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
if (compressionAlgorithmKey !== undefined) {
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
const clientSelectedEncoding = compression_algorithms_1.CompressionAlgorithms[compressionAlgorithmKey];
const serverSupportedEncodings = (_a = sharedFilterConfig.serverSupportedEncodingHeader) === null || _a === void 0 ? void 0 : _a.split(',');
/**
* There are two possible situations here:
* 1) We don't have any info yet from the server about what compression it supports
* In that case we should just use what the client tells us to use
* 2) We've previously received a response from the server including a grpc-accept-encoding header
* In that case we only want to use the encoding chosen by the client if the server supports it
*/
if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
this.currentCompressionAlgorithm = clientSelectedEncoding;
this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
}
}
else {
logging.log(constants_1.LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
}
}
}

@@ -158,2 +188,9 @@ async sendMetadata(metadata) {

headers.set('accept-encoding', 'identity');
// No need to send the header if it's "identity" - behavior is identical; save the bandwidth
if (this.currentCompressionAlgorithm === 'identity') {
headers.remove('grpc-encoding');
}
else {
headers.set('grpc-encoding', this.currentCompressionAlgorithm);
}
return headers;

@@ -170,2 +207,13 @@ }

metadata.remove('grpc-encoding');
/* Check to see if the compression we're using to send messages is supported by the server
* If not, reset the sendCompression filter and have it use the default IdentityHandler */
const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0];
if (serverSupportedEncodingsHeader) {
this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader;
const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
this.sendCompression = new IdentityHandler();
this.currentCompressionAlgorithm = 'identity';
}
}
metadata.remove('grpc-accept-encoding');

@@ -175,2 +223,3 @@ return metadata;

async sendMessage(message) {
var _a;
/* This filter is special. The input message is the bare message bytes,

@@ -180,5 +229,9 @@ * and the output is a framed and possibly compressed message. For this

const resolvedMessage = await message;
const compress = resolvedMessage.flags === undefined
? false
: (resolvedMessage.flags & 2 /* NoCompress */) === 0;
let compress;
if (this.sendCompression instanceof IdentityHandler) {
compress = false;
}
else {
compress = (((_a = resolvedMessage.flags) !== null && _a !== void 0 ? _a : 0) & 2 /* NoCompress */) === 0;
}
return {

@@ -199,7 +252,9 @@ message: await this.sendCompression.writeMessage(resolvedMessage.message, compress),

class CompressionFilterFactory {
constructor(channel) {
constructor(channel, options) {
this.channel = channel;
this.options = options;
this.sharedFilterConfig = {};
}
createFilter(callStream) {
return new CompressionFilter();
return new CompressionFilter(this.options, this.sharedFilterConfig);
}

@@ -206,0 +261,0 @@ }

@@ -6,2 +6,3 @@ /// <reference types="node" />

import { Channel, ChannelImplementation } from './channel';
import { CompressionAlgorithms } from './compression-algorithms';
import { ConnectivityState } from './connectivity-state';

@@ -45,3 +46,3 @@ import { ChannelCredentials } from './channel-credentials';

/**** Constants ****/
export { LogVerbosity as logVerbosity, Status as status, ConnectivityState as connectivityState, Propagate as propagate, };
export { LogVerbosity as logVerbosity, Status as status, ConnectivityState as connectivityState, Propagate as propagate, CompressionAlgorithms as compressionAlgorithms };
/**** Client ****/

@@ -71,3 +72,3 @@ export { Client, ClientOptions, loadPackageDefinition, makeClientConstructor, makeClientConstructor as makeGenericClientConstructor, CallProperties, CallInvocationTransformer, ChannelImplementation as Channel, Channel as ChannelInterface, UnaryCallback as requestCallback, };

export { Requester, ListenerBuilder, RequesterBuilder, Interceptor, InterceptorOptions, InterceptorProvider, InterceptingCall, InterceptorConfigurationError, } from './client-interceptors';
export { GrpcObject } from './make-client';
export { GrpcObject, ServiceClientConstructor, ProtobufTypeDefinition } from './make-client';
export { ChannelOptions } from './channel-options';

@@ -74,0 +75,0 @@ export { getChannelzServiceDefinition, getChannelzHandlers } from './channelz';

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.experimental = exports.StatusBuilder = exports.getClientChannel = exports.ServerCredentials = exports.Server = exports.setLogVerbosity = exports.setLogger = exports.load = exports.loadObject = exports.CallCredentials = exports.ChannelCredentials = exports.waitForClientReady = exports.closeClient = exports.Channel = exports.makeGenericClientConstructor = exports.makeClientConstructor = exports.loadPackageDefinition = exports.Client = exports.propagate = exports.connectivityState = exports.status = exports.logVerbosity = exports.Metadata = exports.credentials = void 0;
exports.experimental = exports.StatusBuilder = exports.getClientChannel = exports.ServerCredentials = exports.Server = exports.setLogVerbosity = exports.setLogger = exports.load = exports.loadObject = exports.CallCredentials = exports.ChannelCredentials = exports.waitForClientReady = exports.closeClient = exports.Channel = exports.makeGenericClientConstructor = exports.makeClientConstructor = exports.loadPackageDefinition = exports.Client = exports.compressionAlgorithms = exports.propagate = exports.connectivityState = exports.status = exports.logVerbosity = exports.Metadata = exports.credentials = void 0;
const call_credentials_1 = require("./call-credentials");

@@ -25,2 +25,4 @@ Object.defineProperty(exports, "CallCredentials", { enumerable: true, get: function () { return call_credentials_1.CallCredentials; } });

Object.defineProperty(exports, "Channel", { enumerable: true, get: function () { return channel_1.ChannelImplementation; } });
const compression_algorithms_1 = require("./compression-algorithms");
Object.defineProperty(exports, "compressionAlgorithms", { enumerable: true, get: function () { return compression_algorithms_1.CompressionAlgorithms; } });
const connectivity_state_1 = require("./connectivity-state");

@@ -27,0 +29,0 @@ Object.defineProperty(exports, "connectivityState", { enumerable: true, get: function () { return connectivity_state_1.ConnectivityState; } });

@@ -45,3 +45,3 @@ /// <reference types="node" />

cancelled: boolean;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, deserialize: Deserialize<RequestType>);
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, deserialize: Deserialize<RequestType>, encoding: string);
_read(size: number): void;

@@ -74,3 +74,3 @@ getPeer(): string;

private trailingMetadata;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, serialize: Serialize<ResponseType>, deserialize: Deserialize<RequestType>);
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, serialize: Serialize<ResponseType>, deserialize: Deserialize<RequestType>, encoding: string);
getPeer(): string;

@@ -134,5 +134,6 @@ sendMetadata(responseMetadata: Metadata): void;

private checkCancelled;
private getDecompressedMessage;
sendMetadata(customMetadata?: Metadata): void;
receiveMetadata(headers: http2.IncomingHttpHeaders): Metadata | undefined;
receiveUnaryMessage(): Promise<RequestType>;
receiveMetadata(headers: http2.IncomingHttpHeaders): Metadata;
receiveUnaryMessage(encoding: string): Promise<RequestType>;
serializeMessage(value: ResponseType): Buffer;

@@ -146,3 +147,3 @@ deserializeMessage(bytes: Buffer): RequestType;

setupSurfaceCall(call: ServerSurfaceCall): void;
setupReadable(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>): void;
setupReadable(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>, encoding: string): void;
consumeUnpushedMessages(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>): boolean;

@@ -149,0 +150,0 @@ private pushOrBufferMessage;

@@ -23,2 +23,3 @@ "use strict";

const stream_1 = require("stream");
const zlib = require("zlib");
const constants_1 = require("./constants");

@@ -49,3 +50,3 @@ const metadata_1 = require("./metadata");

// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity',
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',

@@ -79,3 +80,3 @@ [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,

class ServerReadableStreamImpl extends stream_1.Readable {
constructor(call, metadata, deserialize) {
constructor(call, metadata, deserialize, encoding) {
super({ objectMode: true });

@@ -87,3 +88,3 @@ this.call = call;

this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);
}

@@ -165,3 +166,3 @@ _read(size) {

class ServerDuplexStreamImpl extends stream_1.Duplex {
constructor(call, metadata, serialize, deserialize) {
constructor(call, metadata, serialize, deserialize, encoding) {
super({ objectMode: true });

@@ -175,3 +176,3 @@ this.call = call;

this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);
this.on('error', (err) => {

@@ -261,2 +262,48 @@ this.call.sendError(err);

}
getDecompressedMessage(message, encoding) {
switch (encoding) {
case 'deflate': {
return new Promise((resolve, reject) => {
zlib.inflate(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: constants_1.Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
}
else {
resolve(output);
}
});
});
}
case 'gzip': {
return new Promise((resolve, reject) => {
zlib.unzip(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: constants_1.Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
}
else {
resolve(output);
}
});
});
}
case 'identity': {
return Promise.resolve(message.slice(5));
}
default: {
this.sendError({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
return Promise.resolve();
}
}
}
sendMetadata(customMetadata) {

@@ -285,3 +332,3 @@ if (this.checkCancelled()) {

this.sendError(err);
return;
return metadata;
}

@@ -298,7 +345,6 @@ const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;

metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
metadata.remove('grpc-encoding');
metadata.remove('grpc-accept-encoding');
return metadata;
}
receiveUnaryMessage() {
receiveUnaryMessage(encoding) {
return new Promise((resolve, reject) => {

@@ -324,3 +370,13 @@ const stream = this.stream;

this.emit('receiveMessage');
resolve(this.deserializeMessage(requestBytes));
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) {
resolve();
}
else {
resolve(this.deserializeMessage(decompressedMessage));
}
}

@@ -346,5 +402,3 @@ catch (err) {

deserializeMessage(bytes) {
// TODO(cjihrig): Call compression aware deserializeMessage().
const receivedMessage = bytes.slice(5);
return this.handler.deserialize(receivedMessage);
return this.handler.deserialize(bytes);
}

@@ -444,6 +498,17 @@ async sendUnaryMessage(err, value, metadata, flags) {

}
setupReadable(readable) {
setupReadable(readable, encoding) {
const decoder = new stream_decoder_1.StreamDecoder();
let readsDone = false;
let pendingMessageProcessing = false;
let pushedEnd = false;
const maybePushEnd = () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
this.pushOrBufferMessage(readable, null);
}
};
this.stream.on('data', async (data) => {
const messages = decoder.write(data);
pendingMessageProcessing = true;
this.stream.pause();
for (const message of messages) {

@@ -459,7 +524,18 @@ if (this.maxReceiveMessageSize !== -1 &&

this.emit('receiveMessage');
this.pushOrBufferMessage(readable, message);
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage)
return;
this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
this.stream.resume();
maybePushEnd();
});
this.stream.once('end', () => {
this.pushOrBufferMessage(readable, null);
readsDone = true;
maybePushEnd();
});

@@ -489,2 +565,3 @@ }

if (messageBytes === null) {
trace('Received end of stream');
if (this.canPush) {

@@ -498,2 +575,3 @@ readable.push(null);

}
trace('Received message of length ' + messageBytes.length);
this.isPushPending = true;

@@ -500,0 +578,0 @@ try {

@@ -530,2 +530,3 @@ "use strict";

http2Server.on('stream', (stream, headers) => {
var _a;
const channelzSessionInfo = this.sessions.get(stream.session);

@@ -597,14 +598,16 @@ this.callTracker.addCallStarted();

const metadata = call.receiveMetadata(headers);
const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
metadata.remove('grpc-encoding');
switch (handler.type) {
case 'unary':
handleUnary(call, handler, metadata);
handleUnary(call, handler, metadata, encoding);
break;
case 'clientStream':
handleClientStreaming(call, handler, metadata);
handleClientStreaming(call, handler, metadata, encoding);
break;
case 'serverStream':
handleServerStreaming(call, handler, metadata);
handleServerStreaming(call, handler, metadata, encoding);
break;
case 'bidi':
handleBidiStreaming(call, handler, metadata);
handleBidiStreaming(call, handler, metadata, encoding);
break;

@@ -660,4 +663,4 @@ default:

exports.Server = Server;
async function handleUnary(call, handler, metadata) {
const request = await call.receiveUnaryMessage();
async function handleUnary(call, handler, metadata, encoding) {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {

@@ -671,4 +674,4 @@ return;

}
function handleClientStreaming(call, handler, metadata) {
const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize);
function handleClientStreaming(call, handler, metadata, encoding) {
const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
function respond(err, value, trailer, flags) {

@@ -684,4 +687,4 @@ stream.destroy();

}
async function handleServerStreaming(call, handler, metadata) {
const request = await call.receiveUnaryMessage();
async function handleServerStreaming(call, handler, metadata, encoding) {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {

@@ -693,4 +696,4 @@ return;

}
function handleBidiStreaming(call, handler, metadata) {
const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize);
function handleBidiStreaming(call, handler, metadata, encoding) {
const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
if (call.cancelled) {

@@ -697,0 +700,0 @@ return;

{
"name": "@grpc/grpc-js",
"version": "1.4.6",
"version": "1.5.0",
"description": "gRPC Library for Node - pure JS implementation",

@@ -24,3 +24,3 @@ "homepage": "https://grpc.io/",

"@types/pify": "^3.0.2",
"@types/yargs": "^15.0.5",
"@types/semver": "^7.3.9",
"clang-format": "^1.0.55",

@@ -37,5 +37,5 @@ "execa": "^2.0.3",

"rimraf": "^3.0.2",
"semver": "^7.3.5",
"ts-node": "^8.3.0",
"typescript": "^3.7.2",
"yargs": "^15.4.1"
"typescript": "^3.7.2"
},

@@ -49,3 +49,3 @@ "contributors": [

"build": "npm run compile",
"clean": "node -e 'require(\"rimraf\")(\"./build\", () => {})'",
"clean": "rimraf ./build",
"compile": "tsc -p .",

@@ -58,5 +58,6 @@ "format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts",

"fix": "gts fix src/*.ts",
"pretest": "npm run generate-types && npm run compile",
"pretest": "npm run generate-types && npm run generate-test-types && npm run compile",
"posttest": "npm run check && madge -c ./build/src",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ -O src/generated/ --grpcLib ../index channelz.proto"
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ --include-dirs test/fixtures/ -O src/generated/ --grpcLib ../index channelz.proto",
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto"
},

@@ -63,0 +64,0 @@ "dependencies": {

@@ -39,3 +39,25 @@ # Pure JavaScript gRPC Client

- If you have a server and you are using `Server#bind` to bind ports, you will need to use `Server#bindAsync` instead.
- If you are using any channel options supported in `grpc` but not supported in `@grpc/grpc-js`, you may need to adjust your code to handle the different behavior. Refer to [the list of supported options](#supported-channel-options) below.
- Refer to the [detailed package comparison](https://github.com/grpc/grpc-node/blob/master/PACKAGE-COMPARISON.md) for more details on the differences between `grpc` and `@grpc/grpc-js`.
## Supported Channel Options
Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. The channel arguments supported by `@grpc/grpc-js` are:
- `grpc.ssl_target_name_override`
- `grpc.primary_user_agent`
- `grpc.secondary_user_agent`
- `grpc.default_authority`
- `grpc.keepalive_time_ms`
- `grpc.keepalive_timeout_ms`
- `grpc.keepalive_permit_without_calls`
- `grpc.service_config`
- `grpc.max_concurrent_streams`
- `grpc.initial_reconnect_backoff_ms`
- `grpc.max_reconnect_backoff_ms`
- `grpc.use_local_subchannel_pool`
- `grpc.max_send_message_length`
- `grpc.max_receive_message_length`
- `grpc.enable_http_proxy`
- `channelOverride`
- `channelFactoryOverride`
## Some Notes on API Guarantees

@@ -42,0 +64,0 @@

@@ -627,3 +627,3 @@ /*

code = Status.RESOURCE_EXHAUSTED;
details = 'Bandwidth exhausted';
details = 'Bandwidth exhausted or memory limit exceeded';
break;

@@ -630,0 +630,0 @@ case http2.constants.NGHTTP2_INADEQUATE_SECURITY:

@@ -31,12 +31,2 @@ /*

/**
* A certificate as received by the checkServerIdentity callback.
*/
export interface Certificate {
/**
* The raw certificate in DER form.
*/
raw: Buffer;
}
/**
* A callback that will receive the expected hostname and presented peer

@@ -49,3 +39,3 @@ * certificate as parameters. The callback should return an error to

hostname: string,
cert: Certificate
cert: PeerCertificate
) => Error | undefined;

@@ -197,10 +187,8 @@

});
this.connectionOptions = { secureContext };
if (verifyOptions && verifyOptions.checkServerIdentity) {
this.connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
) => {
return verifyOptions.checkServerIdentity!(host, { raw: cert.raw });
};
this.connectionOptions = {
secureContext
};
// Node asserts that this option is a function, so we cannot pass undefined
if (verifyOptions?.checkServerIdentity) {
this.connectionOptions.checkServerIdentity = verifyOptions.checkServerIdentity;
}

@@ -207,0 +195,0 @@ }

@@ -18,2 +18,4 @@ /*

import { CompressionAlgorithms } from './compression-algorithms';
/**

@@ -40,2 +42,3 @@ * An interface that contains options used when initializing a Channel instance.

'grpc.http_connect_creds'?: string;
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
'grpc.enable_channelz'?: number;

@@ -42,0 +45,0 @@ 'grpc-node.max_session_memory'?: number;

@@ -48,3 +48,2 @@ /*

import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';
import { Filter } from './filter';

@@ -337,3 +336,3 @@

new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this),
new CompressionFilterFactory(this, this.options),
]);

@@ -340,0 +339,0 @@ this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));

@@ -20,7 +20,21 @@ /*

import { Call, WriteFlags, WriteObject } from './call-stream';
import { Call, WriteObject, WriteFlags } from './call-stream';
import { Channel } from './channel';
import { ChannelOptions } from './channel-options';
import { CompressionAlgorithms } from './compression-algorithms';
import { LogVerbosity } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
import * as logging from './logging';
import { Metadata, MetadataValue } from './metadata';
const isCompressionAlgorithmKey = (key: number): key is CompressionAlgorithms => {
return typeof key === 'number' && typeof CompressionAlgorithms[key] === 'string';
}
type CompressionAlgorithm = keyof typeof CompressionAlgorithms;
type SharedCompressionFilterConfig = {
serverSupportedEncodingHeader?: string;
};
abstract class CompressionHandler {

@@ -171,2 +185,29 @@ protected abstract compressMessage(message: Buffer): Promise<Buffer>;

private receiveCompression: CompressionHandler = new IdentityHandler();
private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) {
super();
const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
if (compressionAlgorithmKey !== undefined) {
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm;
const serverSupportedEncodings = sharedFilterConfig.serverSupportedEncodingHeader?.split(',');
/**
* There are two possible situations here:
* 1) We don't have any info yet from the server about what compression it supports
* In that case we should just use what the client tells us to use
* 2) We've previously received a response from the server including a grpc-accept-encoding header
* In that case we only want to use the encoding chosen by the client if the server supports it
*/
if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
this.currentCompressionAlgorithm = clientSelectedEncoding;
this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
}
} else {
logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
}
}
}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {

@@ -176,2 +217,10 @@ const headers: Metadata = await metadata;

headers.set('accept-encoding', 'identity');
// No need to send the header if it's "identity" - behavior is identical; save the bandwidth
if (this.currentCompressionAlgorithm === 'identity') {
headers.remove('grpc-encoding');
} else {
headers.set('grpc-encoding', this.currentCompressionAlgorithm);
}
return headers;

@@ -189,2 +238,15 @@ }

metadata.remove('grpc-encoding');
/* Check to see if the compression we're using to send messages is supported by the server
* If not, reset the sendCompression filter and have it use the default IdentityHandler */
const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined;
if (serverSupportedEncodingsHeader) {
this.sharedFilterConfig.serverSupportedEncodingHeader = serverSupportedEncodingsHeader;
const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');
if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)) {
this.sendCompression = new IdentityHandler();
this.currentCompressionAlgorithm = 'identity';
}
}
metadata.remove('grpc-accept-encoding');

@@ -199,6 +261,9 @@ return metadata;

const resolvedMessage: WriteObject = await message;
const compress =
resolvedMessage.flags === undefined
? false
: (resolvedMessage.flags & WriteFlags.NoCompress) === 0;
let compress: boolean;
if (this.sendCompression instanceof IdentityHandler) {
compress = false;
} else {
compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
}
return {

@@ -224,6 +289,7 @@ message: await this.sendCompression.writeMessage(

implements FilterFactory<CompressionFilter> {
constructor(private readonly channel: Channel) {}
private sharedFilterConfig: SharedCompressionFilterConfig = {};
constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {}
createFilter(callStream: Call): CompressionFilter {
return new CompressionFilter();
return new CompressionFilter(this.options, this.sharedFilterConfig);
}
}

@@ -28,2 +28,3 @@ /*

import { Channel, ChannelImplementation } from './channel';
import { CompressionAlgorithms } from './compression-algorithms';
import { ConnectivityState } from './connectivity-state';

@@ -46,3 +47,5 @@ import { ChannelCredentials } from './channel-credentials';

MethodDefinition,
ProtobufTypeDefinition,
Serialize,
ServiceClientConstructor,
ServiceDefinition,

@@ -129,2 +132,3 @@ } from './make-client';

Propagate as propagate,
CompressionAlgorithms as compressionAlgorithms
// TODO: Other constants as well

@@ -251,3 +255,7 @@ };

export { GrpcObject } from './make-client';
export {
GrpcObject,
ServiceClientConstructor,
ProtobufTypeDefinition
} from './make-client';

@@ -254,0 +262,0 @@ export { ChannelOptions } from './channel-options';

@@ -21,2 +21,3 @@ /*

import { Duplex, Readable, Writable } from 'stream';
import * as zlib from 'zlib';

@@ -64,3 +65,3 @@ import { Deadline, StatusObject } from './call-stream';

// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity',
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',

@@ -141,3 +142,4 @@ [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,

public metadata: Metadata,
public deserialize: Deserialize<RequestType>
public deserialize: Deserialize<RequestType>,
encoding: string
) {

@@ -147,3 +149,3 @@ super({ objectMode: true });

this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);
}

@@ -257,3 +259,4 @@

public serialize: Serialize<ResponseType>,
public deserialize: Deserialize<RequestType>
public deserialize: Deserialize<RequestType>,
encoding: string
) {

@@ -264,3 +267,3 @@ super({ objectMode: true });

this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);

@@ -447,2 +450,50 @@ this.on('error', (err) => {

private getDecompressedMessage(message: Buffer, encoding: string) {
switch (encoding) {
case 'deflate': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.inflate(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
resolve(output);
}
});
});
}
case 'gzip': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.unzip(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
resolve(output);
}
});
});
}
case 'identity': {
return Promise.resolve(message.slice(5));
}
default: {
this.sendError({
code: Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
return Promise.resolve();
}
}
}
sendMetadata(customMetadata?: Metadata) {

@@ -478,3 +529,3 @@ if (this.checkCancelled()) {

this.sendError(err);
return;
return metadata;
}

@@ -494,3 +545,2 @@

metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
metadata.remove('grpc-encoding');
metadata.remove('grpc-accept-encoding');

@@ -501,3 +551,3 @@

receiveUnaryMessage(): Promise<RequestType> {
receiveUnaryMessage(encoding: string): Promise<RequestType> {
return new Promise((resolve, reject) => {

@@ -528,3 +578,15 @@ const stream = this.stream;

this.emit('receiveMessage');
resolve(this.deserializeMessage(requestBytes));
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) {
resolve();
}
else {
resolve(this.deserializeMessage(decompressedMessage));
}
} catch (err) {

@@ -552,6 +614,3 @@ err.code = Status.INTERNAL;

deserializeMessage(bytes: Buffer) {
// TODO(cjihrig): Call compression aware deserializeMessage().
const receivedMessage = bytes.slice(5);
return this.handler.deserialize(receivedMessage);
return this.handler.deserialize(bytes);
}

@@ -687,9 +746,25 @@

| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
encoding: string
) {
const decoder = new StreamDecoder();
let readsDone = false;
let pendingMessageProcessing = false;
let pushedEnd = false;
const maybePushEnd = () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
this.pushOrBufferMessage(readable, null);
}
}
this.stream.on('data', async (data: Buffer) => {
const messages = decoder.write(data);
pendingMessageProcessing = true;
this.stream.pause();
for (const message of messages) {

@@ -707,8 +782,21 @@ if (

this.emit('receiveMessage');
this.pushOrBufferMessage(readable, message);
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) return;
this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
this.stream.resume();
maybePushEnd();
});
this.stream.once('end', () => {
this.pushOrBufferMessage(readable, null);
readsDone = true;
maybePushEnd();
});

@@ -757,2 +845,3 @@ }

if (messageBytes === null) {
trace('Received end of stream');
if (this.canPush) {

@@ -767,2 +856,4 @@ readable.push(null);

trace('Received message of length ' + messageBytes.length);
this.isPushPending = true;

@@ -769,0 +860,0 @@

@@ -799,6 +799,9 @@ /*

}
const metadata: Metadata = call.receiveMetadata(headers) as Metadata;
const metadata = call.receiveMetadata(headers);
const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
metadata.remove('grpc-encoding');
switch (handler.type) {
case 'unary':
handleUnary(call, handler as UntypedUnaryHandler, metadata);
handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
break;

@@ -809,3 +812,4 @@ case 'clientStream':

handler as UntypedClientStreamingHandler,
metadata
metadata,
encoding
);

@@ -817,3 +821,4 @@ break;

handler as UntypedServerStreamingHandler,
metadata
metadata,
encoding
);

@@ -825,3 +830,4 @@ break;

handler as UntypedBidiStreamingHandler,
metadata
metadata,
encoding
);

@@ -886,5 +892,6 @@ break;

handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata
metadata: Metadata,
encoding: string
): Promise<void> {
const request = await call.receiveUnaryMessage();
const request = await call.receiveUnaryMessage(encoding);

@@ -917,3 +924,4 @@ if (request === undefined || call.cancelled) {

handler: ClientStreamingHandler<RequestType, ResponseType>,
metadata: Metadata
metadata: Metadata,
encoding: string
): void {

@@ -923,3 +931,4 @@ const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(

metadata,
handler.deserialize
handler.deserialize,
encoding
);

@@ -948,5 +957,6 @@

handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata
metadata: Metadata,
encoding: string
): Promise<void> {
const request = await call.receiveUnaryMessage();
const request = await call.receiveUnaryMessage(encoding);

@@ -970,3 +980,4 @@ if (request === undefined || call.cancelled) {

handler: BidiStreamingHandler<RequestType, ResponseType>,
metadata: Metadata
metadata: Metadata,
encoding: string
): void {

@@ -977,3 +988,4 @@ const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(

handler.serialize,
handler.deserialize
handler.deserialize,
encoding
);

@@ -980,0 +992,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc