Socket
Socket
Sign inDemoInstall

@grpc/grpc-js

Package Overview
Dependencies
Maintainers
3
Versions
175
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@grpc/grpc-js - npm Package Compare versions

Comparing version 1.9.14 to 1.10.0

build/src/server-interceptors.d.ts

2

build/src/call-interface.d.ts

@@ -20,3 +20,3 @@ /// <reference types="node" />

export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata: Metadata | null;
metadata?: Metadata | null | undefined;
};

@@ -23,0 +23,0 @@ export declare const enum WriteFlags {

@@ -94,4 +94,4 @@ "use strict";

class InsecureChannelCredentialsImpl extends ChannelCredentials {
constructor(callCredentials) {
super(callCredentials);
constructor() {
super();
}

@@ -98,0 +98,0 @@ compose(callCredentials) {

@@ -38,2 +38,3 @@ import { CompressionAlgorithms } from './compression-algorithms';

'grpc-node.tls_enable_trace'?: number;
'grpc.lb.ring_hash.ring_size_cap'?: number;
[key: string]: any;

@@ -72,3 +73,4 @@ }

'grpc-node.tls_enable_trace': boolean;
'grpc.lb.ring_hash.ring_size_cap': boolean;
};
export declare function channelOptionsEqual(options1: ChannelOptions, options2: ChannelOptions): boolean;

@@ -51,2 +51,3 @@ "use strict";

'grpc-node.tls_enable_trace': true,
'grpc.lb.ring_hash.ring_size_cap': true,
};

@@ -53,0 +54,0 @@ function channelOptionsEqual(options1, options2) {

@@ -5,6 +5,6 @@ export { trace, log } from './logging';

export { Duration, durationToMs } from './duration';
export { ServiceConfig, MethodConfig, RetryPolicy } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig, } from './load-balancer';
export { SubchannelAddress, subchannelAddressToString, } from './subchannel-address';
export { LoadBalancer, TypedLoadBalancingConfig, ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType, selectLbConfigFromList, parseLoadBalancingConfig, isLoadBalancerNameRegistered, } from './load-balancer';
export { LeafLoadBalancer } from './load-balancer-pick-first';
export { SubchannelAddress, subchannelAddressToString, Endpoint, endpointToString, endpointHasAddress, EndpointMap, } from './subchannel-address';
export { ChildLoadBalancerHandler } from './load-balancer-child-handler';

@@ -16,3 +16,3 @@ export { Picker, UnavailablePicker, QueuePicker, PickResult, PickArgs, PickResultType, } from './picker';

export { registerAdminService } from './admin';
export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener, } from './subchannel-interface';
export { OutlierDetectionLoadBalancingConfig, SuccessRateEjectionConfig, FailurePercentageEjectionConfig, } from './load-balancer-outlier-detection';
export { SubchannelInterface, BaseSubchannelWrapper, ConnectivityStateListener, HealthListener, } from './subchannel-interface';
export { OutlierDetectionRawConfig, SuccessRateEjectionConfig, FailurePercentageEjectionConfig, } from './load-balancer-outlier-detection';
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.OutlierDetectionLoadBalancingConfig = exports.BaseSubchannelWrapper = exports.registerAdminService = exports.FilterStackFactory = exports.BaseFilter = exports.PickResultType = exports.QueuePicker = exports.UnavailablePicker = exports.ChildLoadBalancerHandler = exports.subchannelAddressToString = exports.validateLoadBalancingConfig = exports.getFirstUsableConfig = exports.registerLoadBalancerType = exports.createChildChannelControlHelper = exports.BackoffTimeout = exports.durationToMs = exports.uriToString = exports.createResolver = exports.registerResolver = exports.log = exports.trace = void 0;
exports.BaseSubchannelWrapper = exports.registerAdminService = exports.FilterStackFactory = exports.BaseFilter = exports.PickResultType = exports.QueuePicker = exports.UnavailablePicker = exports.ChildLoadBalancerHandler = exports.EndpointMap = exports.endpointHasAddress = exports.endpointToString = exports.subchannelAddressToString = exports.LeafLoadBalancer = exports.isLoadBalancerNameRegistered = exports.parseLoadBalancingConfig = exports.selectLbConfigFromList = exports.registerLoadBalancerType = exports.createChildChannelControlHelper = exports.BackoffTimeout = exports.durationToMs = exports.uriToString = exports.createResolver = exports.registerResolver = exports.log = exports.trace = void 0;
var logging_1 = require("./logging");

@@ -19,6 +19,12 @@ Object.defineProperty(exports, "trace", { enumerable: true, get: function () { return logging_1.trace; } });

Object.defineProperty(exports, "registerLoadBalancerType", { enumerable: true, get: function () { return load_balancer_1.registerLoadBalancerType; } });
Object.defineProperty(exports, "getFirstUsableConfig", { enumerable: true, get: function () { return load_balancer_1.getFirstUsableConfig; } });
Object.defineProperty(exports, "validateLoadBalancingConfig", { enumerable: true, get: function () { return load_balancer_1.validateLoadBalancingConfig; } });
Object.defineProperty(exports, "selectLbConfigFromList", { enumerable: true, get: function () { return load_balancer_1.selectLbConfigFromList; } });
Object.defineProperty(exports, "parseLoadBalancingConfig", { enumerable: true, get: function () { return load_balancer_1.parseLoadBalancingConfig; } });
Object.defineProperty(exports, "isLoadBalancerNameRegistered", { enumerable: true, get: function () { return load_balancer_1.isLoadBalancerNameRegistered; } });
var load_balancer_pick_first_1 = require("./load-balancer-pick-first");
Object.defineProperty(exports, "LeafLoadBalancer", { enumerable: true, get: function () { return load_balancer_pick_first_1.LeafLoadBalancer; } });
var subchannel_address_1 = require("./subchannel-address");
Object.defineProperty(exports, "subchannelAddressToString", { enumerable: true, get: function () { return subchannel_address_1.subchannelAddressToString; } });
Object.defineProperty(exports, "endpointToString", { enumerable: true, get: function () { return subchannel_address_1.endpointToString; } });
Object.defineProperty(exports, "endpointHasAddress", { enumerable: true, get: function () { return subchannel_address_1.endpointHasAddress; } });
Object.defineProperty(exports, "EndpointMap", { enumerable: true, get: function () { return subchannel_address_1.EndpointMap; } });
var load_balancer_child_handler_1 = require("./load-balancer-child-handler");

@@ -38,4 +44,2 @@ Object.defineProperty(exports, "ChildLoadBalancerHandler", { enumerable: true, get: function () { return load_balancer_child_handler_1.ChildLoadBalancerHandler; } });

Object.defineProperty(exports, "BaseSubchannelWrapper", { enumerable: true, get: function () { return subchannel_interface_1.BaseSubchannelWrapper; } });
var load_balancer_outlier_detection_1 = require("./load-balancer-outlier-detection");
Object.defineProperty(exports, "OutlierDetectionLoadBalancingConfig", { enumerable: true, get: function () { return load_balancer_outlier_detection_1.OutlierDetectionLoadBalancingConfig; } });
//# sourceMappingURL=experimental.js.map

@@ -7,3 +7,3 @@ import { ClientDuplexStream, ClientReadableStream, ClientUnaryCall, ClientWritableStream, ServiceError } from './call';

import { ConnectivityState } from './connectivity-state';
import { ChannelCredentials } from './channel-credentials';
import { ChannelCredentials, VerifyOptions } from './channel-credentials';
import { CallOptions, Client, ClientOptions, CallInvocationTransformer, CallProperties, UnaryCallback } from './client';

@@ -13,3 +13,3 @@ import { LogVerbosity, Status, Propagate } from './constants';

import { Metadata, MetadataOptions, MetadataValue } from './metadata';
import { Server, UntypedHandleCall, UntypedServiceImplementation } from './server';
import { Server, ServerOptions, UntypedHandleCall, UntypedServiceImplementation } from './server';
import { KeyCertPair, ServerCredentials } from './server-credentials';

@@ -56,3 +56,3 @@ import { StatusBuilder } from './status-builder';

export declare const waitForClientReady: (client: Client, deadline: Date | number, callback: (error?: Error) => void) => void;
export { sendUnaryData, ChannelCredentials, CallCredentials, Deadline, Serialize as serialize, Deserialize as deserialize, ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream, CallOptions, MethodDefinition, StatusObject, ServiceError, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream, ServerErrorResponse, ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, };
export { sendUnaryData, ChannelCredentials, CallCredentials, Deadline, Serialize as serialize, Deserialize as deserialize, ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream, CallOptions, MethodDefinition, StatusObject, ServiceError, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream, ServerErrorResponse, ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, VerifyOptions };
/**** Server ****/

@@ -66,3 +66,3 @@ export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall, handleClientStreamingCall, };

export declare const setLogVerbosity: (verbosity: LogVerbosity) => void;
export { Server };
export { Server, ServerOptions };
export { ServerCredentials };

@@ -78,4 +78,6 @@ export { KeyCertPair };

export { addAdminServicesToServer } from './admin';
export { ServiceConfig, LoadBalancingConfig, MethodConfig, RetryPolicy } from './service-config';
export { ServerListener, FullServerListener, ServerListenerBuilder, Responder, FullResponder, ResponderBuilder, ServerInterceptingCallInterface, ServerInterceptingCall, ServerInterceptor } from './server-interceptors';
import * as experimental from './experimental';
export { experimental };
import { Deadline } from './deadline';

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.experimental = exports.addAdminServicesToServer = exports.getChannelzHandlers = exports.getChannelzServiceDefinition = exports.InterceptorConfigurationError = exports.InterceptingCall = exports.RequesterBuilder = exports.ListenerBuilder = exports.StatusBuilder = exports.getClientChannel = exports.ServerCredentials = exports.Server = exports.setLogVerbosity = exports.setLogger = exports.load = exports.loadObject = exports.CallCredentials = exports.ChannelCredentials = exports.waitForClientReady = exports.closeClient = exports.Channel = exports.makeGenericClientConstructor = exports.makeClientConstructor = exports.loadPackageDefinition = exports.Client = exports.compressionAlgorithms = exports.propagate = exports.connectivityState = exports.status = exports.logVerbosity = exports.Metadata = exports.credentials = void 0;
exports.experimental = exports.ServerInterceptingCall = exports.ResponderBuilder = exports.ServerListenerBuilder = exports.addAdminServicesToServer = exports.getChannelzHandlers = exports.getChannelzServiceDefinition = exports.InterceptorConfigurationError = exports.InterceptingCall = exports.RequesterBuilder = exports.ListenerBuilder = exports.StatusBuilder = exports.getClientChannel = exports.ServerCredentials = exports.Server = exports.setLogVerbosity = exports.setLogger = exports.load = exports.loadObject = exports.CallCredentials = exports.ChannelCredentials = exports.waitForClientReady = exports.closeClient = exports.Channel = exports.makeGenericClientConstructor = exports.makeClientConstructor = exports.loadPackageDefinition = exports.Client = exports.compressionAlgorithms = exports.propagate = exports.connectivityState = exports.status = exports.logVerbosity = exports.Metadata = exports.credentials = void 0;
const call_credentials_1 = require("./call-credentials");

@@ -123,2 +123,6 @@ Object.defineProperty(exports, "CallCredentials", { enumerable: true, get: function () { return call_credentials_1.CallCredentials; } });

Object.defineProperty(exports, "addAdminServicesToServer", { enumerable: true, get: function () { return admin_1.addAdminServicesToServer; } });
var server_interceptors_1 = require("./server-interceptors");
Object.defineProperty(exports, "ServerListenerBuilder", { enumerable: true, get: function () { return server_interceptors_1.ServerListenerBuilder; } });
Object.defineProperty(exports, "ResponderBuilder", { enumerable: true, get: function () { return server_interceptors_1.ResponderBuilder; } });
Object.defineProperty(exports, "ServerInterceptingCall", { enumerable: true, get: function () { return server_interceptors_1.ServerInterceptingCall; } });
const experimental = require("./experimental");

@@ -125,0 +129,0 @@ exports.experimental = experimental;

@@ -82,2 +82,8 @@ import { ChannelCredentials } from './channel-credentials';

private readonly childrenTracker;
/**
* Randomly generated ID to be passed to the config selector, for use by
* ring_hash in xDS. An integer distributed approximately uniformly between
* 0 and MAX_SAFE_INTEGER.
*/
private readonly randomChannelId;
constructor(target: string, credentials: ChannelCredentials, options: ChannelOptions);

@@ -84,0 +90,0 @@ private getChannelzInfo;

@@ -104,2 +104,8 @@ "use strict";

this.childrenTracker = new channelz_1.ChannelzChildrenTracker();
/**
* Randomly generated ID to be passed to the config selector, for use by
* ring_hash in xDS. An integer distributed approximately uniformly between
* 0 and MAX_SAFE_INTEGER.
*/
this.randomChannelId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
if (typeof target !== 'string') {

@@ -344,3 +350,3 @@ throw new TypeError('Channel target must be a string');

type: 'SUCCESS',
config: this.configSelector(method, metadata),
config: this.configSelector(method, metadata, this.randomChannelId),
};

@@ -347,0 +353,0 @@ }

@@ -1,5 +0,7 @@

import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer';
import { SubchannelAddress } from './subchannel-address';
import { LoadBalancer, ChannelControlHelper, TypedLoadBalancingConfig } from './load-balancer';
import { Endpoint } from './subchannel-address';
import { ChannelOptions } from './channel-options';
export declare class ChildLoadBalancerHandler implements LoadBalancer {
private readonly channelControlHelper;
private readonly options;
private currentChild;

@@ -9,11 +11,11 @@ private pendingChild;

private ChildPolicyHelper;
constructor(channelControlHelper: ChannelControlHelper);
protected configUpdateRequiresNewPolicyInstance(oldConfig: LoadBalancingConfig, newConfig: LoadBalancingConfig): boolean;
constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions);
protected configUpdateRequiresNewPolicyInstance(oldConfig: TypedLoadBalancingConfig, newConfig: TypedLoadBalancingConfig): boolean;
/**
* Prerequisites: lbConfig !== null and lbConfig.name is registered
* @param addressList
* @param endpointList
* @param lbConfig
* @param attributes
*/
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: {
[key: string]: unknown;

@@ -20,0 +22,0 @@ }): void;

@@ -24,4 +24,5 @@ "use strict";

class ChildLoadBalancerHandler {
constructor(channelControlHelper) {
constructor(channelControlHelper, options) {
this.channelControlHelper = channelControlHelper;
this.options = options;
this.currentChild = null;

@@ -82,7 +83,7 @@ this.pendingChild = null;

* Prerequisites: lbConfig !== null and lbConfig.name is registered
* @param addressList
* @param endpointList
* @param lbConfig
* @param attributes
*/
updateAddressList(addressList, lbConfig, attributes) {
updateAddressList(endpointList, lbConfig, attributes) {
let childToUpdate;

@@ -93,3 +94,3 @@ if (this.currentChild === null ||

const newHelper = new this.ChildPolicyHelper(this);
const newChild = (0, load_balancer_1.createLoadBalancer)(lbConfig, newHelper);
const newChild = (0, load_balancer_1.createLoadBalancer)(lbConfig, newHelper, this.options);
newHelper.setChild(newChild);

@@ -117,3 +118,3 @@ if (this.currentChild === null) {

this.latestConfig = lbConfig;
childToUpdate.updateAddressList(addressList, lbConfig, attributes);
childToUpdate.updateAddressList(endpointList, lbConfig, attributes);
}

@@ -120,0 +121,0 @@ exitIdle() {

@@ -0,4 +1,7 @@

import { ChannelOptions } from './channel-options';
import { Duration } from './duration';
import { ChannelControlHelper } from './experimental';
import { LoadBalancer, LoadBalancingConfig } from './load-balancer';
import { SubchannelAddress } from './subchannel-address';
import { LoadBalancer, TypedLoadBalancingConfig } from './load-balancer';
import { Endpoint } from './subchannel-address';
import { LoadBalancingConfig } from './service-config';
export interface SuccessRateEjectionConfig {

@@ -16,3 +19,12 @@ readonly stdev_factor: number;

}
export declare class OutlierDetectionLoadBalancingConfig implements LoadBalancingConfig {
export interface OutlierDetectionRawConfig {
interval?: Duration;
base_ejection_time?: Duration;
max_ejection_time?: Duration;
max_ejection_percent?: number;
success_rate_ejection?: Partial<SuccessRateEjectionConfig>;
failure_percentage_ejection?: Partial<FailurePercentageEjectionConfig>;
child_policy: LoadBalancingConfig[];
}
export declare class OutlierDetectionLoadBalancingConfig implements TypedLoadBalancingConfig {
private readonly childPolicy;

@@ -25,3 +37,3 @@ private readonly intervalMs;

private readonly failurePercentageEjection;
constructor(intervalMs: number | null, baseEjectionTimeMs: number | null, maxEjectionTimeMs: number | null, maxEjectionPercent: number | null, successRateEjection: Partial<SuccessRateEjectionConfig> | null, failurePercentageEjection: Partial<FailurePercentageEjectionConfig> | null, childPolicy: LoadBalancingConfig[]);
constructor(intervalMs: number | null, baseEjectionTimeMs: number | null, maxEjectionTimeMs: number | null, maxEjectionPercent: number | null, successRateEjection: Partial<SuccessRateEjectionConfig> | null, failurePercentageEjection: Partial<FailurePercentageEjectionConfig> | null, childPolicy: TypedLoadBalancingConfig);
getLoadBalancerName(): string;

@@ -35,4 +47,3 @@ toJsonObject(): object;

getFailurePercentageEjectionConfig(): FailurePercentageEjectionConfig | null;
getChildPolicy(): LoadBalancingConfig[];
copyWithChildPolicy(childPolicy: LoadBalancingConfig[]): OutlierDetectionLoadBalancingConfig;
getChildPolicy(): TypedLoadBalancingConfig;
static createFromJson(obj: any): OutlierDetectionLoadBalancingConfig;

@@ -42,7 +53,7 @@ }

private childBalancer;
private addressMap;
private entryMap;
private latestConfig;
private ejectionTimer;
private timerStartTime;
constructor(channelControlHelper: ChannelControlHelper);
constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions);
private isCountingEnabled;

@@ -57,3 +68,3 @@ private getCurrentEjectionPercent;

private runChecks;
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: {
[key: string]: unknown;

@@ -60,0 +71,0 @@ }): void;

@@ -50,3 +50,5 @@ "use strict";

function validateFieldType(obj, fieldName, expectedType, objectName) {
if (fieldName in obj && typeof obj[fieldName] !== expectedType) {
if (fieldName in obj &&
obj[fieldName] !== undefined &&
typeof obj[fieldName] !== expectedType) {
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;

@@ -58,3 +60,3 @@ throw new Error(`outlier detection config ${fullFieldName} parse error: expected ${expectedType}, got ${typeof obj[fieldName]}`);

const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
if (fieldName in obj) {
if (fieldName in obj && obj[fieldName] !== undefined) {
if (!(0, duration_1.isDuration)(obj[fieldName])) {

@@ -74,3 +76,5 @@ throw new Error(`outlier detection config ${fullFieldName} parse error: expected Duration, got ${typeof obj[fieldName]}`);

validateFieldType(obj, fieldName, 'number', objectName);
if (fieldName in obj && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
if (fieldName in obj &&
obj[fieldName] !== undefined &&
!(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
throw new Error(`outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`);

@@ -82,4 +86,3 @@ }

this.childPolicy = childPolicy;
if (childPolicy.length > 0 &&
childPolicy[0].getLoadBalancerName() === 'pick_first') {
if (childPolicy.getLoadBalancerName() === 'pick_first') {
throw new Error('outlier_detection LB policy cannot have a pick_first child policy');

@@ -100,10 +103,13 @@ }

toJsonObject() {
var _a, _b;
return {
interval: (0, duration_1.msToDuration)(this.intervalMs),
base_ejection_time: (0, duration_1.msToDuration)(this.baseEjectionTimeMs),
max_ejection_time: (0, duration_1.msToDuration)(this.maxEjectionTimeMs),
max_ejection_percent: this.maxEjectionPercent,
success_rate_ejection: this.successRateEjection,
failure_percentage_ejection: this.failurePercentageEjection,
child_policy: this.childPolicy.map(policy => policy.toJsonObject()),
outlier_detection: {
interval: (0, duration_1.msToDuration)(this.intervalMs),
base_ejection_time: (0, duration_1.msToDuration)(this.baseEjectionTimeMs),
max_ejection_time: (0, duration_1.msToDuration)(this.maxEjectionTimeMs),
max_ejection_percent: this.maxEjectionPercent,
success_rate_ejection: (_a = this.successRateEjection) !== null && _a !== void 0 ? _a : undefined,
failure_percentage_ejection: (_b = this.failurePercentageEjection) !== null && _b !== void 0 ? _b : undefined,
child_policy: [this.childPolicy.toJsonObject()],
},
};

@@ -132,5 +138,2 @@ }

}
copyWithChildPolicy(childPolicy) {
return new OutlierDetectionLoadBalancingConfig(this.intervalMs, this.baseEjectionTimeMs, this.maxEjectionTimeMs, this.maxEjectionPercent, this.successRateEjection, this.failurePercentageEjection, childPolicy);
}
static createFromJson(obj) {

@@ -142,3 +145,4 @@ var _a;

validatePercentage(obj, 'max_ejection_percent');
if ('success_rate_ejection' in obj) {
if ('success_rate_ejection' in obj &&
obj.success_rate_ejection !== undefined) {
if (typeof obj.success_rate_ejection !== 'object') {

@@ -152,3 +156,4 @@ throw new Error('outlier detection config success_rate_ejection must be an object');

}
if ('failure_percentage_ejection' in obj) {
if ('failure_percentage_ejection' in obj &&
obj.failure_percentage_ejection !== undefined) {
if (typeof obj.failure_percentage_ejection !== 'object') {

@@ -162,3 +167,10 @@ throw new Error('outlier detection config failure_percentage_ejection must be an object');

}
return new OutlierDetectionLoadBalancingConfig(obj.interval ? (0, duration_1.durationToMs)(obj.interval) : null, obj.base_ejection_time ? (0, duration_1.durationToMs)(obj.base_ejection_time) : null, obj.max_ejection_time ? (0, duration_1.durationToMs)(obj.max_ejection_time) : null, (_a = obj.max_ejection_percent) !== null && _a !== void 0 ? _a : null, obj.success_rate_ejection, obj.failure_percentage_ejection, obj.child_policy.map(load_balancer_1.validateLoadBalancingConfig));
if (!('child_policy' in obj) || !Array.isArray(obj.child_policy)) {
throw new Error('outlier detection config child_policy must be an array');
}
const childPolicy = (0, load_balancer_1.selectLbConfigFromList)(obj.child_policy);
if (!childPolicy) {
throw new Error('outlier detection config child_policy: no valid recognized policy found');
}
return new OutlierDetectionLoadBalancingConfig(obj.interval ? (0, duration_1.durationToMs)(obj.interval) : null, obj.base_ejection_time ? (0, duration_1.durationToMs)(obj.base_ejection_time) : null, obj.max_ejection_time ? (0, duration_1.durationToMs)(obj.max_ejection_time) : null, (_a = obj.max_ejection_percent) !== null && _a !== void 0 ? _a : null, obj.success_rate_ejection, obj.failure_percentage_ejection, childPolicy);
}

@@ -171,42 +183,4 @@ }

this.mapEntry = mapEntry;
this.stateListeners = [];
this.ejected = false;
this.refCount = 0;
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState, keepaliveTime);
}
}
});
}
getConnectivityState() {
if (this.ejected) {
return connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE;
}
else {
return this.childSubchannelState;
}
}
/**
* Add a listener function to be called whenever the wrapper's
* connectivity state changes.
* @param listener
*/
addConnectivityStateListener(listener) {
this.stateListeners.push(listener);
}
/**
* Remove a listener previously added with `addConnectivityStateListener`
* @param listener A reference to a function previously passed to
* `addConnectivityStateListener`
*/
removeConnectivityStateListener(listener) {
const listenerIndex = this.stateListeners.indexOf(listener);
if (listenerIndex > -1) {
this.stateListeners.splice(listenerIndex, 1);
}
}
ref() {

@@ -229,12 +203,6 @@ this.child.ref();

eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, -1);
}
this.setHealthy(false);
}
uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
this.setHealthy(true);
}

@@ -312,4 +280,4 @@ getMapEntry() {

class OutlierDetectionLoadBalancer {
constructor(channelControlHelper) {
this.addressMap = new Map();
constructor(channelControlHelper, options) {
this.entryMap = new subchannel_address_1.EndpointMap();
this.latestConfig = null;

@@ -320,3 +288,3 @@ this.timerStartTime = null;

const originalSubchannel = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs);
const mapEntry = this.addressMap.get((0, subchannel_address_1.subchannelAddressToString)(subchannelAddress));
const mapEntry = this.entryMap.getForSubchannelAddress(subchannelAddress);
const subchannelWrapper = new OutlierDetectionSubchannelWrapper(originalSubchannel, mapEntry);

@@ -338,3 +306,3 @@ if ((mapEntry === null || mapEntry === void 0 ? void 0 : mapEntry.currentEjectionTimestamp) !== null) {

},
}));
}), options);
this.ejectionTimer = setInterval(() => { }, 0);

@@ -350,3 +318,3 @@ clearInterval(this.ejectionTimer);

let ejectionCount = 0;
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
if (mapEntry.currentEjectionTimestamp !== null) {

@@ -356,3 +324,3 @@ ejectionCount += 1;

}
return (ejectionCount * 100) / this.addressMap.size;
return (ejectionCount * 100) / this.entryMap.size;
}

@@ -372,7 +340,7 @@ runSuccessRateCheck(ejectionTimestamp) {

const successRates = [];
for (const [address, mapEntry] of this.addressMap) {
for (const [endpoint, mapEntry] of this.entryMap.entries()) {
const successes = mapEntry.counter.getLastSuccesses();
const failures = mapEntry.counter.getLastFailures();
trace('Stats for ' +
address +
(0, subchannel_address_1.endpointToString)(endpoint) +
': successes=' +

@@ -412,3 +380,3 @@ successes +

// Step 3
for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
// Step 3.i

@@ -457,3 +425,3 @@ if (this.getCurrentEjectionPercent() >=

let addressesWithTargetVolume = 0;
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
const successes = mapEntry.counter.getLastSuccesses();

@@ -469,3 +437,3 @@ const failures = mapEntry.counter.getLastFailures();

// Step 2
for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
// Step 2.i

@@ -514,3 +482,3 @@ if (this.getCurrentEjectionPercent() >=

switchAllBuckets() {
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
mapEntry.counter.switchBuckets();

@@ -535,3 +503,3 @@ }

this.runFailurePercentageCheck(ejectionTimestamp);
for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
if (mapEntry.currentEjectionTimestamp === null) {

@@ -555,14 +523,10 @@ if (mapEntry.ejectionTimeMultiplier > 0) {

}
updateAddressList(addressList, lbConfig, attributes) {
updateAddressList(endpointList, lbConfig, attributes) {
if (!(lbConfig instanceof OutlierDetectionLoadBalancingConfig)) {
return;
}
const subchannelAddresses = new Set();
for (const address of addressList) {
subchannelAddresses.add((0, subchannel_address_1.subchannelAddressToString)(address));
}
for (const address of subchannelAddresses) {
if (!this.addressMap.has(address)) {
trace('Adding map entry for ' + address);
this.addressMap.set(address, {
for (const endpoint of endpointList) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + (0, subchannel_address_1.endpointToString)(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),

@@ -575,10 +539,5 @@ currentEjectionTimestamp: null,

}
for (const key of this.addressMap.keys()) {
if (!subchannelAddresses.has(key)) {
trace('Removing map entry for ' + key);
this.addressMap.delete(key);
}
}
const childPolicy = (0, load_balancer_1.getFirstUsableConfig)(lbConfig.getChildPolicy(), true);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
this.entryMap.deleteMissing(endpointList);
const childPolicy = lbConfig.getChildPolicy();
this.childBalancer.updateAddressList(endpointList, childPolicy, attributes);
if (lbConfig.getSuccessRateEjectionConfig() ||

@@ -604,3 +563,3 @@ lbConfig.getFailurePercentageEjectionConfig()) {

clearTimeout(this.ejectionTimer);
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
this.uneject(mapEntry);

@@ -607,0 +566,0 @@ mapEntry.ejectionTimeMultiplier = 0;

@@ -1,4 +0,7 @@

import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer';
import { SubchannelAddress } from './subchannel-address';
export declare class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
import { LoadBalancer, ChannelControlHelper, TypedLoadBalancingConfig } from './load-balancer';
import { ConnectivityState } from './connectivity-state';
import { Picker } from './picker';
import { Endpoint } from './subchannel-address';
import { ChannelOptions } from './channel-options';
export declare class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
private readonly shuffleAddressList;

@@ -44,2 +47,3 @@ constructor(shuffleAddressList: boolean);

private subchannelStateListener;
private pickedSubchannelHealthListener;
/**

@@ -57,2 +61,3 @@ * Timer reference for the timer tracking when to start

private stickyTransientFailureMode;
private reportHealthStatus;
/**

@@ -76,3 +81,3 @@ * Indicates whether we called channelControlHelper.requestReresolution since

*/
constructor(channelControlHelper: ChannelControlHelper);
constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions);
private allChildrenHaveReportedTF;

@@ -94,3 +99,3 @@ private calculateAndReportNewState;

private connectToAddressList;
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig): void;
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig): void;
exitIdle(): void;

@@ -101,2 +106,26 @@ resetBackoff(): void;

}
/**
* This class handles the leaf load balancing operations for a single endpoint.
* It is a thin wrapper around a PickFirstLoadBalancer with a different API
* that more closely reflects how it will be used as a leaf balancer.
*/
export declare class LeafLoadBalancer {
private endpoint;
private pickFirstBalancer;
private latestState;
private latestPicker;
constructor(endpoint: Endpoint, channelControlHelper: ChannelControlHelper, options: ChannelOptions);
startConnecting(): void;
/**
* Update the endpoint associated with this LeafLoadBalancer to a new
* endpoint. Does not trigger connection establishment if a connection
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint: Endpoint): void;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;
getEndpoint(): Endpoint;
destroy(): void;
}
export declare function setup(): void;

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.setup = exports.PickFirstLoadBalancer = exports.shuffled = exports.PickFirstLoadBalancingConfig = void 0;
exports.setup = exports.LeafLoadBalancer = exports.PickFirstLoadBalancer = exports.shuffled = exports.PickFirstLoadBalancingConfig = void 0;
const load_balancer_1 = require("./load-balancer");

@@ -26,2 +26,4 @@ const connectivity_state_1 = require("./connectivity-state");

const constants_1 = require("./constants");
const subchannel_address_1 = require("./subchannel-address");
const net_1 = require("net");
const TRACER_NAME = 'pick_first';

@@ -98,2 +100,33 @@ function trace(text) {

exports.shuffled = shuffled;
/**
* Interleave addresses in addressList by family in accordance with RFC-8304 section 4
* @param addressList
* @returns
*/
function interleaveAddressFamilies(addressList) {
const result = [];
const ipv6Addresses = [];
const ipv4Addresses = [];
const ipv6First = (0, subchannel_address_1.isTcpSubchannelAddress)(addressList[0]) && (0, net_1.isIPv6)(addressList[0].host);
for (const address of addressList) {
if ((0, subchannel_address_1.isTcpSubchannelAddress)(address) && (0, net_1.isIPv6)(address.host)) {
ipv6Addresses.push(address);
}
else {
ipv4Addresses.push(address);
}
}
const firstList = ipv6First ? ipv6Addresses : ipv4Addresses;
const secondList = ipv6First ? ipv4Addresses : ipv6Addresses;
for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) {
if (i < firstList.length) {
result.push(firstList[i]);
}
if (i < secondList.length) {
result.push(secondList[i]);
}
}
return result;
}
const REPORT_HEALTH_STATUS_OPTION_NAME = 'grpc-node.internal.pick-first.report_health_status';
class PickFirstLoadBalancer {

@@ -107,3 +140,3 @@ /**

*/
constructor(channelControlHelper) {
constructor(channelControlHelper, options) {
this.channelControlHelper = channelControlHelper;

@@ -137,2 +170,3 @@ /**

};
this.pickedSubchannelHealthListener = () => this.calculateAndReportNewState();
this.triedAllSubchannels = false;

@@ -159,2 +193,3 @@ /**

clearTimeout(this.connectionDelayTimeout);
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
}

@@ -166,3 +201,10 @@ allChildrenHaveReportedTF() {

if (this.currentPick) {
this.updateState(connectivity_state_1.ConnectivityState.READY, new PickFirstPicker(this.currentPick));
if (this.reportHealthStatus && !this.currentPick.isHealthy()) {
this.updateState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker({
details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`,
}));
}
else {
this.updateState(connectivity_state_1.ConnectivityState.READY, new PickFirstPicker(this.currentPick));
}
}

@@ -217,2 +259,5 @@ else if (this.children.length === 0) {

this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef());
if (this.reportHealthStatus) {
currentPick.removeHealthStateWatcher(this.pickedSubchannelHealthListener);
}
}

@@ -295,9 +340,8 @@ }

this.stickyTransientFailureMode = false;
if (this.currentPick !== null) {
this.currentPick.unref();
this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef());
this.currentPick.removeConnectivityStateListener(this.subchannelStateListener);
}
this.removeCurrentPick();
this.currentPick = subchannel;
subchannel.ref();
if (this.reportHealthStatus) {
subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener);
}
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());

@@ -366,3 +410,3 @@ this.resetSubchannelList();

}
updateAddressList(addressList, lbConfig) {
updateAddressList(endpointList, lbConfig) {
if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {

@@ -375,4 +419,9 @@ return;

if (lbConfig.getShuffleAddressList()) {
addressList = shuffled(addressList);
endpointList = shuffled(endpointList);
}
const rawAddressList = [].concat(...endpointList.map(endpoint => endpoint.addresses));
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;

@@ -399,2 +448,51 @@ this.connectToAddressList(addressList);

exports.PickFirstLoadBalancer = PickFirstLoadBalancer;
const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
/**
* This class handles the leaf load balancing operations for a single endpoint.
* It is a thin wrapper around a PickFirstLoadBalancer with a different API
* that more closely reflects how it will be used as a leaf balancer.
*/
class LeafLoadBalancer {
constructor(endpoint, channelControlHelper, options) {
this.endpoint = endpoint;
this.latestState = connectivity_state_1.ConnectivityState.IDLE;
const childChannelControlHelper = (0, load_balancer_1.createChildChannelControlHelper)(channelControlHelper, {
updateState: (connectivityState, picker) => {
this.latestState = connectivityState;
this.latestPicker = picker;
channelControlHelper.updateState(connectivityState, picker);
},
});
this.pickFirstBalancer = new PickFirstLoadBalancer(childChannelControlHelper, Object.assign(Object.assign({}, options), { [REPORT_HEALTH_STATUS_OPTION_NAME]: true }));
this.latestPicker = new picker_1.QueuePicker(this.pickFirstBalancer);
}
startConnecting() {
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
}
/**
* Update the endpoint associated with this LeafLoadBalancer to a new
* endpoint. Does not trigger connection establishment if a connection
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint) {
this.endpoint = newEndpoint;
if (this.latestState !== connectivity_state_1.ConnectivityState.IDLE) {
this.startConnecting();
}
}
getConnectivityState() {
return this.latestState;
}
getPicker() {
return this.latestPicker;
}
getEndpoint() {
return this.endpoint;
}
destroy() {
this.pickFirstBalancer.destroy();
}
}
exports.LeafLoadBalancer = LeafLoadBalancer;
function setup() {

@@ -401,0 +499,0 @@ (0, load_balancer_1.registerLoadBalancerType)(TYPE_NAME, PickFirstLoadBalancer, PickFirstLoadBalancingConfig);

@@ -1,16 +0,19 @@

import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer';
import { SubchannelAddress } from './subchannel-address';
import { LoadBalancer, ChannelControlHelper, TypedLoadBalancingConfig } from './load-balancer';
import { Endpoint } from './subchannel-address';
import { ChannelOptions } from './channel-options';
export declare class RoundRobinLoadBalancer implements LoadBalancer {
private readonly channelControlHelper;
private subchannels;
private readonly options;
private children;
private currentState;
private subchannelStateListener;
private currentReadyPicker;
private updatesPaused;
private childChannelControlHelper;
private lastError;
constructor(channelControlHelper: ChannelControlHelper);
private countSubchannelsWithState;
constructor(channelControlHelper: ChannelControlHelper, options: ChannelOptions);
private countChildrenWithState;
private calculateAndUpdateState;
private updateState;
private resetSubchannelList;
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig): void;
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig): void;
exitIdle(): void;

@@ -17,0 +20,0 @@ resetBackoff(): void;

@@ -23,5 +23,6 @@ "use strict";

const picker_1 = require("./picker");
const subchannel_address_1 = require("./subchannel-address");
const logging = require("./logging");
const constants_1 = require("./constants");
const subchannel_address_1 = require("./subchannel-address");
const load_balancer_pick_first_1 = require("./load-balancer-pick-first");
const TRACER_NAME = 'round_robin';

@@ -48,16 +49,10 @@ function trace(text) {

class RoundRobinPicker {
constructor(subchannelList, nextIndex = 0) {
this.subchannelList = subchannelList;
constructor(children, nextIndex = 0) {
this.children = children;
this.nextIndex = nextIndex;
}
pick(pickArgs) {
const pickedSubchannel = this.subchannelList[this.nextIndex];
this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length;
return {
pickResultType: picker_1.PickResultType.COMPLETE,
subchannel: pickedSubchannel,
status: null,
onCallStarted: null,
onCallEnded: null,
};
const childPicker = this.children[this.nextIndex].picker;
this.nextIndex = (this.nextIndex + 1) % this.children.length;
return childPicker.pick(pickArgs);
}

@@ -69,34 +64,35 @@ /**

*/
peekNextSubchannel() {
return this.subchannelList[this.nextIndex];
peekNextEndpoint() {
return this.children[this.nextIndex].endpoint;
}
}
class RoundRobinLoadBalancer {
constructor(channelControlHelper) {
constructor(channelControlHelper, options) {
this.channelControlHelper = channelControlHelper;
this.subchannels = [];
this.options = options;
this.children = [];
this.currentState = connectivity_state_1.ConnectivityState.IDLE;
this.currentReadyPicker = null;
this.updatesPaused = false;
this.lastError = null;
this.subchannelStateListener = (subchannel, previousState, newState, keepaliveTime, errorMessage) => {
this.calculateAndUpdateState();
if (newState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE ||
newState === connectivity_state_1.ConnectivityState.IDLE) {
if (errorMessage) {
this.lastError = errorMessage;
}
this.channelControlHelper.requestReresolution();
subchannel.startConnecting();
}
};
this.childChannelControlHelper = (0, load_balancer_1.createChildChannelControlHelper)(channelControlHelper, {
updateState: (connectivityState, picker) => {
this.calculateAndUpdateState();
},
});
}
countSubchannelsWithState(state) {
return this.subchannels.filter(subchannel => subchannel.getConnectivityState() === state).length;
countChildrenWithState(state) {
return this.children.filter(child => child.getConnectivityState() === state)
.length;
}
calculateAndUpdateState() {
if (this.countSubchannelsWithState(connectivity_state_1.ConnectivityState.READY) > 0) {
const readySubchannels = this.subchannels.filter(subchannel => subchannel.getConnectivityState() === connectivity_state_1.ConnectivityState.READY);
if (this.updatesPaused) {
return;
}
if (this.countChildrenWithState(connectivity_state_1.ConnectivityState.READY) > 0) {
const readyChildren = this.children.filter(child => child.getConnectivityState() === connectivity_state_1.ConnectivityState.READY);
let index = 0;
if (this.currentReadyPicker !== null) {
index = readySubchannels.indexOf(this.currentReadyPicker.peekNextSubchannel());
const nextPickedEndpoint = this.currentReadyPicker.peekNextEndpoint();
index = readyChildren.findIndex(child => (0, subchannel_address_1.endpointEqual)(child.getEndpoint(), nextPickedEndpoint));
if (index < 0) {

@@ -106,8 +102,11 @@ index = 0;

}
this.updateState(connectivity_state_1.ConnectivityState.READY, new RoundRobinPicker(readySubchannels, index));
this.updateState(connectivity_state_1.ConnectivityState.READY, new RoundRobinPicker(readyChildren.map(child => ({
endpoint: child.getEndpoint(),
picker: child.getPicker(),
})), index));
}
else if (this.countSubchannelsWithState(connectivity_state_1.ConnectivityState.CONNECTING) > 0) {
else if (this.countChildrenWithState(connectivity_state_1.ConnectivityState.CONNECTING) > 0) {
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this));
}
else if (this.countSubchannelsWithState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) > 0) {
else if (this.countChildrenWithState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) > 0) {
this.updateState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker({ details: `No connection established. Last error: ${this.lastError}` }));

@@ -133,34 +132,24 @@ }

resetSubchannelList() {
for (const subchannel of this.subchannels) {
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
subchannel.unref();
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef());
for (const child of this.children) {
child.destroy();
}
this.subchannels = [];
}
updateAddressList(addressList, lbConfig) {
updateAddressList(endpointList, lbConfig) {
this.resetSubchannelList();
trace('Connect to address list ' +
addressList.map(address => (0, subchannel_address_1.subchannelAddressToString)(address)));
this.subchannels = addressList.map(address => this.channelControlHelper.createSubchannel(address, {}));
for (const subchannel of this.subchannels) {
subchannel.ref();
subchannel.addConnectivityStateListener(this.subchannelStateListener);
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
const subchannelState = subchannel.getConnectivityState();
if (subchannelState === connectivity_state_1.ConnectivityState.IDLE ||
subchannelState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) {
subchannel.startConnecting();
}
trace('Connect to endpoint list ' + endpointList.map(subchannel_address_1.endpointToString));
this.updatesPaused = true;
this.children = endpointList.map(endpoint => new load_balancer_pick_first_1.LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options));
for (const child of this.children) {
child.startConnecting();
}
this.updatesPaused = false;
this.calculateAndUpdateState();
}
exitIdle() {
for (const subchannel of this.subchannels) {
subchannel.startConnecting();
}
/* The round_robin LB policy is only in the IDLE state if it has no
* addresses to try to connect to and it has no picked subchannel.
* In that case, there is no meaningful action that can be taken here. */
}
resetBackoff() {
/* The pick first load balancer does not have a connection backoff, so this
* does nothing */
// This LB policy has no backoff to reset
}

@@ -167,0 +156,0 @@ destroy() {

import { ChannelOptions } from './channel-options';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state';

@@ -7,2 +7,3 @@ import { Picker } from './picker';

import { SubchannelInterface } from './subchannel-interface';
import { LoadBalancingConfig } from './service-config';
/**

@@ -53,7 +54,7 @@ * A collection of functions associated with a channel that a load balancer

* are established
* @param addressList The new list of addresses to connect to
* @param endpointList The new list of addresses to connect to
* @param lbConfig The load balancing config object from the service config,
* if one was provided
*/
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: {
[key: string]: unknown;

@@ -84,17 +85,18 @@ }): void;

export interface LoadBalancerConstructor {
new (channelControlHelper: ChannelControlHelper): LoadBalancer;
new (channelControlHelper: ChannelControlHelper, options: ChannelOptions): LoadBalancer;
}
export interface LoadBalancingConfig {
export interface TypedLoadBalancingConfig {
getLoadBalancerName(): string;
toJsonObject(): object;
}
export interface LoadBalancingConfigConstructor {
new (...args: any): LoadBalancingConfig;
createFromJson(obj: any): LoadBalancingConfig;
export interface TypedLoadBalancingConfigConstructor {
new (...args: any): TypedLoadBalancingConfig;
createFromJson(obj: any): TypedLoadBalancingConfig;
}
export declare function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor, loadBalancingConfigType: LoadBalancingConfigConstructor): void;
export declare function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor, loadBalancingConfigType: TypedLoadBalancingConfigConstructor): void;
export declare function registerDefaultLoadBalancerType(typeName: string): void;
export declare function createLoadBalancer(config: LoadBalancingConfig, channelControlHelper: ChannelControlHelper): LoadBalancer | null;
export declare function createLoadBalancer(config: TypedLoadBalancingConfig, channelControlHelper: ChannelControlHelper, options: ChannelOptions): LoadBalancer | null;
export declare function isLoadBalancerNameRegistered(typeName: string): boolean;
export declare function getFirstUsableConfig(configs: LoadBalancingConfig[], fallbackTodefault?: true): LoadBalancingConfig;
export declare function validateLoadBalancingConfig(obj: any): LoadBalancingConfig;
export declare function parseLoadBalancingConfig(rawConfig: LoadBalancingConfig): TypedLoadBalancingConfig;
export declare function getDefaultConfig(): TypedLoadBalancingConfig;
export declare function selectLbConfigFromList(configs: LoadBalancingConfig[], fallbackTodefault?: boolean): TypedLoadBalancingConfig | null;

@@ -19,3 +19,5 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.validateLoadBalancingConfig = exports.getFirstUsableConfig = exports.isLoadBalancerNameRegistered = exports.createLoadBalancer = exports.registerDefaultLoadBalancerType = exports.registerLoadBalancerType = exports.createChildChannelControlHelper = void 0;
exports.selectLbConfigFromList = exports.getDefaultConfig = exports.parseLoadBalancingConfig = exports.isLoadBalancerNameRegistered = exports.createLoadBalancer = exports.registerDefaultLoadBalancerType = exports.registerLoadBalancerType = exports.createChildChannelControlHelper = void 0;
const logging_1 = require("./logging");
const constants_1 = require("./constants");
/**

@@ -53,6 +55,6 @@ * Create a child ChannelControlHelper that overrides some methods of the

exports.registerDefaultLoadBalancerType = registerDefaultLoadBalancerType;
function createLoadBalancer(config, channelControlHelper) {
function createLoadBalancer(config, channelControlHelper, options) {
const typeName = config.getLoadBalancerName();
if (typeName in registeredLoadBalancerTypes) {
return new registeredLoadBalancerTypes[typeName].LoadBalancer(channelControlHelper);
return new registeredLoadBalancerTypes[typeName].LoadBalancer(channelControlHelper, options);
}

@@ -68,7 +70,37 @@ else {

exports.isLoadBalancerNameRegistered = isLoadBalancerNameRegistered;
function getFirstUsableConfig(configs, fallbackTodefault = false) {
function parseLoadBalancingConfig(rawConfig) {
const keys = Object.keys(rawConfig);
if (keys.length !== 1) {
throw new Error('Provided load balancing config has multiple conflicting entries');
}
const typeName = keys[0];
if (typeName in registeredLoadBalancerTypes) {
try {
return registeredLoadBalancerTypes[typeName].LoadBalancingConfig.createFromJson(rawConfig[typeName]);
}
catch (e) {
throw new Error(`${typeName}: ${e.message}`);
}
}
else {
throw new Error(`Unrecognized load balancing config name ${typeName}`);
}
}
exports.parseLoadBalancingConfig = parseLoadBalancingConfig;
function getDefaultConfig() {
if (!defaultLoadBalancerType) {
throw new Error('No default load balancer type registered');
}
return new registeredLoadBalancerTypes[defaultLoadBalancerType].LoadBalancingConfig();
}
exports.getDefaultConfig = getDefaultConfig;
function selectLbConfigFromList(configs, fallbackTodefault = false) {
for (const config of configs) {
if (config.getLoadBalancerName() in registeredLoadBalancerTypes) {
return config;
try {
return parseLoadBalancingConfig(config);
}
catch (e) {
(0, logging_1.log)(constants_1.LogVerbosity.DEBUG, 'Config parsing failed with error', e.message);
continue;
}
}

@@ -87,21 +119,3 @@ if (fallbackTodefault) {

}
exports.getFirstUsableConfig = getFirstUsableConfig;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function validateLoadBalancingConfig(obj) {
if (!(obj !== null && typeof obj === 'object')) {
throw new Error('Load balancing config must be an object');
}
const keys = Object.keys(obj);
if (keys.length !== 1) {
throw new Error('Provided load balancing config has multiple conflicting entries');
}
const typeName = keys[0];
if (typeName in registeredLoadBalancerTypes) {
return registeredLoadBalancerTypes[typeName].LoadBalancingConfig.createFromJson(obj[typeName]);
}
else {
throw new Error(`Unrecognized load balancing config name ${typeName}`);
}
}
exports.validateLoadBalancingConfig = validateLoadBalancingConfig;
exports.selectLbConfigFromList = selectLbConfigFromList;
//# sourceMappingURL=load-balancer.js.map

@@ -87,3 +87,4 @@ "use strict";

this.trace('Pick called');
const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
const finalMetadata = this.metadata.clone();
const pickResult = this.channel.doPick(finalMetadata, this.callConfig.pickInformation);
const subchannelString = pickResult.subchannel

@@ -116,3 +117,2 @@ ? '(' +

}
const finalMetadata = this.metadata.clone();
finalMetadata.merge(credsMetadata);

@@ -119,0 +119,0 @@ if (finalMetadata.get('authorization').length > 1) {

@@ -85,8 +85,11 @@ import { StatusObject } from './call-interface';

* once any pick is attempted.
* If the childPicker is provided, delegate to it instead of returning the
* hardcoded QUEUE pick result, but still calls exitIdle.
*/
export declare class QueuePicker {
private loadBalancer;
private childPicker?;
private calledExitIdle;
constructor(loadBalancer: LoadBalancer);
pick(pickArgs: PickArgs): QueuePickResult;
constructor(loadBalancer: LoadBalancer, childPicker?: Picker | undefined);
pick(pickArgs: PickArgs): PickResult;
}

@@ -54,7 +54,10 @@ "use strict";

* once any pick is attempted.
* If the childPicker is provided, delegate to it instead of returning the
* hardcoded QUEUE pick result, but still calls exitIdle.
*/
class QueuePicker {
// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(loadBalancer) {
constructor(loadBalancer, childPicker) {
this.loadBalancer = loadBalancer;
this.childPicker = childPicker;
this.calledExitIdle = false;

@@ -69,9 +72,14 @@ }

}
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
if (this.childPicker) {
return this.childPicker.pick(pickArgs);
}
else {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}

@@ -78,0 +86,0 @@ }

@@ -42,18 +42,2 @@ "use strict";

/**
* Merge any number of arrays into a single alternating array
* @param arrays
*/
function mergeArrays(...arrays) {
const result = [];
for (let i = 0; i <
Math.max.apply(null, arrays.map(array => array.length)); i++) {
for (const array of arrays) {
if (i < array.length) {
result.push(array[i]);
}
}
}
return result;
}
/**
* Resolver implementation that handles DNS names and IP addresses.

@@ -86,4 +70,8 @@ */

{
host: hostPort.host,
port: (_a = hostPort.port) !== null && _a !== void 0 ? _a : exports.DEFAULT_PORT,
addresses: [
{
host: hostPort.host,
port: (_a = hostPort.port) !== null && _a !== void 0 ? _a : exports.DEFAULT_PORT,
},
],
},

@@ -178,7 +166,8 @@ ];

this.backoff.stop();
const ip4Addresses = addressList.filter(addr => addr.family === 4);
const ip6Addresses = addressList.filter(addr => addr.family === 6);
this.latestLookupResult = mergeArrays(ip6Addresses, ip4Addresses).map(addr => ({ host: addr.address, port: +this.port }));
const subchannelAddresses = addressList.map(addr => ({ host: addr.address, port: +this.port }));
this.latestLookupResult = subchannelAddresses.map(address => ({
addresses: [address],
}));
const allAddressesString = '[' +
this.latestLookupResult
subchannelAddresses
.map(addr => addr.host + ':' + addr.port)

@@ -185,0 +174,0 @@ .join(',') +

@@ -39,3 +39,3 @@ "use strict";

this.listener = listener;
this.addresses = [];
this.endpoints = [];
this.error = null;

@@ -78,4 +78,4 @@ this.hasReturnedResult = false;

}
this.addresses = addresses;
trace('Parsed ' + target.scheme + ' address list ' + this.addresses);
this.endpoints = addresses.map(address => ({ addresses: [address] }));
trace('Parsed ' + target.scheme + ' address list ' + addresses);
}

@@ -90,3 +90,3 @@ updateResolution() {

else {
this.listener.onSuccessfulResolution(this.addresses, null, null, null, {});
this.listener.onSuccessfulResolution(this.endpoints, null, null, null, {});
}

@@ -93,0 +93,0 @@ });

@@ -23,4 +23,4 @@ "use strict";

this.listener = listener;
this.addresses = [];
this.hasReturnedResult = false;
this.endpoints = [];
let path;

@@ -33,3 +33,3 @@ if (target.authority === '') {

}
this.addresses = [{ path }];
this.endpoints = [{ addresses: [{ path }] }];
}

@@ -39,3 +39,3 @@ updateResolution() {

this.hasReturnedResult = true;
process.nextTick(this.listener.onSuccessfulResolution, this.addresses, null, null, null, {});
process.nextTick(this.listener.onSuccessfulResolution, this.endpoints, null, null, null, {});
}

@@ -42,0 +42,0 @@ }

import { MethodConfig, ServiceConfig } from './service-config';
import { StatusObject } from './call-interface';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint } from './subchannel-address';
import { GrpcUri } from './uri-parser';

@@ -23,3 +23,3 @@ import { ChannelOptions } from './channel-options';

export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
}

@@ -40,3 +40,3 @@ /**

*/
onSuccessfulResolution(addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, configSelector: ConfigSelector | null, attributes: {
onSuccessfulResolution(addressList: Endpoint[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, configSelector: ConfigSelector | null, attributes: {
[key: string]: unknown;

@@ -43,0 +43,0 @@ }): void;

@@ -1,6 +0,6 @@

import { ChannelControlHelper, LoadBalancer, LoadBalancingConfig } from './load-balancer';
import { ChannelControlHelper, LoadBalancer, TypedLoadBalancingConfig } from './load-balancer';
import { ServiceConfig } from './service-config';
import { ConfigSelector } from './resolver';
import { StatusObject } from './call-interface';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint } from './subchannel-address';
import { GrpcUri } from './uri-parser';

@@ -63,3 +63,3 @@ import { ChannelOptions } from './channel-options';

exitIdle(): void;
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig | null): never;
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig | null): never;
resetBackoff(): void;

@@ -66,0 +66,0 @@ destroy(): void;

@@ -174,5 +174,5 @@ "use strict";

removeChannelzChild: channelControlHelper.removeChannelzChild.bind(channelControlHelper),
});
}, channelOptions);
this.innerResolver = (0, resolver_1.createResolver)(target, {
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError, configSelector, attributes) => {
onSuccessfulResolution: (endpointList, serviceConfig, serviceConfigError, configSelector, attributes) => {
var _a;

@@ -211,3 +211,3 @@ this.backoffTimeout.stop();

const workingConfigList = (_a = workingServiceConfig === null || workingServiceConfig === void 0 ? void 0 : workingServiceConfig.loadBalancingConfig) !== null && _a !== void 0 ? _a : [];
const loadBalancingConfig = (0, load_balancer_1.getFirstUsableConfig)(workingConfigList, true);
const loadBalancingConfig = (0, load_balancer_1.selectLbConfigFromList)(workingConfigList, true);
if (loadBalancingConfig === null) {

@@ -222,3 +222,3 @@ // There were load balancing configs but none are supported. This counts as a resolution failure

}
this.childLoadBalancer.updateAddressList(addressList, loadBalancingConfig, attributes);
this.childLoadBalancer.updateAddressList(endpointList, loadBalancingConfig, attributes);
const finalServiceConfig = workingServiceConfig !== null && workingServiceConfig !== void 0 ? workingServiceConfig : this.defaultServiceConfig;

@@ -249,3 +249,7 @@ this.onSuccessfulResolution(finalServiceConfig, configSelector !== null && configSelector !== void 0 ? configSelector : getDefaultConfigSelector(finalServiceConfig));

if (this.currentState === connectivity_state_1.ConnectivityState.IDLE) {
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this));
/* this.latestChildPicker is initialized as new QueuePicker(this), which
* is an appropriate value here if the child LB policy is unset.
* Otherwise, we want to delegate to the child here, in case that
* triggers something. */
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, this.latestChildPicker);
}

@@ -262,3 +266,3 @@ this.backoffTimeout.runOnce();

if (connectivityState === connectivity_state_1.ConnectivityState.IDLE) {
picker = new picker_1.QueuePicker(this);
picker = new picker_1.QueuePicker(this, picker);
}

@@ -286,3 +290,3 @@ this.currentState = connectivityState;

}
updateAddressList(addressList, lbConfig) {
updateAddressList(endpointList, lbConfig) {
throw new Error('updateAddressList not supported on ResolvingLoadBalancer');

@@ -289,0 +293,0 @@ }

/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';

@@ -12,5 +8,5 @@ import { Deserialize, Serialize } from './make-client';

import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';
import { StatusObject, PartialStatusObject } from './call-interface';
import { Deadline } from './deadline';
import { ServerInterceptingCallInterface } from './server-interceptors';
export type ServerStatusResponse = Partial<StatusObject>;

@@ -37,3 +33,5 @@ export type ServerErrorResponse = ServerStatusResponse & Error;

};
export declare function serverErrorToStatus(error: ServerErrorResponse | ServerStatusResponse, overrideTrailers?: Metadata | undefined): PartialStatusObject;
export declare class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter implements ServerUnaryCall<RequestType, ResponseType> {
private path;
private call;

@@ -43,3 +41,3 @@ metadata: Metadata;

cancelled: boolean;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, request: RequestType);
constructor(path: string, call: ServerInterceptingCallInterface, metadata: Metadata, request: RequestType);
getPeer(): string;

@@ -51,7 +49,7 @@ sendMetadata(responseMetadata: Metadata): void;

export declare class ServerReadableStreamImpl<RequestType, ResponseType> extends Readable implements ServerReadableStream<RequestType, ResponseType> {
private path;
private call;
metadata: Metadata;
deserialize: Deserialize<RequestType>;
cancelled: boolean;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, deserialize: Deserialize<RequestType>, encoding: string);
constructor(path: string, call: ServerInterceptingCallInterface, metadata: Metadata);
_read(size: number): void;

@@ -64,9 +62,10 @@ getPeer(): string;

export declare class ServerWritableStreamImpl<RequestType, ResponseType> extends Writable implements ServerWritableStream<RequestType, ResponseType> {
private path;
private call;
metadata: Metadata;
serialize: Serialize<ResponseType>;
request: RequestType;
cancelled: boolean;
private trailingMetadata;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, serialize: Serialize<ResponseType>, request: RequestType);
private pendingStatus;
constructor(path: string, call: ServerInterceptingCallInterface, metadata: Metadata, request: RequestType);
getPeer(): string;

@@ -81,9 +80,9 @@ sendMetadata(responseMetadata: Metadata): void;

export declare class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex implements ServerDuplexStream<RequestType, ResponseType> {
private path;
private call;
metadata: Metadata;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
cancelled: boolean;
private trailingMetadata;
constructor(call: Http2ServerCallStream<RequestType, ResponseType>, metadata: Metadata, serialize: Serialize<ResponseType>, deserialize: Deserialize<RequestType>, encoding: string);
private pendingStatus;
constructor(path: string, call: ServerInterceptingCallInterface, metadata: Metadata);
getPeer(): string;

@@ -93,2 +92,5 @@ sendMetadata(responseMetadata: Metadata): void;

getPath(): string;
_read(size: number): void;
_write(chunk: ResponseType, encoding: string, callback: (...args: any[]) => void): void;
_final(callback: Function): void;
end(metadata?: any): this;

@@ -132,39 +134,1 @@ }

export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
export declare class Http2ServerCallStream<RequestType, ResponseType> extends EventEmitter {
private stream;
private handler;
cancelled: boolean;
deadlineTimer: NodeJS.Timeout | null;
private statusSent;
private deadline;
private wantTrailers;
private metadataSent;
private canPush;
private isPushPending;
private bufferedMessages;
private messagesToPush;
private maxSendMessageSize;
private maxReceiveMessageSize;
constructor(stream: http2.ServerHttp2Stream, handler: Handler<RequestType, ResponseType>, options: ChannelOptions);
private checkCancelled;
private getDecompressedMessage;
sendMetadata(customMetadata?: Metadata): void;
receiveMetadata(headers: http2.IncomingHttpHeaders): Metadata;
receiveUnaryMessage(encoding: string): Promise<RequestType>;
private deserializeMessageWithInternalError;
serializeMessage(value: ResponseType): Buffer;
deserializeMessage(bytes: Buffer): RequestType;
sendUnaryMessage(err: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, metadata?: Metadata | null, flags?: number): Promise<void>;
sendStatus(statusObj: PartialStatusObject): void;
sendError(error: ServerErrorResponse | ServerStatusResponse): void;
write(chunk: Buffer): boolean | undefined;
resume(): void;
setupSurfaceCall(call: ServerSurfaceCall): void;
setupReadable(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>, encoding: string): void;
consumeUnpushedMessages(readable: ServerReadableStream<RequestType, ResponseType> | ServerDuplexStream<RequestType, ResponseType>): boolean;
private pushOrBufferMessage;
private pushMessage;
getPeer(): string;
getDeadline(): Deadline;
getPath(): string;
}

@@ -19,49 +19,29 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.Http2ServerCallStream = exports.ServerDuplexStreamImpl = exports.ServerWritableStreamImpl = exports.ServerReadableStreamImpl = exports.ServerUnaryCallImpl = void 0;
exports.ServerDuplexStreamImpl = exports.ServerWritableStreamImpl = exports.ServerReadableStreamImpl = exports.ServerUnaryCallImpl = exports.serverErrorToStatus = void 0;
const events_1 = require("events");
const http2 = require("http2");
const stream_1 = require("stream");
const zlib = require("zlib");
const util_1 = require("util");
const constants_1 = require("./constants");
const metadata_1 = require("./metadata");
const stream_decoder_1 = require("./stream-decoder");
const logging = require("./logging");
const error_1 = require("./error");
const TRACER_NAME = 'server_call';
const unzip = (0, util_1.promisify)(zlib.unzip);
const inflate = (0, util_1.promisify)(zlib.inflate);
function trace(text) {
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text);
function serverErrorToStatus(error, overrideTrailers) {
var _a;
const status = {
code: constants_1.Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata: (_a = overrideTrailers !== null && overrideTrailers !== void 0 ? overrideTrailers : error.metadata) !== null && _a !== void 0 ? _a : null
};
if ('code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code)) {
status.code = error.code;
if ('details' in error && typeof error.details === 'string') {
status.details = error.details;
}
}
return status;
}
const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
const GRPC_ENCODING_HEADER = 'grpc-encoding';
const GRPC_MESSAGE_HEADER = 'grpc-message';
const GRPC_STATUS_HEADER = 'grpc-status';
const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
const deadlineUnitsToMs = {
H: 3600000,
M: 60000,
S: 1000,
m: 1,
u: 0.001,
n: 0.000001,
};
const defaultCompressionHeaders = {
// TODO(cjihrig): Remove these encoding headers from the default response
// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',
};
const defaultResponseHeaders = {
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
};
const defaultResponseOptions = {
waitForTrailers: true,
};
exports.serverErrorToStatus = serverErrorToStatus;
class ServerUnaryCallImpl extends events_1.EventEmitter {
constructor(call, metadata, request) {
constructor(path, call, metadata, request) {
super();
this.path = path;
this.call = call;

@@ -71,3 +51,2 @@ this.metadata = metadata;

this.cancelled = false;
this.call.setupSurfaceCall(this);
}

@@ -84,3 +63,3 @@ getPeer() {

getPath() {
return this.call.getPath();
return this.path;
}

@@ -90,16 +69,11 @@ }

class ServerReadableStreamImpl extends stream_1.Readable {
constructor(call, metadata, deserialize, encoding) {
constructor(path, call, metadata) {
super({ objectMode: true });
this.path = path;
this.call = call;
this.metadata = metadata;
this.deserialize = deserialize;
this.cancelled = false;
this.call.setupSurfaceCall(this);
this.call.setupReadable(this, encoding);
}
_read(size) {
if (!this.call.consumeUnpushedMessages(this)) {
return;
}
this.call.resume();
this.call.startRead();
}

@@ -116,3 +90,3 @@ getPeer() {

getPath() {
return this.call.getPath();
return this.path;
}

@@ -122,13 +96,16 @@ }

class ServerWritableStreamImpl extends stream_1.Writable {
constructor(call, metadata, serialize, request) {
constructor(path, call, metadata, request) {
super({ objectMode: true });
this.path = path;
this.call = call;
this.metadata = metadata;
this.serialize = serialize;
this.request = request;
this.pendingStatus = {
code: constants_1.Status.OK,
details: 'OK'
};
this.cancelled = false;
this.trailingMetadata = new metadata_1.Metadata();
this.call.setupSurfaceCall(this);
this.on('error', err => {
this.call.sendError(err);
this.pendingStatus = serverErrorToStatus(err);
this.end();

@@ -147,3 +124,3 @@ });

getPath() {
return this.call.getPath();
return this.path;
}

@@ -153,23 +130,7 @@ _write(chunk, encoding,

callback) {
try {
const response = this.call.serializeMessage(chunk);
if (!this.call.write(response)) {
this.call.once('drain', callback);
return;
}
}
catch (err) {
this.emit('error', {
details: (0, error_1.getErrorMessage)(err),
code: constants_1.Status.INTERNAL,
});
}
callback();
this.call.sendMessage(chunk, callback);
}
_final(callback) {
this.call.sendStatus({
code: constants_1.Status.OK,
details: 'OK',
metadata: this.trailingMetadata,
});
var _a;
this.call.sendStatus(Object.assign(Object.assign({}, this.pendingStatus), { metadata: (_a = this.pendingStatus.metadata) !== null && _a !== void 0 ? _a : this.trailingMetadata }));
callback(null);

@@ -187,14 +148,15 @@ }

class ServerDuplexStreamImpl extends stream_1.Duplex {
constructor(call, metadata, serialize, deserialize, encoding) {
constructor(path, call, metadata) {
super({ objectMode: true });
this.path = path;
this.call = call;
this.metadata = metadata;
this.serialize = serialize;
this.deserialize = deserialize;
this.pendingStatus = {
code: constants_1.Status.OK,
details: 'OK'
};
this.cancelled = false;
this.trailingMetadata = new metadata_1.Metadata();
this.call.setupSurfaceCall(this);
this.call.setupReadable(this, encoding);
this.on('error', err => {
this.call.sendError(err);
this.pendingStatus = serverErrorToStatus(err);
this.end();

@@ -213,5 +175,18 @@ });

getPath() {
return this.call.getPath();
return this.path;
}
_read(size) {
this.call.startRead();
}
_write(chunk, encoding,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callback) {
this.call.sendMessage(chunk, callback);
}
_final(callback) {
var _a;
this.call.sendStatus(Object.assign(Object.assign({}, this.pendingStatus), { metadata: (_a = this.pendingStatus.metadata) !== null && _a !== void 0 ? _a : this.trailingMetadata }));
callback(null);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
end(metadata) {

@@ -225,450 +200,2 @@ if (metadata) {

exports.ServerDuplexStreamImpl = ServerDuplexStreamImpl;
ServerDuplexStreamImpl.prototype._read =
ServerReadableStreamImpl.prototype._read;
ServerDuplexStreamImpl.prototype._write =
ServerWritableStreamImpl.prototype._write;
ServerDuplexStreamImpl.prototype._final =
ServerWritableStreamImpl.prototype._final;
// Internal class that wraps the HTTP2 request.
class Http2ServerCallStream extends events_1.EventEmitter {
constructor(stream, handler, options) {
super();
this.stream = stream;
this.handler = handler;
this.cancelled = false;
this.deadlineTimer = null;
this.statusSent = false;
this.deadline = Infinity;
this.wantTrailers = false;
this.metadataSent = false;
this.canPush = false;
this.isPushPending = false;
this.bufferedMessages = [];
this.messagesToPush = [];
this.maxSendMessageSize = constants_1.DEFAULT_MAX_SEND_MESSAGE_LENGTH;
this.maxReceiveMessageSize = constants_1.DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
this.stream.once('error', (err) => {
/* We need an error handler to avoid uncaught error event exceptions, but
* there is nothing we can reasonably do here. Any error event should
* have a corresponding close event, which handles emitting the cancelled
* event. And the stream is now in a bad state, so we can't reasonably
* expect to be able to send an error over it. */
});
this.stream.once('close', () => {
var _a;
trace('Request to method ' +
((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
' stream closed with rstCode ' +
this.stream.rstCode);
if (!this.statusSent) {
this.cancelled = true;
this.emit('cancelled', 'cancelled');
this.emit('streamEnd', false);
this.sendStatus({
code: constants_1.Status.CANCELLED,
details: 'Cancelled by client',
metadata: null,
});
if (this.deadlineTimer)
clearTimeout(this.deadlineTimer);
}
});
this.stream.on('drain', () => {
this.emit('drain');
});
if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length'];
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length'];
}
}
checkCancelled() {
/* In some cases the stream can become destroyed before the close event
* fires. That creates a race condition that this check works around */
if (this.stream.destroyed || this.stream.closed) {
this.cancelled = true;
}
return this.cancelled;
}
getDecompressedMessage(message, encoding) {
if (encoding === 'deflate') {
return inflate(message.subarray(5));
}
else if (encoding === 'gzip') {
return unzip(message.subarray(5));
}
else if (encoding === 'identity') {
return message.subarray(5);
}
return Promise.reject({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
}
sendMetadata(customMetadata) {
if (this.checkCancelled()) {
return;
}
if (this.metadataSent) {
return;
}
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = Object.assign(Object.assign(Object.assign({}, defaultResponseHeaders), defaultCompressionHeaders), custom);
this.stream.respond(headers, defaultResponseOptions);
}
receiveMetadata(headers) {
const metadata = metadata_1.Metadata.fromHttp2Headers(headers);
if (logging.isTracerEnabled(TRACER_NAME)) {
trace('Request to ' +
this.handler.path +
' received headers ' +
JSON.stringify(metadata.toJSON()));
}
// TODO(cjihrig): Receive compression metadata.
const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
if (timeoutHeader.length > 0) {
const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
if (match === null) {
const err = new Error('Invalid deadline');
err.code = constants_1.Status.OUT_OF_RANGE;
this.sendError(err);
return metadata;
}
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
const now = new Date();
this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}
// Remove several headers that should not be propagated to the application
metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
metadata.remove(http2.constants.HTTP2_HEADER_TE);
metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
metadata.remove('grpc-accept-encoding');
return metadata;
}
receiveUnaryMessage(encoding) {
return new Promise((resolve, reject) => {
const { stream } = this;
let receivedLength = 0;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body = [];
const limit = this.maxReceiveMessageSize;
this.stream.on('data', onData);
this.stream.on('end', onEnd);
this.stream.on('error', onEnd);
function onData(chunk) {
receivedLength += chunk.byteLength;
if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
reject({
code: constants_1.Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}
body.push(chunk);
}
function onEnd(err) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
reject({ code: constants_1.Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
reject({
code: constants_1.Status.INTERNAL,
details: 'received empty unary message',
});
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(requestBytes, compressedMessageEncoding);
if (Buffer.isBuffer(decompressedMessage)) {
resolve(call.deserializeMessageWithInternalError(decompressedMessage));
return;
}
decompressedMessage.then(decompressed => resolve(call.deserializeMessageWithInternalError(decompressed)), (err) => reject(err.code
? err
: {
code: constants_1.Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}));
}
});
}
async deserializeMessageWithInternalError(buffer) {
try {
return this.deserializeMessage(buffer);
}
catch (err) {
throw {
details: (0, error_1.getErrorMessage)(err),
code: constants_1.Status.INTERNAL,
};
}
}
serializeMessage(value) {
const messageBuffer = this.handler.serialize(value);
// TODO(cjihrig): Call compression aware serializeMessage().
const byteLength = messageBuffer.byteLength;
const output = Buffer.allocUnsafe(byteLength + 5);
output.writeUInt8(0, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
return output;
}
deserializeMessage(bytes) {
return this.handler.deserialize(bytes);
}
async sendUnaryMessage(err, value, metadata, flags) {
if (this.checkCancelled()) {
return;
}
if (metadata === undefined) {
metadata = null;
}
if (err) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
err.metadata = metadata;
}
this.sendError(err);
return;
}
try {
const response = this.serializeMessage(value);
this.write(response);
this.sendStatus({ code: constants_1.Status.OK, details: 'OK', metadata });
}
catch (err) {
this.sendError({
details: (0, error_1.getErrorMessage)(err),
code: constants_1.Status.INTERNAL,
});
}
}
sendStatus(statusObj) {
var _a, _b;
this.emit('callEnd', statusObj.code);
this.emit('streamEnd', statusObj.code === constants_1.Status.OK);
if (this.checkCancelled()) {
return;
}
trace('Request to method ' +
((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
' ended with status code: ' +
constants_1.Status[statusObj.code] +
' details: ' +
statusObj.details);
if (this.deadlineTimer)
clearTimeout(this.deadlineTimer);
if (this.stream.headersSent) {
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
var _a;
const trailersToSend = Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, (_a = statusObj.metadata) === null || _a === void 0 ? void 0 : _a.toHttp2Headers());
this.stream.sendTrailers(trailersToSend);
this.statusSent = true;
});
this.stream.end();
}
}
else {
// Trailers-only response
const trailersToSend = Object.assign(Object.assign({ [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details) }, defaultResponseHeaders), (_b = statusObj.metadata) === null || _b === void 0 ? void 0 : _b.toHttp2Headers());
this.stream.respond(trailersToSend, { endStream: true });
this.statusSent = true;
}
}
sendError(error) {
const status = {
code: constants_1.Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata: 'metadata' in error && error.metadata !== undefined
? error.metadata
: null,
};
if ('code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code)) {
status.code = error.code;
if ('details' in error && typeof error.details === 'string') {
status.details = error.details;
}
}
this.sendStatus(status);
}
write(chunk) {
if (this.checkCancelled()) {
return;
}
if (this.maxSendMessageSize !== -1 &&
chunk.length > this.maxSendMessageSize) {
this.sendError({
code: constants_1.Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
});
return;
}
this.sendMetadata();
this.emit('sendMessage');
return this.stream.write(chunk);
}
resume() {
this.stream.resume();
}
setupSurfaceCall(call) {
this.once('cancelled', reason => {
call.cancelled = true;
call.emit('cancelled', reason);
});
this.once('callEnd', status => call.emit('callEnd', status));
}
setupReadable(readable, encoding) {
const decoder = new stream_decoder_1.StreamDecoder();
let readsDone = false;
let pendingMessageProcessing = false;
let pushedEnd = false;
const maybePushEnd = async () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
await this.pushOrBufferMessage(readable, null);
}
};
this.stream.on('data', async (data) => {
const messages = decoder.write(data);
pendingMessageProcessing = true;
this.stream.pause();
for (const message of messages) {
if (this.maxReceiveMessageSize !== -1 &&
message.length > this.maxReceiveMessageSize) {
this.sendError({
code: constants_1.Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
});
return;
}
this.emit('receiveMessage');
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage)
return;
await this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
this.stream.resume();
await maybePushEnd();
});
this.stream.once('end', async () => {
readsDone = true;
await maybePushEnd();
});
}
consumeUnpushedMessages(readable) {
this.canPush = true;
while (this.messagesToPush.length > 0) {
const nextMessage = this.messagesToPush.shift();
const canPush = readable.push(nextMessage);
if (nextMessage === null || canPush === false) {
this.canPush = false;
break;
}
}
return this.canPush;
}
async pushOrBufferMessage(readable, messageBytes) {
if (this.isPushPending) {
this.bufferedMessages.push(messageBytes);
}
else {
await this.pushMessage(readable, messageBytes);
}
}
async pushMessage(readable, messageBytes) {
if (messageBytes === null) {
trace('Received end of stream');
if (this.canPush) {
readable.push(null);
}
else {
this.messagesToPush.push(null);
}
return;
}
trace('Received message of length ' + messageBytes.length);
this.isPushPending = true;
try {
const deserialized = await this.deserializeMessage(messageBytes);
if (this.canPush) {
if (!readable.push(deserialized)) {
this.canPush = false;
this.stream.pause();
}
}
else {
this.messagesToPush.push(deserialized);
}
}
catch (error) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
let code = (0, error_1.getErrorCode)(error);
if (code === null || code < constants_1.Status.OK || code > constants_1.Status.UNAUTHENTICATED) {
code = constants_1.Status.INTERNAL;
}
readable.emit('error', {
details: (0, error_1.getErrorMessage)(error),
code: code,
});
}
this.isPushPending = false;
if (this.bufferedMessages.length > 0) {
await this.pushMessage(readable, this.bufferedMessages.shift());
}
}
getPeer() {
var _a;
const socket = (_a = this.stream.session) === null || _a === void 0 ? void 0 : _a.socket;
if (socket === null || socket === void 0 ? void 0 : socket.remoteAddress) {
if (socket.remotePort) {
return `${socket.remoteAddress}:${socket.remotePort}`;
}
else {
return socket.remoteAddress;
}
}
else {
return 'unknown';
}
}
getDeadline() {
return this.deadline;
}
getPath() {
return this.handler.path;
}
}
exports.Http2ServerCallStream = Http2ServerCallStream;
function handleExpiredDeadline(call) {
const err = new Error('Deadline exceeded');
err.code = constants_1.Status.DEADLINE_EXCEEDED;
call.sendError(err);
call.cancelled = true;
call.emit('cancelled', 'deadline');
}
//# sourceMappingURL=server-call.js.map

@@ -11,4 +11,5 @@ /// <reference types="node" />

abstract _getSettings(): SecureServerOptions | null;
abstract _equals(other: ServerCredentials): boolean;
static createInsecure(): ServerCredentials;
static createSsl(rootCerts: Buffer | null, keyCertPairs: KeyCertPair[], checkClientCertificate?: boolean): ServerCredentials;
}

@@ -26,2 +26,3 @@ "use strict";

static createSsl(rootCerts, keyCertPairs, checkClientCertificate = false) {
var _a;
if (rootCerts !== null && !Buffer.isBuffer(rootCerts)) {

@@ -53,3 +54,3 @@ throw new TypeError('rootCerts must be null or a Buffer');

return new SecureServerCredentials({
ca: rootCerts || (0, tls_helpers_1.getDefaultRootsData)() || undefined,
ca: (_a = rootCerts !== null && rootCerts !== void 0 ? rootCerts : (0, tls_helpers_1.getDefaultRootsData)()) !== null && _a !== void 0 ? _a : undefined,
cert,

@@ -70,2 +71,5 @@ key,

}
_equals(other) {
return other instanceof InsecureServerCredentials;
}
}

@@ -83,3 +87,85 @@ class SecureServerCredentials extends ServerCredentials {

}
/**
* Checks equality by checking the options that are actually set by
* createSsl.
* @param other
* @returns
*/
_equals(other) {
if (this === other) {
return true;
}
if (!(other instanceof SecureServerCredentials)) {
return false;
}
// options.ca equality check
if (Buffer.isBuffer(this.options.ca) && Buffer.isBuffer(other.options.ca)) {
if (!this.options.ca.equals(other.options.ca)) {
return false;
}
}
else {
if (this.options.ca !== other.options.ca) {
return false;
}
}
// options.cert equality check
if (Array.isArray(this.options.cert) && Array.isArray(other.options.cert)) {
if (this.options.cert.length !== other.options.cert.length) {
return false;
}
for (let i = 0; i < this.options.cert.length; i++) {
const thisCert = this.options.cert[i];
const otherCert = other.options.cert[i];
if (Buffer.isBuffer(thisCert) && Buffer.isBuffer(otherCert)) {
if (!thisCert.equals(otherCert)) {
return false;
}
}
else {
if (thisCert !== otherCert) {
return false;
}
}
}
}
else {
if (this.options.cert !== other.options.cert) {
return false;
}
}
// options.key equality check
if (Array.isArray(this.options.key) && Array.isArray(other.options.key)) {
if (this.options.key.length !== other.options.key.length) {
return false;
}
for (let i = 0; i < this.options.key.length; i++) {
const thisKey = this.options.key[i];
const otherKey = other.options.key[i];
if (Buffer.isBuffer(thisKey) && Buffer.isBuffer(otherKey)) {
if (!thisKey.equals(otherKey)) {
return false;
}
}
else {
if (thisKey !== otherKey) {
return false;
}
}
}
}
else {
if (this.options.key !== other.options.key) {
return false;
}
}
// options.requestCert equality check
if (this.options.requestCert !== other.options.requestCert) {
return false;
}
/* ciphers is derived from a value that is constant for the process, so no
* equality check is needed. */
return true;
}
}
//# sourceMappingURL=server-credentials.js.map

@@ -6,2 +6,3 @@ import { Deserialize, Serialize, ServiceDefinition } from './make-client';

import { ServerRef } from './channelz';
import { ServerInterceptor } from './server-interceptors';
export type UntypedHandleCall = HandleCall<any, any>;

@@ -11,6 +12,14 @@ export interface UntypedServiceImplementation {

}
export interface ServerOptions extends ChannelOptions {
interceptors?: ServerInterceptor[];
}
export declare class Server {
private http2ServerList;
private boundPorts;
private http2Servers;
private handlers;
private sessions;
/**
* This field only exists to ensure that the start method throws an error if
* it is called twice, as it did previously.
*/
private started;

@@ -30,3 +39,9 @@ private shutdown;

private readonly keepaliveTimeoutMs;
constructor(options?: ChannelOptions);
private readonly interceptors;
/**
* Options that will be used to construct all Http2Server instances for this
* Server.
*/
private commonServerOptions;
constructor(options?: ServerOptions);
private getChannelzInfo;

@@ -39,6 +54,40 @@ private getChannelzSessionInfoGetter;

bind(port: string, creds: ServerCredentials): never;
private registerListenerToChannelz;
private createHttp2Server;
private bindOneAddress;
private bindManyPorts;
private bindAddressList;
private resolvePort;
private bindPort;
private normalizePort;
bindAsync(port: string, creds: ServerCredentials, callback: (error: Error | null, port: number) => void): void;
private closeServer;
private closeSession;
private completeUnbind;
/**
* Unbind a previously bound port, or cancel an in-progress bindAsync
* operation. If port 0 was bound, only the actual bound port can be
* unbound. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be unbound as "localhost:54321".
* @param port
*/
unbind(port: string): void;
/**
* Gracefully close all connections associated with a previously bound port.
* After the grace time, forcefully close all remaining open connections.
*
* If port 0 was bound, only the actual bound port can be
* drained. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be drained as "localhost:54321".
* @param port
* @param graceTimeMs
* @returns
*/
drain(port: string, graceTimeMs: number): void;
forceShutdown(): void;
register<RequestType, ResponseType>(name: string, handler: HandleCall<RequestType, ResponseType>, serialize: Serialize<ResponseType>, deserialize: Deserialize<RequestType>, type: string): boolean;
unregister(name: string): boolean;
/**
* @deprecated No longer needed as of version 1.10.x
*/
start(): void;

@@ -45,0 +94,0 @@ tryShutdown(callback: (error?: Error) => void): void;

@@ -18,5 +18,40 @@ "use strict";

*/
var __runInitializers = (this && this.__runInitializers) || function (thisArg, initializers, value) {
var useValue = arguments.length > 2;
for (var i = 0; i < initializers.length; i++) {
value = useValue ? initializers[i].call(thisArg, value) : initializers[i].call(thisArg);
}
return useValue ? value : void 0;
};
var __esDecorate = (this && this.__esDecorate) || function (ctor, descriptorIn, decorators, contextIn, initializers, extraInitializers) {
function accept(f) { if (f !== void 0 && typeof f !== "function") throw new TypeError("Function expected"); return f; }
var kind = contextIn.kind, key = kind === "getter" ? "get" : kind === "setter" ? "set" : "value";
var target = !descriptorIn && ctor ? contextIn["static"] ? ctor : ctor.prototype : null;
var descriptor = descriptorIn || (target ? Object.getOwnPropertyDescriptor(target, contextIn.name) : {});
var _, done = false;
for (var i = decorators.length - 1; i >= 0; i--) {
var context = {};
for (var p in contextIn) context[p] = p === "access" ? {} : contextIn[p];
for (var p in contextIn.access) context.access[p] = contextIn.access[p];
context.addInitializer = function (f) { if (done) throw new TypeError("Cannot add initializers after decoration has completed"); extraInitializers.push(accept(f || null)); };
var result = (0, decorators[i])(kind === "accessor" ? { get: descriptor.get, set: descriptor.set } : descriptor[key], context);
if (kind === "accessor") {
if (result === void 0) continue;
if (result === null || typeof result !== "object") throw new TypeError("Object expected");
if (_ = accept(result.get)) descriptor.get = _;
if (_ = accept(result.set)) descriptor.set = _;
if (_ = accept(result.init)) initializers.unshift(_);
}
else if (_ = accept(result)) {
if (kind === "field") initializers.unshift(_);
else descriptor[key] = _;
}
}
if (target) Object.defineProperty(target, contextIn.name, descriptor);
done = true;
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.Server = void 0;
const http2 = require("http2");
const util = require("util");
const constants_1 = require("./constants");

@@ -30,2 +65,3 @@ const server_call_1 = require("./server-call");

const channelz_1 = require("./channelz");
const server_interceptors_1 = require("./server-interceptors");
const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);

@@ -37,2 +73,12 @@ const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);

function noop() { }
/**
* Decorator to wrap a class method with util.deprecate
* @param message The message to output if the deprecated method is called
* @returns
*/
function deprecate(message) {
return function (target, context) {
return util.deprecate(target, message);
};
}
function getUnimplementedStatusResponse(methodName) {

@@ -67,246 +113,237 @@ return {

}
class Server {
constructor(options) {
var _a, _b, _c, _d;
this.http2ServerList = [];
this.handlers = new Map();
this.sessions = new Map();
this.started = false;
this.shutdown = false;
this.serverAddressString = 'null';
// Channelz Info
this.channelzEnabled = true;
this.channelzTrace = new channelz_1.ChannelzTrace();
this.callTracker = new channelz_1.ChannelzCallTracker();
this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
this.options = options !== null && options !== void 0 ? options : {};
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzRef = (0, channelz_1.registerChannelzServer)(() => this.getChannelzInfo(), this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Server created');
}
this.maxConnectionAgeMs =
(_a = this.options['grpc.max_connection_age_ms']) !== null && _a !== void 0 ? _a : UNLIMITED_CONNECTION_AGE_MS;
this.maxConnectionAgeGraceMs =
(_b = this.options['grpc.max_connection_age_grace_ms']) !== null && _b !== void 0 ? _b : UNLIMITED_CONNECTION_AGE_MS;
this.keepaliveTimeMs =
(_c = this.options['grpc.keepalive_time_ms']) !== null && _c !== void 0 ? _c : KEEPALIVE_MAX_TIME_MS;
this.keepaliveTimeoutMs =
(_d = this.options['grpc.keepalive_timeout_ms']) !== null && _d !== void 0 ? _d : KEEPALIVE_TIMEOUT_MS;
this.trace('Server constructed');
}
getChannelzInfo() {
return {
trace: this.channelzTrace,
callTracker: this.callTracker,
listenerChildren: this.listenerChildrenTracker.getChildLists(),
sessionChildren: this.sessionChildrenTracker.getChildLists(),
};
}
getChannelzSessionInfoGetter(session) {
return () => {
var _a, _b, _c;
const sessionInfo = this.sessions.get(session);
const sessionSocket = session.socket;
const remoteAddress = sessionSocket.remoteAddress
? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.remoteAddress, sessionSocket.remotePort)
: null;
const localAddress = sessionSocket.localAddress
? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.localAddress, sessionSocket.localPort)
: null;
let tlsInfo;
if (session.encrypted) {
const tlsSocket = sessionSocket;
const cipherInfo = tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null,
cipherSuiteOtherName: cipherInfo.standardName
? null
: cipherInfo.name,
localCertificate: certificate && 'raw' in certificate ? certificate.raw : null,
remoteCertificate: peerCertificate && 'raw' in peerCertificate
? peerCertificate.raw
: null,
let Server = (() => {
var _a;
let _instanceExtraInitializers = [];
let _start_decorators;
return _a = class Server {
constructor(options) {
var _b, _c, _d, _e, _f;
this.boundPorts = (__runInitializers(this, _instanceExtraInitializers), new Map());
this.http2Servers = new Map();
this.handlers = new Map();
this.sessions = new Map();
/**
* This field only exists to ensure that the start method throws an error if
* it is called twice, as it did previously.
*/
this.started = false;
this.shutdown = false;
this.serverAddressString = 'null';
// Channelz Info
this.channelzEnabled = true;
this.channelzTrace = new channelz_1.ChannelzTrace();
this.callTracker = new channelz_1.ChannelzCallTracker();
this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker();
this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker();
this.options = options !== null && options !== void 0 ? options : {};
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzRef = (0, channelz_1.registerChannelzServer)(() => this.getChannelzInfo(), this.channelzEnabled);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Server created');
}
this.maxConnectionAgeMs =
(_b = this.options['grpc.max_connection_age_ms']) !== null && _b !== void 0 ? _b : UNLIMITED_CONNECTION_AGE_MS;
this.maxConnectionAgeGraceMs =
(_c = this.options['grpc.max_connection_age_grace_ms']) !== null && _c !== void 0 ? _c : UNLIMITED_CONNECTION_AGE_MS;
this.keepaliveTimeMs =
(_d = this.options['grpc.keepalive_time_ms']) !== null && _d !== void 0 ? _d : KEEPALIVE_MAX_TIME_MS;
this.keepaliveTimeoutMs =
(_e = this.options['grpc.keepalive_timeout_ms']) !== null && _e !== void 0 ? _e : KEEPALIVE_TIMEOUT_MS;
this.commonServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
}
else {
tlsInfo = null;
}
const socketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: null,
streamsStarted: sessionInfo.streamTracker.callsStarted,
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
streamsFailed: sessionInfo.streamTracker.callsFailed,
messagesSent: sessionInfo.messagesSent,
messagesReceived: sessionInfo.messagesReceived,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null,
remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null,
};
return socketInfo;
};
}
trace(text) {
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
}
addProtoService() {
throw new Error('Not implemented. Use addService() instead');
}
addService(service, implementation) {
if (service === null ||
typeof service !== 'object' ||
implementation === null ||
typeof implementation !== 'object') {
throw new Error('addService() requires two objects as arguments');
}
const serviceKeys = Object.keys(service);
if (serviceKeys.length === 0) {
throw new Error('Cannot add an empty service to a server');
}
serviceKeys.forEach(name => {
const attrs = service[name];
let methodType;
if (attrs.requestStream) {
if (attrs.responseStream) {
methodType = 'bidi';
if ('grpc-node.max_session_memory' in this.options) {
this.commonServerOptions.maxSessionMemory =
this.options['grpc-node.max_session_memory'];
}
else {
methodType = 'clientStream';
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
}
else {
if (attrs.responseStream) {
methodType = 'serverStream';
if ('grpc.max_concurrent_streams' in this.options) {
this.commonServerOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}
else {
methodType = 'unary';
}
this.interceptors = (_f = this.options.interceptors) !== null && _f !== void 0 ? _f : [];
this.trace('Server constructed');
}
let implFn = implementation[name];
let impl;
if (implFn === undefined && typeof attrs.originalName === 'string') {
implFn = implementation[attrs.originalName];
getChannelzInfo() {
return {
trace: this.channelzTrace,
callTracker: this.callTracker,
listenerChildren: this.listenerChildrenTracker.getChildLists(),
sessionChildren: this.sessionChildrenTracker.getChildLists(),
};
}
if (implFn !== undefined) {
impl = implFn.bind(implementation);
getChannelzSessionInfoGetter(session) {
return () => {
var _b, _c, _d;
const sessionInfo = this.sessions.get(session);
const sessionSocket = session.socket;
const remoteAddress = sessionSocket.remoteAddress
? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.remoteAddress, sessionSocket.remotePort)
: null;
const localAddress = sessionSocket.localAddress
? (0, subchannel_address_1.stringToSubchannelAddress)(sessionSocket.localAddress, sessionSocket.localPort)
: null;
let tlsInfo;
if (session.encrypted) {
const tlsSocket = sessionSocket;
const cipherInfo = tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: (_b = cipherInfo.standardName) !== null && _b !== void 0 ? _b : null,
cipherSuiteOtherName: cipherInfo.standardName
? null
: cipherInfo.name,
localCertificate: certificate && 'raw' in certificate ? certificate.raw : null,
remoteCertificate: peerCertificate && 'raw' in peerCertificate
? peerCertificate.raw
: null,
};
}
else {
tlsInfo = null;
}
const socketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: null,
streamsStarted: sessionInfo.streamTracker.callsStarted,
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
streamsFailed: sessionInfo.streamTracker.callsFailed,
messagesSent: sessionInfo.messagesSent,
messagesReceived: sessionInfo.messagesReceived,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp,
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
localFlowControlWindow: (_c = session.state.localWindowSize) !== null && _c !== void 0 ? _c : null,
remoteFlowControlWindow: (_d = session.state.remoteWindowSize) !== null && _d !== void 0 ? _d : null,
};
return socketInfo;
};
}
else {
impl = getDefaultHandler(methodType, name);
trace(text) {
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text);
}
const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
if (success === false) {
throw new Error(`Method handler for ${attrs.path} already provided.`);
addProtoService() {
throw new Error('Not implemented. Use addService() instead');
}
});
}
removeService(service) {
if (service === null || typeof service !== 'object') {
throw new Error('removeService() requires object as argument');
}
const serviceKeys = Object.keys(service);
serviceKeys.forEach(name => {
const attrs = service[name];
this.unregister(attrs.path);
});
}
bind(port, creds) {
throw new Error('Not implemented. Use bindAsync() instead');
}
bindAsync(port, creds, callback) {
if (this.started === true) {
throw new Error('server is already started');
}
if (this.shutdown) {
throw new Error('bindAsync called after shutdown');
}
if (typeof port !== 'string') {
throw new TypeError('port must be a string');
}
if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
throw new TypeError('creds must be a ServerCredentials object');
}
if (typeof callback !== 'function') {
throw new TypeError('callback must be a function');
}
const initialPortUri = (0, uri_parser_1.parseUri)(port);
if (initialPortUri === null) {
throw new Error(`Could not parse port "${port}"`);
}
const portUri = (0, resolver_1.mapUriDefaultScheme)(initialPortUri);
if (portUri === null) {
throw new Error(`Could not get a default scheme for port "${port}"`);
}
const serverOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
if ('grpc-node.max_session_memory' in this.options) {
serverOptions.maxSessionMemory =
this.options['grpc-node.max_session_memory'];
}
else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
if ('grpc.max_concurrent_streams' in this.options) {
serverOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}
const deferredCallback = (error, port) => {
process.nextTick(() => callback(error, port));
};
const setupServer = () => {
let http2Server;
if (creds._isSecure()) {
const secureServerOptions = Object.assign(serverOptions, creds._getSettings());
secureServerOptions.enableTrace =
this.options['grpc-node.tls_enable_trace'] === 1;
http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e) => {
this.trace('An incoming TLS connection closed with error: ' + e.message);
});
addService(service, implementation) {
if (service === null ||
typeof service !== 'object' ||
implementation === null ||
typeof implementation !== 'object') {
throw new Error('addService() requires two objects as arguments');
}
const serviceKeys = Object.keys(service);
if (serviceKeys.length === 0) {
throw new Error('Cannot add an empty service to a server');
}
serviceKeys.forEach(name => {
const attrs = service[name];
let methodType;
if (attrs.requestStream) {
if (attrs.responseStream) {
methodType = 'bidi';
}
else {
methodType = 'clientStream';
}
}
else {
if (attrs.responseStream) {
methodType = 'serverStream';
}
else {
methodType = 'unary';
}
}
let implFn = implementation[name];
let impl;
if (implFn === undefined && typeof attrs.originalName === 'string') {
implFn = implementation[attrs.originalName];
}
if (implFn !== undefined) {
impl = implFn.bind(implementation);
}
else {
impl = getDefaultHandler(methodType, name);
}
const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType);
if (success === false) {
throw new Error(`Method handler for ${attrs.path} already provided.`);
}
});
}
else {
http2Server = http2.createServer(serverOptions);
removeService(service) {
if (service === null || typeof service !== 'object') {
throw new Error('removeService() requires object as argument');
}
const serviceKeys = Object.keys(service);
serviceKeys.forEach(name => {
const attrs = service[name];
this.unregister(attrs.path);
});
}
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
};
const bindSpecificPort = (addressList, portNum, previousCount) => {
if (addressList.length === 0) {
return Promise.resolve({ port: portNum, count: previousCount });
bind(port, creds) {
throw new Error('Not implemented. Use bindAsync() instead');
}
return Promise.all(addressList.map(address => {
this.trace('Attempting to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address));
let addr;
if ((0, subchannel_address_1.isTcpSubchannelAddress)(address)) {
addr = {
host: address.host,
port: portNum,
registerListenerToChannelz(boundAddress) {
return (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundAddress), () => {
return {
localAddress: boundAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
}, this.channelzEnabled);
}
createHttp2Server(credentials) {
let http2Server;
if (credentials._isSecure()) {
const secureServerOptions = Object.assign(this.commonServerOptions, credentials._getSettings());
secureServerOptions.enableTrace =
this.options['grpc-node.tls_enable_trace'] === 1;
http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e) => {
this.trace('An incoming TLS connection closed with error: ' + e.message);
});
});
}
else {
addr = address;
http2Server = http2.createServer(this.commonServerOptions);
}
const http2Server = setupServer();
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
}
bindOneAddress(address, boundPortObject) {
this.trace('Attempting to bind ' + (0, subchannel_address_1.subchannelAddressToString)(address));
const http2Server = this.createHttp2Server(boundPortObject.credentials);
return new Promise((resolve, reject) => {

@@ -318,11 +355,9 @@ const onError = (err) => {

err.message);
resolve(err);
resolve({
port: 'port' in address ? address.port : 1,
error: err.message
});
};
http2Server.once('error', onError);
http2Server.listen(addr, () => {
if (this.shutdown) {
http2Server.close();
resolve(new Error('bindAsync failed because server is shutdown'));
return;
}
http2Server.listen(address, () => {
const boundAddress = http2Server.address();

@@ -341,558 +376,859 @@ let boundSubchannelAddress;

}
const channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
}, this.channelzEnabled);
const channelzRef = this.registerListenerToChannelz(boundSubchannelAddress);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({
server: http2Server,
this.http2Servers.set(http2Server, {
channelzRef: channelzRef,
sessions: new Set()
});
boundPortObject.listeningServers.add(http2Server);
this.trace('Successfully bound ' +
(0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
resolve('port' in boundSubchannelAddress
? boundSubchannelAddress.port
: portNum);
resolve({
port: 'port' in boundSubchannelAddress
? boundSubchannelAddress.port
: 1
});
http2Server.removeListener('error', onError);
});
});
})).then(results => {
let count = 0;
for (const result of results) {
if (typeof result === 'number') {
count += 1;
if (result !== portNum) {
throw new Error('Invalid state: multiple port numbers added from single address');
}
}
async bindManyPorts(addressList, boundPortObject) {
if (addressList.length === 0) {
return {
count: 0,
port: 0,
errors: []
};
}
if ((0, subchannel_address_1.isTcpSubchannelAddress)(addressList[0]) && addressList[0].port === 0) {
/* If binding to port 0, first try to bind the first address, then bind
* the rest of the address list to the specific port that it binds. */
const firstAddressResult = await this.bindOneAddress(addressList[0], boundPortObject);
if (firstAddressResult.error) {
/* If the first address fails to bind, try the same operation starting
* from the second item in the list. */
const restAddressResult = await this.bindManyPorts(addressList.slice(1), boundPortObject);
return Object.assign(Object.assign({}, restAddressResult), { errors: [firstAddressResult.error, ...restAddressResult.errors] });
}
else {
const restAddresses = addressList.slice(1).map(address => (0, subchannel_address_1.isTcpSubchannelAddress)(address) ? { host: address.host, port: firstAddressResult.port } : address);
const restAddressResult = await Promise.all(restAddresses.map(address => this.bindOneAddress(address, boundPortObject)));
const allResults = [firstAddressResult, ...restAddressResult];
return {
count: allResults.filter(result => result.error === undefined).length,
port: firstAddressResult.port,
errors: allResults.filter(result => result.error).map(result => result.error)
};
}
}
return {
port: portNum,
count: count + previousCount,
};
});
};
const bindWildcardPort = (addressList) => {
if (addressList.length === 0) {
return Promise.resolve({ port: 0, count: 0 });
else {
const allResults = await Promise.all(addressList.map(address => this.bindOneAddress(address, boundPortObject)));
return {
count: allResults.filter(result => result.error === undefined).length,
port: allResults[0].port,
errors: allResults.filter(result => result.error).map(result => result.error)
};
}
}
const address = addressList[0];
const http2Server = setupServer();
return new Promise((resolve, reject) => {
const onError = (err) => {
this.trace('Failed to bind ' +
(0, subchannel_address_1.subchannelAddressToString)(address) +
' with error ' +
err.message);
resolve(bindWildcardPort(addressList.slice(1)));
};
http2Server.once('error', onError);
http2Server.listen(address, () => {
if (this.shutdown) {
http2Server.close();
resolve({ port: 0, count: 0 });
return;
async bindAddressList(addressList, boundPortObject) {
let bindResult;
try {
bindResult = await this.bindManyPorts(addressList, boundPortObject);
}
catch (error) {
throw error;
}
if (bindResult.count > 0) {
if (bindResult.count < addressList.length) {
logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
}
const boundAddress = http2Server.address();
const boundSubchannelAddress = {
host: boundAddress.address,
port: boundAddress.port,
return bindResult.port;
}
else {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(constants_1.LogVerbosity.ERROR, errorString);
throw new Error(`${errorString} errors: [${bindResult.errors.join(',')}]`);
}
}
resolvePort(port) {
return new Promise((resolve, reject) => {
const resolverListener = {
onSuccessfulResolution: (endpointList, serviceConfig, serviceConfigError) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => { };
const addressList = [].concat(...endpointList.map(endpoint => endpoint.addresses));
if (addressList.length === 0) {
reject(new Error(`No addresses resolved for port ${port}`));
return;
}
resolve(addressList);
},
onError: error => {
reject(new Error(error.details));
},
};
const channelzRef = (0, channelz_1.registerChannelzSocket)((0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress), () => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
}, this.channelzEnabled);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({
server: http2Server,
channelzRef: channelzRef,
});
this.trace('Successfully bound ' +
(0, subchannel_address_1.subchannelAddressToString)(boundSubchannelAddress));
resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
http2Server.removeListener('error', onError);
const resolver = (0, resolver_1.createResolver)(port, resolverListener, this.options);
resolver.updateResolution();
});
});
};
const resolverListener = {
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => { };
}
async bindPort(port, boundPortObject) {
const addressList = await this.resolvePort(port);
if (boundPortObject.cancelled) {
this.completeUnbind(boundPortObject);
throw new Error('bindAsync operation cancelled by unbind call');
}
const portNumber = await this.bindAddressList(addressList, boundPortObject);
if (boundPortObject.cancelled) {
this.completeUnbind(boundPortObject);
throw new Error('bindAsync operation cancelled by unbind call');
}
return portNumber;
}
normalizePort(port) {
const initialPortUri = (0, uri_parser_1.parseUri)(port);
if (initialPortUri === null) {
throw new Error(`Could not parse port "${port}"`);
}
const portUri = (0, resolver_1.mapUriDefaultScheme)(initialPortUri);
if (portUri === null) {
throw new Error(`Could not get a default scheme for port "${port}"`);
}
return portUri;
}
bindAsync(port, creds, callback) {
if (this.shutdown) {
deferredCallback(new Error(`bindAsync failed because server is shutdown`), 0);
throw new Error('bindAsync called after shutdown');
}
if (addressList.length === 0) {
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
return;
if (typeof port !== 'string') {
throw new TypeError('port must be a string');
}
let bindResultPromise;
if ((0, subchannel_address_1.isTcpSubchannelAddress)(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) {
throw new TypeError('creds must be a ServerCredentials object');
}
if (typeof callback !== 'function') {
throw new TypeError('callback must be a function');
}
this.trace('bindAsync port=' + port);
const portUri = this.normalizePort(port);
const deferredCallback = (error, port) => {
process.nextTick(() => callback(error, port));
};
/* First, if this port is already bound or that bind operation is in
* progress, use that result. */
let boundPortObject = this.boundPorts.get((0, uri_parser_1.uriToString)(portUri));
if (boundPortObject) {
if (!creds._equals(boundPortObject.credentials)) {
deferredCallback(new Error(`${port} already bound with incompatible credentials`), 0);
return;
}
/* If that operation has previously been cancelled by an unbind call,
* uncancel it. */
boundPortObject.cancelled = false;
if (boundPortObject.completionPromise) {
boundPortObject.completionPromise.then(portNum => callback(null, portNum), error => callback(error, 0));
}
else {
bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
deferredCallback(null, boundPortObject.portNumber);
}
return;
}
boundPortObject = {
mapKey: (0, uri_parser_1.uriToString)(portUri),
originalUri: portUri,
completionPromise: null,
cancelled: false,
portNumber: 0,
credentials: creds,
listeningServers: new Set()
};
const splitPort = (0, uri_parser_1.splitHostPort)(portUri.path);
const completionPromise = this.bindPort(portUri, boundPortObject);
boundPortObject.completionPromise = completionPromise;
/* If the port number is 0, defer populating the map entry until after the
* bind operation completes and we have a specific port number. Otherwise,
* populate it immediately. */
if ((splitPort === null || splitPort === void 0 ? void 0 : splitPort.port) === 0) {
completionPromise.then(portNum => {
const finalUri = {
scheme: portUri.scheme,
authority: portUri.authority,
path: (0, uri_parser_1.combineHostPort)({ host: splitPort.host, port: portNum })
};
boundPortObject.mapKey = (0, uri_parser_1.uriToString)(finalUri);
boundPortObject.completionPromise = null;
boundPortObject.portNumber = portNum;
this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
callback(null, portNum);
}, error => {
callback(error, 0);
});
}
else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
completionPromise.then(portNum => {
boundPortObject.completionPromise = null;
boundPortObject.portNumber = portNum;
callback(null, portNum);
}, error => {
callback(error, 0);
});
}
bindResultPromise.then(bindResult => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(constants_1.LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
}
closeServer(server, callback) {
this.trace('Closing server with address ' + JSON.stringify(server.address()));
const serverInfo = this.http2Servers.get(server);
server.close(() => {
if (this.channelzEnabled && serverInfo) {
this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
(0, channelz_1.unregisterChannelzRef)(serverInfo.channelzRef);
}
else {
if (bindResult.count < addressList.length) {
logging.log(constants_1.LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
}
deferredCallback(null, bindResult.port);
}
}, error => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(constants_1.LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
this.http2Servers.delete(server);
callback === null || callback === void 0 ? void 0 : callback();
});
},
onError: error => {
deferredCallback(new Error(error.details), 0);
},
};
const resolver = (0, resolver_1.createResolver)(portUri, resolverListener, this.options);
resolver.updateResolution();
}
forceShutdown() {
// Close the server if it is still running.
for (const { server: http2Server, channelzRef: ref } of this
.http2ServerList) {
if (http2Server.listening) {
http2Server.close(() => {
if (this.channelzEnabled) {
this.listenerChildrenTracker.unrefChild(ref);
(0, channelz_1.unregisterChannelzRef)(ref);
}
closeSession(session, callback) {
var _b;
this.trace('Closing session initiated by ' + ((_b = session.socket) === null || _b === void 0 ? void 0 : _b.remoteAddress));
const sessionInfo = this.sessions.get(session);
const closeCallback = () => {
if (this.channelzEnabled && sessionInfo) {
this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
(0, channelz_1.unregisterChannelzRef)(sessionInfo.ref);
}
});
this.sessions.delete(session);
callback === null || callback === void 0 ? void 0 : callback();
};
if (session.closed) {
process.nextTick(closeCallback);
}
else {
session.close(closeCallback);
}
}
}
this.started = false;
this.shutdown = true;
// Always destroy any available sessions. It's possible that one or more
// tryShutdown() calls are in progress. Don't wait on them to finish.
this.sessions.forEach((channelzInfo, session) => {
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
// recognize destroy(code) as a valid signature.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
session.destroy(http2.constants.NGHTTP2_CANCEL);
});
this.sessions.clear();
if (this.channelzEnabled) {
(0, channelz_1.unregisterChannelzRef)(this.channelzRef);
}
}
register(name, handler, serialize, deserialize, type) {
if (this.handlers.has(name)) {
return false;
}
this.handlers.set(name, {
func: handler,
serialize,
deserialize,
type,
path: name,
});
return true;
}
unregister(name) {
return this.handlers.delete(name);
}
start() {
if (this.http2ServerList.length === 0 ||
this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) {
throw new Error('server must be bound in order to start');
}
if (this.started === true) {
throw new Error('server is already started');
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Starting');
}
this.started = true;
}
tryShutdown(callback) {
const wrappedCallback = (error) => {
if (this.channelzEnabled) {
(0, channelz_1.unregisterChannelzRef)(this.channelzRef);
completeUnbind(boundPortObject) {
for (const server of boundPortObject.listeningServers) {
const serverInfo = this.http2Servers.get(server);
this.closeServer(server, () => {
boundPortObject.listeningServers.delete(server);
});
if (serverInfo) {
for (const session of serverInfo.sessions) {
this.closeSession(session);
}
}
}
this.boundPorts.delete(boundPortObject.mapKey);
}
callback(error);
};
let pendingChecks = 0;
function maybeCallback() {
pendingChecks--;
if (pendingChecks === 0) {
wrappedCallback();
/**
* Unbind a previously bound port, or cancel an in-progress bindAsync
* operation. If port 0 was bound, only the actual bound port can be
* unbound. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be unbound as "localhost:54321".
* @param port
*/
unbind(port) {
this.trace('unbind port=' + port);
const portUri = this.normalizePort(port);
const splitPort = (0, uri_parser_1.splitHostPort)(portUri.path);
if ((splitPort === null || splitPort === void 0 ? void 0 : splitPort.port) === 0) {
throw new Error('Cannot unbind port 0');
}
const boundPortObject = this.boundPorts.get((0, uri_parser_1.uriToString)(portUri));
if (boundPortObject) {
this.trace('unbinding ' + boundPortObject.mapKey + ' originally bound as ' + (0, uri_parser_1.uriToString)(boundPortObject.originalUri));
/* If the bind operation is pending, the cancelled flag will trigger
* the unbind operation later. */
if (boundPortObject.completionPromise) {
boundPortObject.cancelled = true;
}
else {
this.completeUnbind(boundPortObject);
}
}
}
}
// Close the server if necessary.
this.started = false;
this.shutdown = true;
for (const { server: http2Server, channelzRef: ref } of this
.http2ServerList) {
if (http2Server.listening) {
pendingChecks++;
http2Server.close(() => {
if (this.channelzEnabled) {
this.listenerChildrenTracker.unrefChild(ref);
(0, channelz_1.unregisterChannelzRef)(ref);
/**
* Gracefully close all connections associated with a previously bound port.
* After the grace time, forcefully close all remaining open connections.
*
* If port 0 was bound, only the actual bound port can be
* drained. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be drained as "localhost:54321".
* @param port
* @param graceTimeMs
* @returns
*/
drain(port, graceTimeMs) {
var _b, _c;
this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
const portUri = this.normalizePort(port);
const splitPort = (0, uri_parser_1.splitHostPort)(portUri.path);
if ((splitPort === null || splitPort === void 0 ? void 0 : splitPort.port) === 0) {
throw new Error('Cannot drain port 0');
}
const boundPortObject = this.boundPorts.get((0, uri_parser_1.uriToString)(portUri));
if (!boundPortObject) {
return;
}
const allSessions = new Set();
for (const http2Server of boundPortObject.listeningServers) {
const serverEntry = this.http2Servers.get(http2Server);
if (!serverEntry) {
continue;
}
maybeCallback();
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
}
}
/* After the grace time ends, send another goaway to all remaining sessions
* with the CANCEL code. */
(_c = (_b = setTimeout(() => {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL);
}
}, graceTimeMs)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
}
forceShutdown() {
for (const boundPortObject of this.boundPorts.values()) {
boundPortObject.cancelled = true;
}
this.boundPorts.clear();
// Close the server if it is still running.
for (const server of this.http2Servers.keys()) {
this.closeServer(server);
}
// Always destroy any available sessions. It's possible that one or more
// tryShutdown() calls are in progress. Don't wait on them to finish.
this.sessions.forEach((channelzInfo, session) => {
this.closeSession(session);
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
// recognize destroy(code) as a valid signature.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
session.destroy(http2.constants.NGHTTP2_CANCEL);
});
this.sessions.clear();
if (this.channelzEnabled) {
(0, channelz_1.unregisterChannelzRef)(this.channelzRef);
}
this.shutdown = true;
}
}
this.sessions.forEach((channelzInfo, session) => {
if (!session.closed) {
pendingChecks += 1;
session.close(maybeCallback);
register(name, handler, serialize, deserialize, type) {
if (this.handlers.has(name)) {
return false;
}
this.handlers.set(name, {
func: handler,
serialize,
deserialize,
type,
path: name,
});
return true;
}
});
if (pendingChecks === 0) {
wrappedCallback();
}
}
addHttp2Port() {
throw new Error('Not yet implemented');
}
/**
* Get the channelz reference object for this server. The returned value is
* garbage if channelz is disabled for this server.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
_verifyContentType(stream, headers) {
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if (typeof contentType !== 'string' ||
!contentType.startsWith('application/grpc')) {
stream.respond({
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
}, { endStream: true });
return false;
}
return true;
}
_retrieveHandler(path) {
this.trace('Received call to method ' +
path +
' at address ' +
this.serverAddressString);
const handler = this.handlers.get(path);
if (handler === undefined) {
this.trace('No handler registered for method ' +
path +
'. Sending UNIMPLEMENTED status.');
return null;
}
return handler;
}
_respondWithError(err, stream, channelzSessionInfo = null) {
const call = new server_call_1.Http2ServerCallStream(stream, null, this.options);
if (err.code === undefined) {
err.code = constants_1.Status.INTERNAL;
}
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
}
call.sendError(err);
}
_channelzHandler(stream, headers) {
const channelzSessionInfo = this.sessions.get(stream.session);
this.callTracker.addCallStarted();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
return;
}
const path = headers[HTTP2_HEADER_PATH];
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(getUnimplementedStatusResponse(path), stream, channelzSessionInfo);
return;
}
const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
call.once('callEnd', (code) => {
if (code === constants_1.Status.OK) {
this.callTracker.addCallSucceeded();
unregister(name) {
return this.handlers.delete(name);
}
else {
this.callTracker.addCallFailed();
/**
* @deprecated No longer needed as of version 1.10.x
*/
start() {
if (this.http2Servers.size === 0 ||
[...this.http2Servers.keys()].every(server => !server.listening)) {
throw new Error('server must be bound in order to start');
}
if (this.started === true) {
throw new Error('server is already started');
}
this.started = true;
}
});
if (channelzSessionInfo) {
call.once('streamEnd', (success) => {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
tryShutdown(callback) {
var _b;
const wrappedCallback = (error) => {
if (this.channelzEnabled) {
(0, channelz_1.unregisterChannelzRef)(this.channelzRef);
}
callback(error);
};
let pendingChecks = 0;
function maybeCallback() {
pendingChecks--;
if (pendingChecks === 0) {
wrappedCallback();
}
}
else {
channelzSessionInfo.streamTracker.addCallFailed();
this.shutdown = true;
for (const server of this.http2Servers.keys()) {
pendingChecks++;
const serverString = this.http2Servers.get(server).channelzRef.name;
this.trace('Waiting for server ' + serverString + ' to close');
this.closeServer(server, () => {
this.trace('Server ' + serverString + ' finished closing');
maybeCallback();
});
}
});
call.on('sendMessage', () => {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
});
call.on('receiveMessage', () => {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
});
}
if (!this._runHandlerForCall(call, handler, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
call.sendError({
code: constants_1.Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
}
_streamHandler(stream, headers) {
if (this._verifyContentType(stream, headers) !== true) {
return;
}
const path = headers[HTTP2_HEADER_PATH];
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(getUnimplementedStatusResponse(path), stream, null);
return;
}
const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options);
if (!this._runHandlerForCall(call, handler, headers)) {
call.sendError({
code: constants_1.Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
}
_runHandlerForCall(call, handler, headers) {
var _a;
const metadata = call.receiveMetadata(headers);
const encoding = (_a = metadata.get('grpc-encoding')[0]) !== null && _a !== void 0 ? _a : 'identity';
metadata.remove('grpc-encoding');
const { type } = handler;
if (type === 'unary') {
handleUnary(call, handler, metadata, encoding);
}
else if (type === 'clientStream') {
handleClientStreaming(call, handler, metadata, encoding);
}
else if (type === 'serverStream') {
handleServerStreaming(call, handler, metadata, encoding);
}
else if (type === 'bidi') {
handleBidiStreaming(call, handler, metadata, encoding);
}
else {
return false;
}
return true;
}
_setupHandlers(http2Server) {
if (http2Server === null) {
return;
}
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
for (const session of this.sessions.keys()) {
pendingChecks++;
const sessionString = (_b = session.socket) === null || _b === void 0 ? void 0 : _b.remoteAddress;
this.trace('Waiting for session ' + sessionString + ' to close');
this.closeSession(session, () => {
this.trace('Session ' + sessionString + ' finished closing');
maybeCallback();
});
}
if (pendingChecks === 0) {
wrappedCallback();
}
}
else {
serverAddressString = serverAddress.address + ':' + serverAddress.port;
addHttp2Port() {
throw new Error('Not yet implemented');
}
}
this.serverAddressString = serverAddressString;
const handler = this.channelzEnabled
? this._channelzHandler
: this._streamHandler;
http2Server.on('stream', handler.bind(this));
http2Server.on('session', session => {
var _a, _b, _c, _d, _e;
if (!this.started) {
session.destroy();
return;
/**
* Get the channelz reference object for this server. The returned value is
* garbage if channelz is disabled for this server.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
const channelzRef = (0, channelz_1.registerChannelzSocket)((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
const channelzSessionInfo = {
ref: channelzRef,
streamTracker: new channelz_1.ChannelzCallTracker(),
messagesSent: 0,
messagesReceived: 0,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
};
this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
_verifyContentType(stream, headers) {
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if (typeof contentType !== 'string' ||
!contentType.startsWith('application/grpc')) {
stream.respond({
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
}, { endStream: true });
return false;
}
return true;
}
let connectionAgeTimer = null;
let connectionAgeGraceTimer = null;
let sessionClosedByServer = false;
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
connectionAgeTimer = (_c = (_b = setTimeout(() => {
var _a, _b;
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
}
try {
session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), Buffer.from('max_age'));
}
catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = (_b = (_a = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
}
}, this.maxConnectionAgeMs + jitter)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
_retrieveHandler(path) {
this.trace('Received call to method ' +
path +
' at address ' +
this.serverAddressString);
const handler = this.handlers.get(path);
if (handler === undefined) {
this.trace('No handler registered for method ' +
path +
'. Sending UNIMPLEMENTED status.');
return null;
}
return handler;
}
const keeapliveTimeTimer = (_e = (_d = setInterval(() => {
var _a, _b;
const timeoutTImer = (_b = (_a = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
_respondWithError(err, stream, channelzSessionInfo = null) {
var _b, _c;
const trailersToSend = Object.assign({ 'grpc-status': (_b = err.code) !== null && _b !== void 0 ? _b : constants_1.Status.INTERNAL, 'grpc-message': err.details, [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto' }, (_c = err.metadata) === null || _c === void 0 ? void 0 : _c.toHttp2Headers());
stream.respond(trailersToSend, { endStream: true });
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
}
}
_channelzHandler(stream, headers) {
const channelzSessionInfo = this.sessions.get(stream.session);
this.callTracker.addCallStarted();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted();
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
return;
}
const path = headers[HTTP2_HEADER_PATH];
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(getUnimplementedStatusResponse(path), stream, channelzSessionInfo);
return;
}
let callEventTracker = {
addMessageSent: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
}
},
addMessageReceived: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
}
},
onCallEnd: status => {
if (status.code === constants_1.Status.OK) {
this.callTracker.addCallSucceeded();
}
else {
this.callTracker.addCallFailed();
}
},
onStreamEnd: success => {
if (channelzSessionInfo) {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
}
else {
channelzSessionInfo.streamTracker.addCallFailed();
}
}
}
session.close();
}, this.keepaliveTimeoutMs)).unref) === null || _b === void 0 ? void 0 : _b.call(_a);
try {
session.ping((err, duration, payload) => {
clearTimeout(timeoutTImer);
};
const call = (0, server_interceptors_1.getServerInterceptingCall)(this.interceptors, stream, headers, callEventTracker, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed();
call.sendStatus({
code: constants_1.Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
_streamHandler(stream, headers) {
if (this._verifyContentType(stream, headers) !== true) {
return;
}
}, this.keepaliveTimeMs)).unref) === null || _e === void 0 ? void 0 : _e.call(_d);
session.on('close', () => {
if (this.channelzEnabled) {
if (!sessionClosedByServer) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
}
this.sessionChildrenTracker.unrefChild(channelzRef);
(0, channelz_1.unregisterChannelzRef)(channelzRef);
const path = headers[HTTP2_HEADER_PATH];
const handler = this._retrieveHandler(path);
if (!handler) {
this._respondWithError(getUnimplementedStatusResponse(path), stream, null);
return;
}
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
const call = (0, server_interceptors_1.getServerInterceptingCall)(this.interceptors, stream, headers, null, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
code: constants_1.Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`,
});
}
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
_runHandlerForCall(call, handler) {
const { type } = handler;
if (type === 'unary') {
handleUnary(call, handler);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
else if (type === 'clientStream') {
handleClientStreaming(call, handler);
}
this.sessions.delete(session);
});
});
}
}
else if (type === 'serverStream') {
handleServerStreaming(call, handler);
}
else if (type === 'bidi') {
handleBidiStreaming(call, handler);
}
else {
return false;
}
return true;
}
_setupHandlers(http2Server) {
if (http2Server === null) {
return;
}
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
}
else {
serverAddressString = serverAddress.address + ':' + serverAddress.port;
}
}
this.serverAddressString = serverAddressString;
const handler = this.channelzEnabled
? this._channelzHandler
: this._streamHandler;
http2Server.on('stream', handler.bind(this));
http2Server.on('session', session => {
var _b, _c, _d, _e, _f, _g;
const channelzRef = (0, channelz_1.registerChannelzSocket)((_b = session.socket.remoteAddress) !== null && _b !== void 0 ? _b : 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled);
const channelzSessionInfo = {
ref: channelzRef,
streamTracker: new channelz_1.ChannelzCallTracker(),
messagesSent: 0,
messagesReceived: 0,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
};
(_c = this.http2Servers.get(http2Server)) === null || _c === void 0 ? void 0 : _c.sessions.add(session);
this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
}
let connectionAgeTimer = null;
let connectionAgeGraceTimer = null;
let sessionClosedByServer = false;
if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
// Apply a random jitter within a +/-10% range
const jitterMagnitude = this.maxConnectionAgeMs / 10;
const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
connectionAgeTimer = (_e = (_d = setTimeout(() => {
var _b, _c;
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress);
}
try {
session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), Buffer.from('max_age'));
}
catch (e) {
// The goaway can't be sent because the session is already closed
session.destroy();
return;
}
session.close();
/* Allow a grace period after sending the GOAWAY before forcibly
* closing the connection. */
if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
connectionAgeGraceTimer = (_c = (_b = setTimeout(() => {
session.destroy();
}, this.maxConnectionAgeGraceMs)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
}
}, this.maxConnectionAgeMs + jitter)).unref) === null || _e === void 0 ? void 0 : _e.call(_d);
}
const keeapliveTimeTimer = (_g = (_f = setInterval(() => {
var _b, _c;
const timeoutTImer = (_c = (_b = setTimeout(() => {
sessionClosedByServer = true;
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress);
}
session.close();
}, this.keepaliveTimeoutMs)).unref) === null || _c === void 0 ? void 0 : _c.call(_b);
try {
session.ping((err, duration, payload) => {
clearTimeout(timeoutTImer);
});
}
catch (e) {
// The ping can't be sent because the session is already closed
session.destroy();
}
}, this.keepaliveTimeMs)).unref) === null || _g === void 0 ? void 0 : _g.call(_f);
session.on('close', () => {
var _b;
if (this.channelzEnabled) {
if (!sessionClosedByServer) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
}
this.sessionChildrenTracker.unrefChild(channelzRef);
(0, channelz_1.unregisterChannelzRef)(channelzRef);
}
if (connectionAgeTimer) {
clearTimeout(connectionAgeTimer);
}
if (connectionAgeGraceTimer) {
clearTimeout(connectionAgeGraceTimer);
}
if (keeapliveTimeTimer) {
clearTimeout(keeapliveTimeTimer);
}
(_b = this.http2Servers.get(http2Server)) === null || _b === void 0 ? void 0 : _b.sessions.delete(session);
this.sessions.delete(session);
});
});
}
},
(() => {
const _metadata = typeof Symbol === "function" && Symbol.metadata ? Object.create(null) : void 0;
_start_decorators = [deprecate('Calling start() is no longer necessary. It can be safely omitted.')];
__esDecorate(_a, null, _start_decorators, { kind: "method", name: "start", static: false, private: false, access: { has: obj => "start" in obj, get: obj => obj.start }, metadata: _metadata }, null, _instanceExtraInitializers);
if (_metadata) Object.defineProperty(_a, Symbol.metadata, { enumerable: true, configurable: true, writable: true, value: _metadata });
})(),
_a;
})();
exports.Server = Server;
async function handleUnary(call, handler, metadata, encoding) {
try {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {
async function handleUnary(call, handler) {
let stream;
function respond(err, value, trailer, flags) {
if (err) {
call.sendStatus((0, server_call_1.serverErrorToStatus)(err, trailer));
return;
}
const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata, request);
handler.func(emitter, (err, value, trailer, flags) => {
call.sendUnaryMessage(err, value, trailer, flags);
call.sendMessage(value, () => {
call.sendStatus({
code: constants_1.Status.OK,
details: 'OK',
metadata: trailer !== null && trailer !== void 0 ? trailer : null
});
});
}
catch (err) {
call.sendError(err);
}
let requestMetadata;
let requestMessage = null;
call.start({
onReceiveMetadata(metadata) {
requestMetadata = metadata;
call.startRead();
},
onReceiveMessage(message) {
if (requestMessage) {
call.sendStatus({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
requestMessage = message;
call.startRead();
},
onReceiveHalfClose() {
if (!requestMessage) {
call.sendStatus({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
stream = new server_call_1.ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
try {
handler.func(stream, respond);
}
catch (err) {
call.sendStatus({
code: constants_1.Status.UNKNOWN,
details: `Server method handler threw error ${err.message}`,
metadata: null
});
}
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
}
},
});
}
function handleClientStreaming(call, handler, metadata, encoding) {
const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize, encoding);
function handleClientStreaming(call, handler) {
let stream;
function respond(err, value, trailer, flags) {
stream.destroy();
call.sendUnaryMessage(err, value, trailer, flags);
}
if (call.cancelled) {
return;
}
stream.on('error', respond);
handler.func(stream, respond);
}
async function handleServerStreaming(call, handler, metadata, encoding) {
try {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {
if (err) {
call.sendStatus((0, server_call_1.serverErrorToStatus)(err, trailer));
return;
}
const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize, request);
handler.func(stream);
call.sendMessage(value, () => {
call.sendStatus({
code: constants_1.Status.OK,
details: 'OK',
metadata: trailer !== null && trailer !== void 0 ? trailer : null
});
});
}
catch (err) {
call.sendError(err);
}
call.start({
onReceiveMetadata(metadata) {
stream = new server_call_1.ServerDuplexStreamImpl(handler.path, call, metadata);
try {
handler.func(stream, respond);
}
catch (err) {
call.sendStatus({
code: constants_1.Status.UNKNOWN,
details: `Server method handler threw error ${err.message}`,
metadata: null
});
}
},
onReceiveMessage(message) {
stream.push(message);
},
onReceiveHalfClose() {
stream.push(null);
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}
function handleBidiStreaming(call, handler, metadata, encoding) {
const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize, encoding);
if (call.cancelled) {
return;
}
handler.func(stream);
function handleServerStreaming(call, handler) {
let stream;
let requestMetadata;
let requestMessage = null;
call.start({
onReceiveMetadata(metadata) {
requestMetadata = metadata;
call.startRead();
},
onReceiveMessage(message) {
if (requestMessage) {
call.sendStatus({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
requestMessage = message;
call.startRead();
},
onReceiveHalfClose() {
if (!requestMessage) {
call.sendStatus({
code: constants_1.Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
stream = new server_call_1.ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
try {
handler.func(stream);
}
catch (err) {
call.sendStatus({
code: constants_1.Status.UNKNOWN,
details: `Server method handler threw error ${err.message}`,
metadata: null
});
}
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}
function handleBidiStreaming(call, handler) {
let stream;
call.start({
onReceiveMetadata(metadata) {
stream = new server_call_1.ServerDuplexStreamImpl(handler.path, call, metadata);
try {
handler.func(stream);
}
catch (err) {
call.sendStatus({
code: constants_1.Status.UNKNOWN,
details: `Server method handler threw error ${err.message}`,
metadata: null
});
}
},
onReceiveMessage(message) {
stream.push(message);
},
onReceiveHalfClose() {
stream.push(null);
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}
//# sourceMappingURL=server.js.map
import { Status } from './constants';
import { Duration } from './duration';
import { LoadBalancingConfig } from './load-balancer';
export interface MethodConfigName {

@@ -33,2 +32,5 @@ service?: string;

}
export interface LoadBalancingConfig {
[key: string]: object;
}
export interface ServiceConfig {

@@ -35,0 +37,0 @@ loadBalancingPolicy?: string;

@@ -31,3 +31,2 @@ "use strict";

const constants_1 = require("./constants");
const load_balancer_1 = require("./load-balancer");
/**

@@ -246,2 +245,17 @@ * Recognizes a number with up to 9 digits after the decimal point, followed by

exports.validateRetryThrottling = validateRetryThrottling;
function validateLoadBalancingConfig(obj) {
if (!(typeof obj === 'object' && obj !== null)) {
throw new Error(`Invalid loadBalancingConfig: unexpected type ${typeof obj}`);
}
const keys = Object.keys(obj);
if (keys.length > 1) {
throw new Error(`Invalid loadBalancingConfig: unexpected multiple keys ${keys}`);
}
if (keys.length === 0) {
throw new Error('Invalid loadBalancingConfig: load balancing policy name required');
}
return {
[keys[0]]: obj[keys[0]]
};
}
function validateServiceConfig(obj) {

@@ -263,3 +277,3 @@ const result = {

for (const config of obj.loadBalancingConfig) {
result.loadBalancingConfig.push((0, load_balancer_1.validateLoadBalancingConfig)(config));
result.loadBalancingConfig.push(validateLoadBalancingConfig(config));
}

@@ -266,0 +280,0 @@ }

@@ -19,1 +19,25 @@ export interface TcpSubchannelAddress {

export declare function stringToSubchannelAddress(addressString: string, port?: number): SubchannelAddress;
export interface Endpoint {
addresses: SubchannelAddress[];
}
export declare function endpointEqual(endpoint1: Endpoint, endpoint2: Endpoint): boolean;
export declare function endpointToString(endpoint: Endpoint): string;
export declare function endpointHasAddress(endpoint: Endpoint, expectedAddress: SubchannelAddress): boolean;
export declare class EndpointMap<ValueType> {
private map;
get size(): number;
getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined;
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]): ValueType[];
get(endpoint: Endpoint): ValueType | undefined;
set(endpoint: Endpoint, mapEntry: ValueType): void;
delete(endpoint: Endpoint): void;
has(endpoint: Endpoint): boolean;
clear(): void;
keys(): IterableIterator<Endpoint>;
values(): IterableIterator<ValueType>;
entries(): IterableIterator<[Endpoint, ValueType]>;
}

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.stringToSubchannelAddress = exports.subchannelAddressToString = exports.subchannelAddressEqual = exports.isTcpSubchannelAddress = void 0;
exports.EndpointMap = exports.endpointHasAddress = exports.endpointToString = exports.endpointEqual = exports.stringToSubchannelAddress = exports.subchannelAddressToString = exports.subchannelAddressEqual = exports.isTcpSubchannelAddress = void 0;
const net_1 = require("net");

@@ -67,2 +67,133 @@ function isTcpSubchannelAddress(address) {

exports.stringToSubchannelAddress = stringToSubchannelAddress;
function endpointEqual(endpoint1, endpoint2) {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (let i = 0; i < endpoint1.addresses.length; i++) {
if (!subchannelAddressEqual(endpoint1.addresses[i], endpoint2.addresses[i])) {
return false;
}
}
return true;
}
exports.endpointEqual = endpointEqual;
function endpointToString(endpoint) {
return ('[' + endpoint.addresses.map(subchannelAddressToString).join(', ') + ']');
}
exports.endpointToString = endpointToString;
function endpointHasAddress(endpoint, expectedAddress) {
for (const address of endpoint.addresses) {
if (subchannelAddressEqual(address, expectedAddress)) {
return true;
}
}
return false;
}
exports.endpointHasAddress = endpointHasAddress;
function endpointEqualUnordered(endpoint1, endpoint2) {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
class EndpointMap {
constructor() {
this.map = new Set();
}
get size() {
return this.map.size;
}
getForSubchannelAddress(address) {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints) {
const removedValues = [];
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
removedValues.push(entry.value);
this.map.delete(entry);
}
}
return removedValues;
}
get(endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint, mapEntry) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
clear() {
this.map.clear();
}
*keys() {
for (const entry of this.map) {
yield entry.key;
}
}
*values() {
for (const entry of this.map) {
yield entry.value;
}
}
*entries() {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}
exports.EndpointMap = EndpointMap;
//# sourceMappingURL=subchannel-address.js.map

@@ -5,2 +5,3 @@ import { SubchannelRef } from './channelz';

export type ConnectivityStateListener = (subchannel: SubchannelInterface, previousState: ConnectivityState, newState: ConnectivityState, keepaliveTime: number, errorMessage?: string) => void;
export type HealthListener = (healthy: boolean) => void;
/**

@@ -25,2 +26,5 @@ * This is an interface for load balancing policies to use to interact with

getChannelzRef(): SubchannelRef;
isHealthy(): boolean;
addHealthStateWatcher(listener: HealthListener): void;
removeHealthStateWatcher(listener: HealthListener): void;
/**

@@ -39,3 +43,6 @@ * If this is a wrapper, return the wrapped subchannel, otherwise return this

protected child: SubchannelInterface;
private healthy;
private healthListeners;
constructor(child: SubchannelInterface);
private updateHealthListeners;
getConnectivityState(): ConnectivityState;

@@ -50,4 +57,8 @@ addConnectivityStateListener(listener: ConnectivityStateListener): void;

getChannelzRef(): SubchannelRef;
isHealthy(): boolean;
addHealthStateWatcher(listener: HealthListener): void;
removeHealthStateWatcher(listener: HealthListener): void;
protected setHealthy(healthy: boolean): void;
getRealSubchannel(): Subchannel;
realSubchannelEquals(other: SubchannelInterface): boolean;
}

@@ -23,3 +23,17 @@ "use strict";

this.child = child;
this.healthy = true;
this.healthListeners = new Set();
child.addHealthStateWatcher(childHealthy => {
/* A change to the child health state only affects this wrapper's overall
* health state if this wrapper is reporting healthy. */
if (this.healthy) {
this.updateHealthListeners();
}
});
}
updateHealthListeners() {
for (const listener of this.healthListeners) {
listener(this.isHealthy());
}
}
getConnectivityState() {

@@ -52,2 +66,21 @@ return this.child.getConnectivityState();

}
isHealthy() {
return this.healthy && this.child.isHealthy();
}
addHealthStateWatcher(listener) {
this.healthListeners.add(listener);
}
removeHealthStateWatcher(listener) {
this.healthListeners.delete(listener);
}
setHealthy(healthy) {
if (healthy !== this.healthy) {
this.healthy = healthy;
/* A change to this wrapper's health state only affects the overall
* reported health state if the child is healthy. */
if (this.child.isHealthy()) {
this.updateHealthListeners();
}
}
}
getRealSubchannel() {

@@ -54,0 +87,0 @@ return this.child.getRealSubchannel();

@@ -116,2 +116,5 @@ import { ChannelCredentials } from './channel-credentials';

getChannelzRef(): SubchannelRef;
isHealthy(): boolean;
addHealthStateWatcher(listener: (healthy: boolean) => void): void;
removeHealthStateWatcher(listener: (healthy: boolean) => void): void;
getRealSubchannel(): this;

@@ -118,0 +121,0 @@ realSubchannelEquals(other: SubchannelInterface): boolean;

@@ -340,2 +340,11 @@ "use strict";

}
isHealthy() {
return true;
}
addHealthStateWatcher(listener) {
// Do nothing with the listener
}
removeHealthStateWatcher(listener) {
// Do nothing with the listener
}
getRealSubchannel() {

@@ -342,0 +351,0 @@ return this;

/// <reference types="node" />
import * as http2 from 'http2';
import { StatusObject } from './call-interface';
import { PartialStatusObject } from './call-interface';
import { ChannelCredentials } from './channel-credentials';

@@ -14,3 +14,3 @@ import { ChannelOptions } from './channel-options';

addMessageReceived(): void;
onCallEnd(status: StatusObject): void;
onCallEnd(status: PartialStatusObject): void;
onStreamEnd(success: boolean): void;

@@ -17,0 +17,0 @@ }

@@ -12,2 +12,3 @@ export interface GrpcUri {

export declare function splitHostPort(path: string): HostPort | null;
export declare function combineHostPort(hostPort: HostPort): string;
export declare function uriToString(uri: GrpcUri): string;

@@ -19,3 +19,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.uriToString = exports.splitHostPort = exports.parseUri = void 0;
exports.uriToString = exports.combineHostPort = exports.splitHostPort = exports.parseUri = void 0;
/*

@@ -100,2 +100,17 @@ * The groups correspond to URI parts as follows:

exports.splitHostPort = splitHostPort;
function combineHostPort(hostPort) {
if (hostPort.port === undefined) {
return hostPort.host;
}
else {
// Only an IPv6 host should include a colon
if (hostPort.host.includes(':')) {
return `[${hostPort.host}]:${hostPort.port}`;
}
else {
return `${hostPort.host}:${hostPort.port}`;
}
}
}
exports.combineHostPort = combineHostPort;
function uriToString(uri) {

@@ -102,0 +117,0 @@ let result = '';

{
"name": "@grpc/grpc-js",
"version": "1.9.14",
"version": "1.10.0",
"description": "gRPC Library for Node - pure JS implementation",

@@ -5,0 +5,0 @@ "homepage": "https://grpc.io/",

@@ -40,3 +40,3 @@ /*

export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata: Metadata | null;
metadata?: Metadata | null | undefined;
};

@@ -43,0 +43,0 @@

@@ -166,4 +166,4 @@ /*

class InsecureChannelCredentialsImpl extends ChannelCredentials {
constructor(callCredentials?: CallCredentials) {
super(callCredentials);
constructor() {
super();
}

@@ -170,0 +170,0 @@

@@ -64,2 +64,3 @@ /*

'grpc-node.tls_enable_trace'?: number;
'grpc.lb.ring_hash.ring_size_cap'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any

@@ -100,2 +101,3 @@ [key: string]: any;

'grpc-node.tls_enable_trace': true,
'grpc.lb.ring_hash.ring_size_cap': true,
};

@@ -102,0 +104,0 @@

@@ -11,16 +11,21 @@ export { trace, log } from './logging';

export { Duration, durationToMs } from './duration';
export { ServiceConfig, MethodConfig, RetryPolicy } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export {
LoadBalancer,
LoadBalancingConfig,
TypedLoadBalancingConfig,
ChannelControlHelper,
createChildChannelControlHelper,
registerLoadBalancerType,
getFirstUsableConfig,
validateLoadBalancingConfig,
selectLbConfigFromList,
parseLoadBalancingConfig,
isLoadBalancerNameRegistered,
} from './load-balancer';
export { LeafLoadBalancer } from './load-balancer-pick-first';
export {
SubchannelAddress,
subchannelAddressToString,
Endpoint,
endpointToString,
endpointHasAddress,
EndpointMap,
} from './subchannel-address';

@@ -44,7 +49,8 @@ export { ChildLoadBalancerHandler } from './load-balancer-child-handler';

ConnectivityStateListener,
HealthListener,
} from './subchannel-interface';
export {
OutlierDetectionLoadBalancingConfig,
OutlierDetectionRawConfig,
SuccessRateEjectionConfig,
FailurePercentageEjectionConfig,
} from './load-balancer-outlier-detection';

@@ -30,3 +30,3 @@ /*

import { ConnectivityState } from './connectivity-state';
import { ChannelCredentials } from './channel-credentials';
import { ChannelCredentials, VerifyOptions } from './channel-credentials';
import {

@@ -53,2 +53,3 @@ CallOptions,

Server,
ServerOptions,
UntypedHandleCall,

@@ -187,2 +188,3 @@ UntypedServiceImplementation,

UntypedServiceImplementation,
VerifyOptions
};

@@ -231,3 +233,3 @@

export { Server };
export { Server, ServerOptions };
export { ServerCredentials };

@@ -268,2 +270,16 @@ export { KeyCertPair };

export { ServiceConfig, LoadBalancingConfig, MethodConfig, RetryPolicy } from './service-config';
export {
ServerListener,
FullServerListener,
ServerListenerBuilder,
Responder,
FullResponder,
ResponderBuilder,
ServerInterceptingCallInterface,
ServerInterceptingCall,
ServerInterceptor
} from './server-interceptors';
import * as experimental from './experimental';

@@ -270,0 +286,0 @@ export { experimental };

@@ -197,2 +197,11 @@ /*

/**
* Randomly generated ID to be passed to the config selector, for use by
* ring_hash in xDS. An integer distributed approximately uniformly between
* 0 and MAX_SAFE_INTEGER.
*/
private readonly randomChannelId = Math.floor(
Math.random() * Number.MAX_SAFE_INTEGER
);
constructor(

@@ -537,3 +546,3 @@ target: string,

type: 'SUCCESS',
config: this.configSelector(method, metadata),
config: this.configSelector(method, metadata, this.randomChannelId),
};

@@ -540,0 +549,0 @@ } else {

@@ -21,6 +21,6 @@ /*

ChannelControlHelper,
LoadBalancingConfig,
TypedLoadBalancingConfig,
createLoadBalancer,
} from './load-balancer';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { ChannelOptions } from './channel-options';

@@ -37,3 +37,3 @@ import { ConnectivityState } from './connectivity-state';

private pendingChild: LoadBalancer | null = null;
private latestConfig: LoadBalancingConfig | null = null;
private latestConfig: TypedLoadBalancingConfig | null = null;

@@ -89,7 +89,10 @@ private ChildPolicyHelper = class {

constructor(private readonly channelControlHelper: ChannelControlHelper) {}
constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
) {}
protected configUpdateRequiresNewPolicyInstance(
oldConfig: LoadBalancingConfig,
newConfig: LoadBalancingConfig
oldConfig: TypedLoadBalancingConfig,
newConfig: TypedLoadBalancingConfig
): boolean {

@@ -101,3 +104,3 @@ return oldConfig.getLoadBalancerName() !== newConfig.getLoadBalancerName();

* Prerequisites: lbConfig !== null and lbConfig.name is registered
* @param addressList
* @param endpointList
* @param lbConfig

@@ -107,4 +110,4 @@ * @param attributes

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }

@@ -119,3 +122,3 @@ ): void {

const newHelper = new this.ChildPolicyHelper(this);
const newChild = createLoadBalancer(lbConfig, newHelper)!;
const newChild = createLoadBalancer(lbConfig, newHelper, this.options)!;
newHelper.setChild(newChild);

@@ -140,3 +143,3 @@ if (this.currentChild === null) {

this.latestConfig = lbConfig;
childToUpdate.updateAddressList(addressList, lbConfig, attributes);
childToUpdate.updateAddressList(endpointList, lbConfig, attributes);
}

@@ -143,0 +146,0 @@ exitIdle(): void {

@@ -21,3 +21,3 @@ /*

import { LogVerbosity, Status } from './constants';
import { durationToMs, isDuration, msToDuration } from './duration';
import { Duration, durationToMs, isDuration, msToDuration } from './duration';
import {

@@ -29,6 +29,5 @@ ChannelControlHelper,

import {
getFirstUsableConfig,
selectLbConfigFromList,
LoadBalancer,
LoadBalancingConfig,
validateLoadBalancingConfig,
TypedLoadBalancingConfig,
} from './load-balancer';

@@ -38,11 +37,13 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';

import {
Endpoint,
EndpointMap,
SubchannelAddress,
subchannelAddressToString,
endpointToString,
} from './subchannel-address';
import {
BaseSubchannelWrapper,
ConnectivityStateListener,
SubchannelInterface,
} from './subchannel-interface';
import * as logging from './logging';
import { LoadBalancingConfig } from './service-config';

@@ -74,2 +75,12 @@ const TRACER_NAME = 'outlier_detection';

export interface OutlierDetectionRawConfig {
interval?: Duration;
base_ejection_time?: Duration;
max_ejection_time?: Duration;
max_ejection_percent?: number;
success_rate_ejection?: Partial<SuccessRateEjectionConfig>;
failure_percentage_ejection?: Partial<FailurePercentageEjectionConfig>;
child_policy: LoadBalancingConfig[];
}
const defaultSuccessRateEjectionConfig: SuccessRateEjectionConfig = {

@@ -104,3 +115,7 @@ stdev_factor: 1900,

) {
if (fieldName in obj && typeof obj[fieldName] !== expectedType) {
if (
fieldName in obj &&
obj[fieldName] !== undefined &&
typeof obj[fieldName] !== expectedType
) {
const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;

@@ -121,3 +136,3 @@ throw new Error(

const fullFieldName = objectName ? `${objectName}.${fieldName}` : fieldName;
if (fieldName in obj) {
if (fieldName in obj && obj[fieldName] !== undefined) {
if (!isDuration(obj[fieldName])) {

@@ -148,3 +163,7 @@ throw new Error(

validateFieldType(obj, fieldName, 'number', objectName);
if (fieldName in obj && !(obj[fieldName] >= 0 && obj[fieldName] <= 100)) {
if (
fieldName in obj &&
obj[fieldName] !== undefined &&
!(obj[fieldName] >= 0 && obj[fieldName] <= 100)
) {
throw new Error(

@@ -157,3 +176,3 @@ `outlier detection config ${fullFieldName} parse error: value out of range for percentage (0-100)`

export class OutlierDetectionLoadBalancingConfig
implements LoadBalancingConfig
implements TypedLoadBalancingConfig
{

@@ -174,8 +193,5 @@ private readonly intervalMs: number;

failurePercentageEjection: Partial<FailurePercentageEjectionConfig> | null,
private readonly childPolicy: LoadBalancingConfig[]
private readonly childPolicy: TypedLoadBalancingConfig
) {
if (
childPolicy.length > 0 &&
childPolicy[0].getLoadBalancerName() === 'pick_first'
) {
if (childPolicy.getLoadBalancerName() === 'pick_first') {
throw new Error(

@@ -204,9 +220,12 @@ 'outlier_detection LB policy cannot have a pick_first child policy'

return {
interval: msToDuration(this.intervalMs),
base_ejection_time: msToDuration(this.baseEjectionTimeMs),
max_ejection_time: msToDuration(this.maxEjectionTimeMs),
max_ejection_percent: this.maxEjectionPercent,
success_rate_ejection: this.successRateEjection,
failure_percentage_ejection: this.failurePercentageEjection,
child_policy: this.childPolicy.map(policy => policy.toJsonObject()),
outlier_detection: {
interval: msToDuration(this.intervalMs),
base_ejection_time: msToDuration(this.baseEjectionTimeMs),
max_ejection_time: msToDuration(this.maxEjectionTimeMs),
max_ejection_percent: this.maxEjectionPercent,
success_rate_ejection: this.successRateEjection ?? undefined,
failure_percentage_ejection:
this.failurePercentageEjection ?? undefined,
child_policy: [this.childPolicy.toJsonObject()],
},
};

@@ -233,20 +252,6 @@ }

}
getChildPolicy(): LoadBalancingConfig[] {
getChildPolicy(): TypedLoadBalancingConfig {
return this.childPolicy;
}
copyWithChildPolicy(
childPolicy: LoadBalancingConfig[]
): OutlierDetectionLoadBalancingConfig {
return new OutlierDetectionLoadBalancingConfig(
this.intervalMs,
this.baseEjectionTimeMs,
this.maxEjectionTimeMs,
this.maxEjectionPercent,
this.successRateEjection,
this.failurePercentageEjection,
childPolicy
);
}
static createFromJson(obj: any): OutlierDetectionLoadBalancingConfig {

@@ -257,3 +262,6 @@ validatePositiveDuration(obj, 'interval');

validatePercentage(obj, 'max_ejection_percent');
if ('success_rate_ejection' in obj) {
if (
'success_rate_ejection' in obj &&
obj.success_rate_ejection !== undefined
) {
if (typeof obj.success_rate_ejection !== 'object') {

@@ -288,3 +296,6 @@ throw new Error(

}
if ('failure_percentage_ejection' in obj) {
if (
'failure_percentage_ejection' in obj &&
obj.failure_percentage_ejection !== undefined
) {
if (typeof obj.failure_percentage_ejection !== 'object') {

@@ -319,2 +330,12 @@ throw new Error(

if (!('child_policy' in obj) || !Array.isArray(obj.child_policy)) {
throw new Error('outlier detection config child_policy must be an array');
}
const childPolicy = selectLbConfigFromList(obj.child_policy);
if (!childPolicy) {
throw new Error(
'outlier detection config child_policy: no valid recognized policy found'
);
}
return new OutlierDetectionLoadBalancingConfig(

@@ -327,3 +348,3 @@ obj.interval ? durationToMs(obj.interval) : null,

obj.failure_percentage_ejection,
obj.child_policy.map(validateLoadBalancingConfig)
childPolicy
);

@@ -337,5 +358,2 @@ }

{
private childSubchannelState: ConnectivityState;
private stateListeners: ConnectivityStateListener[] = [];
private ejected = false;
private refCount = 0;

@@ -347,44 +365,4 @@ constructor(

super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener(
(subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState, keepaliveTime);
}
}
}
);
}
getConnectivityState(): ConnectivityState {
if (this.ejected) {
return ConnectivityState.TRANSIENT_FAILURE;
} else {
return this.childSubchannelState;
}
}
/**
* Add a listener function to be called whenever the wrapper's
* connectivity state changes.
* @param listener
*/
addConnectivityStateListener(listener: ConnectivityStateListener) {
this.stateListeners.push(listener);
}
/**
* Remove a listener previously added with `addConnectivityStateListener`
* @param listener A reference to a function previously passed to
* `addConnectivityStateListener`
*/
removeConnectivityStateListener(listener: ConnectivityStateListener) {
const listenerIndex = this.stateListeners.indexOf(listener);
if (listenerIndex > -1) {
this.stateListeners.splice(listenerIndex, 1);
}
}
ref() {

@@ -409,23 +387,7 @@ this.child.ref();

eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(
this,
this.childSubchannelState,
ConnectivityState.TRANSIENT_FAILURE,
-1
);
}
this.setHealthy(false);
}
uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(
this,
ConnectivityState.TRANSIENT_FAILURE,
this.childSubchannelState,
-1
);
}
this.setHealthy(true);
}

@@ -475,9 +437,2 @@

interface MapEntry {
counter: CallCounter;
currentEjectionTimestamp: Date | null;
ejectionTimeMultiplier: number;
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
}
class OutlierDetectionPicker implements Picker {

@@ -520,5 +475,12 @@ constructor(private wrappedPicker: Picker, private countCalls: boolean) {}

interface MapEntry {
counter: CallCounter;
currentEjectionTimestamp: Date | null;
ejectionTimeMultiplier: number;
subchannelWrappers: OutlierDetectionSubchannelWrapper[];
}
export class OutlierDetectionLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;
private addressMap: Map<string, MapEntry> = new Map<string, MapEntry>();
private entryMap = new EndpointMap<MapEntry>();
private latestConfig: OutlierDetectionLoadBalancingConfig | null = null;

@@ -528,3 +490,6 @@ private ejectionTimer: NodeJS.Timeout;

constructor(channelControlHelper: ChannelControlHelper) {
constructor(
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
) {
this.childBalancer = new ChildLoadBalancerHandler(

@@ -540,5 +505,4 @@ createChildChannelControlHelper(channelControlHelper, {

);
const mapEntry = this.addressMap.get(
subchannelAddressToString(subchannelAddress)
);
const mapEntry =
this.entryMap.getForSubchannelAddress(subchannelAddress);
const subchannelWrapper = new OutlierDetectionSubchannelWrapper(

@@ -565,3 +529,4 @@ originalSubchannel,

},
})
}),
options
);

@@ -582,3 +547,3 @@ this.ejectionTimer = setInterval(() => {}, 0);

let ejectionCount = 0;
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
if (mapEntry.currentEjectionTimestamp !== null) {

@@ -588,3 +553,3 @@ ejectionCount += 1;

}
return (ejectionCount * 100) / this.addressMap.size;
return (ejectionCount * 100) / this.entryMap.size;
}

@@ -605,3 +570,3 @@

const successRates: number[] = [];
for (const [address, mapEntry] of this.addressMap) {
for (const [endpoint, mapEntry] of this.entryMap.entries()) {
const successes = mapEntry.counter.getLastSuccesses();

@@ -611,3 +576,3 @@ const failures = mapEntry.counter.getLastFailures();

'Stats for ' +
address +
endpointToString(endpoint) +
': successes=' +

@@ -656,3 +621,3 @@ successes +

// Step 3
for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
// Step 3.i

@@ -709,3 +674,3 @@ if (

let addressesWithTargetVolume = 0;
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
const successes = mapEntry.counter.getLastSuccesses();

@@ -722,3 +687,3 @@ const failures = mapEntry.counter.getLastFailures();

// Step 2
for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
// Step 2.i

@@ -774,3 +739,3 @@ if (

private switchAllBuckets() {
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
mapEntry.counter.switchBuckets();

@@ -800,3 +765,3 @@ }

for (const [address, mapEntry] of this.addressMap.entries()) {
for (const [address, mapEntry] of this.entryMap.entries()) {
if (mapEntry.currentEjectionTimestamp === null) {

@@ -828,4 +793,4 @@ if (mapEntry.ejectionTimeMultiplier > 0) {

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }

@@ -836,10 +801,6 @@ ): void {

}
const subchannelAddresses = new Set<string>();
for (const address of addressList) {
subchannelAddresses.add(subchannelAddressToString(address));
}
for (const address of subchannelAddresses) {
if (!this.addressMap.has(address)) {
trace('Adding map entry for ' + address);
this.addressMap.set(address, {
for (const endpoint of endpointList) {
if (!this.entryMap.has(endpoint)) {
trace('Adding map entry for ' + endpointToString(endpoint));
this.entryMap.set(endpoint, {
counter: new CallCounter(),

@@ -852,13 +813,5 @@ currentEjectionTimestamp: null,

}
for (const key of this.addressMap.keys()) {
if (!subchannelAddresses.has(key)) {
trace('Removing map entry for ' + key);
this.addressMap.delete(key);
}
}
const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
lbConfig.getChildPolicy(),
true
);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
this.entryMap.deleteMissing(endpointList);
const childPolicy = lbConfig.getChildPolicy();
this.childBalancer.updateAddressList(endpointList, childPolicy, attributes);

@@ -886,3 +839,3 @@ if (

clearTimeout(this.ejectionTimer);
for (const mapEntry of this.addressMap.values()) {
for (const mapEntry of this.entryMap.values()) {
this.uneject(mapEntry);

@@ -889,0 +842,0 @@ mapEntry.ejectionTimeMultiplier = 0;

@@ -21,5 +21,6 @@ /*

ChannelControlHelper,
LoadBalancingConfig,
TypedLoadBalancingConfig,
registerDefaultLoadBalancerType,
registerLoadBalancerType,
createChildChannelControlHelper,
} from './load-balancer';

@@ -35,3 +36,3 @@ import { ConnectivityState } from './connectivity-state';

} from './picker';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import * as logging from './logging';

@@ -42,3 +43,7 @@ import { LogVerbosity } from './constants';

ConnectivityStateListener,
HealthListener,
} from './subchannel-interface';
import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';

@@ -59,3 +64,3 @@ const TRACER_NAME = 'pick_first';

export class PickFirstLoadBalancingConfig implements LoadBalancingConfig {
export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
constructor(private readonly shuffleAddressList: boolean) {}

@@ -132,2 +137,38 @@

/**
* Interleave addresses in addressList by family in accordance with RFC-8304 section 4
* @param addressList
* @returns
*/
function interleaveAddressFamilies(
addressList: SubchannelAddress[]
): SubchannelAddress[] {
const result: SubchannelAddress[] = [];
const ipv6Addresses: SubchannelAddress[] = [];
const ipv4Addresses: SubchannelAddress[] = [];
const ipv6First =
isTcpSubchannelAddress(addressList[0]) && isIPv6(addressList[0].host);
for (const address of addressList) {
if (isTcpSubchannelAddress(address) && isIPv6(address.host)) {
ipv6Addresses.push(address);
} else {
ipv4Addresses.push(address);
}
}
const firstList = ipv6First ? ipv6Addresses : ipv4Addresses;
const secondList = ipv6First ? ipv4Addresses : ipv6Addresses;
for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) {
if (i < firstList.length) {
result.push(firstList[i]);
}
if (i < secondList.length) {
result.push(secondList[i]);
}
}
return result;
}
const REPORT_HEALTH_STATUS_OPTION_NAME =
'grpc-node.internal.pick-first.report_health_status';
export class PickFirstLoadBalancer implements LoadBalancer {

@@ -167,2 +208,5 @@ /**

};
private pickedSubchannelHealthListener: HealthListener = () =>
this.calculateAndReportNewState();
/**

@@ -183,2 +227,4 @@ * Timer reference for the timer tracking when to start

private reportHealthStatus: boolean;
/**

@@ -205,5 +251,9 @@ * Indicates whether we called channelControlHelper.requestReresolution since

*/
constructor(private readonly channelControlHelper: ChannelControlHelper) {
constructor(
private readonly channelControlHelper: ChannelControlHelper,
options: ChannelOptions
) {
this.connectionDelayTimeout = setTimeout(() => {}, 0);
clearTimeout(this.connectionDelayTimeout);
this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
}

@@ -217,6 +267,15 @@

if (this.currentPick) {
this.updateState(
ConnectivityState.READY,
new PickFirstPicker(this.currentPick)
);
if (this.reportHealthStatus && !this.currentPick.isHealthy()) {
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`,
})
);
} else {
this.updateState(
ConnectivityState.READY,
new PickFirstPicker(this.currentPick)
);
}
} else if (this.children.length === 0) {

@@ -276,2 +335,7 @@ this.updateState(ConnectivityState.IDLE, new QueuePicker(this));

);
if (this.reportHealthStatus) {
currentPick.removeHealthStateWatcher(
this.pickedSubchannelHealthListener
);
}
}

@@ -366,13 +430,8 @@ }

this.stickyTransientFailureMode = false;
if (this.currentPick !== null) {
this.currentPick.unref();
this.channelControlHelper.removeChannelzChild(
this.currentPick.getChannelzRef()
);
this.currentPick.removeConnectivityStateListener(
this.subchannelStateListener
);
}
this.removeCurrentPick();
this.currentPick = subchannel;
subchannel.ref();
if (this.reportHealthStatus) {
subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener);
}
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());

@@ -454,4 +513,4 @@ this.resetSubchannelList();

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
): void {

@@ -465,4 +524,11 @@ if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {

if (lbConfig.getShuffleAddressList()) {
addressList = shuffled(addressList);
endpointList = shuffled(endpointList);
}
const rawAddressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
const addressList = interleaveAddressFamilies(rawAddressList);
this.latestAddressList = addressList;

@@ -493,2 +559,69 @@ this.connectToAddressList(addressList);

const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
/**
* This class handles the leaf load balancing operations for a single endpoint.
* It is a thin wrapper around a PickFirstLoadBalancer with a different API
* that more closely reflects how it will be used as a leaf balancer.
*/
export class LeafLoadBalancer {
private pickFirstBalancer: PickFirstLoadBalancer;
private latestState: ConnectivityState = ConnectivityState.IDLE;
private latestPicker: Picker;
constructor(
private endpoint: Endpoint,
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
) {
const childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
updateState: (connectivityState, picker) => {
this.latestState = connectivityState;
this.latestPicker = picker;
channelControlHelper.updateState(connectivityState, picker);
},
}
);
this.pickFirstBalancer = new PickFirstLoadBalancer(
childChannelControlHelper,
{ ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
);
this.latestPicker = new QueuePicker(this.pickFirstBalancer);
}
startConnecting() {
this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
}
/**
* Update the endpoint associated with this LeafLoadBalancer to a new
* endpoint. Does not trigger connection establishment if a connection
* attempt is not already in progress.
* @param newEndpoint
*/
updateEndpoint(newEndpoint: Endpoint) {
this.endpoint = newEndpoint;
if (this.latestState !== ConnectivityState.IDLE) {
this.startConnecting();
}
}
getConnectivityState() {
return this.latestState;
}
getPicker() {
return this.latestPicker;
}
getEndpoint() {
return this.endpoint;
}
destroy() {
this.pickFirstBalancer.destroy();
}
}
export function setup(): void {

@@ -495,0 +628,0 @@ registerLoadBalancerType(

@@ -21,4 +21,5 @@ /*

ChannelControlHelper,
LoadBalancingConfig,
TypedLoadBalancingConfig,
registerLoadBalancerType,
createChildChannelControlHelper,
} from './load-balancer';

@@ -30,16 +31,14 @@ import { ConnectivityState } from './connectivity-state';

PickArgs,
CompletePickResult,
PickResultType,
UnavailablePicker,
PickResult,
} from './picker';
import {
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import {
ConnectivityStateListener,
SubchannelInterface,
} from './subchannel-interface';
Endpoint,
endpointEqual,
endpointToString,
} from './subchannel-address';
import { LeafLoadBalancer } from './load-balancer-pick-first';
import { ChannelOptions } from './channel-options';

@@ -54,3 +53,3 @@ const TRACER_NAME = 'round_robin';

class RoundRobinLoadBalancingConfig implements LoadBalancingConfig {
class RoundRobinLoadBalancingConfig implements TypedLoadBalancingConfig {
getLoadBalancerName(): string {

@@ -76,16 +75,10 @@ return TYPE_NAME;

constructor(
private readonly subchannelList: SubchannelInterface[],
private readonly children: { endpoint: Endpoint; picker: Picker }[],
private nextIndex = 0
) {}
pick(pickArgs: PickArgs): CompletePickResult {
const pickedSubchannel = this.subchannelList[this.nextIndex];
this.nextIndex = (this.nextIndex + 1) % this.subchannelList.length;
return {
pickResultType: PickResultType.COMPLETE,
subchannel: pickedSubchannel,
status: null,
onCallStarted: null,
onCallEnded: null,
};
pick(pickArgs: PickArgs): PickResult {
const childPicker = this.children[this.nextIndex].picker;
this.nextIndex = (this.nextIndex + 1) % this.children.length;
return childPicker.pick(pickArgs);
}

@@ -98,4 +91,4 @@

*/
peekNextSubchannel(): SubchannelInterface {
return this.subchannelList[this.nextIndex];
peekNextEndpoint(): Endpoint {
return this.children[this.nextIndex].endpoint;
}

@@ -105,50 +98,46 @@ }

export class RoundRobinLoadBalancer implements LoadBalancer {
private subchannels: SubchannelInterface[] = [];
private children: LeafLoadBalancer[] = [];
private currentState: ConnectivityState = ConnectivityState.IDLE;
private subchannelStateListener: ConnectivityStateListener;
private currentReadyPicker: RoundRobinPicker | null = null;
private updatesPaused = false;
private childChannelControlHelper: ChannelControlHelper;
private lastError: string | null = null;
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.subchannelStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState,
keepaliveTime: number,
errorMessage?: string
) => {
this.calculateAndUpdateState();
if (
newState === ConnectivityState.TRANSIENT_FAILURE ||
newState === ConnectivityState.IDLE
) {
if (errorMessage) {
this.lastError = errorMessage;
}
this.channelControlHelper.requestReresolution();
subchannel.startConnecting();
constructor(
private readonly channelControlHelper: ChannelControlHelper,
private readonly options: ChannelOptions
) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
updateState: (connectivityState, picker) => {
this.calculateAndUpdateState();
},
}
};
);
}
private countSubchannelsWithState(state: ConnectivityState) {
return this.subchannels.filter(
subchannel => subchannel.getConnectivityState() === state
).length;
private countChildrenWithState(state: ConnectivityState) {
return this.children.filter(child => child.getConnectivityState() === state)
.length;
}
private calculateAndUpdateState() {
if (this.countSubchannelsWithState(ConnectivityState.READY) > 0) {
const readySubchannels = this.subchannels.filter(
subchannel =>
subchannel.getConnectivityState() === ConnectivityState.READY
if (this.updatesPaused) {
return;
}
if (this.countChildrenWithState(ConnectivityState.READY) > 0) {
const readyChildren = this.children.filter(
child => child.getConnectivityState() === ConnectivityState.READY
);
let index = 0;
if (this.currentReadyPicker !== null) {
index = readySubchannels.indexOf(
this.currentReadyPicker.peekNextSubchannel()
const nextPickedEndpoint = this.currentReadyPicker.peekNextEndpoint();
index = readyChildren.findIndex(child =>
endpointEqual(child.getEndpoint(), nextPickedEndpoint)
);

@@ -161,10 +150,14 @@ if (index < 0) {

ConnectivityState.READY,
new RoundRobinPicker(readySubchannels, index)
new RoundRobinPicker(
readyChildren.map(child => ({
endpoint: child.getEndpoint(),
picker: child.getPicker(),
})),
index
)
);
} else if (
this.countSubchannelsWithState(ConnectivityState.CONNECTING) > 0
) {
} else if (this.countChildrenWithState(ConnectivityState.CONNECTING) > 0) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
} else if (
this.countSubchannelsWithState(ConnectivityState.TRANSIENT_FAILURE) > 0
this.countChildrenWithState(ConnectivityState.TRANSIENT_FAILURE) > 0
) {

@@ -196,36 +189,26 @@ this.updateState(

private resetSubchannelList() {
for (const subchannel of this.subchannels) {
subchannel.removeConnectivityStateListener(this.subchannelStateListener);
subchannel.unref();
this.channelControlHelper.removeChannelzChild(
subchannel.getChannelzRef()
);
for (const child of this.children) {
child.destroy();
}
this.subchannels = [];
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig
): void {
this.resetSubchannelList();
trace(
'Connect to address list ' +
addressList.map(address => subchannelAddressToString(address))
trace('Connect to endpoint list ' + endpointList.map(endpointToString));
this.updatesPaused = true;
this.children = endpointList.map(
endpoint =>
new LeafLoadBalancer(
endpoint,
this.childChannelControlHelper,
this.options
)
);
this.subchannels = addressList.map(address =>
this.channelControlHelper.createSubchannel(address, {})
);
for (const subchannel of this.subchannels) {
subchannel.ref();
subchannel.addConnectivityStateListener(this.subchannelStateListener);
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
const subchannelState = subchannel.getConnectivityState();
if (
subchannelState === ConnectivityState.IDLE ||
subchannelState === ConnectivityState.TRANSIENT_FAILURE
) {
subchannel.startConnecting();
}
for (const child of this.children) {
child.startConnecting();
}
this.updatesPaused = false;
this.calculateAndUpdateState();

@@ -235,9 +218,8 @@ }

exitIdle(): void {
for (const subchannel of this.subchannels) {
subchannel.startConnecting();
}
/* The round_robin LB policy is only in the IDLE state if it has no
* addresses to try to connect to and it has no picked subchannel.
* In that case, there is no meaningful action that can be taken here. */
}
resetBackoff(): void {
/* The pick first load balancer does not have a connection backoff, so this
* does nothing */
// This LB policy has no backoff to reset
}

@@ -244,0 +226,0 @@ destroy(): void {

@@ -19,3 +19,3 @@ /*

import { ChannelOptions } from './channel-options';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { ConnectivityState } from './connectivity-state';

@@ -25,2 +25,5 @@ import { Picker } from './picker';

import { SubchannelInterface } from './subchannel-interface';
import { LoadBalancingConfig } from './service-config';
import { log } from './logging';
import { LogVerbosity } from './constants';

@@ -97,3 +100,3 @@ /**

* are established
* @param addressList The new list of addresses to connect to
* @param endpointList The new list of addresses to connect to
* @param lbConfig The load balancing config object from the service config,

@@ -103,4 +106,4 @@ * if one was provided

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }

@@ -132,6 +135,9 @@ ): void;

export interface LoadBalancerConstructor {
new (channelControlHelper: ChannelControlHelper): LoadBalancer;
new (
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
): LoadBalancer;
}
export interface LoadBalancingConfig {
export interface TypedLoadBalancingConfig {
getLoadBalancerName(): string;

@@ -141,7 +147,7 @@ toJsonObject(): object;

export interface LoadBalancingConfigConstructor {
export interface TypedLoadBalancingConfigConstructor {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any): LoadBalancingConfig;
new (...args: any): TypedLoadBalancingConfig;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
createFromJson(obj: any): LoadBalancingConfig;
createFromJson(obj: any): TypedLoadBalancingConfig;
}

@@ -152,3 +158,3 @@

LoadBalancer: LoadBalancerConstructor;
LoadBalancingConfig: LoadBalancingConfigConstructor;
LoadBalancingConfig: TypedLoadBalancingConfigConstructor;
};

@@ -162,3 +168,3 @@ } = {};

loadBalancerType: LoadBalancerConstructor,
loadBalancingConfigType: LoadBalancingConfigConstructor
loadBalancingConfigType: TypedLoadBalancingConfigConstructor
) {

@@ -176,4 +182,5 @@ registeredLoadBalancerTypes[typeName] = {

export function createLoadBalancer(
config: LoadBalancingConfig,
channelControlHelper: ChannelControlHelper
config: TypedLoadBalancingConfig,
channelControlHelper: ChannelControlHelper,
options: ChannelOptions
): LoadBalancer | null {

@@ -183,3 +190,4 @@ const typeName = config.getLoadBalancerName();

return new registeredLoadBalancerTypes[typeName].LoadBalancer(
channelControlHelper
channelControlHelper,
options
);

@@ -195,13 +203,48 @@ } else {

export function getFirstUsableConfig(
export function parseLoadBalancingConfig(
rawConfig: LoadBalancingConfig
): TypedLoadBalancingConfig {
const keys = Object.keys(rawConfig);
if (keys.length !== 1) {
throw new Error(
'Provided load balancing config has multiple conflicting entries'
);
}
const typeName = keys[0];
if (typeName in registeredLoadBalancerTypes) {
try {
return registeredLoadBalancerTypes[
typeName
].LoadBalancingConfig.createFromJson(rawConfig[typeName]);
} catch (e) {
throw new Error(`${typeName}: ${(e as Error).message}`);
}
} else {
throw new Error(`Unrecognized load balancing config name ${typeName}`);
}
}
export function getDefaultConfig() {
if (!defaultLoadBalancerType) {
throw new Error('No default load balancer type registered');
}
return new registeredLoadBalancerTypes[
defaultLoadBalancerType
]!.LoadBalancingConfig();
}
export function selectLbConfigFromList(
configs: LoadBalancingConfig[],
fallbackTodefault?: true
): LoadBalancingConfig;
export function getFirstUsableConfig(
configs: LoadBalancingConfig[],
fallbackTodefault = false
): LoadBalancingConfig | null {
): TypedLoadBalancingConfig | null {
for (const config of configs) {
if (config.getLoadBalancerName() in registeredLoadBalancerTypes) {
return config;
try {
return parseLoadBalancingConfig(config);
} catch (e) {
log(
LogVerbosity.DEBUG,
'Config parsing failed with error',
(e as Error).message
);
continue;
}

@@ -221,22 +264,1 @@ }

}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function validateLoadBalancingConfig(obj: any): LoadBalancingConfig {
if (!(obj !== null && typeof obj === 'object')) {
throw new Error('Load balancing config must be an object');
}
const keys = Object.keys(obj);
if (keys.length !== 1) {
throw new Error(
'Provided load balancing config has multiple conflicting entries'
);
}
const typeName = keys[0];
if (typeName in registeredLoadBalancerTypes) {
return registeredLoadBalancerTypes[
typeName
].LoadBalancingConfig.createFromJson(obj[typeName]);
} else {
throw new Error(`Unrecognized load balancing config name ${typeName}`);
}
}

@@ -117,4 +117,5 @@ /*

this.trace('Pick called');
const finalMetadata = this.metadata.clone();
const pickResult = this.channel.doPick(
this.metadata,
finalMetadata,
this.callConfig.pickInformation

@@ -151,3 +152,2 @@ );

}
const finalMetadata = this.metadata!.clone();
finalMetadata.merge(credsMetadata);

@@ -154,0 +154,0 @@ if (finalMetadata.get('authorization').length > 1) {

@@ -125,2 +125,4 @@ /*

* once any pick is attempted.
* If the childPicker is provided, delegate to it instead of returning the
* hardcoded QUEUE pick result, but still calls exitIdle.
*/

@@ -130,5 +132,8 @@ export class QueuePicker {

// Constructed with a load balancer. Calls exitIdle on it the first time pick is called
constructor(private loadBalancer: LoadBalancer) {}
constructor(
private loadBalancer: LoadBalancer,
private childPicker?: Picker
) {}
pick(pickArgs: PickArgs): QueuePickResult {
pick(pickArgs: PickArgs): PickResult {
if (!this.calledExitIdle) {

@@ -140,10 +145,14 @@ process.nextTick(() => {

}
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
if (this.childPicker) {
return this.childPicker.pick(pickArgs);
} else {
return {
pickResultType: PickResultType.QUEUE,
subchannel: null,
status: null,
onCallStarted: null,
onCallEnded: null,
};
}
}
}

@@ -31,3 +31,3 @@ /*

import { LogVerbosity } from './constants';
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address';
import { Endpoint, TcpSubchannelAddress } from './subchannel-address';
import { GrpcUri, uriToString, splitHostPort } from './uri-parser';

@@ -55,30 +55,6 @@ import { isIPv6, isIPv4 } from 'net';

/**
* Merge any number of arrays into a single alternating array
* @param arrays
*/
function mergeArrays<T>(...arrays: T[][]): T[] {
const result: T[] = [];
for (
let i = 0;
i <
Math.max.apply(
null,
arrays.map(array => array.length)
);
i++
) {
for (const array of arrays) {
if (i < array.length) {
result.push(array[i]);
}
}
}
return result;
}
/**
* Resolver implementation that handles DNS names and IP addresses.
*/
class DnsResolver implements Resolver {
private readonly ipResult: SubchannelAddress[] | null;
private readonly ipResult: Endpoint[] | null;
private readonly dnsHostname: string | null;

@@ -94,3 +70,3 @@ private readonly port: number | null;

private pendingTxtPromise: Promise<string[][]> | null = null;
private latestLookupResult: TcpSubchannelAddress[] | null = null;
private latestLookupResult: Endpoint[] | null = null;
private latestServiceConfig: ServiceConfig | null = null;

@@ -121,4 +97,8 @@ private latestServiceConfigError: StatusObject | null = null;

{
host: hostPort.host,
port: hostPort.port ?? DEFAULT_PORT,
addresses: [
{
host: hostPort.host,
port: hostPort.port ?? DEFAULT_PORT,
},
],
},

@@ -225,14 +205,11 @@ ];

this.backoff.stop();
const ip4Addresses: dns.LookupAddress[] = addressList.filter(
addr => addr.family === 4
);
const ip6Addresses: dns.LookupAddress[] = addressList.filter(
addr => addr.family === 6
);
this.latestLookupResult = mergeArrays(ip6Addresses, ip4Addresses).map(
const subchannelAddresses: TcpSubchannelAddress[] = addressList.map(
addr => ({ host: addr.address, port: +this.port! })
);
this.latestLookupResult = subchannelAddresses.map(address => ({
addresses: [address],
}));
const allAddressesString: string =
'[' +
this.latestLookupResult
subchannelAddresses
.map(addr => addr.host + ':' + addr.port)

@@ -239,0 +216,0 @@ .join(',') +

@@ -23,3 +23,3 @@ /*

import { registerResolver, Resolver, ResolverListener } from './resolver';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { GrpcUri, splitHostPort, uriToString } from './uri-parser';

@@ -43,3 +43,3 @@ import * as logging from './logging';

class IpResolver implements Resolver {
private addresses: SubchannelAddress[] = [];
private endpoints: Endpoint[] = [];
private error: StatusObject | null = null;

@@ -89,4 +89,4 @@ private hasReturnedResult = false;

}
this.addresses = addresses;
trace('Parsed ' + target.scheme + ' address list ' + this.addresses);
this.endpoints = addresses.map(address => ({ addresses: [address] }));
trace('Parsed ' + target.scheme + ' address list ' + addresses);
}

@@ -101,3 +101,3 @@ updateResolution(): void {

this.listener.onSuccessfulResolution(
this.addresses,
this.endpoints,
null,

@@ -104,0 +104,0 @@ null,

@@ -18,3 +18,3 @@ /*

import { Resolver, ResolverListener, registerResolver } from './resolver';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint } from './subchannel-address';
import { GrpcUri } from './uri-parser';

@@ -24,4 +24,4 @@ import { ChannelOptions } from './channel-options';

class UdsResolver implements Resolver {
private addresses: SubchannelAddress[] = [];
private hasReturnedResult = false;
private endpoints: Endpoint[] = [];
constructor(

@@ -38,3 +38,3 @@ target: GrpcUri,

}
this.addresses = [{ path }];
this.endpoints = [{ addresses: [{ path }] }];
}

@@ -46,3 +46,3 @@ updateResolution(): void {

this.listener.onSuccessfulResolution,
this.addresses,
this.endpoints,
null,

@@ -49,0 +49,0 @@ null,

@@ -20,3 +20,3 @@ /*

import { StatusObject } from './call-interface';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';

@@ -41,3 +41,3 @@ import { ChannelOptions } from './channel-options';

export interface ConfigSelector {
(methodName: string, metadata: Metadata): CallConfig;
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
}

@@ -60,3 +60,3 @@

onSuccessfulResolution(
addressList: SubchannelAddress[],
addressList: Endpoint[],
serviceConfig: ServiceConfig | null,

@@ -63,0 +63,0 @@ serviceConfigError: StatusObject | null,

@@ -21,4 +21,4 @@ /*

LoadBalancer,
LoadBalancingConfig,
getFirstUsableConfig,
TypedLoadBalancingConfig,
selectLbConfigFromList,
} from './load-balancer';

@@ -40,3 +40,3 @@ import {

import { LogVerbosity } from './constants';
import { SubchannelAddress } from './subchannel-address';
import { Endpoint } from './subchannel-address';
import { GrpcUri, uriToString } from './uri-parser';

@@ -218,27 +218,30 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler';

this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
this.childLoadBalancer = new ChildLoadBalancerHandler({
createSubchannel:
channelControlHelper.createSubchannel.bind(channelControlHelper),
requestReresolution: () => {
/* If the backoffTimeout is running, we're still backing off from
* making resolve requests, so we shouldn't make another one here.
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString());
this.childLoadBalancer = new ChildLoadBalancerHandler(
{
createSubchannel:
channelControlHelper.createSubchannel.bind(channelControlHelper),
requestReresolution: () => {
/* If the backoffTimeout is running, we're still backing off from
* making resolve requests, so we shouldn't make another one here.
* In that case, the backoff timer callback will call
* updateResolution */
if (this.backoffTimeout.isRunning()) {
trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString());
this.continueResolving = true;
} else {
this.updateResolution();
}
} else {
this.updateResolution();
}
},
updateState: (newState: ConnectivityState, picker: Picker) => {
this.latestChildState = newState;
this.latestChildPicker = picker;
this.updateState(newState, picker);
},
addChannelzChild:
channelControlHelper.addChannelzChild.bind(channelControlHelper),
removeChannelzChild:
channelControlHelper.removeChannelzChild.bind(channelControlHelper),
},
updateState: (newState: ConnectivityState, picker: Picker) => {
this.latestChildState = newState;
this.latestChildPicker = picker;
this.updateState(newState, picker);
},
addChannelzChild:
channelControlHelper.addChannelzChild.bind(channelControlHelper),
removeChannelzChild:
channelControlHelper.removeChannelzChild.bind(channelControlHelper),
});
channelOptions
);
this.innerResolver = createResolver(

@@ -248,3 +251,3 @@ target,

onSuccessfulResolution: (
addressList: SubchannelAddress[],
endpointList: Endpoint[],
serviceConfig: ServiceConfig | null,

@@ -285,3 +288,3 @@ serviceConfigError: ServiceError | null,

workingServiceConfig?.loadBalancingConfig ?? [];
const loadBalancingConfig = getFirstUsableConfig(
const loadBalancingConfig = selectLbConfigFromList(
workingConfigList,

@@ -301,3 +304,3 @@ true

this.childLoadBalancer.updateAddressList(
addressList,
endpointList,
loadBalancingConfig,

@@ -337,3 +340,7 @@ attributes

if (this.currentState === ConnectivityState.IDLE) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
/* this.latestChildPicker is initialized as new QueuePicker(this), which
* is an appropriate value here if the child LB policy is unset.
* Otherwise, we want to delegate to the child here, in case that
* triggers something. */
this.updateState(ConnectivityState.CONNECTING, this.latestChildPicker);
}

@@ -353,3 +360,3 @@ this.backoffTimeout.runOnce();

if (connectivityState === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
picker = new QueuePicker(this, picker);
}

@@ -385,4 +392,4 @@ this.currentState = connectivityState;

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig | null
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig | null
): never {

@@ -389,0 +396,0 @@ throw new Error('updateAddressList not supported on ResolvingLoadBalancer');

@@ -19,63 +19,14 @@ /*

import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';
import * as zlib from 'zlib';
import { promisify } from 'util';
import {
Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH,
DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
LogVerbosity,
} from './constants';
import { Deserialize, Serialize } from './make-client';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';
import * as logging from './logging';
import { StatusObject, PartialStatusObject } from './call-interface';
import { Deadline } from './deadline';
import { getErrorCode, getErrorMessage } from './error';
import { ServerInterceptingCallInterface } from './server-interceptors';
const TRACER_NAME = 'server_call';
const unzip = promisify(zlib.unzip);
const inflate = promisify(zlib.inflate);
function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}
interface DeadlineUnitIndexSignature {
[name: string]: number;
}
const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
const GRPC_ENCODING_HEADER = 'grpc-encoding';
const GRPC_MESSAGE_HEADER = 'grpc-message';
const GRPC_STATUS_HEADER = 'grpc-status';
const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
H: 3600000,
M: 60000,
S: 1000,
m: 1,
u: 0.001,
n: 0.000001,
};
const defaultCompressionHeaders = {
// TODO(cjihrig): Remove these encoding headers from the default response
// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',
};
const defaultResponseHeaders = {
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
};
const defaultResponseOptions = {
waitForTrailers: true,
} as http2.ServerStreamResponseOptions;
export type ServerStatusResponse = Partial<StatusObject>;

@@ -109,2 +60,23 @@

export function serverErrorToStatus(error: ServerErrorResponse | ServerStatusResponse, overrideTrailers?: Metadata | undefined): PartialStatusObject {
const status: PartialStatusObject = {
code: Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata: overrideTrailers ?? error.metadata ?? null
};
if (
'code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code)
) {
status.code = error.code;
if ('details' in error && typeof error.details === 'string') {
status.details = error.details!;
}
}
return status;
}
export class ServerUnaryCallImpl<RequestType, ResponseType>

@@ -117,3 +89,4 @@ extends EventEmitter

constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
private path: string,
private call: ServerInterceptingCallInterface,
public metadata: Metadata,

@@ -124,3 +97,2 @@ public request: RequestType

this.cancelled = false;
this.call.setupSurfaceCall(this);
}

@@ -141,3 +113,3 @@

getPath(): string {
return this.call.getPath();
return this.path;
}

@@ -153,19 +125,12 @@ }

constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
public deserialize: Deserialize<RequestType>,
encoding: string
private path: string,
private call: ServerInterceptingCallInterface,
public metadata: Metadata
) {
super({ objectMode: true });
this.cancelled = false;
this.call.setupSurfaceCall(this);
this.call.setupReadable(this, encoding);
}
_read(size: number) {
if (!this.call.consumeUnpushedMessages(this)) {
return;
}
this.call.resume();
this.call.startRead();
}

@@ -186,3 +151,3 @@

getPath(): string {
return this.call.getPath();
return this.path;
}

@@ -197,7 +162,11 @@ }

private trailingMetadata: Metadata;
private pendingStatus: PartialStatusObject = {
code: Status.OK,
details: 'OK'
};
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
private path: string,
private call: ServerInterceptingCallInterface,
public metadata: Metadata,
public serialize: Serialize<ResponseType>,
public request: RequestType

@@ -208,6 +177,5 @@ ) {

this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.on('error', err => {
this.call.sendError(err);
this.pendingStatus = serverErrorToStatus(err);
this.end();

@@ -230,3 +198,3 @@ });

getPath(): string {
return this.call.getPath();
return this.path;
}

@@ -240,17 +208,3 @@

) {
try {
const response = this.call.serializeMessage(chunk);
if (!this.call.write(response)) {
this.call.once('drain', callback);
return;
}
} catch (err) {
this.emit('error', {
details: getErrorMessage(err),
code: Status.INTERNAL,
});
}
callback();
this.call.sendMessage(chunk, callback);
}

@@ -260,5 +214,4 @@

this.call.sendStatus({
code: Status.OK,
details: 'OK',
metadata: this.trailingMetadata,
...this.pendingStatus,
metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
});

@@ -283,14 +236,12 @@ callback(null);

cancelled: boolean;
/* This field appears to be unsued, but it is actually used in _final, which is assiged from
* ServerWritableStreamImpl.prototype._final below. */
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore noUnusedLocals
private trailingMetadata: Metadata;
private pendingStatus: PartialStatusObject = {
code: Status.OK,
details: 'OK'
};
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
public serialize: Serialize<ResponseType>,
public deserialize: Deserialize<RequestType>,
encoding: string
private path: string,
private call: ServerInterceptingCallInterface,
public metadata: Metadata
) {

@@ -300,7 +251,5 @@ super({ objectMode: true });

this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.call.setupReadable(this, encoding);
this.on('error', err => {
this.call.sendError(err);
this.pendingStatus = serverErrorToStatus(err);
this.end();

@@ -323,5 +272,26 @@ });

getPath(): string {
return this.call.getPath();
return this.path;
}
_read(size: number) {
this.call.startRead();
}
_write(
chunk: ResponseType,
encoding: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callback: (...args: any[]) => void
) {
this.call.sendMessage(chunk, callback);
}
_final(callback: Function): void {
this.call.sendStatus({
...this.pendingStatus,
metadata: this.pendingStatus.metadata ?? this.trailingMetadata,
});
callback(null);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any

@@ -337,9 +307,2 @@ end(metadata?: any) {

ServerDuplexStreamImpl.prototype._read =
ServerReadableStreamImpl.prototype._read;
ServerDuplexStreamImpl.prototype._write =
ServerWritableStreamImpl.prototype._write;
ServerDuplexStreamImpl.prototype._final =
ServerWritableStreamImpl.prototype._final;
// Unary response callback signature.

@@ -420,595 +383,1 @@ export type sendUnaryData<ResponseType> = (

export type HandlerType = 'bidi' | 'clientStream' | 'serverStream' | 'unary';
// Internal class that wraps the HTTP2 request.
export class Http2ServerCallStream<
RequestType,
ResponseType
> extends EventEmitter {
cancelled = false;
deadlineTimer: NodeJS.Timeout | null = null;
private statusSent = false;
private deadline: Deadline = Infinity;
private wantTrailers = false;
private metadataSent = false;
private canPush = false;
private isPushPending = false;
private bufferedMessages: Array<Buffer | null> = [];
private messagesToPush: Array<RequestType | null> = [];
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>,
options: ChannelOptions
) {
super();
this.stream.once('error', (err: ServerErrorResponse) => {
/* We need an error handler to avoid uncaught error event exceptions, but
* there is nothing we can reasonably do here. Any error event should
* have a corresponding close event, which handles emitting the cancelled
* event. And the stream is now in a bad state, so we can't reasonably
* expect to be able to send an error over it. */
});
this.stream.once('close', () => {
trace(
'Request to method ' +
this.handler?.path +
' stream closed with rstCode ' +
this.stream.rstCode
);
if (!this.statusSent) {
this.cancelled = true;
this.emit('cancelled', 'cancelled');
this.emit('streamEnd', false);
this.sendStatus({
code: Status.CANCELLED,
details: 'Cancelled by client',
metadata: null,
});
if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
}
});
this.stream.on('drain', () => {
this.emit('drain');
});
if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length']!;
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
}
private checkCancelled(): boolean {
/* In some cases the stream can become destroyed before the close event
* fires. That creates a race condition that this check works around */
if (this.stream.destroyed || this.stream.closed) {
this.cancelled = true;
}
return this.cancelled;
}
private getDecompressedMessage(
message: Buffer,
encoding: string
): Buffer | Promise<Buffer> {
if (encoding === 'deflate') {
return inflate(message.subarray(5));
} else if (encoding === 'gzip') {
return unzip(message.subarray(5));
} else if (encoding === 'identity') {
return message.subarray(5);
}
return Promise.reject({
code: Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`,
});
}
sendMetadata(customMetadata?: Metadata) {
if (this.checkCancelled()) {
return;
}
if (this.metadataSent) {
return;
}
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = {
...defaultResponseHeaders,
...defaultCompressionHeaders,
...custom,
};
this.stream.respond(headers, defaultResponseOptions);
}
receiveMetadata(headers: http2.IncomingHttpHeaders) {
const metadata = Metadata.fromHttp2Headers(headers);
if (logging.isTracerEnabled(TRACER_NAME)) {
trace(
'Request to ' +
this.handler.path +
' received headers ' +
JSON.stringify(metadata.toJSON())
);
}
// TODO(cjihrig): Receive compression metadata.
const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
if (timeoutHeader.length > 0) {
const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
if (match === null) {
const err = new Error('Invalid deadline') as ServerErrorResponse;
err.code = Status.OUT_OF_RANGE;
this.sendError(err);
return metadata;
}
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
const now = new Date();
this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
this.deadlineTimer = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}
// Remove several headers that should not be propagated to the application
metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
metadata.remove(http2.constants.HTTP2_HEADER_TE);
metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
metadata.remove('grpc-accept-encoding');
return metadata;
}
receiveUnaryMessage(encoding: string): Promise<RequestType> {
return new Promise((resolve, reject) => {
const { stream } = this;
let receivedLength = 0;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;
this.stream.on('data', onData);
this.stream.on('end', onEnd);
this.stream.on('error', onEnd);
function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;
if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}
body.push(chunk);
}
function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
reject({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
reject({
code: Status.INTERNAL,
details: 'received empty unary message',
});
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
if (Buffer.isBuffer(decompressedMessage)) {
resolve(
call.deserializeMessageWithInternalError(decompressedMessage)
);
return;
}
decompressedMessage.then(
decompressed =>
resolve(call.deserializeMessageWithInternalError(decompressed)),
(err: any) =>
reject(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
);
}
});
}
private async deserializeMessageWithInternalError(buffer: Buffer) {
try {
return this.deserializeMessage(buffer);
} catch (err) {
throw {
details: getErrorMessage(err),
code: Status.INTERNAL,
};
}
}
serializeMessage(value: ResponseType) {
const messageBuffer = this.handler.serialize(value);
// TODO(cjihrig): Call compression aware serializeMessage().
const byteLength = messageBuffer.byteLength;
const output = Buffer.allocUnsafe(byteLength + 5);
output.writeUInt8(0, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
return output;
}
deserializeMessage(bytes: Buffer) {
return this.handler.deserialize(bytes);
}
async sendUnaryMessage(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
metadata?: Metadata | null,
flags?: number
) {
if (this.checkCancelled()) {
return;
}
if (metadata === undefined) {
metadata = null;
}
if (err) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
err.metadata = metadata;
}
this.sendError(err);
return;
}
try {
const response = this.serializeMessage(value!);
this.write(response);
this.sendStatus({ code: Status.OK, details: 'OK', metadata });
} catch (err) {
this.sendError({
details: getErrorMessage(err),
code: Status.INTERNAL,
});
}
}
sendStatus(statusObj: PartialStatusObject) {
this.emit('callEnd', statusObj.code);
this.emit('streamEnd', statusObj.code === Status.OK);
if (this.checkCancelled()) {
return;
}
trace(
'Request to method ' +
this.handler?.path +
' ended with status code: ' +
Status[statusObj.code] +
' details: ' +
statusObj.details
);
if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
if (this.stream.headersSent) {
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
const trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
...statusObj.metadata?.toHttp2Headers(),
};
this.stream.sendTrailers(trailersToSend);
this.statusSent = true;
});
this.stream.end();
}
} else {
// Trailers-only response
const trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
...defaultResponseHeaders,
...statusObj.metadata?.toHttp2Headers(),
};
this.stream.respond(trailersToSend, { endStream: true });
this.statusSent = true;
}
}
sendError(error: ServerErrorResponse | ServerStatusResponse) {
const status: PartialStatusObject = {
code: Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error',
metadata:
'metadata' in error && error.metadata !== undefined
? error.metadata
: null,
};
if (
'code' in error &&
typeof error.code === 'number' &&
Number.isInteger(error.code)
) {
status.code = error.code;
if ('details' in error && typeof error.details === 'string') {
status.details = error.details!;
}
}
this.sendStatus(status);
}
write(chunk: Buffer) {
if (this.checkCancelled()) {
return;
}
if (
this.maxSendMessageSize !== -1 &&
chunk.length > this.maxSendMessageSize
) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`,
});
return;
}
this.sendMetadata();
this.emit('sendMessage');
return this.stream.write(chunk);
}
resume() {
this.stream.resume();
}
setupSurfaceCall(call: ServerSurfaceCall) {
this.once('cancelled', reason => {
call.cancelled = true;
call.emit('cancelled', reason);
});
this.once('callEnd', status => call.emit('callEnd', status));
}
setupReadable(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
encoding: string
) {
const decoder = new StreamDecoder();
let readsDone = false;
let pendingMessageProcessing = false;
let pushedEnd = false;
const maybePushEnd = async () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
await this.pushOrBufferMessage(readable, null);
}
};
this.stream.on('data', async (data: Buffer) => {
const messages = decoder.write(data);
pendingMessageProcessing = true;
this.stream.pause();
for (const message of messages) {
if (
this.maxReceiveMessageSize !== -1 &&
message.length > this.maxReceiveMessageSize
) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
});
return;
}
this.emit('receiveMessage');
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(
message,
compressedMessageEncoding
);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) return;
await this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
this.stream.resume();
await maybePushEnd();
});
this.stream.once('end', async () => {
readsDone = true;
await maybePushEnd();
});
}
consumeUnpushedMessages(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>
): boolean {
this.canPush = true;
while (this.messagesToPush.length > 0) {
const nextMessage = this.messagesToPush.shift();
const canPush = readable.push(nextMessage);
if (nextMessage === null || canPush === false) {
this.canPush = false;
break;
}
}
return this.canPush;
}
private async pushOrBufferMessage(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
messageBytes: Buffer | null
): Promise<void> {
if (this.isPushPending) {
this.bufferedMessages.push(messageBytes);
} else {
await this.pushMessage(readable, messageBytes);
}
}
private async pushMessage(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
messageBytes: Buffer | null
) {
if (messageBytes === null) {
trace('Received end of stream');
if (this.canPush) {
readable.push(null);
} else {
this.messagesToPush.push(null);
}
return;
}
trace('Received message of length ' + messageBytes.length);
this.isPushPending = true;
try {
const deserialized = await this.deserializeMessage(messageBytes);
if (this.canPush) {
if (!readable.push(deserialized)) {
this.canPush = false;
this.stream.pause();
}
} else {
this.messagesToPush.push(deserialized);
}
} catch (error) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
let code = getErrorCode(error);
if (code === null || code < Status.OK || code > Status.UNAUTHENTICATED) {
code = Status.INTERNAL;
}
readable.emit('error', {
details: getErrorMessage(error),
code: code,
});
}
this.isPushPending = false;
if (this.bufferedMessages.length > 0) {
await this.pushMessage(
readable,
this.bufferedMessages.shift() as Buffer | null
);
}
}
getPeer(): string {
const socket = this.stream.session?.socket;
if (socket?.remoteAddress) {
if (socket.remotePort) {
return `${socket.remoteAddress}:${socket.remotePort}`;
} else {
return socket.remoteAddress;
}
} else {
return 'unknown';
}
}
getDeadline(): Deadline {
return this.deadline;
}
getPath(): string {
return this.handler.path;
}
}
/* eslint-disable @typescript-eslint/no-explicit-any */
type UntypedServerCall = Http2ServerCallStream<any, any>;
function handleExpiredDeadline(call: UntypedServerCall) {
const err = new Error('Deadline exceeded') as ServerErrorResponse;
err.code = Status.DEADLINE_EXCEEDED;
call.sendError(err);
call.cancelled = true;
call.emit('cancelled', 'deadline');
}

@@ -29,2 +29,3 @@ /*

abstract _getSettings(): SecureServerOptions | null;
abstract _equals(other: ServerCredentials): boolean;

@@ -52,4 +53,4 @@ static createInsecure(): ServerCredentials {

const cert = [];
const key = [];
const cert: Buffer[] = [];
const key: Buffer[] = [];

@@ -76,3 +77,3 @@ for (let i = 0; i < keyCertPairs.length; i++) {

return new SecureServerCredentials({
ca: rootCerts || getDefaultRootsData() || undefined,
ca: rootCerts ?? getDefaultRootsData() ?? undefined,
cert,

@@ -94,2 +95,6 @@ key,

}
_equals(other: ServerCredentials): boolean {
return other instanceof InsecureServerCredentials;
}
}

@@ -112,2 +117,80 @@

}
/**
* Checks equality by checking the options that are actually set by
* createSsl.
* @param other
* @returns
*/
_equals(other: ServerCredentials): boolean {
if (this === other) {
return true;
}
if (!(other instanceof SecureServerCredentials)) {
return false;
}
// options.ca equality check
if (Buffer.isBuffer(this.options.ca) && Buffer.isBuffer(other.options.ca)) {
if (!this.options.ca.equals(other.options.ca)) {
return false;
}
} else {
if (this.options.ca !== other.options.ca) {
return false;
}
}
// options.cert equality check
if (Array.isArray(this.options.cert) && Array.isArray(other.options.cert)) {
if (this.options.cert.length !== other.options.cert.length) {
return false;
}
for (let i = 0; i < this.options.cert.length; i++) {
const thisCert = this.options.cert[i];
const otherCert = other.options.cert[i];
if (Buffer.isBuffer(thisCert) && Buffer.isBuffer(otherCert)) {
if (!thisCert.equals(otherCert)) {
return false;
}
} else {
if (thisCert !== otherCert) {
return false;
}
}
}
} else {
if (this.options.cert !== other.options.cert) {
return false;
}
}
// options.key equality check
if (Array.isArray(this.options.key) && Array.isArray(other.options.key)) {
if (this.options.key.length !== other.options.key.length) {
return false;
}
for (let i = 0; i < this.options.key.length; i++) {
const thisKey = this.options.key[i];
const otherKey = other.options.key[i];
if (Buffer.isBuffer(thisKey) && Buffer.isBuffer(otherKey)) {
if (!thisKey.equals(otherKey)) {
return false;
}
} else {
if (thisKey !== otherKey) {
return false;
}
}
}
} else {
if (this.options.key !== other.options.key) {
return false;
}
}
// options.requestCert equality check
if (this.options.requestCert !== other.options.requestCert) {
return false;
}
/* ciphers is derived from a value that is constant for the process, so no
* equality check is needed. */
return true;
}
}

@@ -19,3 +19,3 @@ /*

import * as http2 from 'http2';
import { AddressInfo } from 'net';
import * as util from 'util';

@@ -32,3 +32,2 @@ import { ServiceError } from './call';

HandlerType,
Http2ServerCallStream,
sendUnaryData,

@@ -38,6 +37,4 @@ ServerDuplexStream,

ServerReadableStream,
ServerReadableStreamImpl,
ServerStreamingHandler,
ServerUnaryCall,
ServerUnaryCallImpl,
ServerWritableStream,

@@ -48,2 +45,3 @@ ServerWritableStreamImpl,

ServerStatusResponse,
serverErrorToStatus,
} from './server-call';

@@ -60,3 +58,2 @@ import { ServerCredentials } from './server-credentials';

SubchannelAddress,
TcpSubchannelAddress,
isTcpSubchannelAddress,

@@ -66,3 +63,3 @@ subchannelAddressToString,

} from './subchannel-address';
import { parseUri } from './uri-parser';
import { GrpcUri, combineHostPort, parseUri, splitHostPort, uriToString } from './uri-parser';
import {

@@ -82,2 +79,5 @@ ChannelzCallTracker,

import { CipherNameAndProtocol, TLSSocket } from 'tls';
import { ServerInterceptingCallInterface, ServerInterceptor, getServerInterceptingCall } from './server-interceptors';
import { PartialStatusObject } from './call-interface';
import { CallEventTracker } from './transport';

@@ -92,12 +92,31 @@ const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);

type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
interface BindResult {
port: number;
count: number;
errors: string[];
}
interface SingleAddressBindResult {
port: number;
error?: string;
}
function noop(): void {}
/**
* Decorator to wrap a class method with util.deprecate
* @param message The message to output if the deprecated method is called
* @returns
*/
function deprecate(message: string) {
return function <This, Args extends any[], Return>(target: (this: This, ...args: Args) => Return, context: ClassMethodDecoratorContext<This, (this: This, ...args: Args) => Return>) {
return util.deprecate(target, message);
}
}
function getUnimplementedStatusResponse(
methodName: string
): Partial<ServiceError> {
): PartialStatusObject {
return {

@@ -160,7 +179,61 @@ code: Status.UNIMPLEMENTED,

/**
* Information related to a single invocation of bindAsync. This should be
* tracked in a map keyed by target string, normalized with a pass through
* parseUri -> mapUriDefaultScheme -> uriToString. If the target has a port
* number and the port number is 0, the target string is modified with the
* concrete bound port.
*/
interface BoundPort {
/**
* The key used to refer to this object in the boundPorts map.
*/
mapKey: string;
/**
* The target string, passed through parseUri -> mapUriDefaultScheme. Used
* to determine the final key when the port number is 0.
*/
originalUri: GrpcUri;
/**
* If there is a pending bindAsync operation, this is a promise that resolves
* with the port number when that operation succeeds. If there is no such
* operation pending, this is null.
*/
completionPromise: Promise<number> | null;
/**
* The port number that was actually bound. Populated only after
* completionPromise resolves.
*/
portNumber: number;
/**
* Set by unbind if called while pending is true.
*/
cancelled: boolean;
/**
* The credentials object passed to the original bindAsync call.
*/
credentials: ServerCredentials;
/**
* The set of servers associated with this listening port. A target string
* that expands to multiple addresses will result in multiple listening
* servers.
*/
listeningServers: Set<AnyHttp2Server>
}
/**
* Should be in a map keyed by AnyHttp2Server.
*/
interface Http2ServerInfo {
channelzRef: SocketRef;
sessions: Set<http2.ServerHttp2Session>;
}
export interface ServerOptions extends ChannelOptions {
interceptors?: ServerInterceptor[]
}
export class Server {
private http2ServerList: {
server: http2.Http2Server | http2.Http2SecureServer;
channelzRef: SocketRef;
}[] = [];
private boundPorts: Map<string, BoundPort>= new Map();
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();

@@ -172,5 +245,9 @@ private handlers: Map<string, UntypedHandler> = new Map<

private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
/**
* This field only exists to ensure that the start method throws an error if
* it is called twice, as it did previously.
*/
private started = false;
private shutdown = false;
private options: ChannelOptions;
private options: ServerOptions;
private serverAddressString = 'null';

@@ -192,3 +269,11 @@

constructor(options?: ChannelOptions) {
private readonly interceptors: ServerInterceptor[];
/**
* Options that will be used to construct all Http2Server instances for this
* Server.
*/
private commonServerOptions: http2.ServerOptions;
constructor(options?: ServerOptions) {
this.options = options ?? {};

@@ -214,2 +299,21 @@ if (this.options['grpc.enable_channelz'] === 0) {

this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
this.commonServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
if ('grpc-node.max_session_memory' in this.options) {
this.commonServerOptions.maxSessionMemory =
this.options['grpc-node.max_session_memory'];
} else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
if ('grpc.max_concurrent_streams' in this.options) {
this.commonServerOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}
this.interceptors = this.options.interceptors ?? [];
this.trace('Server constructed');

@@ -382,2 +486,234 @@ }

private registerListenerToChannelz(boundAddress: SubchannelAddress) {
return registerChannelzSocket(
subchannelAddressToString(boundAddress),
() => {
return {
localAddress: boundAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
},
this.channelzEnabled
);
}
private createHttp2Server(credentials: ServerCredentials) {
let http2Server: http2.Http2Server | http2.Http2SecureServer;
if (credentials._isSecure()) {
const secureServerOptions = Object.assign(
this.commonServerOptions,
credentials._getSettings()!
);
secureServerOptions.enableTrace =
this.options['grpc-node.tls_enable_trace'] === 1;
http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket: TLSSocket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e: Error) => {
this.trace(
'An incoming TLS connection closed with error: ' + e.message
);
});
});
} else {
http2Server = http2.createServer(this.commonServerOptions);
}
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
}
private bindOneAddress(address: SubchannelAddress, boundPortObject: BoundPort): Promise<SingleAddressBindResult> {
this.trace(
'Attempting to bind ' + subchannelAddressToString(address)
);
const http2Server = this.createHttp2Server(boundPortObject.credentials);
return new Promise<SingleAddressBindResult>((resolve, reject) => {
const onError = (err: Error) => {
this.trace(
'Failed to bind ' +
subchannelAddressToString(address) +
' with error ' +
err.message
);
resolve({
port: 'port' in address ? address.port : 1,
error: err.message
});
};
http2Server.once('error', onError);
http2Server.listen(address, () => {
const boundAddress = http2Server.address()!;
let boundSubchannelAddress: SubchannelAddress;
if (typeof boundAddress === 'string') {
boundSubchannelAddress = {
path: boundAddress,
};
} else {
boundSubchannelAddress = {
host: boundAddress.address,
port: boundAddress.port,
};
}
const channelzRef = this.registerListenerToChannelz(boundSubchannelAddress);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2Servers.set(http2Server, {
channelzRef: channelzRef,
sessions: new Set()
});
boundPortObject.listeningServers.add(http2Server);
this.trace(
'Successfully bound ' +
subchannelAddressToString(boundSubchannelAddress)
);
resolve({
port: 'port' in boundSubchannelAddress
? boundSubchannelAddress.port
: 1
});
http2Server.removeListener('error', onError);
});
});
}
private async bindManyPorts(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise<BindResult> {
if (addressList.length === 0) {
return {
count: 0,
port: 0,
errors: []
};
}
if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) {
/* If binding to port 0, first try to bind the first address, then bind
* the rest of the address list to the specific port that it binds. */
const firstAddressResult = await this.bindOneAddress(addressList[0], boundPortObject);
if (firstAddressResult.error) {
/* If the first address fails to bind, try the same operation starting
* from the second item in the list. */
const restAddressResult = await this.bindManyPorts(addressList.slice(1), boundPortObject);
return {
...restAddressResult,
errors: [firstAddressResult.error, ...restAddressResult.errors]
};
} else {
const restAddresses = addressList.slice(1).map(address => isTcpSubchannelAddress(address) ? {host: address.host, port: firstAddressResult.port} : address)
const restAddressResult = await Promise.all(restAddresses.map(address => this.bindOneAddress(address, boundPortObject)));
const allResults = [firstAddressResult, ...restAddressResult];
return {
count: allResults.filter(result => result.error === undefined).length,
port: firstAddressResult.port,
errors: allResults.filter(result => result.error).map(result => result.error!)
};
}
} else {
const allResults = await Promise.all(addressList.map(address => this.bindOneAddress(address, boundPortObject)));
return {
count: allResults.filter(result => result.error === undefined).length,
port: allResults[0].port,
errors: allResults.filter(result => result.error).map(result => result.error!)
};
}
}
private async bindAddressList(addressList: SubchannelAddress[], boundPortObject: BoundPort): Promise<number> {
let bindResult: BindResult;
try {
bindResult = await this.bindManyPorts(addressList, boundPortObject);
} catch (error) {
throw error;
}
if (bindResult.count > 0) {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
return bindResult.port;
} else {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
throw new Error(`${errorString} errors: [${bindResult.errors.join(',')}]`);
}
}
private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
return new Promise<SubchannelAddress[]>((resolve, reject) => {
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
endpointList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
const addressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
if (addressList.length === 0) {
reject(
new Error(`No addresses resolved for port ${port}`)
);
return;
}
resolve(addressList);
},
onError: error => {
reject(new Error(error.details));
},
};
const resolver = createResolver(port, resolverListener, this.options);
resolver.updateResolution();
});
}
private async bindPort(port: GrpcUri, boundPortObject: BoundPort): Promise<number> {
const addressList = await this.resolvePort(port);
if (boundPortObject.cancelled) {
this.completeUnbind(boundPortObject);
throw new Error('bindAsync operation cancelled by unbind call');
}
const portNumber = await this.bindAddressList(addressList, boundPortObject);
if (boundPortObject.cancelled) {
this.completeUnbind(boundPortObject);
throw new Error('bindAsync operation cancelled by unbind call');
}
return portNumber;
}
private normalizePort(port: string): GrpcUri {
const initialPortUri = parseUri(port);
if (initialPortUri === null) {
throw new Error(`Could not parse port "${port}"`);
}
const portUri = mapUriDefaultScheme(initialPortUri);
if (portUri === null) {
throw new Error(`Could not get a default scheme for port "${port}"`);
}
return portUri;
}
bindAsync(

@@ -388,10 +724,5 @@ port: string,

): void {
if (this.started === true) {
throw new Error('server is already started');
}
if (this.shutdown) {
throw new Error('bindAsync called after shutdown');
}
if (typeof port !== 'string') {

@@ -409,29 +740,5 @@ throw new TypeError('port must be a string');

const initialPortUri = parseUri(port);
if (initialPortUri === null) {
throw new Error(`Could not parse port "${port}"`);
}
const portUri = mapUriDefaultScheme(initialPortUri);
if (portUri === null) {
throw new Error(`Could not get a default scheme for port "${port}"`);
}
this.trace('bindAsync port=' + port);
const serverOptions: http2.ServerOptions = {
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
};
if ('grpc-node.max_session_memory' in this.options) {
serverOptions.maxSessionMemory =
this.options['grpc-node.max_session_memory'];
} else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
serverOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
if ('grpc.max_concurrent_streams' in this.options) {
serverOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}
const portUri = this.normalizePort(port);

@@ -442,307 +749,189 @@ const deferredCallback = (error: Error | null, port: number) => {

const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
let http2Server: http2.Http2Server | http2.Http2SecureServer;
if (creds._isSecure()) {
const secureServerOptions = Object.assign(
serverOptions,
creds._getSettings()!
);
secureServerOptions.enableTrace =
this.options['grpc-node.tls_enable_trace'] === 1;
http2Server = http2.createSecureServer(secureServerOptions);
http2Server.on('secureConnection', (socket: TLSSocket) => {
/* These errors need to be handled by the user of Http2SecureServer,
* according to https://github.com/nodejs/node/issues/35824 */
socket.on('error', (e: Error) => {
this.trace(
'An incoming TLS connection closed with error: ' + e.message
);
});
});
/* First, if this port is already bound or that bind operation is in
* progress, use that result. */
let boundPortObject = this.boundPorts.get(uriToString(portUri));
if (boundPortObject) {
if (!creds._equals(boundPortObject.credentials)) {
deferredCallback(new Error(`${port} already bound with incompatible credentials`), 0);
return;
}
/* If that operation has previously been cancelled by an unbind call,
* uncancel it. */
boundPortObject.cancelled = false;
if (boundPortObject.completionPromise) {
boundPortObject.completionPromise.then(portNum => callback(null, portNum), error => callback(error as Error, 0));
} else {
http2Server = http2.createServer(serverOptions);
deferredCallback(null, boundPortObject.portNumber);
}
http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
return;
}
boundPortObject = {
mapKey: uriToString(portUri),
originalUri: portUri,
completionPromise: null,
cancelled: false,
portNumber: 0,
credentials: creds,
listeningServers: new Set()
};
const bindSpecificPort = (
addressList: SubchannelAddress[],
portNum: number,
previousCount: number
): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve({ port: portNum, count: previousCount });
}
return Promise.all(
addressList.map(address => {
this.trace(
'Attempting to bind ' + subchannelAddressToString(address)
);
let addr: SubchannelAddress;
if (isTcpSubchannelAddress(address)) {
addr = {
host: (address as TcpSubchannelAddress).host,
port: portNum,
};
} else {
addr = address;
}
const http2Server = setupServer();
return new Promise<number | Error>((resolve, reject) => {
const onError = (err: Error) => {
this.trace(
'Failed to bind ' +
subchannelAddressToString(address) +
' with error ' +
err.message
);
resolve(err);
};
http2Server.once('error', onError);
http2Server.listen(addr, () => {
if (this.shutdown) {
http2Server.close();
resolve(new Error('bindAsync failed because server is shutdown'));
return;
}
const boundAddress = http2Server.address()!;
let boundSubchannelAddress: SubchannelAddress;
if (typeof boundAddress === 'string') {
boundSubchannelAddress = {
path: boundAddress,
};
} else {
boundSubchannelAddress = {
host: boundAddress.address,
port: boundAddress.port,
};
}
const channelzRef = registerChannelzSocket(
subchannelAddressToString(boundSubchannelAddress),
() => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
},
this.channelzEnabled
);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({
server: http2Server,
channelzRef: channelzRef,
});
this.trace(
'Successfully bound ' +
subchannelAddressToString(boundSubchannelAddress)
);
resolve(
'port' in boundSubchannelAddress
? boundSubchannelAddress.port
: portNum
);
http2Server.removeListener('error', onError);
});
});
})
).then(results => {
let count = 0;
for (const result of results) {
if (typeof result === 'number') {
count += 1;
if (result !== portNum) {
throw new Error(
'Invalid state: multiple port numbers added from single address'
);
}
}
}
return {
port: portNum,
count: count + previousCount,
const splitPort = splitHostPort(portUri.path);
const completionPromise = this.bindPort(portUri, boundPortObject);
boundPortObject.completionPromise = completionPromise;
/* If the port number is 0, defer populating the map entry until after the
* bind operation completes and we have a specific port number. Otherwise,
* populate it immediately. */
if (splitPort?.port === 0) {
completionPromise.then(portNum => {
const finalUri: GrpcUri = {
scheme: portUri.scheme,
authority: portUri.authority,
path: combineHostPort({host: splitPort.host, port: portNum})
};
boundPortObject!.mapKey = uriToString(finalUri);
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
callback(null, portNum);
}, error => {
callback(error, 0);
})
} else {
this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
completionPromise.then(portNum => {
boundPortObject!.completionPromise = null;
boundPortObject!.portNumber = portNum;
callback(null, portNum);
}, error => {
callback(error, 0);
});
};
}
}
const bindWildcardPort = (
addressList: SubchannelAddress[]
): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve<BindResult>({ port: 0, count: 0 });
private closeServer(server: AnyHttp2Server, callback?: () => void) {
this.trace('Closing server with address ' + JSON.stringify(server.address()));
const serverInfo = this.http2Servers.get(server);
server.close(() => {
if (this.channelzEnabled && serverInfo) {
this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
unregisterChannelzRef(serverInfo.channelzRef);
}
const address = addressList[0];
const http2Server = setupServer();
return new Promise<BindResult>((resolve, reject) => {
const onError = (err: Error) => {
this.trace(
'Failed to bind ' +
subchannelAddressToString(address) +
' with error ' +
err.message
);
resolve(bindWildcardPort(addressList.slice(1)));
};
this.http2Servers.delete(server);
callback?.();
});
http2Server.once('error', onError);
}
http2Server.listen(address, () => {
if (this.shutdown) {
http2Server.close();
resolve({port: 0, count: 0});
return;
}
const boundAddress = http2Server.address() as AddressInfo;
const boundSubchannelAddress: SubchannelAddress = {
host: boundAddress.address,
port: boundAddress.port,
};
const channelzRef = registerChannelzSocket(
subchannelAddressToString(boundSubchannelAddress),
() => {
return {
localAddress: boundSubchannelAddress,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
},
this.channelzEnabled
);
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
this.http2ServerList.push({
server: http2Server,
channelzRef: channelzRef,
});
this.trace(
'Successfully bound ' +
subchannelAddressToString(boundSubchannelAddress)
);
resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1));
http2Server.removeListener('error', onError);
});
});
private closeSession(session: http2.ServerHttp2Session, callback?: () => void) {
this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
const sessionInfo = this.sessions.get(session);
const closeCallback = () => {
if (this.channelzEnabled && sessionInfo) {
this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
unregisterChannelzRef(sessionInfo.ref);
}
this.sessions.delete(session);
callback?.();
};
if (session.closed) {
process.nextTick(closeCallback);
} else {
session.close(closeCallback);
}
}
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
if (this.shutdown) {
deferredCallback(
new Error(`bindAsync failed because server is shutdown`),
0
);
private completeUnbind(boundPortObject: BoundPort) {
for (const server of boundPortObject.listeningServers) {
const serverInfo = this.http2Servers.get(server);
this.closeServer(server, () => {
boundPortObject.listeningServers.delete(server);
});
if (serverInfo) {
for (const session of serverInfo.sessions) {
this.closeSession(session);
}
if (addressList.length === 0) {
deferredCallback(
new Error(`No addresses resolved for port ${port}`),
0
);
return;
}
let bindResultPromise: Promise<BindResult>;
if (isTcpSubchannelAddress(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
}
} else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(
bindResult => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
deferredCallback(null, bindResult.port);
}
},
error => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
}
);
},
onError: error => {
deferredCallback(new Error(error.details), 0);
},
};
}
}
this.boundPorts.delete(boundPortObject.mapKey);
}
const resolver = createResolver(portUri, resolverListener, this.options);
resolver.updateResolution();
/**
* Unbind a previously bound port, or cancel an in-progress bindAsync
* operation. If port 0 was bound, only the actual bound port can be
* unbound. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be unbound as "localhost:54321".
* @param port
*/
unbind(port: string): void {
this.trace('unbind port=' + port);
const portUri = this.normalizePort(port);
const splitPort = splitHostPort(portUri.path);
if (splitPort?.port === 0) {
throw new Error('Cannot unbind port 0');
}
const boundPortObject = this.boundPorts.get(uriToString(portUri));
if (boundPortObject) {
this.trace('unbinding ' + boundPortObject.mapKey + ' originally bound as ' + uriToString(boundPortObject.originalUri));
/* If the bind operation is pending, the cancelled flag will trigger
* the unbind operation later. */
if (boundPortObject.completionPromise) {
boundPortObject.cancelled = true;
} else {
this.completeUnbind(boundPortObject);
}
}
}
forceShutdown(): void {
// Close the server if it is still running.
for (const { server: http2Server, channelzRef: ref } of this
.http2ServerList) {
if (http2Server.listening) {
http2Server.close(() => {
if (this.channelzEnabled) {
this.listenerChildrenTracker.unrefChild(ref);
unregisterChannelzRef(ref);
}
/**
* Gracefully close all connections associated with a previously bound port.
* After the grace time, forcefully close all remaining open connections.
*
* If port 0 was bound, only the actual bound port can be
* drained. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be drained as "localhost:54321".
* @param port
* @param graceTimeMs
* @returns
*/
drain(port: string, graceTimeMs: number): void {
this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
const portUri = this.normalizePort(port);
const splitPort = splitHostPort(portUri.path);
if (splitPort?.port === 0) {
throw new Error('Cannot drain port 0');
}
const boundPortObject = this.boundPorts.get(uriToString(portUri));
if (!boundPortObject) {
return;
}
const allSessions: Set<http2.Http2Session> = new Set();
for (const http2Server of boundPortObject.listeningServers) {
const serverEntry = this.http2Servers.get(http2Server);
if (!serverEntry) {
continue;
}
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
}
}
/* After the grace time ends, send another goaway to all remaining sessions
* with the CANCEL code. */
setTimeout(() => {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref?.();
}
this.started = false;
this.shutdown = true;
forceShutdown(): void {
for (const boundPortObject of this.boundPorts.values()) {
boundPortObject.cancelled = true;
}
this.boundPorts.clear();
// Close the server if it is still running.
for (const server of this.http2Servers.keys()) {
this.closeServer(server);
}

@@ -752,2 +941,3 @@ // Always destroy any available sessions. It's possible that one or more

this.sessions.forEach((channelzInfo, session) => {
this.closeSession(session);
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to

@@ -762,2 +952,4 @@ // recognize destroy(code) as a valid signature.

}
this.shutdown = true;
}

@@ -790,7 +982,11 @@

/**
* @deprecated No longer needed as of version 1.10.x
*/
@deprecate('Calling start() is no longer necessary. It can be safely omitted.')
start(): void {
if (
this.http2ServerList.length === 0 ||
this.http2ServerList.every(
({ server: http2Server }) => http2Server.listening !== true
this.http2Servers.size === 0 ||
[...this.http2Servers.keys()].every(
server => !server.listening
)

@@ -804,5 +1000,2 @@ ) {

}
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Starting');
}
this.started = true;

@@ -827,27 +1020,22 @@ }

}
// Close the server if necessary.
this.started = false;
this.shutdown = true;
for (const { server: http2Server, channelzRef: ref } of this
.http2ServerList) {
if (http2Server.listening) {
pendingChecks++;
http2Server.close(() => {
if (this.channelzEnabled) {
this.listenerChildrenTracker.unrefChild(ref);
unregisterChannelzRef(ref);
}
maybeCallback();
});
}
for (const server of this.http2Servers.keys()) {
pendingChecks++;
const serverString = this.http2Servers.get(server)!.channelzRef.name;
this.trace('Waiting for server ' + serverString + ' to close');
this.closeServer(server, () => {
this.trace('Server ' + serverString + ' finished closing');
maybeCallback();
});
}
this.sessions.forEach((channelzInfo, session) => {
if (!session.closed) {
pendingChecks += 1;
session.close(maybeCallback);
}
});
for (const session of this.sessions.keys()) {
pendingChecks++;
const sessionString = session.socket?.remoteAddress;
this.trace('Waiting for session ' + sessionString + ' to close');
this.closeSession(session, () => {
this.trace('Session ' + sessionString + ' finished closing');
maybeCallback();
});
}
if (pendingChecks === 0) {

@@ -916,13 +1104,16 @@ wrappedCallback();

private _respondWithError<T extends Partial<ServiceError>>(
err: T,
private _respondWithError(
err: PartialStatusObject,
stream: http2.ServerHttp2Stream,
channelzSessionInfo: ChannelzSessionInfo | null = null
) {
const call = new Http2ServerCallStream(stream, null!, this.options);
const trailersToSend = {
'grpc-status': err.code ?? Status.INTERNAL,
'grpc-message': err.details,
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
...err.metadata?.toHttp2Headers()
};
stream.respond(trailersToSend, {endStream: true});
if (err.code === undefined) {
err.code = Status.INTERNAL;
}
if (this.channelzEnabled) {

@@ -932,4 +1123,2 @@ this.callTracker.addCallFailed();

}
call.sendError(err);
}

@@ -966,35 +1155,40 @@

const call = new Http2ServerCallStream(stream, handler, this.options);
call.once('callEnd', (code: Status) => {
if (code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (channelzSessionInfo) {
call.once('streamEnd', (success: boolean) => {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
let callEventTracker: CallEventTracker = {
addMessageSent: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
}
},
addMessageReceived: () => {
if (channelzSessionInfo) {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
}
},
onCallEnd: status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
this.callTracker.addCallFailed();
}
});
call.on('sendMessage', () => {
channelzSessionInfo.messagesSent += 1;
channelzSessionInfo.lastMessageSentTimestamp = new Date();
});
call.on('receiveMessage', () => {
channelzSessionInfo.messagesReceived += 1;
channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
});
},
onStreamEnd: success => {
if (channelzSessionInfo) {
if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded();
} else {
channelzSessionInfo.streamTracker.addCallFailed();
}
}
}
}
if (!this._runHandlerForCall(call, handler, headers)) {
const call = getServerInterceptingCall(this.interceptors, stream, headers, callEventTracker, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
call.sendError({
call.sendStatus({
code: Status.INTERNAL,

@@ -1026,5 +1220,6 @@ details: `Unknown handler type: ${handler.type}`,

const call = new Http2ServerCallStream(stream, handler, this.options);
if (!this._runHandlerForCall(call, handler, headers)) {
call.sendError({
const call = getServerInterceptingCall(this.interceptors, stream, headers, null, handler, this.options);
if (!this._runHandlerForCall(call, handler)) {
call.sendStatus({
code: Status.INTERNAL,

@@ -1037,20 +1232,13 @@ details: `Unknown handler type: ${handler.type}`,

private _runHandlerForCall(
call: Http2ServerCallStream<any, any>,
handler: Handler<any, any>,
headers: http2.IncomingHttpHeaders
call: ServerInterceptingCallInterface,
handler: Handler<any, any>
): boolean {
const metadata = call.receiveMetadata(headers);
const encoding =
(metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
metadata.remove('grpc-encoding');
const { type } = handler;
if (type === 'unary') {
handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
handleUnary(call, handler as UntypedUnaryHandler);
} else if (type === 'clientStream') {
handleClientStreaming(
call,
handler as UntypedClientStreamingHandler,
metadata,
encoding
handler as UntypedClientStreamingHandler
);

@@ -1060,5 +1248,3 @@ } else if (type === 'serverStream') {

call,
handler as UntypedServerStreamingHandler,
metadata,
encoding
handler as UntypedServerStreamingHandler
);

@@ -1068,5 +1254,3 @@ } else if (type === 'bidi') {

call,
handler as UntypedBidiStreamingHandler,
metadata,
encoding
handler as UntypedBidiStreamingHandler
);

@@ -1104,6 +1288,2 @@ } else {

http2Server.on('session', session => {
if (!this.started) {
session.destroy();
return;
}

@@ -1125,2 +1305,3 @@ const channelzRef = registerChannelzSocket(

this.http2Servers.get(http2Server)?.sessions.add(session);
this.sessions.set(session, channelzSessionInfo);

@@ -1213,2 +1394,3 @@ const clientAddress = session.socket.remoteAddress;

}
this.http2Servers.get(http2Server)?.sessions.delete(session);
this.sessions.delete(session);

@@ -1221,48 +1403,79 @@ });

async function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
call: ServerInterceptingCallInterface,
handler: UnaryHandler<RequestType, ResponseType>
): Promise<void> {
try {
const request = await call.receiveUnaryMessage(encoding);
let stream: ServerUnaryCall<RequestType, ResponseType>;
if (request === undefined || call.cancelled) {
function respond(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) {
if (err) {
call.sendStatus(serverErrorToStatus(err, trailer));
return;
}
call.sendMessage(value, () => {
call.sendStatus({
code: Status.OK,
details: 'OK',
metadata: trailer ?? null
});
});
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
call.sendUnaryMessage(err, value, trailer, flags);
let requestMetadata: Metadata;
let requestMessage: RequestType | null = null;
call.start({
onReceiveMetadata(metadata) {
requestMetadata = metadata;
call.startRead();
},
onReceiveMessage(message) {
if (requestMessage) {
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
);
} catch (err) {
call.sendError(err as ServerErrorResponse);
}
requestMessage = message;
call.startRead();
},
onReceiveHalfClose() {
if (!requestMessage) {
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
try {
handler.func(stream, respond);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
});
}
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
}
},
});
}
function handleClientStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ClientStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
call: ServerInterceptingCallInterface,
handler: ClientStreamingHandler<RequestType, ResponseType>
): void {
const stream = new ServerReadableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.deserialize,
encoding
);
let stream: ServerReadableStream<RequestType, ResponseType>;

@@ -1275,59 +1488,132 @@ function respond(

) {
stream.destroy();
call.sendUnaryMessage(err, value, trailer, flags);
if (err) {
call.sendStatus(serverErrorToStatus(err, trailer));
return;
}
call.sendMessage(value, () => {
call.sendStatus({
code: Status.OK,
details: 'OK',
metadata: trailer ?? null
});
});
}
if (call.cancelled) {
return;
}
stream.on('error', respond);
handler.func(stream, respond);
call.start({
onReceiveMetadata(metadata) {
stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
try {
handler.func(stream, respond);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
});
}
},
onReceiveMessage(message) {
stream.push(message);
},
onReceiveHalfClose() {
stream.push(null);
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}
async function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): Promise<void> {
try {
const request = await call.receiveUnaryMessage(encoding);
function handleServerStreaming<RequestType, ResponseType>(
call: ServerInterceptingCallInterface,
handler: ServerStreamingHandler<RequestType, ResponseType>
): void {
let stream: ServerWritableStream<RequestType, ResponseType>;
if (request === undefined || call.cancelled) {
return;
}
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
request
);
handler.func(stream);
} catch (err) {
call.sendError(err as ServerErrorResponse);
}
let requestMetadata: Metadata;
let requestMessage: RequestType | null = null;
call.start({
onReceiveMetadata(metadata) {
requestMetadata = metadata;
call.startRead();
},
onReceiveMessage(message) {
if (requestMessage) {
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received a second request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
requestMessage = message;
call.startRead();
},
onReceiveHalfClose() {
if (!requestMessage) {
call.sendStatus({
code: Status.UNIMPLEMENTED,
details: `Received no request message for server streaming method ${handler.path}`,
metadata: null
});
return;
}
stream = new ServerWritableStreamImpl(handler.path, call, requestMetadata, requestMessage);
try {
handler.func(stream);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
});
}
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}
function handleBidiStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: BidiStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
call: ServerInterceptingCallInterface,
handler: BidiStreamingHandler<RequestType, ResponseType>
): void {
const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
handler.deserialize,
encoding
);
let stream: ServerDuplexStream<RequestType, ResponseType>;
if (call.cancelled) {
return;
}
handler.func(stream);
call.start({
onReceiveMetadata(metadata) {
stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
try {
handler.func(stream);
} catch (err) {
call.sendStatus({
code: Status.UNKNOWN,
details: `Server method handler threw error ${(err as Error).message}`,
metadata: null
});
}
},
onReceiveMessage(message) {
stream.push(message);
},
onReceiveHalfClose() {
stream.push(null);
},
onCancel() {
if (stream) {
stream.cancelled = true;
stream.emit('cancelled', 'cancelled');
stream.destroy();
}
},
});
}

@@ -32,6 +32,2 @@ /*

import { Duration } from './duration';
import {
LoadBalancingConfig,
validateLoadBalancingConfig,
} from './load-balancer';

@@ -72,2 +68,6 @@ export interface MethodConfigName {

export interface LoadBalancingConfig {
[key: string]: object;
}
export interface ServiceConfig {

@@ -359,2 +359,18 @@ loadBalancingPolicy?: string;

function validateLoadBalancingConfig(obj: any): LoadBalancingConfig {
if (!(typeof obj === 'object' && obj !== null)) {
throw new Error(`Invalid loadBalancingConfig: unexpected type ${typeof obj}`);
}
const keys = Object.keys(obj);
if (keys.length > 1) {
throw new Error(`Invalid loadBalancingConfig: unexpected multiple keys ${keys}`);
}
if (keys.length === 0) {
throw new Error('Invalid loadBalancingConfig: load balancing policy name required');
}
return {
[keys[0]]: obj[keys[0]]
};
}
export function validateServiceConfig(obj: any): ServiceConfig {

@@ -375,2 +391,3 @@ const result: ServiceConfig = {

for (const config of obj.loadBalancingConfig) {
result.loadBalancingConfig.push(validateLoadBalancingConfig(config));

@@ -377,0 +394,0 @@ }

@@ -89,1 +89,161 @@ /*

}
export interface Endpoint {
addresses: SubchannelAddress[];
}
export function endpointEqual(endpoint1: Endpoint, endpoint2: Endpoint) {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (let i = 0; i < endpoint1.addresses.length; i++) {
if (
!subchannelAddressEqual(endpoint1.addresses[i], endpoint2.addresses[i])
) {
return false;
}
}
return true;
}
export function endpointToString(endpoint: Endpoint): string {
return (
'[' + endpoint.addresses.map(subchannelAddressToString).join(', ') + ']'
);
}
export function endpointHasAddress(
endpoint: Endpoint,
expectedAddress: SubchannelAddress
): boolean {
for (const address of endpoint.addresses) {
if (subchannelAddressEqual(address, expectedAddress)) {
return true;
}
}
return false;
}
interface EndpointMapEntry<ValueType> {
key: Endpoint;
value: ValueType;
}
function endpointEqualUnordered(
endpoint1: Endpoint,
endpoint2: Endpoint
): boolean {
if (endpoint1.addresses.length !== endpoint2.addresses.length) {
return false;
}
for (const address1 of endpoint1.addresses) {
let matchFound = false;
for (const address2 of endpoint2.addresses) {
if (subchannelAddressEqual(address1, address2)) {
matchFound = true;
break;
}
}
if (!matchFound) {
return false;
}
}
return true;
}
export class EndpointMap<ValueType> {
private map: Set<EndpointMapEntry<ValueType>> = new Set();
get size() {
return this.map.size;
}
getForSubchannelAddress(address: SubchannelAddress): ValueType | undefined {
for (const entry of this.map) {
if (endpointHasAddress(entry.key, address)) {
return entry.value;
}
}
return undefined;
}
/**
* Delete any entries in this map with keys that are not in endpoints
* @param endpoints
*/
deleteMissing(endpoints: Endpoint[]): ValueType[] {
const removedValues: ValueType[] = [];
for (const entry of this.map) {
let foundEntry = false;
for (const endpoint of endpoints) {
if (endpointEqualUnordered(endpoint, entry.key)) {
foundEntry = true;
}
}
if (!foundEntry) {
removedValues.push(entry.value);
this.map.delete(entry);
}
}
return removedValues;
}
get(endpoint: Endpoint): ValueType | undefined {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return entry.value;
}
}
return undefined;
}
set(endpoint: Endpoint, mapEntry: ValueType) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
entry.value = mapEntry;
return;
}
}
this.map.add({ key: endpoint, value: mapEntry });
}
delete(endpoint: Endpoint) {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
this.map.delete(entry);
return;
}
}
}
has(endpoint: Endpoint): boolean {
for (const entry of this.map) {
if (endpointEqualUnordered(endpoint, entry.key)) {
return true;
}
}
return false;
}
clear() {
this.map.clear();
}
*keys(): IterableIterator<Endpoint> {
for (const entry of this.map) {
yield entry.key;
}
}
*values(): IterableIterator<ValueType> {
for (const entry of this.map) {
yield entry.value;
}
}
*entries(): IterableIterator<[Endpoint, ValueType]> {
for (const entry of this.map) {
yield [entry.key, entry.value];
}
}
}

@@ -30,2 +30,4 @@ /*

export type HealthListener = (healthy: boolean) => void;
/**

@@ -50,2 +52,5 @@ * This is an interface for load balancing policies to use to interact with

getChannelzRef(): SubchannelRef;
isHealthy(): boolean;
addHealthStateWatcher(listener: HealthListener): void;
removeHealthStateWatcher(listener: HealthListener): void;
/**

@@ -64,4 +69,20 @@ * If this is a wrapper, return the wrapped subchannel, otherwise return this

export abstract class BaseSubchannelWrapper implements SubchannelInterface {
constructor(protected child: SubchannelInterface) {}
private healthy = true;
private healthListeners: Set<HealthListener> = new Set();
constructor(protected child: SubchannelInterface) {
child.addHealthStateWatcher(childHealthy => {
/* A change to the child health state only affects this wrapper's overall
* health state if this wrapper is reporting healthy. */
if (this.healthy) {
this.updateHealthListeners();
}
});
}
private updateHealthListeners(): void {
for (const listener of this.healthListeners) {
listener(this.isHealthy());
}
}
getConnectivityState(): ConnectivityState {

@@ -94,2 +115,21 @@ return this.child.getConnectivityState();

}
isHealthy(): boolean {
return this.healthy && this.child.isHealthy();
}
addHealthStateWatcher(listener: HealthListener): void {
this.healthListeners.add(listener);
}
removeHealthStateWatcher(listener: HealthListener): void {
this.healthListeners.delete(listener);
}
protected setHealthy(healthy: boolean): void {
if (healthy !== this.healthy) {
this.healthy = healthy;
/* A change to this wrapper's health state only affects the overall
* reported health state if the child is healthy. */
if (this.child.isHealthy()) {
this.updateHealthListeners();
}
}
}
getRealSubchannel(): Subchannel {

@@ -96,0 +136,0 @@ return this.child.getRealSubchannel();

@@ -469,2 +469,14 @@ /*

isHealthy(): boolean {
return true;
}
addHealthStateWatcher(listener: (healthy: boolean) => void): void {
// Do nothing with the listener
}
removeHealthStateWatcher(listener: (healthy: boolean) => void): void {
// Do nothing with the listener
}
getRealSubchannel(): this {

@@ -471,0 +483,0 @@ return this;

@@ -26,3 +26,3 @@ /*

} from 'tls';
import { StatusObject } from './call-interface';
import { PartialStatusObject } from './call-interface';
import { ChannelCredentials } from './channel-credentials';

@@ -76,3 +76,3 @@ import { ChannelOptions } from './channel-options';

addMessageReceived(): void;
onCallEnd(status: StatusObject): void;
onCallEnd(status: PartialStatusObject): void;
onStreamEnd(success: boolean): void;

@@ -79,0 +79,0 @@ }

@@ -104,2 +104,15 @@ /*

export function combineHostPort(hostPort: HostPort): string {
if (hostPort.port === undefined) {
return hostPort.host;
} else {
// Only an IPv6 host should include a colon
if (hostPort.host.includes(':')) {
return `[${hostPort.host}]:${hostPort.port}`;
} else {
return `${hostPort.host}:${hostPort.port}`;
}
}
}
export function uriToString(uri: GrpcUri): string {

@@ -106,0 +119,0 @@ let result = '';

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc