@grpc/grpc-js
Advanced tools
Comparing version 0.5.4 to 0.6.0
import { Call } from './call-stream'; | ||
import { Http2Channel } from './channel'; | ||
import { Channel } from './channel'; | ||
import { BaseFilter, Filter, FilterFactory } from './filter'; | ||
@@ -9,3 +9,3 @@ import { Metadata } from './metadata'; | ||
private serviceUrl; | ||
constructor(channel: Http2Channel, stream: Call); | ||
constructor(channel: Channel, stream: Call); | ||
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>; | ||
@@ -15,4 +15,4 @@ } | ||
private readonly channel; | ||
constructor(channel: Http2Channel); | ||
constructor(channel: Channel); | ||
createFilter(callStream: Call): CallCredentialsFilter; | ||
} |
@@ -40,5 +40,3 @@ "use strict"; | ||
async sendMetadata(metadata) { | ||
const channelCredentials = this.channel.credentials._getCallCredentials(); | ||
const streamCredentials = this.stream.getCredentials(); | ||
const credentials = channelCredentials.compose(streamCredentials); | ||
const credentials = this.stream.getCredentials(); | ||
const credsMetadata = credentials.generateMetadata({ | ||
@@ -56,2 +54,3 @@ service_url: this.serviceUrl, | ||
this.channel = channel; | ||
this.channel = channel; | ||
} | ||
@@ -58,0 +57,0 @@ createFilter(callStream) { |
@@ -24,2 +24,9 @@ import { Metadata } from './metadata'; | ||
/** | ||
* Check whether two call credentials objects are equal. Separate | ||
* SingleCallCredentials with identical metadata generator functions are | ||
* equal. | ||
* @param other The other CallCredentials object to compare with. | ||
*/ | ||
abstract _equals(other: CallCredentials): boolean; | ||
/** | ||
* Creates a new CallCredentials object from a given function that generates | ||
@@ -26,0 +33,0 @@ * Metadata objects. |
@@ -56,2 +56,13 @@ "use strict"; | ||
} | ||
_equals(other) { | ||
if (this === other) { | ||
return true; | ||
} | ||
if (other instanceof ComposedCallCredentials) { | ||
return this.creds.every((value, index) => value._equals(other.creds[index])); | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
} | ||
@@ -78,2 +89,13 @@ class SingleCallCredentials extends CallCredentials { | ||
} | ||
_equals(other) { | ||
if (this === other) { | ||
return true; | ||
} | ||
if (other instanceof SingleCallCredentials) { | ||
return this.metadataGenerator === other.metadataGenerator; | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
} | ||
@@ -87,3 +109,6 @@ class EmptyCallCredentials extends CallCredentials { | ||
} | ||
_equals(other) { | ||
return other instanceof EmptyCallCredentials; | ||
} | ||
} | ||
//# sourceMappingURL=call-credentials.js.map |
@@ -5,3 +5,2 @@ /// <reference types="node" /> | ||
import { CallCredentials } from './call-credentials'; | ||
import { Http2Channel } from './channel'; | ||
import { Status } from './constants'; | ||
@@ -13,2 +12,4 @@ import { EmitterAugmentation1 } from './events'; | ||
import { ObjectDuplex, WriteCallback } from './object-stream'; | ||
import { ChannelImplementation } from './channel'; | ||
import { Subchannel } from './subchannel'; | ||
export declare type Deadline = Date | number; | ||
@@ -54,2 +55,3 @@ export interface CallStreamOptions { | ||
private readonly options; | ||
private readonly channelCallCredentials; | ||
credentials: CallCredentials; | ||
@@ -71,3 +73,5 @@ filterStack: Filter; | ||
private finalStatus; | ||
constructor(methodName: string, channel: Http2Channel, options: CallStreamOptions, filterStackFactory: FilterStackFactory); | ||
private subchannel; | ||
private disconnectListener; | ||
constructor(methodName: string, channel: ChannelImplementation, options: CallStreamOptions, filterStackFactory: FilterStackFactory, channelCallCredentials: CallCredentials); | ||
/** | ||
@@ -84,3 +88,3 @@ * On first call, emits a 'status' event with the given StatusObject. | ||
private handleTrailers; | ||
attachHttp2Stream(stream: http2.ClientHttp2Stream): void; | ||
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void; | ||
sendMetadata(metadata: Metadata): void; | ||
@@ -87,0 +91,0 @@ private destroyHttp2Stream; |
@@ -21,3 +21,2 @@ "use strict"; | ||
const stream_1 = require("stream"); | ||
const call_credentials_1 = require("./call-credentials"); | ||
const constants_1 = require("./constants"); | ||
@@ -28,3 +27,3 @@ const metadata_1 = require("./metadata"); | ||
class Http2CallStream extends stream_1.Duplex { | ||
constructor(methodName, channel, options, filterStackFactory) { | ||
constructor(methodName, channel, options, filterStackFactory, channelCallCredentials) { | ||
super({ objectMode: true }); | ||
@@ -34,3 +33,3 @@ this.methodName = methodName; | ||
this.options = options; | ||
this.credentials = call_credentials_1.CallCredentials.createEmpty(); | ||
this.channelCallCredentials = channelCallCredentials; | ||
this.http2Stream = null; | ||
@@ -56,3 +55,8 @@ this.pendingRead = false; | ||
this.finalStatus = null; | ||
this.subchannel = null; | ||
this.filterStack = filterStackFactory.createFilter(this); | ||
this.credentials = channelCallCredentials; | ||
this.disconnectListener = () => { | ||
this.endCall({ code: constants_1.Status.UNAVAILABLE, details: 'Connection dropped', metadata: new metadata_1.Metadata() }); | ||
}; | ||
} | ||
@@ -77,2 +81,6 @@ /** | ||
}); | ||
if (this.subchannel) { | ||
this.subchannel.callUnref(); | ||
this.subchannel.removeDisconnectListener(this.disconnectListener); | ||
} | ||
} | ||
@@ -169,3 +177,3 @@ } | ||
} | ||
attachHttp2Stream(stream) { | ||
attachHttp2Stream(stream, subchannel) { | ||
if (this.finalStatus !== null) { | ||
@@ -176,2 +184,5 @@ stream.close(NGHTTP2_CANCEL); | ||
this.http2Stream = stream; | ||
this.subchannel = subchannel; | ||
subchannel.addDisconnectListener(this.disconnectListener); | ||
subchannel.callRef(); | ||
stream.on('response', (headers, flags) => { | ||
@@ -292,3 +303,3 @@ switch (headers[':status']) { | ||
sendMetadata(metadata) { | ||
this.channel._startHttp2Stream(this.options.host, this.methodName, this, metadata); | ||
this.channel._startCallStream(this, metadata); | ||
} | ||
@@ -320,3 +331,3 @@ destroyHttp2Stream() { | ||
setCredentials(credentials) { | ||
this.credentials = credentials; | ||
this.credentials = this.channelCallCredentials.compose(credentials); | ||
} | ||
@@ -323,0 +334,0 @@ getStatus() { |
@@ -61,2 +61,8 @@ /// <reference types="node" /> | ||
/** | ||
* Check whether two channel credentials objects are equal. Two secure | ||
* credentials are equal if they were constructed with the same parameters. | ||
* @param other The other ChannelCredentials Object | ||
*/ | ||
abstract _equals(other: ChannelCredentials): boolean; | ||
/** | ||
* Return a new ChannelCredentials instance with a given set of credentials. | ||
@@ -63,0 +69,0 @@ * The resulting instance can be used to construct a Channel that communicates |
@@ -27,2 +27,10 @@ "use strict"; | ||
} | ||
function bufferOrNullEqual(buf1, buf2) { | ||
if (buf1 === null && buf2 === null) { | ||
return true; | ||
} | ||
else { | ||
return buf1 !== null && buf2 !== null && buf1.equals(buf2); | ||
} | ||
} | ||
/** | ||
@@ -61,14 +69,3 @@ * A class that contains credentials for communicating over a channel, as well | ||
} | ||
const secureContext = tls_1.createSecureContext({ | ||
ca: rootCerts || undefined, | ||
key: privateKey || undefined, | ||
cert: certChain || undefined, | ||
}); | ||
const connectionOptions = { secureContext }; | ||
if (verifyOptions && verifyOptions.checkServerIdentity) { | ||
connectionOptions.checkServerIdentity = (host, cert) => { | ||
return verifyOptions.checkServerIdentity(host, { raw: cert.raw }); | ||
}; | ||
} | ||
return new SecureChannelCredentialsImpl(connectionOptions); | ||
return new SecureChannelCredentialsImpl(rootCerts || null, privateKey || null, certChain || null, verifyOptions || {}); | ||
} | ||
@@ -96,11 +93,28 @@ /** | ||
} | ||
_equals(other) { | ||
return other instanceof InsecureChannelCredentialsImpl; | ||
} | ||
} | ||
class SecureChannelCredentialsImpl extends ChannelCredentials { | ||
constructor(connectionOptions, callCredentials) { | ||
super(callCredentials); | ||
this.connectionOptions = connectionOptions; | ||
constructor(rootCerts, privateKey, certChain, verifyOptions) { | ||
super(); | ||
this.rootCerts = rootCerts; | ||
this.privateKey = privateKey; | ||
this.certChain = certChain; | ||
this.verifyOptions = verifyOptions; | ||
const secureContext = tls_1.createSecureContext({ | ||
ca: rootCerts || undefined, | ||
key: privateKey || undefined, | ||
cert: certChain || undefined, | ||
}); | ||
this.connectionOptions = { secureContext }; | ||
if (verifyOptions && verifyOptions.checkServerIdentity) { | ||
this.connectionOptions.checkServerIdentity = (host, cert) => { | ||
return verifyOptions.checkServerIdentity(host, { raw: cert.raw }); | ||
}; | ||
} | ||
} | ||
compose(callCredentials) { | ||
const combinedCallCredentials = this.callCredentials.compose(callCredentials); | ||
return new SecureChannelCredentialsImpl(this.connectionOptions, combinedCallCredentials); | ||
return new ComposedChannelCredentialsImpl(this, combinedCallCredentials); | ||
} | ||
@@ -113,3 +127,52 @@ _getConnectionOptions() { | ||
} | ||
_equals(other) { | ||
if (this === other) { | ||
return true; | ||
} | ||
if (other instanceof SecureChannelCredentialsImpl) { | ||
if (!bufferOrNullEqual(this.rootCerts, other.rootCerts)) { | ||
return false; | ||
} | ||
if (!bufferOrNullEqual(this.privateKey, other.privateKey)) { | ||
return false; | ||
} | ||
if (!bufferOrNullEqual(this.certChain, other.certChain)) { | ||
return false; | ||
} | ||
return (this.verifyOptions.checkServerIdentity === | ||
other.verifyOptions.checkServerIdentity); | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
} | ||
class ComposedChannelCredentialsImpl extends ChannelCredentials { | ||
constructor(channelCredentials, callCreds) { | ||
super(callCreds); | ||
this.channelCredentials = channelCredentials; | ||
} | ||
compose(callCredentials) { | ||
const combinedCallCredentials = this.callCredentials.compose(callCredentials); | ||
return new ComposedChannelCredentialsImpl(this.channelCredentials, combinedCallCredentials); | ||
} | ||
_getConnectionOptions() { | ||
return this.channelCredentials._getConnectionOptions(); | ||
} | ||
_isSecure() { | ||
return true; | ||
} | ||
_equals(other) { | ||
if (this === other) { | ||
return true; | ||
} | ||
if (other instanceof ComposedChannelCredentialsImpl) { | ||
return (this.channelCredentials._equals(other.channelCredentials) && | ||
this.callCredentials._equals(other.callCredentials)); | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
} | ||
//# sourceMappingURL=channel-credentials.js.map |
@@ -5,9 +5,9 @@ /** | ||
export interface ChannelOptions { | ||
'grpc.ssl_target_name_override': string; | ||
'grpc.primary_user_agent': string; | ||
'grpc.secondary_user_agent': string; | ||
'grpc.default_authority': string; | ||
'grpc.keepalive_time_ms': number; | ||
'grpc.keepalive_timeout_ms': number; | ||
[key: string]: string | number; | ||
'grpc.ssl_target_name_override'?: string; | ||
'grpc.primary_user_agent'?: string; | ||
'grpc.secondary_user_agent'?: string; | ||
'grpc.default_authority'?: string; | ||
'grpc.keepalive_time_ms'?: number; | ||
'grpc.keepalive_timeout_ms'?: number; | ||
[key: string]: string | number | undefined; | ||
} | ||
@@ -26,1 +26,2 @@ /** | ||
}; | ||
export declare function channelOptionsEqual(options1: ChannelOptions, options2: ChannelOptions): boolean; |
@@ -31,2 +31,19 @@ "use strict"; | ||
}; | ||
function channelOptionsEqual(options1, options2) { | ||
const keys1 = Object.keys(options1).sort(); | ||
const keys2 = Object.keys(options2).sort(); | ||
if (keys1.length !== keys2.length) { | ||
return false; | ||
} | ||
for (let i = 0; i < keys1.length; i += 1) { | ||
if (keys1[i] !== keys2[i]) { | ||
return false; | ||
} | ||
if (options1[keys1[i]] !== options2[keys2[i]]) { | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
exports.channelOptionsEqual = channelOptionsEqual; | ||
//# sourceMappingURL=channel-options.js.map |
@@ -1,4 +0,2 @@ | ||
/// <reference types="node" /> | ||
import { EventEmitter } from 'events'; | ||
import { Call, Deadline, Http2CallStream } from './call-stream'; | ||
import { Deadline, Call, Http2CallStream } from './call-stream'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -62,32 +60,31 @@ import { ChannelOptions } from './channel-options'; | ||
} | ||
export declare class Http2Channel extends EventEmitter implements Channel { | ||
readonly credentials: ChannelCredentials; | ||
export declare class ChannelImplementation implements Channel { | ||
private target; | ||
private readonly credentials; | ||
private readonly options; | ||
private readonly userAgent; | ||
private readonly target; | ||
private readonly defaultAuthority; | ||
private resolvingLoadBalancer; | ||
private subchannelPool; | ||
private connectivityState; | ||
private connecting; | ||
private subChannel; | ||
private currentPicker; | ||
private pickQueue; | ||
private connectivityStateWatchers; | ||
private defaultAuthority; | ||
private filterStackFactory; | ||
private subChannelConnectCallback; | ||
private subChannelCloseCallback; | ||
private backoffTimerId; | ||
private currentBackoff; | ||
private currentBackoffDeadline; | ||
private handleStateChange; | ||
private transitionToState; | ||
private startConnecting; | ||
constructor(address: string, credentials: ChannelCredentials, options: Partial<ChannelOptions>); | ||
_startHttp2Stream(authority: string, methodName: string, stream: Http2CallStream, metadata: Metadata): void; | ||
createCall(method: string, deadline: Deadline | null | undefined, host: string | null | undefined, parentCall: Call | null | undefined, propagateFlags: number | null | undefined): Call; | ||
constructor(target: string, credentials: ChannelCredentials, options: ChannelOptions); | ||
/** | ||
* Attempts to connect, returning a Promise that resolves when the connection | ||
* is successful, or rejects if the channel is shut down. | ||
* Check the picker output for the given call and corresponding metadata, | ||
* and take any relevant actions. Should not be called while iterating | ||
* over pickQueue. | ||
* @param callStream | ||
* @param callMetadata | ||
*/ | ||
private connect; | ||
getConnectivityState(tryToConnect: boolean): ConnectivityState; | ||
private tryPick; | ||
private removeConnectivityStateWatcher; | ||
private updateState; | ||
_startCallStream(stream: Http2CallStream, metadata: Metadata): void; | ||
close(): void; | ||
getTarget(): string; | ||
getConnectivityState(): ConnectivityState; | ||
watchConnectivityState(currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void): void; | ||
getTarget(): string; | ||
close(): void; | ||
createCall(method: string, deadline: Deadline | null | undefined, host: string | null | undefined, parentCall: Call | null | undefined, propagateFlags: number | null | undefined): Call; | ||
} |
@@ -19,22 +19,14 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const events_1 = require("events"); | ||
const http2 = require("http2"); | ||
const tls_1 = require("tls"); | ||
const url = require("url"); | ||
const call_credentials_filter_1 = require("./call-credentials-filter"); | ||
const call_stream_1 = require("./call-stream"); | ||
const channel_options_1 = require("./channel-options"); | ||
const compression_filter_1 = require("./compression-filter"); | ||
const resolving_load_balancer_1 = require("./resolving-load-balancer"); | ||
const subchannel_pool_1 = require("./subchannel-pool"); | ||
const picker_1 = require("./picker"); | ||
const metadata_1 = require("./metadata"); | ||
const constants_1 = require("./constants"); | ||
const filter_stack_1 = require("./filter-stack"); | ||
const call_credentials_filter_1 = require("./call-credentials-filter"); | ||
const deadline_filter_1 = require("./deadline-filter"); | ||
const filter_stack_1 = require("./filter-stack"); | ||
const metadata_status_filter_1 = require("./metadata-status-filter"); | ||
const subchannel_1 = require("./subchannel"); | ||
const { version: clientVersion } = require('../../package.json'); | ||
const MIN_CONNECT_TIMEOUT_MS = 20000; | ||
const INITIAL_BACKOFF_MS = 1000; | ||
const BACKOFF_MULTIPLIER = 1.6; | ||
const MAX_BACKOFF_MS = 120000; | ||
const BACKOFF_JITTER = 0.2; | ||
const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants; | ||
const compression_filter_1 = require("./compression-filter"); | ||
const resolver_1 = require("./resolver"); | ||
var ConnectivityState; | ||
@@ -48,32 +40,43 @@ (function (ConnectivityState) { | ||
})(ConnectivityState = exports.ConnectivityState || (exports.ConnectivityState = {})); | ||
function uniformRandom(min, max) { | ||
return Math.random() * (max - min) + min; | ||
} | ||
class Http2Channel extends events_1.EventEmitter { | ||
constructor(address, credentials, options) { | ||
super(); | ||
class ChannelImplementation { | ||
constructor(target, credentials, options) { | ||
this.target = target; | ||
this.credentials = credentials; | ||
this.options = options; | ||
this.connectivityState = ConnectivityState.IDLE; | ||
// Helper Promise object only used in the implementation of connect(). | ||
this.connecting = null; | ||
/* For now, we have up to one subchannel, which will exist as long as we are | ||
* connecting or trying to connect */ | ||
this.subChannel = null; | ||
this.subChannelConnectCallback = () => { }; | ||
this.subChannelCloseCallback = () => { }; | ||
this.currentBackoff = INITIAL_BACKOFF_MS; | ||
for (const option in options) { | ||
if (options.hasOwnProperty(option)) { | ||
if (!channel_options_1.recognizedOptions.hasOwnProperty(option)) { | ||
console.warn(`Unrecognized channel argument '${option}' will be ignored.`); | ||
this.currentPicker = new picker_1.UnavailablePicker(); | ||
this.pickQueue = []; | ||
this.connectivityStateWatchers = []; | ||
// TODO(murgatroid99): check channel arg for getting a private pool | ||
this.subchannelPool = subchannel_pool_1.getSubchannelPool(true); | ||
const channelControlHelper = { | ||
createSubchannel: (subchannelAddress, subchannelArgs) => { | ||
return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); | ||
}, | ||
updateState: (connectivityState, picker) => { | ||
this.currentPicker = picker; | ||
const queueCopy = this.pickQueue.slice(); | ||
this.pickQueue = []; | ||
for (const { callStream, callMetadata } of queueCopy) { | ||
this.tryPick(callStream, callMetadata); | ||
} | ||
} | ||
} | ||
if (credentials._isSecure()) { | ||
this.target = new url.URL(`https://${address}`); | ||
} | ||
else { | ||
this.target = new url.URL(`http://${address}`); | ||
} | ||
this.updateState(connectivityState); | ||
}, | ||
requestReresolution: () => { | ||
// This should never be called. | ||
throw new Error('Resolving load balancer should never call requestReresolution'); | ||
}, | ||
}; | ||
// TODO(murgatroid99): check channel arg for default service config | ||
const defaultServiceConfig = { | ||
loadBalancingConfig: [], | ||
methodConfig: [], | ||
}; | ||
this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(target, channelControlHelper, defaultServiceConfig); | ||
this.filterStackFactory = new filter_stack_1.FilterStackFactory([ | ||
new call_credentials_filter_1.CallCredentialsFilterFactory(this), | ||
new deadline_filter_1.DeadlineFilterFactory(this), | ||
new metadata_status_filter_1.MetadataStatusFilterFactory(this), | ||
new compression_filter_1.CompressionFilterFactory(this), | ||
]); | ||
// TODO(murgatroid99): Add more centralized handling of channel options | ||
@@ -84,140 +87,101 @@ if (this.options['grpc.default_authority']) { | ||
else { | ||
this.defaultAuthority = this.target.host; | ||
this.defaultAuthority = resolver_1.getDefaultAuthority(target); | ||
} | ||
this.filterStackFactory = new filter_stack_1.FilterStackFactory([ | ||
new call_credentials_filter_1.CallCredentialsFilterFactory(this), | ||
new deadline_filter_1.DeadlineFilterFactory(this), | ||
new metadata_status_filter_1.MetadataStatusFilterFactory(this), | ||
new compression_filter_1.CompressionFilterFactory(this), | ||
]); | ||
this.currentBackoffDeadline = new Date(); | ||
/* The only purpose of these lines is to ensure that this.backoffTimerId has | ||
* a value of type NodeJS.Timer. */ | ||
this.backoffTimerId = setTimeout(() => { }, 0); | ||
// Build user-agent string. | ||
this.userAgent = [ | ||
options['grpc.primary_user_agent'], | ||
`grpc-node-js/${clientVersion}`, | ||
options['grpc.secondary_user_agent'], | ||
] | ||
.filter(e => e) | ||
.join(' '); // remove falsey values first | ||
} | ||
handleStateChange(oldState, newState) { | ||
const now = new Date(); | ||
switch (newState) { | ||
case ConnectivityState.CONNECTING: | ||
if (oldState === ConnectivityState.IDLE) { | ||
this.currentBackoff = INITIAL_BACKOFF_MS; | ||
this.currentBackoffDeadline = new Date(now.getTime() + INITIAL_BACKOFF_MS); | ||
/** | ||
* Check the picker output for the given call and corresponding metadata, | ||
* and take any relevant actions. Should not be called while iterating | ||
* over pickQueue. | ||
* @param callStream | ||
* @param callMetadata | ||
*/ | ||
tryPick(callStream, callMetadata) { | ||
const pickResult = this.currentPicker.pick({ metadata: callMetadata }); | ||
switch (pickResult.pickResultType) { | ||
case picker_1.PickResultType.COMPLETE: | ||
if (pickResult.subchannel === null) { | ||
callStream.cancelWithStatus(constants_1.Status.UNAVAILABLE, 'Request dropped by load balancing policy'); | ||
// End the call with an error | ||
} | ||
else if (oldState === ConnectivityState.TRANSIENT_FAILURE) { | ||
this.currentBackoff = Math.min(this.currentBackoff * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS); | ||
const jitterMagnitude = BACKOFF_JITTER * this.currentBackoff; | ||
this.currentBackoffDeadline = new Date(now.getTime() + | ||
this.currentBackoff + | ||
uniformRandom(-jitterMagnitude, jitterMagnitude)); | ||
else { | ||
/* If the subchannel disconnects between calling pick and getting | ||
* the filter stack metadata, the call will end with an error. */ | ||
callStream.filterStack | ||
.sendMetadata(Promise.resolve(new metadata_1.Metadata())) | ||
.then(finalMetadata => { | ||
if (pickResult.subchannel.getConnectivityState() === | ||
ConnectivityState.READY) { | ||
pickResult.subchannel.startCallStream(callMetadata, callStream); | ||
} | ||
else { | ||
callStream.cancelWithStatus(constants_1.Status.UNAVAILABLE, 'Connection dropped while starting call'); | ||
} | ||
}, (error) => { | ||
// We assume the error code isn't 0 (Status.OK) | ||
callStream.cancelWithStatus(error.code || constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`); | ||
}); | ||
} | ||
this.startConnecting(); | ||
break; | ||
case ConnectivityState.READY: | ||
this.emit('connect'); | ||
case picker_1.PickResultType.QUEUE: | ||
this.pickQueue.push({ callStream, callMetadata }); | ||
break; | ||
case ConnectivityState.TRANSIENT_FAILURE: | ||
this.subChannel = null; | ||
this.backoffTimerId = setTimeout(() => { | ||
this.transitionToState([ConnectivityState.TRANSIENT_FAILURE], ConnectivityState.CONNECTING); | ||
}, this.currentBackoffDeadline.getTime() - now.getTime()); | ||
break; | ||
case ConnectivityState.IDLE: | ||
case ConnectivityState.SHUTDOWN: | ||
if (this.subChannel) { | ||
this.subChannel.close(); | ||
this.subChannel.removeListener('connect', this.subChannelConnectCallback); | ||
this.subChannel.removeListener('close', this.subChannelCloseCallback); | ||
this.subChannel = null; | ||
this.emit('shutdown'); | ||
clearTimeout(this.backoffTimerId); | ||
case picker_1.PickResultType.TRANSIENT_FAILURE: | ||
if (callMetadata.getOptions().waitForReady) { | ||
this.pickQueue.push({ callStream, callMetadata }); | ||
} | ||
else { | ||
callStream.cancelWithStatus(pickResult.status.code, pickResult.status.details); | ||
} | ||
break; | ||
default: | ||
throw new Error('This should never happen'); | ||
throw new Error(`Invalid state: unknown pickResultType ${pickResult.pickResultType}`); | ||
} | ||
} | ||
// Transition from any of a set of oldStates to a specific newState | ||
transitionToState(oldStates, newState) { | ||
if (oldStates.indexOf(this.connectivityState) > -1) { | ||
const oldState = this.connectivityState; | ||
this.connectivityState = newState; | ||
this.handleStateChange(oldState, newState); | ||
this.emit('connectivityStateChanged', newState); | ||
removeConnectivityStateWatcher(watcherObject) { | ||
const watcherIndex = this.connectivityStateWatchers.findIndex(value => value === watcherObject); | ||
if (watcherIndex >= 0) { | ||
this.connectivityStateWatchers.splice(watcherIndex, 1); | ||
} | ||
} | ||
startConnecting() { | ||
const connectionOptions = this.credentials._getConnectionOptions() || {}; | ||
if (connectionOptions.secureContext !== null) { | ||
// If provided, the value of grpc.ssl_target_name_override should be used | ||
// to override the target hostname when checking server identity. | ||
// This option is used for testing only. | ||
if (this.options['grpc.ssl_target_name_override']) { | ||
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override']; | ||
connectionOptions.checkServerIdentity = (host, cert) => { | ||
return tls_1.checkServerIdentity(sslTargetNameOverride, cert); | ||
}; | ||
connectionOptions.servername = sslTargetNameOverride; | ||
updateState(newState) { | ||
this.connectivityState = newState; | ||
const watchersCopy = this.connectivityStateWatchers.slice(); | ||
for (const watcherObject of watchersCopy) { | ||
if (newState !== watcherObject.currentState) { | ||
watcherObject.callback(); | ||
clearTimeout(watcherObject.timer); | ||
this.removeConnectivityStateWatcher(watcherObject); | ||
} | ||
} | ||
const subChannel = new subchannel_1.Http2SubChannel(this.target, connectionOptions, this.userAgent, this.options); | ||
this.subChannel = subChannel; | ||
} | ||
_startCallStream(stream, metadata) { | ||
this.tryPick(stream, metadata); | ||
} | ||
close() { | ||
this.resolvingLoadBalancer.destroy(); | ||
this.updateState(ConnectivityState.SHUTDOWN); | ||
} | ||
getTarget() { | ||
return this.target; | ||
} | ||
getConnectivityState() { | ||
return this.connectivityState; | ||
} | ||
watchConnectivityState(currentState, deadline, callback) { | ||
const deadlineDate = deadline instanceof Date ? deadline : new Date(deadline); | ||
const now = new Date(); | ||
const connectionTimeout = Math.max(this.currentBackoffDeadline.getTime() - now.getTime(), MIN_CONNECT_TIMEOUT_MS); | ||
const connectionTimerId = setTimeout(() => { | ||
// This should trigger the 'close' event, which will send us back to | ||
// TRANSIENT_FAILURE | ||
subChannel.close(); | ||
}, connectionTimeout); | ||
this.subChannelConnectCallback = () => { | ||
// Connection succeeded | ||
clearTimeout(connectionTimerId); | ||
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY); | ||
if (deadlineDate <= now) { | ||
process.nextTick(callback, new Error('Deadline passed without connectivity state change')); | ||
return; | ||
} | ||
const watcherObject = { | ||
currentState, | ||
callback, | ||
timer: setTimeout(() => { | ||
this.removeConnectivityStateWatcher(watcherObject); | ||
callback(new Error('Deadline passed without connectivity state change')); | ||
}, deadlineDate.getTime() - now.getTime()), | ||
}; | ||
subChannel.once('connect', this.subChannelConnectCallback); | ||
this.subChannelCloseCallback = () => { | ||
// Connection failed | ||
clearTimeout(connectionTimerId); | ||
/* TODO(murgatroid99): verify that this works for | ||
* CONNECTING->TRANSITIVE_FAILURE see nodejs/node#16645 */ | ||
this.transitionToState([ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.TRANSIENT_FAILURE); | ||
}; | ||
subChannel.once('close', this.subChannelCloseCallback); | ||
this.connectivityStateWatchers.push(watcherObject); | ||
} | ||
_startHttp2Stream(authority, methodName, stream, metadata) { | ||
const connectMetadata = this.connect().then(() => metadata.clone()); | ||
const finalMetadata = stream.filterStack.sendMetadata(connectMetadata); | ||
finalMetadata | ||
.then(metadataValue => { | ||
const headers = metadataValue.toHttp2Headers(); | ||
headers[HTTP2_HEADER_AUTHORITY] = authority; | ||
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; | ||
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; | ||
headers[HTTP2_HEADER_METHOD] = 'POST'; | ||
headers[HTTP2_HEADER_PATH] = methodName; | ||
headers[HTTP2_HEADER_TE] = 'trailers'; | ||
if (this.connectivityState === ConnectivityState.READY) { | ||
const subChannel = this.subChannel; | ||
subChannel.startCallStream(metadataValue, stream); | ||
} | ||
else { | ||
/* In this case, we lost the connection while finalizing | ||
* metadata. That should be very unusual */ | ||
setImmediate(() => { | ||
this._startHttp2Stream(authority, methodName, stream, metadata); | ||
}); | ||
} | ||
}) | ||
.catch((error) => { | ||
// We assume the error code isn't 0 (Status.OK) | ||
stream.cancelWithStatus(error.code || constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`); | ||
}); | ||
} | ||
createCall(method, deadline, host, parentCall, propagateFlags) { | ||
@@ -233,91 +197,7 @@ if (this.connectivityState === ConnectivityState.SHUTDOWN) { | ||
}; | ||
const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory); | ||
const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials()); | ||
return stream; | ||
} | ||
/** | ||
* Attempts to connect, returning a Promise that resolves when the connection | ||
* is successful, or rejects if the channel is shut down. | ||
*/ | ||
connect() { | ||
if (this.connectivityState === ConnectivityState.READY) { | ||
return Promise.resolve(); | ||
} | ||
else if (this.connectivityState === ConnectivityState.SHUTDOWN) { | ||
return Promise.reject(new Error('Channel has been shut down')); | ||
} | ||
else { | ||
// In effect, this.connecting is only assigned upon the first attempt to | ||
// transition from IDLE to CONNECTING, so this condition could have also | ||
// been (connectivityState === IDLE). | ||
if (!this.connecting) { | ||
this.connecting = new Promise((resolve, reject) => { | ||
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); | ||
const onConnect = () => { | ||
this.connecting = null; | ||
this.removeListener('shutdown', onShutdown); | ||
resolve(); | ||
}; | ||
const onShutdown = () => { | ||
this.connecting = null; | ||
this.removeListener('connect', onConnect); | ||
reject(new Error('Channel has been shut down')); | ||
}; | ||
this.once('connect', onConnect); | ||
this.once('shutdown', onShutdown); | ||
}); | ||
} | ||
return this.connecting; | ||
} | ||
} | ||
getConnectivityState(tryToConnect) { | ||
if (tryToConnect) { | ||
this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); | ||
} | ||
return this.connectivityState; | ||
} | ||
watchConnectivityState(currentState, deadline, callback) { | ||
if (this.connectivityState !== currentState) { | ||
/* If the connectivity state is different from the provided currentState, | ||
* we assume that a state change has successfully occurred */ | ||
setImmediate(callback); | ||
} | ||
else { | ||
let deadlineMs = 0; | ||
if (deadline instanceof Date) { | ||
deadlineMs = deadline.getTime(); | ||
} | ||
else { | ||
deadlineMs = deadline; | ||
} | ||
let timeout = deadlineMs - Date.now(); | ||
if (timeout < 0) { | ||
timeout = 0; | ||
} | ||
const timeoutId = setTimeout(() => { | ||
this.removeListener('connectivityStateChanged', eventCb); | ||
callback(new Error('Channel state did not change before deadline')); | ||
}, timeout); | ||
const eventCb = () => { | ||
clearTimeout(timeoutId); | ||
callback(); | ||
}; | ||
this.once('connectivityStateChanged', eventCb); | ||
} | ||
} | ||
getTarget() { | ||
return this.target.toString(); | ||
} | ||
close() { | ||
if (this.connectivityState === ConnectivityState.SHUTDOWN) { | ||
throw new Error('Channel has been shut down'); | ||
} | ||
this.transitionToState([ | ||
ConnectivityState.CONNECTING, | ||
ConnectivityState.READY, | ||
ConnectivityState.TRANSIENT_FAILURE, | ||
ConnectivityState.IDLE, | ||
], ConnectivityState.SHUTDOWN); | ||
} | ||
} | ||
exports.Http2Channel = Http2Channel; | ||
exports.ChannelImplementation = ChannelImplementation; | ||
//# sourceMappingURL=channel.js.map |
@@ -37,3 +37,3 @@ "use strict"; | ||
else { | ||
this[CHANNEL_SYMBOL] = new channel_1.Http2Channel(address, credentials, options); | ||
this[CHANNEL_SYMBOL] = new channel_1.ChannelImplementation(address, credentials, options); | ||
} | ||
@@ -40,0 +40,0 @@ } |
import { Call } from './call-stream'; | ||
import { Http2Channel } from './channel'; | ||
import { Channel } from './channel'; | ||
import { BaseFilter, Filter, FilterFactory } from './filter'; | ||
@@ -10,3 +10,3 @@ import { Metadata } from './metadata'; | ||
private deadline; | ||
constructor(channel: Http2Channel, callStream: Call); | ||
constructor(channel: Channel, callStream: Call); | ||
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>; | ||
@@ -16,4 +16,4 @@ } | ||
private readonly channel; | ||
constructor(channel: Http2Channel); | ||
constructor(channel: Channel); | ||
createFilter(callStream: Call): DeadlineFilter; | ||
} |
@@ -5,3 +5,3 @@ /// <reference types="node" /> | ||
import { Deadline, StatusObject } from './call-stream'; | ||
import { Channel, ConnectivityState, Http2Channel } from './channel'; | ||
import { Channel, ConnectivityState, ChannelImplementation } from './channel'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -33,3 +33,3 @@ import { CallOptions, Client } from './client'; | ||
/**** Client ****/ | ||
export { Client, loadPackageDefinition, makeClientConstructor, makeClientConstructor as makeGenericClientConstructor, Http2Channel as Channel, }; | ||
export { Client, loadPackageDefinition, makeClientConstructor, makeClientConstructor as makeGenericClientConstructor, ChannelImplementation as Channel, }; | ||
/** | ||
@@ -36,0 +36,0 @@ * Close a Client object. |
@@ -24,3 +24,3 @@ "use strict"; | ||
exports.connectivityState = channel_1.ConnectivityState; | ||
exports.Channel = channel_1.Http2Channel; | ||
exports.Channel = channel_1.ChannelImplementation; | ||
const channel_credentials_1 = require("./channel-credentials"); | ||
@@ -151,2 +151,8 @@ exports.ChannelCredentials = channel_credentials_1.ChannelCredentials; | ||
}; | ||
const resolver = require("./resolver"); | ||
const load_balancer = require("./load-balancer"); | ||
(() => { | ||
resolver.registerAll(); | ||
load_balancer.registerAll(); | ||
})(); | ||
//# sourceMappingURL=index.js.map |
@@ -15,5 +15,5 @@ /// <reference types="node" /> | ||
export declare class Metadata { | ||
private options?; | ||
protected internalRepr: MetadataObject; | ||
constructor(options?: MetadataOptions | undefined); | ||
private options; | ||
constructor(options?: MetadataOptions); | ||
/** | ||
@@ -68,2 +68,3 @@ * Sets the given value for the given key by replacing any other values | ||
setOptions(options: MetadataOptions): void; | ||
getOptions(): MetadataOptions; | ||
/** | ||
@@ -70,0 +71,0 @@ * Creates an OutgoingHttpHeaders object that can be used with the http2 API. |
@@ -58,4 +58,9 @@ "use strict"; | ||
constructor(options) { | ||
this.options = options; | ||
this.internalRepr = new Map(); | ||
if (options === undefined) { | ||
this.options = {}; | ||
} | ||
else { | ||
this.options = options; | ||
} | ||
} | ||
@@ -162,2 +167,5 @@ /** | ||
} | ||
getOptions() { | ||
return this.options; | ||
} | ||
/** | ||
@@ -164,0 +172,0 @@ * Creates an OutgoingHttpHeaders object that can be used with the http2 API. |
@@ -1,33 +0,142 @@ | ||
/// <reference types="node" /> | ||
import { EventEmitter } from 'events'; | ||
import * as http2 from 'http2'; | ||
import * as url from 'url'; | ||
import { Call, Http2CallStream } from './call-stream'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
import { Metadata } from './metadata'; | ||
import { Http2CallStream } from './call-stream'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { Metadata } from './metadata'; | ||
export interface SubChannel extends EventEmitter { | ||
import { ConnectivityState } from './channel'; | ||
export declare type ConnectivityStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => void; | ||
export declare class Subchannel { | ||
private channelTarget; | ||
private subchannelAddress; | ||
private options; | ||
private credentials; | ||
/** | ||
* Attach a call stream to this subchannel's connection to start it | ||
* @param headers The headers to start the stream with | ||
* @param callStream The stream to start | ||
* The subchannel's current connectivity state. Invariant: `session` === `null` | ||
* if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE. | ||
*/ | ||
startCallStream(metadata: Metadata, callStream: Call): void; | ||
close(): void; | ||
} | ||
export declare class Http2SubChannel extends EventEmitter implements SubChannel { | ||
private connectivityState; | ||
/** | ||
* The underlying http2 session used to make requests. | ||
*/ | ||
private session; | ||
private refCount; | ||
/** | ||
* Indicates that the subchannel should transition from TRANSIENT_FAILURE to | ||
* CONNECTING instead of IDLE when the backoff timeout ends. | ||
*/ | ||
private continueConnecting; | ||
/** | ||
* A list of listener functions that will be called whenever the connectivity | ||
* state changes. Will be modified by `addConnectivityStateListener` and | ||
* `removeConnectivityStateListener` | ||
*/ | ||
private stateListeners; | ||
/** | ||
* A list of listener functions that will be called when the underlying | ||
* socket disconnects. Used for ending active calls with an UNAVAILABLE | ||
* status. | ||
*/ | ||
private disconnectListeners; | ||
private backoffTimeout; | ||
/** | ||
* The complete user agent string constructed using channel args. | ||
*/ | ||
private userAgent; | ||
/** | ||
* The amount of time in between sending pings | ||
*/ | ||
private keepaliveTimeMs; | ||
/** | ||
* The amount of time to wait for an acknowledgement after sending a ping | ||
*/ | ||
private keepaliveTimeoutMs; | ||
/** | ||
* Timer reference for timeout that indicates when to send the next ping | ||
*/ | ||
private keepaliveIntervalId; | ||
/** | ||
* Timer reference tracking when the most recent ping will be considered lost | ||
*/ | ||
private keepaliveTimeoutId; | ||
constructor(target: url.URL, connectionOptions: http2.SecureClientSessionOptions, userAgent: string, channelArgs: Partial<ChannelOptions>); | ||
private ref; | ||
private unref; | ||
/** | ||
* Tracks calls with references to this subchannel | ||
*/ | ||
private callRefcount; | ||
/** | ||
* Tracks channels and subchannel pools with references to this subchannel | ||
*/ | ||
private refcount; | ||
/** | ||
* A class representing a connection to a single backend. | ||
* @param channelTarget The target string for the channel as a whole | ||
* @param subchannelAddress The address for the backend that this subchannel | ||
* will connect to | ||
* @param options The channel options, plus any specific subchannel options | ||
* for this subchannel | ||
* @param credentials The channel credentials used to establish this | ||
* connection | ||
*/ | ||
constructor(channelTarget: string, subchannelAddress: string, options: ChannelOptions, credentials: ChannelCredentials); | ||
/** | ||
* Start a backoff timer with the current nextBackoff timeout | ||
*/ | ||
private startBackoff; | ||
private stopBackoff; | ||
private sendPing; | ||
private startKeepalivePings; | ||
private stopKeepalivePings; | ||
private startConnectingInternal; | ||
/** | ||
* Initiate a state transition from any element of oldStates to the new | ||
* state. If the current connectivityState is not in oldStates, do nothing. | ||
* @param oldStates The set of states to transition from | ||
* @param newState The state to transition to | ||
* @returns True if the state changed, false otherwise | ||
*/ | ||
private transitionToState; | ||
/** | ||
* Check if the subchannel associated with zero calls and with zero channels. | ||
* If so, shut it down. | ||
*/ | ||
private checkBothRefcounts; | ||
callRef(): void; | ||
callUnref(): void; | ||
ref(): void; | ||
unref(): void; | ||
unrefIfOneRef(): boolean; | ||
/** | ||
* Start a stream on the current session with the given `metadata` as headers | ||
* and then attach it to the `callStream`. Must only be called if the | ||
* subchannel's current connectivity state is READY. | ||
* @param metadata | ||
* @param callStream | ||
*/ | ||
startCallStream(metadata: Metadata, callStream: Http2CallStream): void; | ||
close(): void; | ||
/** | ||
* If the subchannel is currently IDLE, start connecting and switch to the | ||
* CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE, | ||
* the next time it would transition to IDLE, start connecting again instead. | ||
* Otherwise, do nothing. | ||
*/ | ||
startConnecting(): void; | ||
/** | ||
* Get the subchannel's current connectivity state. | ||
*/ | ||
getConnectivityState(): ConnectivityState; | ||
/** | ||
* Add a listener function to be called whenever the subchannel's | ||
* connectivity state changes. | ||
* @param listener | ||
*/ | ||
addConnectivityStateListener(listener: ConnectivityStateListener): void; | ||
/** | ||
* Remove a listener previously added with `addConnectivityStateListener` | ||
* @param listener A reference to a function previously passed to | ||
* `addConnectivityStateListener` | ||
*/ | ||
removeConnectivityStateListener(listener: ConnectivityStateListener): void; | ||
addDisconnectListener(listener: () => void): void; | ||
removeDisconnectListener(listener: () => void): void; | ||
/** | ||
* Reset the backoff timeout, and immediately start connecting if in backoff. | ||
*/ | ||
resetBackoff(): void; | ||
} |
@@ -19,5 +19,12 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const events_1 = require("events"); | ||
const http2 = require("http2"); | ||
const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants; | ||
const tls_1 = require("tls"); | ||
const channel_1 = require("./channel"); | ||
const backoff_timeout_1 = require("./backoff-timeout"); | ||
const { version: clientVersion } = require('../../package.json'); | ||
const MIN_CONNECT_TIMEOUT_MS = 20000; | ||
const INITIAL_BACKOFF_MS = 1000; | ||
const BACKOFF_MULTIPLIER = 1.6; | ||
const MAX_BACKOFF_MS = 120000; | ||
const BACKOFF_JITTER = 0.2; | ||
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't | ||
@@ -28,31 +35,82 @@ * have a constant for the max signed 32 bit integer, so this is a simple way | ||
const KEEPALIVE_TIMEOUT_MS = 20000; | ||
class Http2SubChannel extends events_1.EventEmitter { | ||
constructor(target, connectionOptions, userAgent, channelArgs) { | ||
super(); | ||
this.refCount = 0; | ||
const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT, } = http2.constants; | ||
/** | ||
* Get a number uniformly at random in the range [min, max) | ||
* @param min | ||
* @param max | ||
*/ | ||
function uniformRandom(min, max) { | ||
return Math.random() * (max - min) + min; | ||
} | ||
class Subchannel { | ||
/** | ||
* A class representing a connection to a single backend. | ||
* @param channelTarget The target string for the channel as a whole | ||
* @param subchannelAddress The address for the backend that this subchannel | ||
* will connect to | ||
* @param options The channel options, plus any specific subchannel options | ||
* for this subchannel | ||
* @param credentials The channel credentials used to establish this | ||
* connection | ||
*/ | ||
constructor(channelTarget, subchannelAddress, options, credentials) { | ||
this.channelTarget = channelTarget; | ||
this.subchannelAddress = subchannelAddress; | ||
this.options = options; | ||
this.credentials = credentials; | ||
/** | ||
* The subchannel's current connectivity state. Invariant: `session` === `null` | ||
* if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE. | ||
*/ | ||
this.connectivityState = channel_1.ConnectivityState.IDLE; | ||
/** | ||
* The underlying http2 session used to make requests. | ||
*/ | ||
this.session = null; | ||
/** | ||
* Indicates that the subchannel should transition from TRANSIENT_FAILURE to | ||
* CONNECTING instead of IDLE when the backoff timeout ends. | ||
*/ | ||
this.continueConnecting = false; | ||
/** | ||
* A list of listener functions that will be called whenever the connectivity | ||
* state changes. Will be modified by `addConnectivityStateListener` and | ||
* `removeConnectivityStateListener` | ||
*/ | ||
this.stateListeners = []; | ||
/** | ||
* A list of listener functions that will be called when the underlying | ||
* socket disconnects. Used for ending active calls with an UNAVAILABLE | ||
* status. | ||
*/ | ||
this.disconnectListeners = []; | ||
/** | ||
* The amount of time in between sending pings | ||
*/ | ||
this.keepaliveTimeMs = KEEPALIVE_TIME_MS; | ||
/** | ||
* The amount of time to wait for an acknowledgement after sending a ping | ||
*/ | ||
this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS; | ||
this.session = http2.connect(target, connectionOptions); | ||
this.session.unref(); | ||
this.session.on('connect', () => { | ||
this.emit('connect'); | ||
}); | ||
this.session.on('close', () => { | ||
this.stopKeepalivePings(); | ||
this.emit('close'); | ||
}); | ||
this.session.on('error', () => { | ||
this.stopKeepalivePings(); | ||
this.emit('close'); | ||
}); | ||
this.session.on('goaway', () => { | ||
this.stopKeepalivePings(); | ||
this.emit('close'); | ||
}); | ||
this.userAgent = userAgent; | ||
if (channelArgs['grpc.keepalive_time_ms']) { | ||
this.keepaliveTimeMs = channelArgs['grpc.keepalive_time_ms']; | ||
/** | ||
* Tracks calls with references to this subchannel | ||
*/ | ||
this.callRefcount = 0; | ||
/** | ||
* Tracks channels and subchannel pools with references to this subchannel | ||
*/ | ||
this.refcount = 0; | ||
// Build user-agent string. | ||
this.userAgent = [ | ||
options['grpc.primary_user_agent'], | ||
`grpc-node-js/${clientVersion}`, | ||
options['grpc.secondary_user_agent'], | ||
] | ||
.filter(e => e) | ||
.join(' '); // remove falsey values first | ||
if ('grpc.keepalive_time_ms' in options) { | ||
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']; | ||
} | ||
if (channelArgs['grpc.keepalive_timeout_ms']) { | ||
this.keepaliveTimeoutMs = channelArgs['grpc.keepalive_timeout_ms']; | ||
if ('grpc.keepalive_timeout_ms' in options) { | ||
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']; | ||
} | ||
@@ -63,20 +121,24 @@ this.keepaliveIntervalId = setTimeout(() => { }, 0); | ||
clearTimeout(this.keepaliveTimeoutId); | ||
this.backoffTimeout = new backoff_timeout_1.BackoffTimeout(() => { | ||
if (this.continueConnecting) { | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE, channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.CONNECTING); | ||
} | ||
else { | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE, channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.IDLE); | ||
} | ||
}); | ||
} | ||
ref() { | ||
if (this.refCount === 0) { | ||
this.session.ref(); | ||
this.startKeepalivePings(); | ||
} | ||
this.refCount += 1; | ||
/** | ||
* Start a backoff timer with the current nextBackoff timeout | ||
*/ | ||
startBackoff() { | ||
this.backoffTimeout.runOnce(); | ||
} | ||
unref() { | ||
this.refCount -= 1; | ||
if (this.refCount === 0) { | ||
this.session.unref(); | ||
this.stopKeepalivePings(); | ||
} | ||
stopBackoff() { | ||
this.backoffTimeout.stop(); | ||
this.backoffTimeout.reset(); | ||
} | ||
sendPing() { | ||
this.keepaliveTimeoutId = setTimeout(() => { | ||
this.emit('close'); | ||
this.transitionToState([channel_1.ConnectivityState.READY], channel_1.ConnectivityState.IDLE); | ||
}, this.keepaliveTimeoutMs); | ||
@@ -87,4 +149,2 @@ this.session.ping((err, duration, payload) => { | ||
} | ||
/* TODO(murgatroid99): refactor subchannels so that keepalives can be handled | ||
* per subchannel */ | ||
startKeepalivePings() { | ||
@@ -100,3 +160,141 @@ this.keepaliveIntervalId = setInterval(() => { | ||
} | ||
// Prerequisite: this subchannel is connected | ||
startConnectingInternal() { | ||
const connectionOptions = this.credentials._getConnectionOptions() || {}; | ||
let addressScheme = 'http://'; | ||
if ('secureContext' in connectionOptions) { | ||
addressScheme = 'https://'; | ||
// If provided, the value of grpc.ssl_target_name_override should be used | ||
// to override the target hostname when checking server identity. | ||
// This option is used for testing only. | ||
if (this.options['grpc.ssl_target_name_override']) { | ||
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override']; | ||
connectionOptions.checkServerIdentity = (host, cert) => { | ||
return tls_1.checkServerIdentity(sslTargetNameOverride, cert); | ||
}; | ||
connectionOptions.servername = sslTargetNameOverride; | ||
} | ||
else { | ||
connectionOptions.servername = this.channelTarget; | ||
} | ||
} | ||
this.session = http2.connect(addressScheme + this.subchannelAddress, connectionOptions); | ||
this.session.unref(); | ||
this.session.once('connect', () => { | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.READY); | ||
}); | ||
this.session.once('close', () => { | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING, channel_1.ConnectivityState.READY], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
}); | ||
this.session.once('goaway', () => { | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING, channel_1.ConnectivityState.READY], channel_1.ConnectivityState.IDLE); | ||
}); | ||
this.session.once('error', error => { | ||
/* Do nothing here. Any error should also trigger a close event, which is | ||
* where we want to handle that. */ | ||
}); | ||
} | ||
/** | ||
* Initiate a state transition from any element of oldStates to the new | ||
* state. If the current connectivityState is not in oldStates, do nothing. | ||
* @param oldStates The set of states to transition from | ||
* @param newState The state to transition to | ||
* @returns True if the state changed, false otherwise | ||
*/ | ||
transitionToState(oldStates, newState) { | ||
if (oldStates.indexOf(this.connectivityState) === -1) { | ||
return false; | ||
} | ||
const previousState = this.connectivityState; | ||
this.connectivityState = newState; | ||
switch (newState) { | ||
case channel_1.ConnectivityState.READY: | ||
this.stopBackoff(); | ||
this.session.socket.once('close', () => { | ||
for (const listener of this.disconnectListeners) { | ||
listener(); | ||
} | ||
}); | ||
break; | ||
case channel_1.ConnectivityState.CONNECTING: | ||
this.startBackoff(); | ||
this.startConnectingInternal(); | ||
this.continueConnecting = false; | ||
break; | ||
case channel_1.ConnectivityState.TRANSIENT_FAILURE: | ||
this.session = null; | ||
this.stopKeepalivePings(); | ||
break; | ||
case channel_1.ConnectivityState.IDLE: | ||
/* Stopping the backoff timer here is probably redundant because we | ||
* should only transition to the IDLE state as a result of the timer | ||
* ending, but we still want to reset the backoff timeout. */ | ||
this.stopBackoff(); | ||
this.session = null; | ||
this.stopKeepalivePings(); | ||
break; | ||
default: | ||
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); | ||
} | ||
/* We use a shallow copy of the stateListeners array in case a listener | ||
* is removed during this iteration */ | ||
for (const listener of [...this.stateListeners]) { | ||
listener(this, previousState, newState); | ||
} | ||
return true; | ||
} | ||
/** | ||
* Check if the subchannel associated with zero calls and with zero channels. | ||
* If so, shut it down. | ||
*/ | ||
checkBothRefcounts() { | ||
/* If no calls, channels, or subchannel pools have any more references to | ||
* this subchannel, we can be sure it will never be used again. */ | ||
if (this.callRefcount === 0 && this.refcount === 0) { | ||
this.transitionToState([ | ||
channel_1.ConnectivityState.CONNECTING, | ||
channel_1.ConnectivityState.IDLE, | ||
channel_1.ConnectivityState.READY, | ||
], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
} | ||
} | ||
callRef() { | ||
if (this.callRefcount === 0) { | ||
if (this.session) { | ||
this.session.ref(); | ||
} | ||
this.startKeepalivePings(); | ||
} | ||
this.callRefcount += 1; | ||
} | ||
callUnref() { | ||
this.callRefcount -= 1; | ||
if (this.callRefcount === 0) { | ||
if (this.session) { | ||
this.session.unref(); | ||
} | ||
this.stopKeepalivePings(); | ||
this.checkBothRefcounts(); | ||
} | ||
} | ||
ref() { | ||
this.refcount += 1; | ||
} | ||
unref() { | ||
this.refcount -= 1; | ||
this.checkBothRefcounts(); | ||
} | ||
unrefIfOneRef() { | ||
if (this.refcount === 1) { | ||
this.unref(); | ||
return true; | ||
} | ||
return false; | ||
} | ||
/** | ||
* Start a stream on the current session with the given `metadata` as headers | ||
* and then attach it to the `callStream`. Must only be called if the | ||
* subchannel's current connectivity state is READY. | ||
* @param metadata | ||
* @param callStream | ||
*/ | ||
startCallStream(metadata, callStream) { | ||
@@ -111,13 +309,64 @@ const headers = metadata.toHttp2Headers(); | ||
const http2Stream = this.session.request(headers); | ||
this.ref(); | ||
http2Stream.on('close', () => { | ||
this.unref(); | ||
}); | ||
callStream.attachHttp2Stream(http2Stream); | ||
callStream.attachHttp2Stream(http2Stream, this); | ||
} | ||
close() { | ||
this.session.close(); | ||
/** | ||
* If the subchannel is currently IDLE, start connecting and switch to the | ||
* CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE, | ||
* the next time it would transition to IDLE, start connecting again instead. | ||
* Otherwise, do nothing. | ||
*/ | ||
startConnecting() { | ||
/* First, try to transition from IDLE to connecting. If that doesn't happen | ||
* because the state is not currently IDLE, check if it is | ||
* TRANSIENT_FAILURE, and if so indicate that it should go back to | ||
* connecting after the backoff timer ends. Otherwise do nothing */ | ||
if (!this.transitionToState([channel_1.ConnectivityState.IDLE], channel_1.ConnectivityState.CONNECTING)) { | ||
if (this.connectivityState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
this.continueConnecting = true; | ||
} | ||
} | ||
} | ||
/** | ||
* Get the subchannel's current connectivity state. | ||
*/ | ||
getConnectivityState() { | ||
return this.connectivityState; | ||
} | ||
/** | ||
* Add a listener function to be called whenever the subchannel's | ||
* connectivity state changes. | ||
* @param listener | ||
*/ | ||
addConnectivityStateListener(listener) { | ||
this.stateListeners.push(listener); | ||
} | ||
/** | ||
* Remove a listener previously added with `addConnectivityStateListener` | ||
* @param listener A reference to a function previously passed to | ||
* `addConnectivityStateListener` | ||
*/ | ||
removeConnectivityStateListener(listener) { | ||
const listenerIndex = this.stateListeners.indexOf(listener); | ||
if (listenerIndex > -1) { | ||
this.stateListeners.splice(listenerIndex, 1); | ||
} | ||
} | ||
addDisconnectListener(listener) { | ||
this.disconnectListeners.push(listener); | ||
} | ||
removeDisconnectListener(listener) { | ||
const listenerIndex = this.disconnectListeners.indexOf(listener); | ||
if (listenerIndex > -1) { | ||
this.disconnectListeners.splice(listenerIndex, 1); | ||
} | ||
} | ||
/** | ||
* Reset the backoff timeout, and immediately start connecting if in backoff. | ||
*/ | ||
resetBackoff() { | ||
this.backoffTimeout.reset(); | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE], channel_1.ConnectivityState.CONNECTING); | ||
} | ||
} | ||
exports.Http2SubChannel = Http2SubChannel; | ||
exports.Subchannel = Subchannel; | ||
//# sourceMappingURL=subchannel.js.map |
{ | ||
"name": "@grpc/grpc-js", | ||
"version": "0.5.4", | ||
"version": "0.6.0", | ||
"description": "gRPC Library for Node - pure JS implementation", | ||
@@ -23,4 +23,5 @@ "homepage": "https://grpc.io/", | ||
"@types/mocha": "^5.2.6", | ||
"@types/node": "^12.7.5", | ||
"@types/ncp": "^2.0.1", | ||
"@types/node": "^12.0.2", | ||
"@types/node": "^12.7.5", | ||
"@types/pify": "^3.0.2", | ||
@@ -27,0 +28,0 @@ "@types/semver": "^6.0.1", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
289744
6977
7