Socket
Socket
Sign inDemoInstall

@grpc/grpc-js

Package Overview
Dependencies
Maintainers
3
Versions
178
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@grpc/grpc-js - npm Package Compare versions

Comparing version 1.1.3 to 1.1.4

build/src/load-balancer-cds.d.ts

16

build/src/call-stream.js

@@ -112,3 +112,2 @@ "use strict";

outputStatus() {
var _a;
/* Precondition: this.finalStatus !== null */

@@ -118,3 +117,12 @@ if (!this.statusOutput) {

const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus);
(_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
* if the status is not OK, the "error" event may be emitted
* synchronously at the top level, which will result in a thrown error if
* the user does not handle that event. */
process.nextTick(() => {
var _a;
(_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
});
if (this.subchannel) {

@@ -464,2 +472,3 @@ this.subchannel.callUnref();

}
this.trace('close http2 stream with code ' + code);
this.http2Stream.close(code);

@@ -485,3 +494,4 @@ }

getPeer() {
throw new Error('Not yet implemented');
var _a, _b;
return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget();
}

@@ -488,0 +498,0 @@ getMethod() {

8

build/src/call.js

@@ -44,3 +44,3 @@ "use strict";

var _a, _b;
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : '';
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : 'unknown';
}

@@ -60,3 +60,3 @@ }

var _a, _b;
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : '';
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : 'unknown';
}

@@ -80,3 +80,3 @@ _read(_size) {

var _a, _b;
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : '';
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : 'unknown';
}

@@ -113,3 +113,3 @@ _write(chunk, encoding, cb) {

var _a, _b;
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : '';
return (_b = (_a = this.call) === null || _a === void 0 ? void 0 : _a.getPeer()) !== null && _b !== void 0 ? _b : 'unknown';
}

@@ -116,0 +116,0 @@ _read(_size) {

@@ -31,3 +31,2 @@ "use strict";

const resolver_1 = require("./resolver");
const service_config_1 = require("./service-config");
const logging_1 = require("./logging");

@@ -117,11 +116,3 @@ const max_message_size_filter_1 = require("./max-message-size-filter");

};
// TODO(murgatroid99): check channel arg for default service config
let defaultServiceConfig = {
loadBalancingConfig: [],
methodConfig: [],
};
if (options['grpc.service_config']) {
defaultServiceConfig = service_config_1.validateServiceConfig(JSON.parse(options['grpc.service_config']));
}
this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, defaultServiceConfig);
this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, options);
this.filterStackFactory = new filter_stack_1.FilterStackFactory([

@@ -243,2 +234,5 @@ new call_credentials_filter_1.CallCredentialsFilterFactory(this),

break;
case picker_1.PickResultType.DROP:
callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details);
break;
default:

@@ -245,0 +239,0 @@ throw new Error(`Invalid state: unknown pickResultType ${pickResult.pickResultType}`);

@@ -37,4 +37,11 @@ import { LoadBalancer, ChannelControlHelper } from './load-balancer';

private nextPriorityChildNumber;
private clusterDropStats;
constructor(channelControlHelper: ChannelControlHelper);
/**
* Check whether a single call should be dropped according to the current
* policy, based on randomly chosen numbers. Returns the drop category if
* the call should be dropped, and null otherwise.
*/
private checkForDrop;
/**
* Should be called when this balancer gets a new config and when the

@@ -41,0 +48,0 @@ * XdsClient returns a new ClusterLoadAssignment.

@@ -60,3 +60,39 @@ "use strict";

this.nextPriorityChildNumber = 0;
this.childBalancer = new load_balancer_child_handler_1.ChildLoadBalancerHandler(channelControlHelper);
this.clusterDropStats = null;
this.childBalancer = new load_balancer_child_handler_1.ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelArgs) => this.channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs),
requestReresolution: () => this.channelControlHelper.requestReresolution(),
updateState: (connectivityState, originalPicker) => {
if (this.latestEdsUpdate === null) {
return;
}
const edsPicker = {
pick: (pickArgs) => {
var _a;
const dropCategory = this.checkForDrop();
/* If we drop the call, it ends with an UNAVAILABLE status.
* Otherwise, delegate picking the subchannel to the child
* balancer. */
if (dropCategory === null) {
return originalPicker.pick(pickArgs);
}
else {
(_a = this.clusterDropStats) === null || _a === void 0 ? void 0 : _a.addCallDropped(dropCategory);
return {
pickResultType: picker_1.PickResultType.DROP,
status: {
code: constants_1.Status.UNAVAILABLE,
details: `Call dropped by load balancing policy. Category: ${dropCategory}`,
metadata: new metadata_1.Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null,
};
}
},
};
this.channelControlHelper.updateState(connectivityState, edsPicker);
},
});
this.watcher = {

@@ -68,4 +104,5 @@ onValidUpdate: (update) => {

onResourceDoesNotExist: () => {
/* TODO(murgatroid99): Figure out what needs to be done here after
* implementing CDS */
var _a;
(_a = this.xdsClient) === null || _a === void 0 ? void 0 : _a.removeEndpointWatcher(this.edsServiceName, this.watcher);
this.isWatcherActive = false;
},

@@ -84,2 +121,40 @@ onTransientError: (status) => {

/**
* Check whether a single call should be dropped according to the current
* policy, based on randomly chosen numbers. Returns the drop category if
* the call should be dropped, and null otherwise.
*/
checkForDrop() {
var _a;
if (!((_a = this.latestEdsUpdate) === null || _a === void 0 ? void 0 : _a.policy)) {
return null;
}
/* The drop_overloads policy is a list of pairs of category names and
* probabilities. For each one, if the random number is within that
* probability range, we drop the call citing that category. Otherwise, the
* call proceeds as usual. */
for (const dropOverload of this.latestEdsUpdate.policy.drop_overloads) {
if (!dropOverload.drop_percentage) {
continue;
}
let randNum;
switch (dropOverload.drop_percentage.denominator) {
case 'HUNDRED':
randNum = Math.random() * 100;
break;
case 'TEN_THOUSAND':
randNum = Math.random() * 10000;
break;
case 'MILLION':
randNum = Math.random() * 1000000;
break;
default:
continue;
}
if (randNum < dropOverload.drop_percentage.numerator) {
return dropOverload.category;
}
}
return null;
}
/**
* Should be called when this balancer gets a new config and when the

@@ -89,3 +164,3 @@ * XdsClient returns a new ClusterLoadAssignment.

updateChild() {
var _a, _b;
var _a, _b, _c;
if (!(this.lastestConfig && this.latestEdsUpdate)) {

@@ -171,12 +246,30 @@ return;

for (const localityObj of localityArray) {
/* Use the endpoint picking policy from the config, default to
* round_robin. */
const endpointPickingPolicy = [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
];
let childPolicy;
if (this.lastestConfig.eds.lrsLoadReportingServerName) {
childPolicy = [
{
name: 'lrs',
lrs: {
cluster_name: this.lastestConfig.eds.cluster,
eds_service_name: (_c = this.lastestConfig.eds.edsServiceName) !== null && _c !== void 0 ? _c : '',
lrs_load_reporting_server_name: this.lastestConfig.eds
.lrsLoadReportingServerName,
locality: localityObj.locality,
child_policy: endpointPickingPolicy,
},
},
];
}
else {
childPolicy = endpointPickingPolicy;
}
childTargets.set(localityToName(localityObj.locality), {
weight: localityObj.weight,
/* TODO(murgatroid99): Insert an lrs config around the round_robin
* config after implementing lrs */
/* Use the endpoint picking policy from the config, default to
* round_robin. */
child_policy: [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
],
child_policy: childPolicy,
});

@@ -215,3 +308,3 @@ for (const address of localityObj.addresses) {

updateAddressList(addressList, lbConfig, attributes) {
var _a;
var _a, _b;
if (!load_balancing_config_1.isEdsLoadBalancingConfig(lbConfig)) {

@@ -232,6 +325,6 @@ return;

/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
* calling addEndpointWatcher */
this.isWatcherActive = false;
/* If we have a new name, the latestEdsUpdate does not correspond to
* the new config, so it is no longer valid */
* the new config, so it is no longer valid */
this.latestEdsUpdate = null;

@@ -244,2 +337,5 @@ }

}
if (lbConfig.eds.lrsLoadReportingServerName) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(lbConfig.eds.lrsLoadReportingServerName, lbConfig.eds.cluster, (_b = lbConfig.eds.edsServiceName) !== null && _b !== void 0 ? _b : '');
}
/* If updateAddressList is called after receiving an update and the update

@@ -246,0 +342,0 @@ * is still valid, we want to update the child config with the information

@@ -24,2 +24,5 @@ "use strict";

const load_balancer_weighted_target = require("./load-balancer-weighted-target");
const load_balancer_eds = require("./load-balancer-eds");
const load_balancer_cds = require("./load-balancer-cds");
const load_balancer_lrs = require("./load-balancer-lrs");
const registeredLoadBalancerTypes = {};

@@ -57,4 +60,7 @@ function registerLoadBalancerType(typeName, loadBalancerType) {

load_balancer_weighted_target.setup();
load_balancer_eds.setup();
load_balancer_cds.setup();
load_balancer_lrs.setup();
}
exports.registerAll = registerAll;
//# sourceMappingURL=load-balancer.js.map

@@ -0,1 +1,2 @@

import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
export declare type PickFirstConfig = {};

@@ -42,2 +43,12 @@ export declare type RoundRobinConfig = {};

}
export interface CdsLbConfig {
cluster: string;
}
export interface LrsLbConfig {
cluster_name: string;
eds_service_name: string;
lrs_load_reporting_server_name: string;
locality: Locality__Output;
child_policy: LoadBalancingConfig[];
}
export interface PickFirstLoadBalancingConfig {

@@ -71,3 +82,11 @@ name: 'pick_first';

}
export declare type LoadBalancingConfig = PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig | PriorityLoadBalancingConfig | WeightedTargetLoadBalancingConfig | EdsLoadBalancingConfig;
export interface CdsLoadBalancingConfig {
name: 'cds';
cds: CdsLbConfig;
}
export interface LrsLoadBalancingConfig {
name: 'lrs';
lrs: LrsLbConfig;
}
export declare type LoadBalancingConfig = PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig | PriorityLoadBalancingConfig | WeightedTargetLoadBalancingConfig | EdsLoadBalancingConfig | CdsLoadBalancingConfig | LrsLoadBalancingConfig;
export declare function isRoundRobinLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is RoundRobinLoadBalancingConfig;

@@ -79,2 +98,4 @@ export declare function isXdsLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is XdsLoadBalancingConfig;

export declare function isEdsLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is EdsLoadBalancingConfig;
export declare function isCdsLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is CdsLoadBalancingConfig;
export declare function isLrsLoadBalancingConfig(lbconfig: LoadBalancingConfig): lbconfig is LrsLoadBalancingConfig;
export declare function validateConfig(obj: any): LoadBalancingConfig;

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.validateConfig = exports.isEdsLoadBalancingConfig = exports.isWeightedTargetLoadBalancingConfig = exports.isPriorityLoadBalancingConfig = exports.isGrpcLbLoadBalancingConfig = exports.isXdsLoadBalancingConfig = exports.isRoundRobinLoadBalancingConfig = void 0;
exports.validateConfig = exports.isLrsLoadBalancingConfig = exports.isCdsLoadBalancingConfig = exports.isEdsLoadBalancingConfig = exports.isWeightedTargetLoadBalancingConfig = exports.isPriorityLoadBalancingConfig = exports.isGrpcLbLoadBalancingConfig = exports.isXdsLoadBalancingConfig = exports.isRoundRobinLoadBalancingConfig = void 0;
function isRoundRobinLoadBalancingConfig(lbconfig) {

@@ -45,2 +45,10 @@ return lbconfig.name === 'round_robin';

exports.isEdsLoadBalancingConfig = isEdsLoadBalancingConfig;
function isCdsLoadBalancingConfig(lbconfig) {
return lbconfig.name === 'cds';
}
exports.isCdsLoadBalancingConfig = isCdsLoadBalancingConfig;
function isLrsLoadBalancingConfig(lbconfig) {
return lbconfig.name === 'lrs';
}
exports.isLrsLoadBalancingConfig = isLrsLoadBalancingConfig;
/* In these functions we assume the input came from a JSON object. Therefore we

@@ -47,0 +55,0 @@ * expect that the prototype is uninteresting and that `in` can be used

@@ -9,3 +9,4 @@ import { Subchannel } from './subchannel';

QUEUE = 1,
TRANSIENT_FAILURE = 2
TRANSIENT_FAILURE = 2,
DROP = 3
}

@@ -54,2 +55,9 @@ export interface PickResult {

}
export interface DropCallPickResult extends PickResult {
pickResultType: PickResultType.DROP;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
onCallStarted: null;
}
export interface PickArgs {

@@ -56,0 +64,0 @@ metadata: Metadata;

@@ -27,2 +27,3 @@ "use strict";

PickResultType[PickResultType["TRANSIENT_FAILURE"] = 2] = "TRANSIENT_FAILURE";
PickResultType[PickResultType["DROP"] = 3] = "DROP";
})(PickResultType = exports.PickResultType || (exports.PickResultType = {}));

@@ -29,0 +30,0 @@ /**

@@ -59,3 +59,3 @@ "use strict";

class DnsResolver {
constructor(target, listener) {
constructor(target, listener, channelOptions) {
var _a, _b;

@@ -62,0 +62,0 @@ this.target = target;

@@ -21,3 +21,3 @@ "use strict";

class UdsResolver {
constructor(target, listener) {
constructor(target, listener, channelOptions) {
this.listener = listener;

@@ -24,0 +24,0 @@ this.addresses = [];

@@ -5,2 +5,3 @@ import { ServiceConfig } from './service-config';

import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
/**

@@ -43,3 +44,3 @@ * A listener object passed to the resolver's constructor that provides name

export interface ResolverConstructor {
new (target: GrpcUri, listener: ResolverListener): Resolver;
new (target: GrpcUri, listener: ResolverListener, channelOptions: ChannelOptions): Resolver;
/**

@@ -73,3 +74,3 @@ * Get the default authority for a target. This loosely corresponds to that

*/
export declare function createResolver(target: GrpcUri, listener: ResolverListener): Resolver;
export declare function createResolver(target: GrpcUri, listener: ResolverListener, options: ChannelOptions): Resolver;
/**

@@ -76,0 +77,0 @@ * Get the default authority for the specified target, if possible. Throws an

@@ -51,5 +51,5 @@ "use strict";

*/
function createResolver(target, listener) {
function createResolver(target, listener, options) {
if (target.scheme !== undefined && target.scheme in registeredResolvers) {
return new registeredResolvers[target.scheme](target, listener);
return new registeredResolvers[target.scheme](target, listener, options);
}

@@ -56,0 +56,0 @@ else {

import { ChannelControlHelper, LoadBalancer } from './load-balancer';
import { ServiceConfig } from './service-config';
import { LoadBalancingConfig } from './load-balancing-config';
import { SubchannelAddress } from './subchannel';
import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
export declare class ResolvingLoadBalancer implements LoadBalancer {
private readonly target;
private readonly channelControlHelper;
private readonly defaultServiceConfig;
private readonly channelOptions;
/**

@@ -21,2 +21,3 @@ * The resolver class constructed for the target address.

private currentState;
private readonly defaultServiceConfig;
/**

@@ -49,3 +50,3 @@ * The service config object from the last successful resolution, if

*/
constructor(target: GrpcUri, channelControlHelper: ChannelControlHelper, defaultServiceConfig: ServiceConfig | null);
constructor(target: GrpcUri, channelControlHelper: ChannelControlHelper, channelOptions: ChannelOptions);
private updateResolution;

@@ -52,0 +53,0 @@ private updateState;

@@ -21,2 +21,3 @@ "use strict";

const load_balancer_1 = require("./load-balancer");
const service_config_1 = require("./service-config");
const channel_1 = require("./channel");

@@ -50,6 +51,6 @@ const resolver_1 = require("./resolver");

*/
constructor(target, channelControlHelper, defaultServiceConfig) {
constructor(target, channelControlHelper, channelOptions) {
this.target = target;
this.channelControlHelper = channelControlHelper;
this.defaultServiceConfig = defaultServiceConfig;
this.channelOptions = channelOptions;
this.latestChildState = channel_1.ConnectivityState.IDLE;

@@ -72,2 +73,11 @@ this.latestChildPicker = new picker_1.QueuePicker(this);

this.continueResolving = false;
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = service_config_1.validateServiceConfig(JSON.parse(channelOptions['grpc.service_config']));
}
else {
this.defaultServiceConfig = {
loadBalancingConfig: [],
methodConfig: [],
};
}
this.updateState(channel_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this));

@@ -148,3 +158,3 @@ this.childLoadBalancer = new load_balancer_child_handler_1.ChildLoadBalancerHandler({

},
});
}, channelOptions);
this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => {

@@ -151,0 +161,0 @@ if (this.continueResolving) {

@@ -24,4 +24,7 @@ /// <reference types="node" />

request: RequestType | null;
end: (metadata?: Metadata) => void;
};
export declare type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall & ObjectReadable<RequestType> & ObjectWritable<ResponseType>;
export declare type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall & ObjectReadable<RequestType> & ObjectWritable<ResponseType> & {
end: (metadata?: Metadata) => void;
};
export declare class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter implements ServerUnaryCall<RequestType, ResponseType> {

@@ -70,2 +73,3 @@ private call;

sendMetadata(responseMetadata: Metadata): void;
end(metadata?: any): void;
}

@@ -139,2 +143,3 @@ export declare type sendUnaryData<ResponseType> = (error: ServerErrorResponse | ServerStatusResponse | null, value: ResponseType | null, trailer?: Metadata, flags?: number) => void;

private pushMessage;
getPeer(): string;
}

@@ -66,3 +66,3 @@ "use strict";

getPeer() {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -91,3 +91,3 @@ sendMetadata(responseMetadata) {

getPeer() {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -115,3 +115,3 @@ sendMetadata(responseMetadata) {

getPeer() {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -171,3 +171,3 @@ sendMetadata(responseMetadata) {

getPeer() {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -177,2 +177,9 @@ sendMetadata(responseMetadata) {

}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata) {
if (metadata) {
this.trailingMetadata = metadata;
}
super.end();
}
}

@@ -212,2 +219,6 @@ exports.ServerDuplexStreamImpl = ServerDuplexStreamImpl;

this.stream.once('close', () => {
var _a;
trace('Request to method ' + ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
' stream closed with rstCode ' +
this.stream.rstCode);
this.cancelled = true;

@@ -246,3 +257,3 @@ this.emit('cancelled', 'cancelled');

// TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom);
const headers = Object.assign({}, defaultResponseHeaders, custom);
this.stream.respond(headers, defaultResponseOptions);

@@ -486,2 +497,16 @@ }

}
getPeer() {
const socket = this.stream.session.socket;
if (socket.remoteAddress) {
if (socket.remotePort) {
return `${socket.remoteAddress}:${socket.remotePort}`;
}
else {
return socket.remoteAddress;
}
}
else {
return 'unknown';
}
}
}

@@ -488,0 +513,0 @@ exports.Http2ServerCallStream = Http2ServerCallStream;

@@ -279,3 +279,3 @@ "use strict";

};
const resolver = resolver_1.createResolver(portUri, resolverListener);
const resolver = resolver_1.createResolver(portUri, resolverListener, this.options);
resolver.updateResolution();

@@ -358,3 +358,2 @@ }

http2Server.on('stream', (stream, headers) => {
var _a;
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];

@@ -370,5 +369,17 @@ if (typeof contentType !== 'string' ||

const path = headers[http2.constants.HTTP2_HEADER_PATH];
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
}
else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port;
}
}
trace('Received call to method ' +
path +
' at address ' + ((_a = http2Server.address()) === null || _a === void 0 ? void 0 : _a.toString()));
' at address ' +
serverAddressString);
const handler = this.handlers.get(path);

@@ -375,0 +386,0 @@ if (handler === undefined) {

@@ -297,3 +297,3 @@ "use strict";

}
trace(this.subchannelAddress +
trace(this.subchannelAddressString +
' connection closed by GOAWAY with code ' +

@@ -300,0 +300,0 @@ errorCode);

@@ -5,2 +5,9 @@ import { StatusObject } from './call-stream';

import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
declare type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
declare type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster';
declare type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener';
declare type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
declare type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl;
export interface Watcher<UpdateType> {

@@ -11,14 +18,23 @@ onValidUpdate(update: UpdateType): void;

}
export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
export interface XdsClusterLocalityStats {
addCallStarted(): void;
addCallFinished(fail: boolean): void;
}
export declare class XdsClient {
private targetName;
private serviceConfigWatcher;
private node;
private client;
private adsNode;
private adsClient;
private adsCall;
private lrsNode;
private lrsClient;
private lrsCall;
private latestLrsSettings;
private clusterStatsMap;
private statsTimer;
private hasShutdown;
private endpointWatchers;
private lastEdsVersionInfo;
private lastEdsNonce;
private latestEdsResponses;
private adsState;
constructor(targetName: string, serviceConfigWatcher: Watcher<ServiceConfig>, channelOptions: ChannelOptions);
private handleAdsResponse;
/**

@@ -29,25 +45,32 @@ * Start the ADS stream if the client exists and there is not already an

private maybeStartAdsStream;
private nackUnknown;
/**
* Acknowledge an EDS update. This should be called after the local nonce and
* Acknowledge an update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
private ackEds;
ack(typeUrl: AdsTypeUrl): void;
/**
* Reject an EDS update. This should be called without updating the local
* Reject an update. This should be called without updating the local
* nonce and version info.
*/
private nackEds;
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse;
private handleEdsResponse;
private updateEdsNames;
private nack;
private updateNames;
private reportStreamError;
private maybeStartLrsStream;
private sendStats;
addEndpointWatcher(edsServiceName: string, watcher: Watcher<ClusterLoadAssignment__Output>): void;
removeEndpointWatcher(edsServiceName: string, watcher: Watcher<ClusterLoadAssignment__Output>): void;
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void;
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void;
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
* string indicates that the default LRS client should be used. Currently
* only the empty string is supported here.
* @param clusterName
* @param edsServiceName
*/
addClusterDropStats(lrsServer: string, clusterName: string, edsServiceName: string): XdsClusterDropStats;
addClusterLocalityStats(lrsServer: string, clusterName: string, edsServiceName: string, locality: Locality__Output): XdsClusterLocalityStats;
shutdown(): void;
}
export {};

@@ -34,2 +34,6 @@ "use strict";

const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster';
const LDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Listener';
const RDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
const HTTP_CONNECTION_MANGER_TYPE_URL = 'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
let loadedProtos = null;

@@ -43,2 +47,3 @@ function loadAdsProtos() {

'envoy/service/discovery/v2/ads.proto',
'envoy/service/load_stats/v2/lrs.proto',
'envoy/api/v2/listener.proto',

@@ -48,2 +53,3 @@ 'envoy/api/v2/route.proto',

'envoy/api/v2/endpoint.proto',
'envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto',
], {

@@ -66,14 +72,453 @@ keepCase: true,

}
function localityEqual(loc1, loc2) {
return (loc1.region === loc2.region &&
loc1.zone === loc2.zone &&
loc1.sub_zone === loc2.sub_zone);
}
class ClusterLoadReportMap {
constructor() {
this.statsMap = [];
}
get(clusterName, edsServiceName) {
for (const statsObj of this.statsMap) {
if (statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName) {
return statsObj.stats;
}
}
return undefined;
}
getOrCreate(clusterName, edsServiceName) {
for (const statsObj of this.statsMap) {
if (statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName) {
return statsObj.stats;
}
}
const newStats = {
callsDropped: new Map(),
localityStats: [],
intervalStart: process.hrtime(),
};
this.statsMap.push({
clusterName,
edsServiceName,
stats: newStats,
});
return newStats;
}
*entries() {
for (const statsEntry of this.statsMap) {
yield [
{
clusterName: statsEntry.clusterName,
edsServiceName: statsEntry.edsServiceName,
},
statsEntry.stats,
];
}
}
}
class EdsState {
constructor(updateResourceNames) {
this.updateResourceNames = updateResourceNames;
this.versionInfo = '';
this.nonce = '';
this.watchers = new Map();
this.latestResponses = [];
}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(edsServiceName, watcher) {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(edsServiceName, watcher) {
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames() {
return Array.from(this.watchers.keys());
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
validateResponse(message) {
var _a, _b;
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = (_b = (_a = lb.endpoint) === null || _a === void 0 ? void 0 : _a.address) === null || _b === void 0 ? void 0 : _b.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(net_1.isIPv4(socketAddress.address) || net_1.isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allEdsServiceNames) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses) {
var _a;
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
const allClusterNames = new Set();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = (_a = this.watchers.get(message.cluster_name)) !== null && _a !== void 0 ? _a : [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
this.handleMissingNames(allClusterNames);
return null;
}
reportStreamError(status) {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class CdsState {
constructor(edsState, updateResourceNames) {
this.edsState = edsState;
this.updateResourceNames = updateResourceNames;
this.versionInfo = '';
this.nonce = '';
this.watchers = new Map();
this.latestResponses = [];
}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName, watcher) {
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(clusterName, watcher) {
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames() {
return Array.from(this.watchers.keys());
}
validateResponse(message) {
var _a, _b;
if (message.type !== 'EDS') {
return false;
}
if (!((_b = (_a = message.eds_cluster_config) === null || _a === void 0 ? void 0 : _a.eds_config) === null || _b === void 0 ? void 0 : _b.ads)) {
return false;
}
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allClusterNames) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(edsServiceName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses) {
var _a, _b, _c;
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'Cluster validation failed';
}
}
this.latestResponses = responses;
const allEdsServiceNames = new Set();
const allClusterNames = new Set();
for (const message of responses) {
allClusterNames.add(message.name);
const edsServiceName = (_b = (_a = message.eds_cluster_config) === null || _a === void 0 ? void 0 : _a.service_name) !== null && _b !== void 0 ? _b : '';
allEdsServiceNames.add(edsServiceName === '' ? message.name : edsServiceName);
const watchers = (_c = this.watchers.get(message.name)) !== null && _c !== void 0 ? _c : [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
}
reportStreamError(status) {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class RdsState {
constructor(watcher, updateResouceNames) {
this.watcher = watcher;
this.updateResouceNames = updateResouceNames;
this.versionInfo = '';
this.nonce = '';
this.routeConfigName = null;
}
getResourceNames() {
return this.routeConfigName ? [this.routeConfigName] : [];
}
handleSingleMessage(message) {
var _a, _b;
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.routeConfigName) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (((_a = route.match) === null || _a === void 0 ? void 0 : _a.prefix) === '' && ((_b = route.route) === null || _b === void 0 ? void 0 : _b.cluster)) {
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
{
name: 'cds',
cds: {
cluster: route.route.cluster,
},
},
],
});
break;
}
}
}
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses) {
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
this.handleSingleMessage(message);
return null;
}
}
}
return null;
}
setRouteConfigName(name) {
const oldName = this.routeConfigName;
this.routeConfigName = name;
if (name !== oldName) {
this.updateResouceNames();
}
}
reportStreamError(status) {
this.watcher.onTransientError(status);
}
}
class LdsState {
constructor(targetName, rdsState) {
this.targetName = targetName;
this.rdsState = rdsState;
this.versionInfo = '';
this.nonce = '';
}
getResourceNames() {
return [this.targetName];
}
validateResponse(message) {
var _a, _b, _c, _d, _e;
if (!(((_a = message.api_listener) === null || _a === void 0 ? void 0 : _a.api_listener) &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
((_b = message.api_listener) === null || _b === void 0 ? void 0 : _b.api_listener['@type']) ===
HTTP_CONNECTION_MANGER_TYPE_URL)) {
return false;
}
const httpConnectionManager = (_c = message.api_listener) === null || _c === void 0 ? void 0 : _c.api_listener;
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!((_e = (_d = httpConnectionManager.rds) === null || _d === void 0 ? void 0 : _d.config_source) === null || _e === void 0 ? void 0 : _e.ads);
case 'route_config':
return true;
}
return false;
}
handleResponses(responses) {
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
// The validation step ensures that this is correct
const httpConnectionManager = message.api_listener
.api_listener;
switch (httpConnectionManager.route_specifier) {
case 'rds':
this.rdsState.setRouteConfigName(httpConnectionManager.rds.route_config_name);
break;
case 'route_config':
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(httpConnectionManager.route_config);
break;
default:
// The validation rules should prevent this
}
}
else {
return 'Listener validation failed';
}
}
}
throw new Error('Method not implemented.');
}
reportStreamError(status) {
// Nothing to do here
}
}
function getResponseMessages(typeUrl, resources) {
const result = [];
for (const resource of resources) {
if (protoLoader.isAnyExtension(resource) && resource['@type'] === typeUrl) {
result.push(resource);
}
else {
throw new Error(`Invalid resource type ${protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url}`);
}
}
return result;
}
class XdsClient {
constructor(targetName, serviceConfigWatcher, channelOptions) {
this.targetName = targetName;
this.serviceConfigWatcher = serviceConfigWatcher;
this.node = null;
this.client = null;
this.adsNode = null;
this.adsClient = null;
this.adsCall = null;
this.lrsNode = null;
this.lrsClient = null;
this.lrsCall = null;
this.latestLrsSettings = null;
this.clusterStatsMap = new ClusterLoadReportMap();
this.hasShutdown = false;
this.endpointWatchers = new Map();
this.lastEdsVersionInfo = '';
this.lastEdsNonce = '';
this.latestEdsResponses = [];
const edsState = new EdsState(() => {
this.updateNames(EDS_TYPE_URL);
});
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
});
const rdsState = new RdsState(serviceConfigWatcher, () => {
this.updateNames(RDS_TYPE_URL);
});
const ldsState = new LdsState(targetName, rdsState);
this.adsState = {
[EDS_TYPE_URL]: edsState,
[CDS_TYPE_URL]: cdsState,
[RDS_TYPE_URL]: rdsState,
[LDS_TYPE_URL]: ldsState,
};
const channelArgs = Object.assign({}, channelOptions);

@@ -102,5 +547,9 @@ const channelArgsToRemove = [

}
this.node = Object.assign(Object.assign({}, bootstrapInfo.node), { build_version: `gRPC Node Pure JS ${clientVersion}`, user_agent_name: 'gRPC Node Pure JS' });
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, channel_credentials_1.createGoogleDefaultCredentials(), channelArgs);
const node = Object.assign(Object.assign({}, bootstrapInfo.node), { build_version: `gRPC Node Pure JS ${clientVersion}`, user_agent_name: 'gRPC Node Pure JS' });
this.adsNode = Object.assign(Object.assign({}, node), { client_features: ['envoy.lb.does_not_support_overprovisioning'] });
this.lrsNode = Object.assign(Object.assign({}, node), { client_features: ['envoy.lrs.supports_send_all_clusters'] });
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(bootstrapInfo.xdsServers[0].serverUri, channel_credentials_1.createGoogleDefaultCredentials(), channelArgs);
this.maybeStartAdsStream();
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(bootstrapInfo.xdsServers[0].serverUri, channel_credentials_1.createGoogleDefaultCredentials(), channelArgs);
this.maybeStartLrsStream();
}, (error) => {

@@ -115,3 +564,39 @@ trace('Failed to initialize xDS Client. ' + error.message);

});
this.statsTimer = setInterval(() => { }, 0);
clearInterval(this.statsTimer);
}
handleAdsResponse(message) {
let errorString;
/* The cases in this switch statement look redundant but separating them
* out like this is necessary for the typechecker to validate the types
* as narrowly as we need it to. */
switch (message.type_url) {
case EDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(getResponseMessages(message.type_url, message.resources));
break;
case CDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(getResponseMessages(message.type_url, message.resources));
break;
case RDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(getResponseMessages(message.type_url, message.resources));
break;
case LDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(getResponseMessages(message.type_url, message.resources));
break;
default:
errorString = `Unknown type_url ${message.type_url}`;
}
if (errorString === null) {
/* errorString can only be null in one of the first 4 cases, which
* implies that message.type_url is one of the 4 known type URLs, which
* means that this type assertion is valid. */
const typeUrl = message.type_url;
this.adsState[typeUrl].nonce = message.nonce;
this.adsState[typeUrl].versionInfo = message.version_info;
this.ack(typeUrl);
}
else {
this.nack(message.type_url, errorString);
}
}
/**

@@ -122,3 +607,3 @@ * Start the ADS stream if the client exists and there is not already an

maybeStartAdsStream() {
if (this.client === null) {
if (this.adsClient === null) {
return;

@@ -132,36 +617,5 @@ }

}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
const edsResponses = [];
for (const resource of message.resources) {
if (protoLoader.isAnyExtension(resource) &&
resource['@type'] === EDS_TYPE_URL) {
const resp = resource;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
}
else {
this.nackEds(`Invalid resource type ${protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url}`);
return;
}
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.latestEdsResponses = edsResponses;
this.ackEds();
break;
}
default:
this.nackUnknown(message.type_url, message.version_info, message.nonce);
}
this.handleAdsResponse(message);
});

@@ -177,156 +631,266 @@ this.adsCall.on('error', (error) => {

});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames,
});
const allTypeUrls = [
EDS_TYPE_URL,
CDS_TYPE_URL,
RDS_TYPE_URL,
LDS_TYPE_URL,
];
for (const typeUrl of allTypeUrls) {
const state = this.adsState[typeUrl];
state.nonce = '';
state.versionInfo = '';
if (state.getResourceNames().length > 0) {
this.updateNames(typeUrl);
}
}
}
nackUnknown(typeUrl, versionInfo, nonce) {
if (!this.adsCall) {
return;
/**
* Acknowledge an update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
ack(typeUrl) {
this.updateNames(typeUrl);
}
/**
* Reject an update. This should be called without updating the local
* nonce and version info.
*/
nack(typeUrl, message) {
var _a;
let resourceNames;
let nonce;
let versionInfo;
switch (typeUrl) {
case EDS_TYPE_URL:
case CDS_TYPE_URL:
case RDS_TYPE_URL:
case LDS_TYPE_URL:
resourceNames = this.adsState[typeUrl].getResourceNames();
nonce = this.adsState[typeUrl].nonce;
versionInfo = this.adsState[typeUrl].versionInfo;
break;
default:
resourceNames = [];
nonce = '';
versionInfo = '';
}
this.adsCall.write({
node: this.node,
(_a = this.adsCall) === null || _a === void 0 ? void 0 : _a.write({
node: this.adsNode,
type_url: typeUrl,
resource_names: resourceNames,
response_nonce: nonce,
version_info: versionInfo,
response_nonce: nonce,
error_detail: {
message: `Unknown type_url ${typeUrl}`,
message: message,
},
});
}
/**
* Acknowledge an EDS update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
ackEds() {
if (!this.adsCall) {
updateNames(typeUrl) {
var _a;
(_a = this.adsCall) === null || _a === void 0 ? void 0 : _a.write({
node: this.adsNode,
type_url: typeUrl,
resource_names: this.adsState[typeUrl].getResourceNames(),
response_nonce: this.adsState[typeUrl].nonce,
version_info: this.adsState[typeUrl].versionInfo,
});
}
reportStreamError(status) {
this.adsState[EDS_TYPE_URL].reportStreamError(status);
this.adsState[CDS_TYPE_URL].reportStreamError(status);
this.adsState[RDS_TYPE_URL].reportStreamError(status);
this.adsState[LDS_TYPE_URL].reportStreamError(status);
}
maybeStartLrsStream() {
if (!this.lrsClient) {
return;
}
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
if (this.lrsCall) {
return;
}
if (this.hasShutdown) {
return;
}
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('data', (message) => {
var _a, _b, _c, _d, _e, _f;
if (((_a = message.load_reporting_interval) === null || _a === void 0 ? void 0 : _a.seconds) !== ((_c = (_b = this.latestLrsSettings) === null || _b === void 0 ? void 0 : _b.load_reporting_interval) === null || _c === void 0 ? void 0 : _c.seconds) ||
((_d = message.load_reporting_interval) === null || _d === void 0 ? void 0 : _d.nanos) !== ((_f = (_e = this.latestLrsSettings) === null || _e === void 0 ? void 0 : _e.load_reporting_interval) === null || _f === void 0 ? void 0 : _f.nanos)) {
/* Only reset the timer if the interval has changed or was not set
* before. */
clearInterval(this.statsTimer);
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs = Number.parseInt(message.load_reporting_interval.seconds) * 1000 +
message.load_reporting_interval.nanos / 1000000;
setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.latestLrsSettings = message;
});
this.lrsCall.on('error', (error) => {
trace('LRS stream ended. code=' + error.code + ' details= ' + error.details);
this.lrsCall = null;
clearInterval(this.statsTimer);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
this.lrsCall.write({
node: this.lrsNode,
});
}
/**
* Reject an EDS update. This should be called without updating the local
* nonce and version info.
*/
nackEds(message) {
if (!this.adsCall) {
sendStats() {
if (!this.lrsCall) {
return;
}
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
error_detail: {
message,
},
});
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
validateEdsResponse(message) {
var _a, _b;
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = (_b = (_a = lb.endpoint) === null || _a === void 0 ? void 0 : _a.address) === null || _b === void 0 ? void 0 : _b.socket_address;
if (!socketAddress) {
return false;
const clusterStats = [];
for (const [{ clusterName, edsServiceName }, stats,] of this.clusterStatsMap.entries()) {
if (this.latestLrsSettings.send_all_clusters ||
this.latestLrsSettings.clusters.indexOf(clusterName) > 0) {
const upstreamLocalityStats = [];
for (const localityStats of stats.localityStats) {
// Skip localities with 0 requests
if (localityStats.callsStarted > 0 ||
localityStats.callsSucceeded > 0 ||
localityStats.callsFailed > 0) {
upstreamLocalityStats.push({
locality: localityStats.locality,
total_issued_requests: localityStats.callsStarted,
total_successful_requests: localityStats.callsSucceeded,
total_error_requests: localityStats.callsFailed,
total_requests_in_progress: localityStats.callsInProgress,
});
localityStats.callsStarted = 0;
localityStats.callsSucceeded = 0;
localityStats.callsFailed = 0;
}
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
const droppedRequests = [];
let totalDroppedRequests = 0;
for (const [category, count] of stats.callsDropped.entries()) {
if (count > 0) {
droppedRequests.push({
category,
dropped_count: count,
});
totalDroppedRequests += count;
}
}
if (!(net_1.isIPv4(socketAddress.address) || net_1.isIPv6(socketAddress.address))) {
return false;
// Clear out dropped call stats after sending them
stats.callsDropped.clear();
const interval = process.hrtime(stats.intervalStart);
stats.intervalStart = process.hrtime();
// Skip clusters with 0 requests
if (upstreamLocalityStats.length > 0 || totalDroppedRequests > 0) {
clusterStats.push({
cluster_name: clusterName,
cluster_service_name: edsServiceName,
dropped_requests: droppedRequests,
total_dropped_requests: totalDroppedRequests,
upstream_locality_stats: upstreamLocalityStats,
load_report_interval: {
seconds: interval[0],
nanos: interval[1],
},
});
}
}
}
return true;
this.lrsCall.write({
node: this.lrsNode,
cluster_stats: clusterStats,
});
}
handleEdsResponse(message) {
var _a;
const watchers = (_a = this.endpointWatchers.get(message.cluster_name)) !== null && _a !== void 0 ? _a : [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
addEndpointWatcher(edsServiceName, watcher) {
trace('Watcher added for endpoint ' + edsServiceName);
this.adsState[EDS_TYPE_URL].addWatcher(edsServiceName, watcher);
}
updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
});
}
removeEndpointWatcher(edsServiceName, watcher) {
trace('Watcher removed for endpoint ' + edsServiceName);
this.adsState[EDS_TYPE_URL].removeWatcher(edsServiceName, watcher);
}
reportStreamError(status) {
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
addClusterWatcher(clusterName, watcher) {
trace('Watcher added for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].addWatcher(clusterName, watcher);
}
removeClusterWatcher(clusterName, watcher) {
trace('Watcher removed for endpoint ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
* string indicates that the default LRS client should be used. Currently
* only the empty string is supported here.
* @param clusterName
* @param edsServiceName
*/
addClusterDropStats(lrsServer, clusterName, edsServiceName) {
if (lrsServer !== '') {
return {
addCallDropped: (category) => { },
};
}
// Also do the same for other types of watchers when those are implemented
const clusterStats = this.clusterStatsMap.getOrCreate(clusterName, edsServiceName);
return {
addCallDropped: (category) => {
var _a;
const prevCount = (_a = clusterStats.callsDropped.get(category)) !== null && _a !== void 0 ? _a : 0;
clusterStats.callsDropped.set(category, prevCount + 1);
},
};
}
addEndpointWatcher(edsServiceName, watcher) {
trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.endpointWatchers.set(edsServiceName, watchersEntry);
addClusterLocalityStats(lrsServer, clusterName, edsServiceName, locality) {
if (lrsServer !== '') {
return {
addCallStarted: () => { },
addCallFinished: (fail) => { },
};
}
watchersEntry.push(watcher);
if (addedServiceName) {
this.updateEdsNames();
}
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestEdsResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
const clusterStats = this.clusterStatsMap.getOrCreate(clusterName, edsServiceName);
let localityStats = null;
for (const statsObj of clusterStats.localityStats) {
if (localityEqual(locality, statsObj.locality)) {
localityStats = statsObj;
break;
}
}
}
removeEndpointWatcher(edsServiceName, watcher) {
trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.endpointWatchers.delete(edsServiceName);
}
if (localityStats === null) {
localityStats = {
locality: locality,
callsInProgress: 0,
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
};
clusterStats.localityStats.push(localityStats);
}
if (removedServiceName) {
this.updateEdsNames();
}
/* Help the compiler understand that this object is always non-null in the
* closure */
const finalLocalityStats = localityStats;
return {
addCallStarted: () => {
finalLocalityStats.callsStarted += 1;
finalLocalityStats.callsInProgress += 1;
},
addCallFinished: (fail) => {
if (fail) {
finalLocalityStats.callsFailed += 1;
}
else {
finalLocalityStats.callsSucceeded += 1;
}
finalLocalityStats.callsInProgress -= 1;
},
};
}
shutdown() {
var _a, _b;
var _a, _b, _c, _d;
(_a = this.adsCall) === null || _a === void 0 ? void 0 : _a.cancel();
(_b = this.client) === null || _b === void 0 ? void 0 : _b.close();
(_b = this.adsClient) === null || _b === void 0 ? void 0 : _b.close();
(_c = this.lrsCall) === null || _c === void 0 ? void 0 : _c.cancel();
(_d = this.lrsClient) === null || _d === void 0 ? void 0 : _d.close();
this.hasShutdown = true;

@@ -333,0 +897,0 @@ }

{
"name": "@grpc/grpc-js",
"version": "1.1.3",
"version": "1.1.4",
"description": "gRPC Library for Node - pure JS implementation",

@@ -24,3 +24,2 @@ "homepage": "https://grpc.io/",

"@types/ncp": "^2.0.1",
"@types/node": "^12.7.5",
"@types/pify": "^3.0.2",

@@ -52,3 +51,3 @@ "@types/semver": "^6.0.1",

"format": "clang-format -i -style=\"{Language: JavaScript, BasedOnStyle: Google, ColumnLimit: 80}\" src/*.ts test/*.ts",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs deps/envoy-api/ deps/udpa/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib ../index envoy/service/discovery/v2/ads.proto envoy/service/load_stats/v2/lrs.proto envoy/api/v2/listener.proto envoy/api/v2/route.proto envoy/api/v2/cluster.proto envoy/api/v2/endpoint.proto envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto",
"lint": "npm run check",

@@ -63,2 +62,3 @@ "prepare": "npm run compile",

"dependencies": {
"@types/node": "^12.12.47",
"semver": "^6.2.0"

@@ -65,0 +65,0 @@ },

@@ -11,2 +11,6 @@ # Pure JavaScript gRPC Client

## Documentation
Documentation specifically for the `@grpc/grpc-js` package is currently not available. However, [documentation is available for the `grpc` package](https://grpc.github.io/grpc/node/grpc.html), and the two packages contain mostly the same interface. There are a few notable differences, however, and these differences are noted in the "Migrating from grpc" section below.
## Features

@@ -32,3 +36,3 @@

- If you are currently loading `.proto` files using `grpc.load`, that function is not available in this library. You should instead load your `.proto` files using `@grpc/proto-loader` and load the resulting package definition objects into `@grpc/grpc-js` using `grpc.loadPackageDefinition`.
- If you are currently loading packages generated by `grpc-tools`, you should instead generate your files using the `--generate_package_definitions` option in `grpc-tools`, then load the object exported by the generated file into `@grpc/grpc-js` using `grpc.loadPackageDefinition`.
- If you are currently loading packages generated by `grpc-tools`, you should instead generate your files using the `generate_package_definition` option in `grpc-tools`, then load the object exported by the generated file into `@grpc/grpc-js` using `grpc.loadPackageDefinition`.
- If you have a server and you are using `Server#bind` to bind ports, you will need to use `Server#bindAsync` instead.

@@ -35,0 +39,0 @@

@@ -230,3 +230,11 @@ /*

);
this.listener?.onReceiveStatus(filteredStatus);
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
* if the status is not OK, the "error" event may be emitted
* synchronously at the top level, which will result in a thrown error if
* the user does not handle that event. */
process.nextTick(() => {
this.listener?.onReceiveStatus(filteredStatus);
});
if (this.subchannel) {

@@ -606,2 +614,3 @@ this.subchannel.callUnref();

}
this.trace('close http2 stream with code ' + code);
this.http2Stream.close(code);

@@ -635,3 +644,3 @@ }

getPeer(): string {
throw new Error('Not yet implemented');
return this.subchannel?.getAddress() ?? this.channel.getTarget();
}

@@ -638,0 +647,0 @@

@@ -96,3 +96,3 @@ /*

getPeer(): string {
return this.call?.getPeer() ?? '';
return this.call?.getPeer() ?? 'unknown';
}

@@ -113,3 +113,3 @@ }

getPeer(): string {
return this.call?.getPeer() ?? '';
return this.call?.getPeer() ?? 'unknown';
}

@@ -134,3 +134,3 @@

getPeer(): string {
return this.call?.getPeer() ?? '';
return this.call?.getPeer() ?? 'unknown';
}

@@ -170,3 +170,3 @@

getPeer(): string {
return this.call?.getPeer() ?? '';
return this.call?.getPeer() ?? 'unknown';
}

@@ -173,0 +173,0 @@

@@ -37,3 +37,2 @@ /*

import { getDefaultAuthority, mapUriDefaultScheme } from './resolver';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { trace, log } from './logging';

@@ -224,16 +223,6 @@ import { SubchannelAddress } from './subchannel';

};
// TODO(murgatroid99): check channel arg for default service config
let defaultServiceConfig: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: [],
};
if (options['grpc.service_config']) {
defaultServiceConfig = validateServiceConfig(
JSON.parse(options['grpc.service_config']!)
);
}
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
defaultServiceConfig
options
);

@@ -395,2 +384,8 @@ this.filterStackFactory = new FilterStackFactory([

break;
case PickResultType.DROP:
callStream.cancelWithStatus(
pickResult.status!.code,
pickResult.status!.details
);
break;
default:

@@ -397,0 +392,0 @@ throw new Error(

// Original file: null
import { UninterpretedOption as _google_protobuf_UninterpretedOption, UninterpretedOption__Output as _google_protobuf_UninterpretedOption__Output } from '../../google/protobuf/UninterpretedOption';
import { HttpRule as _google_api_HttpRule, HttpRule__Output as _google_api_HttpRule__Output } from '../../google/api/HttpRule';

@@ -9,3 +8,2 @@ export interface MethodOptions {

'uninterpretedOption'?: (_google_protobuf_UninterpretedOption)[];
'.google.api.http'?: (_google_api_HttpRule);
}

@@ -16,3 +14,2 @@

'uninterpretedOption': (_google_protobuf_UninterpretedOption__Output)[];
'.google.api.http'?: (_google_api_HttpRule__Output);
}

@@ -35,6 +35,6 @@ /*

import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { XdsClient, Watcher } from './xds-client';
import { XdsClient, Watcher, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import { ConnectivityState } from './channel';
import { UnavailablePicker } from './picker';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';

@@ -87,4 +87,44 @@ import { LocalitySubchannelAddress } from './load-balancer-priority';

private clusterDropStats: XdsClusterDropStats | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelArgs) =>
this.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
),
requestReresolution: () =>
this.channelControlHelper.requestReresolution(),
updateState: (connectivityState, originalPicker) => {
if (this.latestEdsUpdate === null) {
return;
}
const edsPicker: Picker = {
pick: (pickArgs) => {
const dropCategory = this.checkForDrop();
/* If we drop the call, it ends with an UNAVAILABLE status.
* Otherwise, delegate picking the subchannel to the child
* balancer. */
if (dropCategory === null) {
return originalPicker.pick(pickArgs);
} else {
this.clusterDropStats?.addCallDropped(dropCategory);
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: `Call dropped by load balancing policy. Category: ${dropCategory}`,
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null,
};
}
},
};
this.channelControlHelper.updateState(connectivityState, edsPicker);
},
});
this.watcher = {

@@ -96,4 +136,7 @@ onValidUpdate: (update) => {

onResourceDoesNotExist: () => {
/* TODO(murgatroid99): Figure out what needs to be done here after
* implementing CDS */
this.xdsClient?.removeEndpointWatcher(
this.edsServiceName!,
this.watcher
);
this.isWatcherActive = false;
},

@@ -116,2 +159,40 @@ onTransientError: (status) => {

/**
* Check whether a single call should be dropped according to the current
* policy, based on randomly chosen numbers. Returns the drop category if
* the call should be dropped, and null otherwise.
*/
private checkForDrop(): string | null {
if (!this.latestEdsUpdate?.policy) {
return null;
}
/* The drop_overloads policy is a list of pairs of category names and
* probabilities. For each one, if the random number is within that
* probability range, we drop the call citing that category. Otherwise, the
* call proceeds as usual. */
for (const dropOverload of this.latestEdsUpdate.policy.drop_overloads) {
if (!dropOverload.drop_percentage) {
continue;
}
let randNum: number;
switch (dropOverload.drop_percentage.denominator) {
case 'HUNDRED':
randNum = Math.random() * 100;
break;
case 'TEN_THOUSAND':
randNum = Math.random() * 10_000;
break;
case 'MILLION':
randNum = Math.random() * 1_000_000;
break;
default:
continue;
}
if (randNum < dropOverload.drop_percentage.numerator) {
return dropOverload.category;
}
}
return null;
}
/**
* Should be called when this balancer gets a new config and when the

@@ -223,12 +304,29 @@ * XdsClient returns a new ClusterLoadAssignment.

for (const localityObj of localityArray) {
/* Use the endpoint picking policy from the config, default to
* round_robin. */
const endpointPickingPolicy: LoadBalancingConfig[] = [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
];
let childPolicy: LoadBalancingConfig[];
if (this.lastestConfig.eds.lrsLoadReportingServerName) {
childPolicy = [
{
name: 'lrs',
lrs: {
cluster_name: this.lastestConfig.eds.cluster,
eds_service_name: this.lastestConfig.eds.edsServiceName ?? '',
lrs_load_reporting_server_name: this.lastestConfig.eds
.lrsLoadReportingServerName,
locality: localityObj.locality,
child_policy: endpointPickingPolicy,
},
},
];
} else {
childPolicy = endpointPickingPolicy;
}
childTargets.set(localityToName(localityObj.locality), {
weight: localityObj.weight,
/* TODO(murgatroid99): Insert an lrs config around the round_robin
* config after implementing lrs */
/* Use the endpoint picking policy from the config, default to
* round_robin. */
child_policy: [
...this.lastestConfig.eds.endpointPickingPolicy,
{ name: 'round_robin', round_robin: {} },
],
child_policy: childPolicy,
});

@@ -290,3 +388,4 @@ for (const address of localityObj.addresses) {

this.xdsClient = attributes.xdsClient;
const newEdsServiceName = lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster;
const newEdsServiceName =
lbConfig.eds.edsServiceName ?? lbConfig.eds.cluster;

@@ -298,6 +397,6 @@ /* If the name is changing, disable the old watcher before adding the new

/* Setting isWatcherActive to false here lets us have one code path for
* calling addEndpointWatcher */
* calling addEndpointWatcher */
this.isWatcherActive = false;
/* If we have a new name, the latestEdsUpdate does not correspond to
* the new config, so it is no longer valid */
* the new config, so it is no longer valid */
this.latestEdsUpdate = null;

@@ -313,2 +412,10 @@ }

if (lbConfig.eds.lrsLoadReportingServerName) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.eds.lrsLoadReportingServerName,
lbConfig.eds.cluster,
lbConfig.eds.edsServiceName ?? ''
);
}
/* If updateAddressList is called after receiving an update and the update

@@ -315,0 +422,0 @@ * is still valid, we want to update the child config with the information

@@ -27,2 +27,5 @@ /*

import * as load_balancer_weighted_target from './load-balancer-weighted-target';
import * as load_balancer_eds from './load-balancer-eds';
import * as load_balancer_cds from './load-balancer-cds';
import * as load_balancer_lrs from './load-balancer-lrs';

@@ -145,2 +148,5 @@ /**

load_balancer_weighted_target.setup();
load_balancer_eds.setup();
load_balancer_cds.setup();
load_balancer_lrs.setup();
}

@@ -18,2 +18,4 @@ /*

import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
/* This file is an implementation of gRFC A24:

@@ -79,2 +81,14 @@ * https://github.com/grpc/proposal/blob/master/A24-lb-policy-config.md. Each

export interface CdsLbConfig {
cluster: string;
}
export interface LrsLbConfig {
cluster_name: string;
eds_service_name: string;
lrs_load_reporting_server_name: string;
locality: Locality__Output;
child_policy: LoadBalancingConfig[];
}
export interface PickFirstLoadBalancingConfig {

@@ -115,2 +129,12 @@ name: 'pick_first';

export interface CdsLoadBalancingConfig {
name: 'cds';
cds: CdsLbConfig;
}
export interface LrsLoadBalancingConfig {
name: 'lrs';
lrs: LrsLbConfig;
}
export type LoadBalancingConfig =

@@ -123,3 +147,5 @@ | PickFirstLoadBalancingConfig

| WeightedTargetLoadBalancingConfig
| EdsLoadBalancingConfig;
| EdsLoadBalancingConfig
| CdsLoadBalancingConfig
| LrsLoadBalancingConfig;

@@ -162,2 +188,14 @@ export function isRoundRobinLoadBalancingConfig(

export function isCdsLoadBalancingConfig(
lbconfig: LoadBalancingConfig
): lbconfig is CdsLoadBalancingConfig {
return lbconfig.name === 'cds';
}
export function isLrsLoadBalancingConfig(
lbconfig: LoadBalancingConfig
): lbconfig is LrsLoadBalancingConfig {
return lbconfig.name === 'lrs';
}
/* In these functions we assume the input came from a JSON object. Therefore we

@@ -164,0 +202,0 @@ * expect that the prototype is uninteresting and that `in` can be used

@@ -29,2 +29,3 @@ /*

TRANSIENT_FAILURE,
DROP,
}

@@ -78,2 +79,10 @@

export interface DropCallPickResult extends PickResult {
pickResultType: PickResultType.DROP;
subchannel: null;
status: StatusObject;
extraFilterFactory: null;
onCallStarted: null;
}
export interface PickArgs {

@@ -80,0 +89,0 @@ metadata: Metadata;

@@ -34,2 +34,3 @@ /*

import { isIPv6, isIPv4 } from 'net';
import { ChannelOptions } from './channel-options';

@@ -88,3 +89,7 @@ const TRACER_NAME = 'dns_resolver';

private defaultResolutionError: StatusObject;
constructor(private target: GrpcUri, private listener: ResolverListener) {
constructor(
private target: GrpcUri,
private listener: ResolverListener,
channelOptions: ChannelOptions
) {
trace('Resolver constructed for target ' + uriToString(target));

@@ -91,0 +96,0 @@ const hostPort = splitHostPort(target.path);

@@ -20,6 +20,11 @@ /*

import { GrpcUri } from './uri-parser';
import { ChannelOptions } from './channel-options';
class UdsResolver implements Resolver {
private addresses: SubchannelAddress[] = [];
constructor(target: GrpcUri, private listener: ResolverListener) {
constructor(
target: GrpcUri,
private listener: ResolverListener,
channelOptions: ChannelOptions
) {
let path: string;

@@ -26,0 +31,0 @@ if (target.authority === '') {

@@ -24,2 +24,3 @@ /*

import { GrpcUri, uriToString } from './uri-parser';
import { ChannelOptions } from './channel-options';

@@ -68,3 +69,7 @@ /**

export interface ResolverConstructor {
new (target: GrpcUri, listener: ResolverListener): Resolver;
new (
target: GrpcUri,
listener: ResolverListener,
channelOptions: ChannelOptions
): Resolver;
/**

@@ -113,6 +118,7 @@ * Get the default authority for a target. This loosely corresponds to that

target: GrpcUri,
listener: ResolverListener
listener: ResolverListener,
options: ChannelOptions
): Resolver {
if (target.scheme !== undefined && target.scheme in registeredResolvers) {
return new registeredResolvers[target.scheme](target, listener);
return new registeredResolvers[target.scheme](target, listener, options);
} else {

@@ -119,0 +125,0 @@ throw new Error(

@@ -23,3 +23,3 @@ /*

} from './load-balancer';
import { ServiceConfig } from './service-config';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { ConnectivityState } from './channel';

@@ -39,2 +39,3 @@ import { createResolver, Resolver } from './resolver';

import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ChannelOptions } from './channel-options';

@@ -62,2 +63,3 @@ const TRACER_NAME = 'resolving_load_balancer';

private currentState: ConnectivityState = ConnectivityState.IDLE;
private readonly defaultServiceConfig: ServiceConfig;
/**

@@ -96,4 +98,14 @@ * The service config object from the last successful resolution, if

private readonly channelControlHelper: ChannelControlHelper,
private readonly defaultServiceConfig: ServiceConfig | null
private readonly channelOptions: ChannelOptions
) {
if (channelOptions['grpc.service_config']) {
this.defaultServiceConfig = validateServiceConfig(
JSON.parse(channelOptions['grpc.service_config']!)
);
} else {
this.defaultServiceConfig = {
loadBalancingConfig: [],
methodConfig: [],
};
}
this.updateState(ConnectivityState.IDLE, new QueuePicker(this));

@@ -121,64 +133,68 @@ this.childLoadBalancer = new ChildLoadBalancerHandler({

});
this.innerResolver = createResolver(target, {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md
* in the section called "Behavior on receiving a new gRPC Config".
*/
if (serviceConfig === null) {
// Step 4 and 5
if (serviceConfigError === null) {
// Step 5
this.previousServiceConfig = null;
workingServiceConfig = this.defaultServiceConfig;
} else {
// Step 4
if (this.previousServiceConfig === null) {
// Step 4.ii
this.handleResolutionFailure(serviceConfigError);
this.innerResolver = createResolver(
target,
{
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: ServiceError | null,
attributes: { [key: string]: unknown }
) => {
let workingServiceConfig: ServiceConfig | null = null;
/* This first group of conditionals implements the algorithm described
* in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md
* in the section called "Behavior on receiving a new gRPC Config".
*/
if (serviceConfig === null) {
// Step 4 and 5
if (serviceConfigError === null) {
// Step 5
this.previousServiceConfig = null;
workingServiceConfig = this.defaultServiceConfig;
} else {
// Step 4.i
workingServiceConfig = this.previousServiceConfig;
// Step 4
if (this.previousServiceConfig === null) {
// Step 4.ii
this.handleResolutionFailure(serviceConfigError);
} else {
// Step 4.i
workingServiceConfig = this.previousServiceConfig;
}
}
} else {
// Step 3
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
} else {
// Step 3
workingServiceConfig = serviceConfig;
this.previousServiceConfig = serviceConfig;
}
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
if (workingConfigList.length === 0) {
workingConfigList.push({
name: 'pick_first',
pick_first: {},
});
}
const loadBalancingConfig = getFirstUsableConfig(workingConfigList);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
return;
}
this.childLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig,
attributes
);
const workingConfigList =
workingServiceConfig?.loadBalancingConfig ?? [];
if (workingConfigList.length === 0) {
workingConfigList.push({
name: 'pick_first',
pick_first: {},
});
}
const loadBalancingConfig = getFirstUsableConfig(workingConfigList);
if (loadBalancingConfig === null) {
// There were load balancing configs but none are supported. This counts as a resolution failure
this.handleResolutionFailure({
code: Status.UNAVAILABLE,
details:
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
return;
}
this.childLoadBalancer.updateAddressList(
addressList,
loadBalancingConfig,
attributes
);
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
},
},
onError: (error: StatusObject) => {
this.handleResolutionFailure(error);
},
});
channelOptions
);

@@ -185,0 +201,0 @@ this.backoffTimeout = new BackoffTimeout(() => {

@@ -94,6 +94,9 @@ /*

> = ServerSurfaceCall &
ObjectWritable<ResponseType> & { request: RequestType | null };
ObjectWritable<ResponseType> & {
request: RequestType | null;
end: (metadata?: Metadata) => void;
};
export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
ObjectReadable<RequestType> &
ObjectWritable<ResponseType>;
ObjectWritable<ResponseType> & { end: (metadata?: Metadata) => void };

@@ -116,3 +119,3 @@ export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter

getPeer(): string {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -150,3 +153,3 @@

getPeer(): string {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -184,3 +187,3 @@

getPeer(): string {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -256,3 +259,3 @@

getPeer(): string {
throw new Error('not implemented yet');
return this.call.getPeer();
}

@@ -263,2 +266,11 @@

}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata?: any) {
if (metadata) {
this.trailingMetadata = metadata;
}
super.end();
}
}

@@ -382,2 +394,8 @@

this.stream.once('close', () => {
trace(
'Request to method ' +
this.handler?.path +
' stream closed with rstCode ' +
this.stream.rstCode
);
this.cancelled = true;

@@ -423,3 +441,3 @@ this.emit('cancelled', 'cancelled');

// TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom);
const headers = Object.assign({}, defaultResponseHeaders, custom);
this.stream.respond(headers, defaultResponseOptions);

@@ -749,2 +767,15 @@ }

}
getPeer(): string {
const socket = this.stream.session.socket;
if (socket.remoteAddress) {
if (socket.remotePort) {
return `${socket.remoteAddress}:${socket.remotePort}`;
} else {
return socket.remoteAddress;
}
} else {
return 'unknown';
}
}
}

@@ -751,0 +782,0 @@

@@ -418,3 +418,3 @@ /*

const resolver = createResolver(portUri, resolverListener);
const resolver = createResolver(portUri, resolverListener, this.options);
resolver.updateResolution();

@@ -547,2 +547,12 @@ }

const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port;
}
}
trace(

@@ -552,3 +562,3 @@ 'Received call to method ' +

' at address ' +
http2Server.address()?.toString()
serverAddressString
);

@@ -555,0 +565,0 @@ const handler = this.handlers.get(path);

@@ -418,3 +418,3 @@ /*

trace(
this.subchannelAddress +
this.subchannelAddressString +
' connection closed by GOAWAY with code ' +

@@ -421,0 +421,0 @@ errorCode

@@ -21,2 +21,3 @@ /*

import * as adsTypes from './generated/ads';
import * as lrsTypes from './generated/lrs';
import { createGoogleDefaultCredentials } from './channel-credentials';

@@ -36,3 +37,23 @@ import { loadBootstrapInfo } from './xds-bootstrap';

import { DiscoveryResponse__Output } from './generated/envoy/api/v2/DiscoveryResponse';
import { ClusterLoadAssignment__Output } from './generated/envoy/api/v2/ClusterLoadAssignment';
import {
ClusterLoadAssignment__Output,
ClusterLoadAssignment,
} from './generated/envoy/api/v2/ClusterLoadAssignment';
import { Cluster__Output } from './generated/envoy/api/v2/Cluster';
import { LoadReportingServiceClient } from './generated/envoy/service/load_stats/v2/LoadReportingService';
import { LoadStatsRequest } from './generated/envoy/service/load_stats/v2/LoadStatsRequest';
import { LoadStatsResponse__Output } from './generated/envoy/service/load_stats/v2/LoadStatsResponse';
import {
Locality__Output,
Locality,
} from './generated/envoy/api/v2/core/Locality';
import {
ClusterStats,
_envoy_api_v2_endpoint_ClusterStats_DroppedRequests,
} from './generated/envoy/api/v2/endpoint/ClusterStats';
import { UpstreamLocalityStats } from './generated/envoy/api/v2/endpoint/UpstreamLocalityStats';
import { Listener__Output } from './generated/envoy/api/v2/Listener';
import { HttpConnectionManager__Output } from './generated/envoy/config/filter/network/http_connection_manager/v2/HttpConnectionManager';
import { RouteConfiguration__Output } from './generated/envoy/api/v2/RouteConfiguration';
import { Any__Output } from './generated/google/protobuf/Any';

@@ -48,6 +69,23 @@ const TRACER_NAME = 'xds_client';

const EDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
const CDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Cluster';
const LDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.Listener';
const RDS_TYPE_URL = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
let loadedProtos: Promise<adsTypes.ProtoGrpcType> | null = null;
type EdsTypeUrl = 'type.googleapis.com/envoy.api.v2.ClusterLoadAssignment';
type CdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Cluster';
type LdsTypeUrl = 'type.googleapis.com/envoy.api.v2.Listener';
type RdsTypeUrl = 'type.googleapis.com/envoy.api.v2.RouteConfiguration';
function loadAdsProtos(): Promise<adsTypes.ProtoGrpcType> {
type AdsTypeUrl = EdsTypeUrl | CdsTypeUrl | RdsTypeUrl | LdsTypeUrl;
const HTTP_CONNECTION_MANGER_TYPE_URL =
'type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager';
let loadedProtos: Promise<
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
> | null = null;
function loadAdsProtos(): Promise<
adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
> {
if (loadedProtos !== null) {

@@ -60,2 +98,3 @@ return loadedProtos;

'envoy/service/discovery/v2/ads.proto',
'envoy/service/load_stats/v2/lrs.proto',
'envoy/api/v2/listener.proto',

@@ -65,2 +104,3 @@ 'envoy/api/v2/route.proto',

'envoy/api/v2/endpoint.proto',
'envoy/config/filter/network/http_connection_manager/v2/http_connection_manager.proto',
],

@@ -86,3 +126,3 @@ {

packageDefinition
) as unknown) as adsTypes.ProtoGrpcType
) as unknown) as adsTypes.ProtoGrpcType & lrsTypes.ProtoGrpcType
);

@@ -92,2 +132,13 @@ return loadedProtos;

function localityEqual(
loc1: Locality__Output,
loc2: Locality__Output
): boolean {
return (
loc1.region === loc2.region &&
loc1.zone === loc2.zone &&
loc1.sub_zone === loc2.sub_zone
);
}
export interface Watcher<UpdateType> {

@@ -99,5 +150,552 @@ onValidUpdate(update: UpdateType): void;

export interface XdsClusterDropStats {
addCallDropped(category: string): void;
}
export interface XdsClusterLocalityStats {
addCallStarted(): void;
addCallFinished(fail: boolean): void;
}
interface ClusterLocalityStats {
locality: Locality__Output;
callsStarted: number;
callsSucceeded: number;
callsFailed: number;
callsInProgress: number;
}
interface ClusterLoadReport {
callsDropped: Map<string, number>;
localityStats: ClusterLocalityStats[];
intervalStart: [number, number];
}
class ClusterLoadReportMap {
private statsMap: {
clusterName: string;
edsServiceName: string;
stats: ClusterLoadReport;
}[] = [];
get(
clusterName: string,
edsServiceName: string
): ClusterLoadReport | undefined {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
return statsObj.stats;
}
}
return undefined;
}
getOrCreate(clusterName: string, edsServiceName: string): ClusterLoadReport {
for (const statsObj of this.statsMap) {
if (
statsObj.clusterName === clusterName &&
statsObj.edsServiceName === edsServiceName
) {
return statsObj.stats;
}
}
const newStats: ClusterLoadReport = {
callsDropped: new Map<string, number>(),
localityStats: [],
intervalStart: process.hrtime(),
};
this.statsMap.push({
clusterName,
edsServiceName,
stats: newStats,
});
return newStats;
}
*entries(): IterableIterator<
[{ clusterName: string; edsServiceName: string }, ClusterLoadReport]
> {
for (const statsEntry of this.statsMap) {
yield [
{
clusterName: statsEntry.clusterName,
edsServiceName: statsEntry.edsServiceName,
},
statsEntry.stats,
];
}
}
}
interface XdsStreamState<ResponseType> {
versionInfo: string;
nonce: string;
getResourceNames(): string[];
/**
* Returns a string containing the error details if the message should be nacked,
* or null if it should be acked.
* @param responses
*/
handleResponses(responses: ResponseType[]): string | null;
reportStreamError(status: StatusObject): void;
}
class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
public versionInfo = '';
public nonce = '';
private watchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private latestResponses: ClusterLoadAssignment__Output[] = [];
constructor(private updateResourceNames: () => void) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param edsServiceName
* @param watcher
*/
addWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
let watchersEntry = this.watchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(
edsServiceName: string,
watcher: Watcher<ClusterLoadAssignment__Output>
): void {
const watchersEntry = this.watchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateResponse(message: ClusterLoadAssignment__Output) {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
handleMissingNames(allEdsServiceNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allEdsServiceNames.has(edsServiceName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: ClusterLoadAssignment__Output[]) {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
this.handleMissingNames(allClusterNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class CdsState implements XdsStreamState<Cluster__Output> {
versionInfo = '';
nonce = '';
private watchers: Map<string, Watcher<Cluster__Output>[]> = new Map<
string,
Watcher<Cluster__Output>[]
>();
private latestResponses: Cluster__Output[] = [];
constructor(
private edsState: EdsState,
private updateResourceNames: () => void
) {}
/**
* Add the watcher to the watcher list. Returns true if the list of resource
* names has changed, and false otherwise.
* @param clusterName
* @param watcher
*/
addWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
let watchersEntry = this.watchers.get(clusterName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.watchers.set(clusterName, watchersEntry);
}
watchersEntry.push(watcher);
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestResponses) {
if (message.name === clusterName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
}
}
if (addedServiceName) {
this.updateResourceNames();
}
}
removeWatcher(clusterName: string, watcher: Watcher<Cluster__Output>): void {
const watchersEntry = this.watchers.get(clusterName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.watchers.delete(clusterName);
}
}
if (removedServiceName) {
this.updateResourceNames();
}
}
getResourceNames(): string[] {
return Array.from(this.watchers.keys());
}
private validateResponse(message: Cluster__Output): boolean {
if (message.type !== 'EDS') {
return false;
}
if (!message.eds_cluster_config?.eds_config?.ads) {
return false;
}
if (message.lb_policy !== 'ROUND_ROBIN') {
return false;
}
if (message.lrs_server) {
if (!message.lrs_server.self) {
return false;
}
}
return true;
}
/**
* Given a list of edsServiceNames (which may actually be the cluster name),
* for each watcher watching a name not on the list, call that watcher's
* onResourceDoesNotExist method.
* @param allClusterNames
*/
private handleMissingNames(allClusterNames: Set<string>) {
for (const [edsServiceName, watcherList] of this.watchers.entries()) {
if (!allClusterNames.has(edsServiceName)) {
for (const watcher of watcherList) {
watcher.onResourceDoesNotExist();
}
}
}
}
handleResponses(responses: Cluster__Output[]): string | null {
for (const message of responses) {
if (!this.validateResponse(message)) {
return 'Cluster validation failed';
}
}
this.latestResponses = responses;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
edsServiceName === '' ? message.name : edsServiceName
);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
}
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
}
reportStreamError(status: StatusObject): void {
for (const watcherList of this.watchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
}
}
}
}
class RdsState implements XdsStreamState<RouteConfiguration__Output> {
versionInfo = '';
nonce = '';
private routeConfigName: string | null = null;
constructor(
private watcher: Watcher<ServiceConfig>,
private updateResouceNames: () => void
) {}
getResourceNames(): string[] {
return this.routeConfigName ? [this.routeConfigName] : [];
}
handleSingleMessage(message: RouteConfiguration__Output) {
for (const virtualHost of message.virtual_hosts) {
if (virtualHost.domains.indexOf(this.routeConfigName!) >= 0) {
const route = virtualHost.routes[virtualHost.routes.length - 1];
if (route.match?.prefix === '' && route.route?.cluster) {
this.watcher.onValidUpdate({
methodConfig: [],
loadBalancingConfig: [
{
name: 'cds',
cds: {
cluster: route.route.cluster,
},
},
],
});
break;
}
}
}
/* If none of the routes match the one we are looking for, bubble up an
* error. */
this.watcher.onResourceDoesNotExist();
}
handleResponses(responses: RouteConfiguration__Output[]): string | null {
if (this.routeConfigName !== null) {
for (const message of responses) {
if (message.name === this.routeConfigName) {
this.handleSingleMessage(message);
return null;
}
}
}
return null;
}
setRouteConfigName(name: string | null) {
const oldName = this.routeConfigName;
this.routeConfigName = name;
if (name !== oldName) {
this.updateResouceNames();
}
}
reportStreamError(status: StatusObject): void {
this.watcher.onTransientError(status);
}
}
class LdsState implements XdsStreamState<Listener__Output> {
versionInfo = '';
nonce = '';
constructor(private targetName: string, private rdsState: RdsState) {}
getResourceNames(): string[] {
return [this.targetName];
}
private validateResponse(message: Listener__Output): boolean {
if (
!(
message.api_listener?.api_listener &&
protoLoader.isAnyExtension(message.api_listener.api_listener) &&
message.api_listener?.api_listener['@type'] ===
HTTP_CONNECTION_MANGER_TYPE_URL
)
) {
return false;
}
const httpConnectionManager = message.api_listener
?.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
return !!httpConnectionManager.rds?.config_source?.ads;
case 'route_config':
return true;
}
return false;
}
handleResponses(responses: Listener__Output[]): string | null {
for (const message of responses) {
if (message.name === this.targetName) {
if (this.validateResponse(message)) {
// The validation step ensures that this is correct
const httpConnectionManager = message.api_listener!
.api_listener as protoLoader.AnyExtension &
HttpConnectionManager__Output;
switch (httpConnectionManager.route_specifier) {
case 'rds':
this.rdsState.setRouteConfigName(
httpConnectionManager.rds!.route_config_name
);
break;
case 'route_config':
this.rdsState.setRouteConfigName(null);
this.rdsState.handleSingleMessage(
httpConnectionManager.route_config!
);
break;
default:
// The validation rules should prevent this
}
} else {
return 'Listener validation failed';
}
}
}
throw new Error('Method not implemented.');
}
reportStreamError(status: StatusObject): void {
// Nothing to do here
}
}
interface AdsState {
[EDS_TYPE_URL]: EdsState;
[CDS_TYPE_URL]: CdsState;
[RDS_TYPE_URL]: RdsState;
[LDS_TYPE_URL]: LdsState;
}
/**
* Map type URLs to their corresponding message types
*/
type OutputType<T extends AdsTypeUrl> = T extends EdsTypeUrl
? ClusterLoadAssignment__Output
: T extends CdsTypeUrl
? Cluster__Output
: T extends RdsTypeUrl
? RouteConfiguration__Output
: Listener__Output;
function getResponseMessages<T extends AdsTypeUrl>(
typeUrl: T,
resources: Any__Output[]
): OutputType<T>[] {
const result: OutputType<T>[] = [];
for (const resource of resources) {
if (protoLoader.isAnyExtension(resource) && resource['@type'] === typeUrl) {
result.push(resource as protoLoader.AnyExtension & OutputType<T>);
} else {
throw new Error(
`Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
);
}
}
return result;
}
export class XdsClient {
private node: Node | null = null;
private client: AggregatedDiscoveryServiceClient | null = null;
private adsNode: Node | null = null;
private adsClient: AggregatedDiscoveryServiceClient | null = null;
private adsCall: ClientDuplexStream<

@@ -108,17 +706,39 @@ DiscoveryRequest,

private lrsNode: Node | null = null;
private lrsClient: LoadReportingServiceClient | null = null;
private lrsCall: ClientDuplexStream<
LoadStatsRequest,
LoadStatsResponse__Output
> | null = null;
private latestLrsSettings: LoadStatsResponse__Output | null = null;
private clusterStatsMap: ClusterLoadReportMap = new ClusterLoadReportMap();
private statsTimer: NodeJS.Timer;
private hasShutdown = false;
private endpointWatchers: Map<
string,
Watcher<ClusterLoadAssignment__Output>[]
> = new Map<string, Watcher<ClusterLoadAssignment__Output>[]>();
private lastEdsVersionInfo = '';
private lastEdsNonce = '';
private latestEdsResponses: ClusterLoadAssignment__Output[] = [];
private adsState: AdsState;
constructor(
private targetName: string,
private serviceConfigWatcher: Watcher<ServiceConfig>,
targetName: string,
serviceConfigWatcher: Watcher<ServiceConfig>,
channelOptions: ChannelOptions
) {
const edsState = new EdsState(() => {
this.updateNames(EDS_TYPE_URL);
});
const cdsState = new CdsState(edsState, () => {
this.updateNames(CDS_TYPE_URL);
});
const rdsState = new RdsState(serviceConfigWatcher, () => {
this.updateNames(RDS_TYPE_URL);
});
const ldsState = new LdsState(targetName, rdsState);
this.adsState = {
[EDS_TYPE_URL]: edsState,
[CDS_TYPE_URL]: cdsState,
[RDS_TYPE_URL]: rdsState,
[LDS_TYPE_URL]: ldsState,
};
const channelArgs = { ...channelOptions };

@@ -148,3 +768,3 @@ const channelArgsToRemove = [

}
this.node = {
const node: Node = {
...bootstrapInfo.node,

@@ -154,3 +774,11 @@ build_version: `gRPC Node Pure JS ${clientVersion}`,

};
this.client = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
this.adsNode = {
...node,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...node,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
this.adsClient = new protoDefinitions.envoy.service.discovery.v2.AggregatedDiscoveryService(
bootstrapInfo.xdsServers[0].serverUri,

@@ -161,2 +789,9 @@ createGoogleDefaultCredentials(),

this.maybeStartAdsStream();
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v2.LoadReportingService(
bootstrapInfo.xdsServers[0].serverUri,
createGoogleDefaultCredentials(),
channelArgs
);
this.maybeStartLrsStream();
},

@@ -173,4 +808,48 @@ (error) => {

);
this.statsTimer = setInterval(() => {}, 0);
clearInterval(this.statsTimer);
}
private handleAdsResponse(message: DiscoveryResponse__Output) {
let errorString: string | null;
/* The cases in this switch statement look redundant but separating them
* out like this is necessary for the typechecker to validate the types
* as narrowly as we need it to. */
switch (message.type_url) {
case EDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
);
break;
case CDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
);
break;
case RDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
);
break;
case LDS_TYPE_URL:
errorString = this.adsState[message.type_url].handleResponses(
getResponseMessages(message.type_url, message.resources)
);
break;
default:
errorString = `Unknown type_url ${message.type_url}`;
}
if (errorString === null) {
/* errorString can only be null in one of the first 4 cases, which
* implies that message.type_url is one of the 4 known type URLs, which
* means that this type assertion is valid. */
const typeUrl = message.type_url as AdsTypeUrl;
this.adsState[typeUrl].nonce = message.nonce;
this.adsState[typeUrl].versionInfo = message.version_info;
this.ack(typeUrl);
} else {
this.nack(message.type_url, errorString);
}
}
/**

@@ -181,3 +860,3 @@ * Start the ADS stream if the client exists and there is not already an

private maybeStartAdsStream() {
if (this.client === null) {
if (this.adsClient === null) {
return;

@@ -191,46 +870,5 @@ }

}
this.adsCall = this.client.StreamAggregatedResources();
this.adsCall = this.adsClient.StreamAggregatedResources();
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
switch (message.type_url) {
case EDS_TYPE_URL: {
const edsResponses: ClusterLoadAssignment__Output[] = [];
for (const resource of message.resources) {
if (
protoLoader.isAnyExtension(resource) &&
resource['@type'] === EDS_TYPE_URL
) {
const resp = resource as protoLoader.AnyExtension &
ClusterLoadAssignment__Output;
if (!this.validateEdsResponse(resp)) {
this.nackEds('ClusterLoadAssignment validation failed');
return;
}
edsResponses.push(resp);
} else {
this.nackEds(
`Invalid resource type ${
protoLoader.isAnyExtension(resource)
? resource['@type']
: resource.type_url
}`
);
return;
}
}
for (const message of edsResponses) {
this.handleEdsResponse(message);
}
this.lastEdsVersionInfo = message.version_info;
this.lastEdsNonce = message.nonce;
this.latestEdsResponses = edsResponses;
this.ackEds();
break;
}
default:
this.nackUnknown(
message.type_url,
message.version_info,
message.nonce
);
}
this.handleAdsResponse(message);
});

@@ -248,60 +886,57 @@ this.adsCall.on('error', (error: ServiceError) => {

});
const endpointWatcherNames = Array.from(this.endpointWatchers.keys());
if (endpointWatcherNames.length > 0) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: endpointWatcherNames,
});
}
}
private nackUnknown(typeUrl: string, versionInfo: string, nonce: string) {
if (!this.adsCall) {
return;
const allTypeUrls: AdsTypeUrl[] = [
EDS_TYPE_URL,
CDS_TYPE_URL,
RDS_TYPE_URL,
LDS_TYPE_URL,
];
for (const typeUrl of allTypeUrls) {
const state = this.adsState[typeUrl];
state.nonce = '';
state.versionInfo = '';
if (state.getResourceNames().length > 0) {
this.updateNames(typeUrl);
}
}
this.adsCall.write({
node: this.node!,
type_url: typeUrl,
version_info: versionInfo,
response_nonce: nonce,
error_detail: {
message: `Unknown type_url ${typeUrl}`,
},
});
}
/**
* Acknowledge an EDS update. This should be called after the local nonce and
* Acknowledge an update. This should be called after the local nonce and
* version info are updated so that it sends the post-update values.
*/
private ackEds() {
if (!this.adsCall) {
return;
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
});
ack(typeUrl: AdsTypeUrl) {
this.updateNames(typeUrl);
}
/**
* Reject an EDS update. This should be called without updating the local
* Reject an update. This should be called without updating the local
* nonce and version info.
*/
private nackEds(message: string) {
if (!this.adsCall) {
return;
private nack(typeUrl: string, message: string) {
let resourceNames: string[];
let nonce: string;
let versionInfo: string;
switch (typeUrl) {
case EDS_TYPE_URL:
case CDS_TYPE_URL:
case RDS_TYPE_URL:
case LDS_TYPE_URL:
resourceNames = this.adsState[typeUrl].getResourceNames();
nonce = this.adsState[typeUrl].nonce;
versionInfo = this.adsState[typeUrl].versionInfo;
break;
default:
resourceNames = [];
nonce = '';
versionInfo = '';
}
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
this.adsCall?.write({
node: this.adsNode!,
type_url: typeUrl,
resource_names: resourceNames,
response_nonce: nonce,
version_info: versionInfo,
error_detail: {
message,
message: message,
},

@@ -311,51 +946,136 @@ });

/**
* Validate the ClusterLoadAssignment object by these rules:
* https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md#clusterloadassignment-proto
* @param message
*/
private validateEdsResponse(message: ClusterLoadAssignment__Output): boolean {
for (const endpoint of message.endpoints) {
for (const lb of endpoint.lb_endpoints) {
const socketAddress = lb.endpoint?.address?.socket_address;
if (!socketAddress) {
return false;
}
if (socketAddress.port_specifier !== 'port_value') {
return false;
}
if (!(isIPv4(socketAddress.address) || isIPv6(socketAddress.address))) {
return false;
}
}
}
return true;
private updateNames(typeUrl: AdsTypeUrl) {
this.adsCall?.write({
node: this.adsNode!,
type_url: typeUrl,
resource_names: this.adsState[typeUrl].getResourceNames(),
response_nonce: this.adsState[typeUrl].nonce,
version_info: this.adsState[typeUrl].versionInfo,
});
}
private handleEdsResponse(message: ClusterLoadAssignment__Output) {
const watchers = this.endpointWatchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message);
}
private reportStreamError(status: StatusObject) {
this.adsState[EDS_TYPE_URL].reportStreamError(status);
this.adsState[CDS_TYPE_URL].reportStreamError(status);
this.adsState[RDS_TYPE_URL].reportStreamError(status);
this.adsState[LDS_TYPE_URL].reportStreamError(status);
}
private updateEdsNames() {
if (this.adsCall) {
this.adsCall.write({
node: this.node!,
type_url: EDS_TYPE_URL,
resource_names: Array.from(this.endpointWatchers.keys()),
response_nonce: this.lastEdsNonce,
version_info: this.lastEdsVersionInfo,
});
private maybeStartLrsStream() {
if (!this.lrsClient) {
return;
}
if (this.lrsCall) {
return;
}
if (this.hasShutdown) {
return;
}
this.lrsCall = this.lrsClient.streamLoadStats();
this.lrsCall.on('data', (message: LoadStatsResponse__Output) => {
if (
message.load_reporting_interval?.seconds !==
this.latestLrsSettings?.load_reporting_interval?.seconds ||
message.load_reporting_interval?.nanos !==
this.latestLrsSettings?.load_reporting_interval?.nanos
) {
/* Only reset the timer if the interval has changed or was not set
* before. */
clearInterval(this.statsTimer);
/* Convert a google.protobuf.Duration to a number of milliseconds for
* use with setInterval. */
const loadReportingIntervalMs =
Number.parseInt(message.load_reporting_interval!.seconds) * 1000 +
message.load_reporting_interval!.nanos / 1_000_000;
setInterval(() => {
this.sendStats();
}, loadReportingIntervalMs);
}
this.latestLrsSettings = message;
});
this.lrsCall.on('error', (error: ServiceError) => {
trace(
'LRS stream ended. code=' + error.code + ' details= ' + error.details
);
this.lrsCall = null;
clearInterval(this.statsTimer);
/* Connection backoff is handled by the client object, so we can
* immediately start a new request to indicate that it should try to
* reconnect */
this.maybeStartAdsStream();
});
this.lrsCall.write({
node: this.lrsNode!,
});
}
private reportStreamError(status: StatusObject) {
for (const watcherList of this.endpointWatchers.values()) {
for (const watcher of watcherList) {
watcher.onTransientError(status);
private sendStats() {
if (!this.lrsCall) {
return;
}
const clusterStats: ClusterStats[] = [];
for (const [
{ clusterName, edsServiceName },
stats,
] of this.clusterStatsMap.entries()) {
if (
this.latestLrsSettings!.send_all_clusters ||
this.latestLrsSettings!.clusters.indexOf(clusterName) > 0
) {
const upstreamLocalityStats: UpstreamLocalityStats[] = [];
for (const localityStats of stats.localityStats) {
// Skip localities with 0 requests
if (
localityStats.callsStarted > 0 ||
localityStats.callsSucceeded > 0 ||
localityStats.callsFailed > 0
) {
upstreamLocalityStats.push({
locality: localityStats.locality,
total_issued_requests: localityStats.callsStarted,
total_successful_requests: localityStats.callsSucceeded,
total_error_requests: localityStats.callsFailed,
total_requests_in_progress: localityStats.callsInProgress,
});
localityStats.callsStarted = 0;
localityStats.callsSucceeded = 0;
localityStats.callsFailed = 0;
}
}
const droppedRequests: _envoy_api_v2_endpoint_ClusterStats_DroppedRequests[] = [];
let totalDroppedRequests = 0;
for (const [category, count] of stats.callsDropped.entries()) {
if (count > 0) {
droppedRequests.push({
category,
dropped_count: count,
});
totalDroppedRequests += count;
}
}
// Clear out dropped call stats after sending them
stats.callsDropped.clear();
const interval = process.hrtime(stats.intervalStart);
stats.intervalStart = process.hrtime();
// Skip clusters with 0 requests
if (upstreamLocalityStats.length > 0 || totalDroppedRequests > 0) {
clusterStats.push({
cluster_name: clusterName,
cluster_service_name: edsServiceName,
dropped_requests: droppedRequests,
total_dropped_requests: totalDroppedRequests,
upstream_locality_stats: upstreamLocalityStats,
load_report_interval: {
seconds: interval[0],
nanos: interval[1],
},
});
}
}
}
// Also do the same for other types of watchers when those are implemented
this.lrsCall.write({
node: this.lrsNode!,
cluster_stats: clusterStats,
});
}

@@ -368,25 +1088,3 @@

trace('Watcher added for endpoint ' + edsServiceName);
let watchersEntry = this.endpointWatchers.get(edsServiceName);
let addedServiceName = false;
if (watchersEntry === undefined) {
addedServiceName = true;
watchersEntry = [];
this.endpointWatchers.set(edsServiceName, watchersEntry);
}
watchersEntry.push(watcher);
if (addedServiceName) {
this.updateEdsNames();
}
/* If we have already received an update for the requested edsServiceName,
* immediately pass that update along to the watcher */
for (const message of this.latestEdsResponses) {
if (message.cluster_name === edsServiceName) {
/* These updates normally occur asynchronously, so we ensure that
* the same happens here */
process.nextTick(() => {
watcher.onValidUpdate(message);
});
}
}
this.adsState[EDS_TYPE_URL].addWatcher(edsServiceName, watcher);
}

@@ -399,17 +1097,95 @@

trace('Watcher removed for endpoint ' + edsServiceName);
const watchersEntry = this.endpointWatchers.get(edsServiceName);
let removedServiceName = false;
if (watchersEntry !== undefined) {
const entryIndex = watchersEntry.indexOf(watcher);
if (entryIndex >= 0) {
watchersEntry.splice(entryIndex, 1);
this.adsState[EDS_TYPE_URL].removeWatcher(edsServiceName, watcher);
}
addClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher added for cluster ' + clusterName);
this.adsState[CDS_TYPE_URL].addWatcher(clusterName, watcher);
}
removeClusterWatcher(clusterName: string, watcher: Watcher<Cluster__Output>) {
trace('Watcher removed for endpoint ' + clusterName);
this.adsState[CDS_TYPE_URL].removeWatcher(clusterName, watcher);
}
/**
*
* @param lrsServer The target name of the server to send stats to. An empty
* string indicates that the default LRS client should be used. Currently
* only the empty string is supported here.
* @param clusterName
* @param edsServiceName
*/
addClusterDropStats(
lrsServer: string,
clusterName: string,
edsServiceName: string
): XdsClusterDropStats {
if (lrsServer !== '') {
return {
addCallDropped: (category) => {},
};
}
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
return {
addCallDropped: (category) => {
const prevCount = clusterStats.callsDropped.get(category) ?? 0;
clusterStats.callsDropped.set(category, prevCount + 1);
},
};
}
addClusterLocalityStats(
lrsServer: string,
clusterName: string,
edsServiceName: string,
locality: Locality__Output
): XdsClusterLocalityStats {
if (lrsServer !== '') {
return {
addCallStarted: () => {},
addCallFinished: (fail) => {},
};
}
const clusterStats = this.clusterStatsMap.getOrCreate(
clusterName,
edsServiceName
);
let localityStats: ClusterLocalityStats | null = null;
for (const statsObj of clusterStats.localityStats) {
if (localityEqual(locality, statsObj.locality)) {
localityStats = statsObj;
break;
}
if (watchersEntry.length === 0) {
removedServiceName = true;
this.endpointWatchers.delete(edsServiceName);
}
}
if (removedServiceName) {
this.updateEdsNames();
if (localityStats === null) {
localityStats = {
locality: locality,
callsInProgress: 0,
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
};
clusterStats.localityStats.push(localityStats);
}
/* Help the compiler understand that this object is always non-null in the
* closure */
const finalLocalityStats: ClusterLocalityStats = localityStats;
return {
addCallStarted: () => {
finalLocalityStats.callsStarted += 1;
finalLocalityStats.callsInProgress += 1;
},
addCallFinished: (fail) => {
if (fail) {
finalLocalityStats.callsFailed += 1;
} else {
finalLocalityStats.callsSucceeded += 1;
}
finalLocalityStats.callsInProgress -= 1;
},
};
}

@@ -419,5 +1195,7 @@

this.adsCall?.cancel();
this.client?.close();
this.adsClient?.close();
this.lrsCall?.cancel();
this.lrsClient?.close();
this.hasShutdown = true;
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc