@grpc/grpc-js
Advanced tools
Comparing version 1.3.8 to 1.4.0
@@ -5,7 +5,7 @@ /// <reference types="node" /> | ||
import { Status } from './constants'; | ||
import { Filter, FilterFactory } from './filter'; | ||
import { FilterStackFactory } from './filter-stack'; | ||
import { Filter } from './filter'; | ||
import { FilterStackFactory, FilterStack } from './filter-stack'; | ||
import { Metadata } from './metadata'; | ||
import { ChannelImplementation } from './channel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; | ||
import { ServerSurfaceCall } from './server-call'; | ||
@@ -95,3 +95,3 @@ export declare type Deadline = Date | number; | ||
credentials: CallCredentials; | ||
filterStack: Filter; | ||
filterStack: FilterStack; | ||
private http2Stream; | ||
@@ -120,2 +120,6 @@ private pendingRead; | ||
private internalError; | ||
private configDeadline; | ||
private statusWatchers; | ||
private streamEndWatchers; | ||
private callStatsTracker; | ||
constructor(methodName: string, channel: ChannelImplementation, options: CallStreamOptions, filterStackFactory: FilterStackFactory, channelCallCredentials: CallCredentials, callNumber: number); | ||
@@ -137,3 +141,4 @@ private outputStatus; | ||
private handleTrailers; | ||
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel, extraFilterFactory?: FilterFactory<Filter>): void; | ||
private writeMessageToStream; | ||
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel, extraFilters: Filter[], callStatsTracker: SubchannelCallStatsTracker): void; | ||
start(metadata: Metadata, listener: InterceptingListener): void; | ||
@@ -149,2 +154,6 @@ private destroyHttp2Stream; | ||
getHost(): string; | ||
setConfigDeadline(configDeadline: Deadline): void; | ||
addStatusWatcher(watcher: (status: StatusObject) => void): void; | ||
addStreamEndWatcher(watcher: (success: boolean) => void): void; | ||
addFilters(extraFilters: Filter[]): void; | ||
startRead(): void; | ||
@@ -151,0 +160,0 @@ private maybeCloseWrites; |
@@ -23,3 +23,2 @@ "use strict"; | ||
const constants_1 = require("./constants"); | ||
const filter_stack_1 = require("./filter-stack"); | ||
const metadata_1 = require("./metadata"); | ||
@@ -45,2 +44,12 @@ const stream_decoder_1 = require("./stream-decoder"); | ||
} | ||
function getMinDeadline(deadlineList) { | ||
let minValue = Infinity; | ||
for (const deadline of deadlineList) { | ||
const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline; | ||
if (deadlineMsecs < minValue) { | ||
minValue = deadlineMsecs; | ||
} | ||
} | ||
return minValue; | ||
} | ||
function isInterceptingListener(listener) { | ||
@@ -119,2 +128,6 @@ return (listener.onReceiveMetadata !== undefined && | ||
this.internalError = null; | ||
this.configDeadline = Infinity; | ||
this.statusWatchers = []; | ||
this.streamEndWatchers = []; | ||
this.callStatsTracker = null; | ||
this.filterStack = filterStackFactory.createFilter(this); | ||
@@ -129,3 +142,4 @@ this.credentials = channelCallCredentials; | ||
}; | ||
if (this.options.parentCall && this.options.flags & constants_1.Propagate.CANCELLATION) { | ||
if (this.options.parentCall && | ||
this.options.flags & constants_1.Propagate.CANCELLATION) { | ||
this.options.parentCall.on('cancelled', () => { | ||
@@ -141,2 +155,3 @@ this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call'); | ||
const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus); | ||
this.statusWatchers.forEach(watcher => watcher(filteredStatus)); | ||
/* We delay the actual action of bubbling up the status to insulate the | ||
@@ -263,2 +278,3 @@ * cleanup code in this class from any errors that may be thrown in the | ||
handleTrailers(headers) { | ||
this.streamEndWatchers.forEach(watcher => watcher(true)); | ||
let headersString = ''; | ||
@@ -297,9 +313,9 @@ for (const header of Object.keys(headers)) { | ||
} | ||
attachHttp2Stream(stream, subchannel, extraFilterFactory) { | ||
if (extraFilterFactory !== undefined) { | ||
this.filterStack = new filter_stack_1.FilterStack([ | ||
this.filterStack, | ||
extraFilterFactory.createFilter(this), | ||
]); | ||
} | ||
writeMessageToStream(message, callback) { | ||
var _a; | ||
(_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent(); | ||
this.http2Stream.write(message, callback); | ||
} | ||
attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) { | ||
this.filterStack.push(extraFilters); | ||
if (this.finalStatus !== null) { | ||
@@ -312,2 +328,3 @@ stream.close(NGHTTP2_CANCEL); | ||
this.subchannel = subchannel; | ||
this.callStatsTracker = callStatsTracker; | ||
subchannel.addDisconnectListener(this.disconnectListener); | ||
@@ -380,2 +397,3 @@ subchannel.callRef(); | ||
this.trace('parsed message of length ' + message.length); | ||
this.callStatsTracker.addMessageReceived(); | ||
this.tryPush(message); | ||
@@ -442,3 +460,3 @@ } | ||
else { | ||
if (this.internalError.code === 'ECONNRESET') { | ||
if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') { | ||
code = constants_1.Status.UNAVAILABLE; | ||
@@ -476,5 +494,13 @@ details = this.internalError.message; | ||
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') { | ||
this.trace('Node error event: message=' + err.message + ' code=' + err.code + ' errno=' + getSystemErrorName(err.errno) + ' syscall=' + err.syscall); | ||
this.trace('Node error event: message=' + | ||
err.message + | ||
' code=' + | ||
err.code + | ||
' errno=' + | ||
getSystemErrorName(err.errno) + | ||
' syscall=' + | ||
err.syscall); | ||
this.internalError = err; | ||
} | ||
this.streamEndWatchers.forEach(watcher => watcher(false)); | ||
}); | ||
@@ -492,3 +518,3 @@ if (!this.pendingRead) { | ||
try { | ||
stream.write(this.pendingWrite, this.pendingWriteCallback); | ||
this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback); | ||
} | ||
@@ -535,12 +561,10 @@ catch (error) { | ||
getDeadline() { | ||
const deadlineList = [this.options.deadline]; | ||
if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) { | ||
const parentDeadline = this.options.parentCall.getDeadline(); | ||
const selfDeadline = this.options.deadline; | ||
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline; | ||
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline; | ||
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs); | ||
deadlineList.push(this.options.parentCall.getDeadline()); | ||
} | ||
else { | ||
return this.options.deadline; | ||
if (this.configDeadline) { | ||
deadlineList.push(this.configDeadline); | ||
} | ||
return getMinDeadline(deadlineList); | ||
} | ||
@@ -566,2 +590,14 @@ getCredentials() { | ||
} | ||
setConfigDeadline(configDeadline) { | ||
this.configDeadline = configDeadline; | ||
} | ||
addStatusWatcher(watcher) { | ||
this.statusWatchers.push(watcher); | ||
} | ||
addStreamEndWatcher(watcher) { | ||
this.streamEndWatchers.push(watcher); | ||
} | ||
addFilters(extraFilters) { | ||
this.filterStack.push(extraFilters); | ||
} | ||
startRead() { | ||
@@ -617,3 +653,3 @@ /* If the stream has ended with an error, we should not emit any more | ||
try { | ||
this.http2Stream.write(message.message, cb); | ||
this.writeMessageToStream(message.message, cb); | ||
} | ||
@@ -620,0 +656,0 @@ catch (error) { |
@@ -6,9 +6,4 @@ import { Deadline, Call, Http2CallStream } from './call-stream'; | ||
import { ServerSurfaceCall } from './server-call'; | ||
export declare enum ConnectivityState { | ||
IDLE = 0, | ||
CONNECTING = 1, | ||
READY = 2, | ||
TRANSIENT_FAILURE = 3, | ||
SHUTDOWN = 4 | ||
} | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ChannelRef } from './channelz'; | ||
/** | ||
@@ -49,2 +44,8 @@ * An interface that represents a communication channel to a server specified | ||
/** | ||
* Get the channelz reference object for this channel. A request to the | ||
* channelz service for the id in this object will provide information | ||
* about this channel. | ||
*/ | ||
getChannelzRef(): ChannelRef; | ||
/** | ||
* Create a call object. Call is an opaque type that is used by the Client | ||
@@ -89,3 +90,10 @@ * class. This function is called by the gRPC library when starting a | ||
private configSelector; | ||
private originalTarget; | ||
private channelzRef; | ||
private channelzTrace; | ||
private callTracker; | ||
private childrenTracker; | ||
constructor(target: string, credentials: ChannelCredentials, options: ChannelOptions); | ||
private getChannelzInfo; | ||
private trace; | ||
private callRefTimerRef; | ||
@@ -110,3 +118,4 @@ private callRefTimerUnref; | ||
watchConnectivityState(currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void): void; | ||
getChannelzRef(): ChannelRef; | ||
createCall(method: string, deadline: Deadline, host: string | null | undefined, parentCall: ServerSurfaceCall | null, propagateFlags: number | null | undefined): Call; | ||
} |
@@ -19,3 +19,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.ChannelImplementation = exports.ConnectivityState = void 0; | ||
exports.ChannelImplementation = void 0; | ||
const call_stream_1 = require("./call-stream"); | ||
@@ -36,10 +36,4 @@ const channel_credentials_1 = require("./channel-credentials"); | ||
const uri_parser_1 = require("./uri-parser"); | ||
var ConnectivityState; | ||
(function (ConnectivityState) { | ||
ConnectivityState[ConnectivityState["IDLE"] = 0] = "IDLE"; | ||
ConnectivityState[ConnectivityState["CONNECTING"] = 1] = "CONNECTING"; | ||
ConnectivityState[ConnectivityState["READY"] = 2] = "READY"; | ||
ConnectivityState[ConnectivityState["TRANSIENT_FAILURE"] = 3] = "TRANSIENT_FAILURE"; | ||
ConnectivityState[ConnectivityState["SHUTDOWN"] = 4] = "SHUTDOWN"; | ||
})(ConnectivityState = exports.ConnectivityState || (exports.ConnectivityState = {})); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const channelz_1 = require("./channelz"); | ||
/** | ||
@@ -63,3 +57,3 @@ * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args | ||
this.options = options; | ||
this.connectivityState = ConnectivityState.IDLE; | ||
this.connectivityState = connectivity_state_1.ConnectivityState.IDLE; | ||
this.currentPicker = new picker_1.UnavailablePicker(); | ||
@@ -74,2 +68,4 @@ /** | ||
this.configSelector = null; | ||
this.callTracker = new channelz_1.ChannelzCallTracker(); | ||
this.childrenTracker = new channelz_1.ChannelzChildrenTracker(); | ||
if (typeof target !== 'string') { | ||
@@ -82,9 +78,7 @@ throw new TypeError('Channel target must be a string'); | ||
if (options) { | ||
if (typeof options !== 'object' || | ||
!Object.values(options).every((value) => typeof value === 'string' || | ||
typeof value === 'number' || | ||
typeof value === 'undefined')) { | ||
throw new TypeError('Channel options must be an object with string or number values'); | ||
if (typeof options !== 'object') { | ||
throw new TypeError('Channel options must be an object'); | ||
} | ||
} | ||
this.originalTarget = target; | ||
const originalTargetUri = uri_parser_1.parseUri(target); | ||
@@ -102,2 +96,5 @@ if (originalTargetUri === null) { | ||
(_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a); | ||
this.channelzRef = channelz_1.registerChannelzChannel(target, () => this.getChannelzInfo()); | ||
this.channelzTrace = new channelz_1.ChannelzTrace(); | ||
this.channelzTrace.addTrace('CT_INFO', 'Channel created'); | ||
if (this.options['grpc.default_authority']) { | ||
@@ -117,3 +114,5 @@ this.defaultAuthority = this.options['grpc.default_authority']; | ||
createSubchannel: (subchannelAddress, subchannelArgs) => { | ||
return this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); | ||
const subchannel = this.subchannelPool.getOrCreateSubchannel(this.target, subchannelAddress, Object.assign({}, this.options, subchannelArgs), this.credentials); | ||
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); | ||
return subchannel; | ||
}, | ||
@@ -125,4 +124,4 @@ updateState: (connectivityState, picker) => { | ||
this.callRefTimerUnref(); | ||
for (const { callStream, callMetadata, callConfig } of queueCopy) { | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
@@ -135,4 +134,11 @@ this.updateState(connectivityState); | ||
}, | ||
addChannelzChild: (child) => { | ||
this.childrenTracker.refChild(child); | ||
}, | ||
removeChannelzChild: (child) => { | ||
this.childrenTracker.unrefChild(child); | ||
} | ||
}; | ||
this.resolvingLoadBalancer = new resolving_load_balancer_1.ResolvingLoadBalancer(this.target, channelControlHelper, options, (configSelector) => { | ||
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); | ||
this.configSelector = configSelector; | ||
@@ -151,4 +157,5 @@ /* We process the queue asynchronously to ensure that the corresponding | ||
}, (status) => { | ||
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); | ||
if (this.configSelectionQueue.length > 0) { | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uri_parser_1.uriToString(this.target) + ' with calls queued for config selection'); | ||
this.trace('Name resolution failed with calls queued for config selection'); | ||
} | ||
@@ -174,4 +181,16 @@ const localQueue = this.configSelectionQueue; | ||
]); | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', 'Channel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
} | ||
getChannelzInfo() { | ||
return { | ||
target: this.originalTarget, | ||
state: this.connectivityState, | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
children: this.childrenTracker.getChildLists() | ||
}; | ||
} | ||
trace(text, verbosityOverride) { | ||
logging_1.trace(verbosityOverride !== null && verbosityOverride !== void 0 ? verbosityOverride : constants_1.LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uri_parser_1.uriToString(this.target) + ' ' + text); | ||
} | ||
callRefTimerRef() { | ||
@@ -181,3 +200,6 @@ var _a, _b, _c, _d; | ||
if (!((_b = (_a = this.callRefTimer).hasRef) === null || _b === void 0 ? void 0 : _b.call(_a))) { | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); | ||
this.trace('callRefTimer.ref | configSelectionQueue.length=' + | ||
this.configSelectionQueue.length + | ||
' pickQueue.length=' + | ||
this.pickQueue.length); | ||
(_d = (_c = this.callRefTimer).ref) === null || _d === void 0 ? void 0 : _d.call(_c); | ||
@@ -189,9 +211,12 @@ } | ||
// If the hasRef function does not exist, always run the code | ||
if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) { | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); | ||
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { | ||
this.trace('callRefTimer.unref | configSelectionQueue.length=' + | ||
this.configSelectionQueue.length + | ||
' pickQueue.length=' + | ||
this.pickQueue.length); | ||
(_b = (_a = this.callRefTimer).unref) === null || _b === void 0 ? void 0 : _b.call(_a); | ||
} | ||
} | ||
pushPick(callStream, callMetadata, callConfig) { | ||
this.pickQueue.push({ callStream, callMetadata, callConfig }); | ||
pushPick(callStream, callMetadata, callConfig, dynamicFilters) { | ||
this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); | ||
this.callRefTimerRef(); | ||
@@ -206,6 +231,9 @@ } | ||
*/ | ||
tryPick(callStream, callMetadata, callConfig) { | ||
tryPick(callStream, callMetadata, callConfig, dynamicFilters) { | ||
var _a, _b, _c; | ||
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation }); | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', 'Pick result: ' + | ||
const pickResult = this.currentPicker.pick({ | ||
metadata: callMetadata, | ||
extraPickInfo: callConfig.pickInformation, | ||
}); | ||
this.trace('Pick result: ' + | ||
picker_1.PickResultType[pickResult.pickResultType] + | ||
@@ -226,8 +254,8 @@ ' subchannel: ' + ((_a = pickResult.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) + | ||
if (pickResult.subchannel.getConnectivityState() !== | ||
ConnectivityState.READY) { | ||
connectivity_state_1.ConnectivityState.READY) { | ||
logging_1.log(constants_1.LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' + | ||
pickResult.subchannel.getAddress() + | ||
' has state ' + | ||
ConnectivityState[pickResult.subchannel.getConnectivityState()]); | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
connectivity_state_1.ConnectivityState[pickResult.subchannel.getConnectivityState()]); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
break; | ||
@@ -241,11 +269,12 @@ } | ||
.then((finalMetadata) => { | ||
var _a, _b, _c; | ||
var _a, _b; | ||
const subchannelState = pickResult.subchannel.getConnectivityState(); | ||
if (subchannelState === ConnectivityState.READY) { | ||
if (subchannelState === connectivity_state_1.ConnectivityState.READY) { | ||
try { | ||
pickResult.subchannel.startCallStream(finalMetadata, callStream, (_a = pickResult.extraFilterFactory) !== null && _a !== void 0 ? _a : undefined); | ||
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); | ||
pickResult.subchannel.startCallStream(finalMetadata, callStream, [...dynamicFilters, ...pickExtraFilters]); | ||
/* If we reach this point, the call stream has started | ||
* successfully */ | ||
(_b = callConfig.onCommitted) === null || _b === void 0 ? void 0 : _b.call(callConfig); | ||
(_c = pickResult.onCallStarted) === null || _c === void 0 ? void 0 : _c.call(pickResult); | ||
(_a = callConfig.onCommitted) === null || _a === void 0 ? void 0 : _a.call(callConfig); | ||
(_b = pickResult.onCallStarted) === null || _b === void 0 ? void 0 : _b.call(pickResult); | ||
} | ||
@@ -268,15 +297,15 @@ catch (error) { | ||
* tryPick */ | ||
logging_1.trace(constants_1.LogVerbosity.INFO, 'channel', 'Failed to start call on picked subchannel ' + | ||
this.trace('Failed to start call on picked subchannel ' + | ||
pickResult.subchannel.getAddress() + | ||
' with error ' + | ||
error.message + | ||
'. Retrying pick'); | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
'. Retrying pick', constants_1.LogVerbosity.INFO); | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
else { | ||
logging_1.trace(constants_1.LogVerbosity.INFO, 'channel', 'Failed to start call on picked subchanel ' + | ||
this.trace('Failed to start call on picked subchanel ' + | ||
pickResult.subchannel.getAddress() + | ||
' with error ' + | ||
error.message + | ||
'. Ending call'); | ||
'. Ending call', constants_1.LogVerbosity.INFO); | ||
callStream.cancelWithStatus(constants_1.Status.INTERNAL, `Failed to start HTTP/2 stream with error: ${error.message}`); | ||
@@ -289,12 +318,12 @@ } | ||
* block above */ | ||
logging_1.trace(constants_1.LogVerbosity.INFO, 'channel', 'Picked subchannel ' + | ||
this.trace('Picked subchannel ' + | ||
pickResult.subchannel.getAddress() + | ||
' has state ' + | ||
ConnectivityState[subchannelState] + | ||
' after metadata filters. Retrying pick'); | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
connectivity_state_1.ConnectivityState[subchannelState] + | ||
' after metadata filters. Retrying pick', constants_1.LogVerbosity.INFO); | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
}, (error) => { | ||
// We assume the error code isn't 0 (Status.OK) | ||
callStream.cancelWithStatus((typeof error.code === 'number') ? error.code : constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`); | ||
callStream.cancelWithStatus(typeof error.code === 'number' ? error.code : constants_1.Status.UNKNOWN, `Getting metadata from plugin failed with error: ${error.message}`); | ||
}); | ||
@@ -304,7 +333,7 @@ } | ||
case picker_1.PickResultType.QUEUE: | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
break; | ||
case picker_1.PickResultType.TRANSIENT_FAILURE: | ||
if (callMetadata.getOptions().waitForReady) { | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
@@ -329,7 +358,9 @@ else { | ||
updateState(newState) { | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'connectivity_state', uri_parser_1.uriToString(this.target) + | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'connectivity_state', '(' + this.channelzRef.id + ') ' + | ||
uri_parser_1.uriToString(this.target) + | ||
' ' + | ||
ConnectivityState[this.connectivityState] + | ||
connectivity_state_1.ConnectivityState[this.connectivityState] + | ||
' -> ' + | ||
ConnectivityState[newState]); | ||
connectivity_state_1.ConnectivityState[newState]); | ||
this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]); | ||
this.connectivityState = newState; | ||
@@ -348,2 +379,7 @@ const watchersCopy = this.connectivityStateWatchers.slice(); | ||
tryGetConfig(stream, metadata) { | ||
if (stream.getStatus() !== null) { | ||
/* If the stream has a status, it has already finished and we don't need | ||
* to take any more actions on it. */ | ||
return; | ||
} | ||
if (this.configSelector === null) { | ||
@@ -357,3 +393,3 @@ /* This branch will only be taken at the beginning of the channel's life, | ||
callStream: stream, | ||
callMetadata: metadata | ||
callMetadata: metadata, | ||
}); | ||
@@ -365,6 +401,36 @@ this.callRefTimerRef(); | ||
if (callConfig.status === constants_1.Status.OK) { | ||
this.tryPick(stream, metadata, callConfig); | ||
if (callConfig.methodConfig.timeout) { | ||
const deadline = new Date(); | ||
deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds); | ||
deadline.setMilliseconds(deadline.getMilliseconds() + | ||
callConfig.methodConfig.timeout.nanos / 1000000); | ||
stream.setConfigDeadline(deadline); | ||
// Refreshing the filters makes the deadline filter pick up the new deadline | ||
stream.filterStack.refresh(); | ||
} | ||
if (callConfig.dynamicFilterFactories.length > 0) { | ||
/* These dynamicFilters are the mechanism for implementing gRFC A39: | ||
* https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md | ||
* We run them here instead of with the rest of the filters because | ||
* that spec says "the xDS HTTP filters will run in between name | ||
* resolution and load balancing". | ||
* | ||
* We use the filter stack here to simplify the multi-filter async | ||
* waterfall logic, but we pass along the underlying list of filters | ||
* to avoid having nested filter stacks when combining it with the | ||
* original filter stack. We do not pass along the original filter | ||
* factory list because these filters may need to persist data | ||
* between sending headers and other operations. */ | ||
const dynamicFilterStackFactory = new filter_stack_1.FilterStackFactory(callConfig.dynamicFilterFactories); | ||
const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); | ||
dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { | ||
this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); | ||
}); | ||
} | ||
else { | ||
this.tryPick(stream, metadata, callConfig, []); | ||
} | ||
} | ||
else { | ||
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); | ||
stream.cancelWithStatus(callConfig.status, 'Failed to route call to method ' + stream.getMethod()); | ||
} | ||
@@ -378,4 +444,5 @@ } | ||
this.resolvingLoadBalancer.destroy(); | ||
this.updateState(ConnectivityState.SHUTDOWN); | ||
this.updateState(connectivity_state_1.ConnectivityState.SHUTDOWN); | ||
clearInterval(this.callRefTimer); | ||
channelz_1.unregisterChannelzRef(this.channelzRef); | ||
this.subchannelPool.unrefUnusedSubchannels(); | ||
@@ -394,3 +461,3 @@ } | ||
watchConnectivityState(currentState, deadline, callback) { | ||
if (this.connectivityState === ConnectivityState.SHUTDOWN) { | ||
if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) { | ||
throw new Error('Channel has been shut down'); | ||
@@ -414,6 +481,9 @@ } | ||
callback, | ||
timer | ||
timer, | ||
}; | ||
this.connectivityStateWatchers.push(watcherObject); | ||
} | ||
getChannelzRef() { | ||
return this.channelzRef; | ||
} | ||
createCall(method, deadline, host, parentCall, propagateFlags) { | ||
@@ -426,8 +496,7 @@ if (typeof method !== 'string') { | ||
} | ||
if (this.connectivityState === ConnectivityState.SHUTDOWN) { | ||
if (this.connectivityState === connectivity_state_1.ConnectivityState.SHUTDOWN) { | ||
throw new Error('Channel has been shut down'); | ||
} | ||
const callNumber = getNewCallNumber(); | ||
logging_1.trace(constants_1.LogVerbosity.DEBUG, 'channel', uri_parser_1.uriToString(this.target) + | ||
' createCall [' + | ||
this.trace('createCall [' + | ||
callNumber + | ||
@@ -445,2 +514,11 @@ '] method="' + | ||
const stream = new call_stream_1.Http2CallStream(method, this, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), callNumber); | ||
this.callTracker.addCallStarted(); | ||
stream.addStatusWatcher(status => { | ||
if (status.code === constants_1.Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} | ||
else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
return stream; | ||
@@ -447,0 +525,0 @@ } |
@@ -22,2 +22,3 @@ "use strict"; | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const constants_1 = require("./constants"); | ||
@@ -85,3 +86,3 @@ const metadata_1 = require("./metadata"); | ||
} | ||
if (newState === channel_1.ConnectivityState.READY) { | ||
if (newState === connectivity_state_1.ConnectivityState.READY) { | ||
callback(); | ||
@@ -88,0 +89,0 @@ } |
@@ -11,2 +11,5 @@ import { Call, StatusObject } from './call-stream'; | ||
constructor(channel: Channel, callStream: Call); | ||
private retreiveDeadline; | ||
private runTimer; | ||
refresh(): void; | ||
sendMetadata(metadata: Promise<Metadata>): Promise<Metadata>; | ||
@@ -13,0 +16,0 @@ receiveTrailers(status: StatusObject): StatusObject; |
@@ -41,3 +41,2 @@ "use strict"; | ||
constructor(channel, callStream) { | ||
var _a, _b; | ||
super(); | ||
@@ -47,3 +46,8 @@ this.channel = channel; | ||
this.timer = null; | ||
const callDeadline = callStream.getDeadline(); | ||
this.deadline = Infinity; | ||
this.retreiveDeadline(); | ||
this.runTimer(); | ||
} | ||
retreiveDeadline() { | ||
const callDeadline = this.callStream.getDeadline(); | ||
if (callDeadline instanceof Date) { | ||
@@ -55,7 +59,13 @@ this.deadline = callDeadline.getTime(); | ||
} | ||
} | ||
runTimer() { | ||
var _a, _b; | ||
if (this.timer) { | ||
clearTimeout(this.timer); | ||
} | ||
const now = new Date().getTime(); | ||
let timeout = this.deadline - now; | ||
const timeout = this.deadline - now; | ||
if (timeout <= 0) { | ||
process.nextTick(() => { | ||
callStream.cancelWithStatus(constants_1.Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); | ||
this.callStream.cancelWithStatus(constants_1.Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); | ||
}); | ||
@@ -65,3 +75,3 @@ } | ||
this.timer = setTimeout(() => { | ||
callStream.cancelWithStatus(constants_1.Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); | ||
this.callStream.cancelWithStatus(constants_1.Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); | ||
}, timeout); | ||
@@ -71,2 +81,6 @@ (_b = (_a = this.timer).unref) === null || _b === void 0 ? void 0 : _b.call(_a); | ||
} | ||
refresh() { | ||
this.retreiveDeadline(); | ||
this.runTimer(); | ||
} | ||
async sendMetadata(metadata) { | ||
@@ -73,0 +87,0 @@ if (this.deadline === Infinity) { |
export { trace } from './logging'; | ||
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver'; | ||
export { Resolver, ResolverListener, registerResolver, ConfigSelector, } from './resolver'; | ||
export { GrpcUri, uriToString } from './uri-parser'; | ||
export { ServiceConfig } from './service-config'; | ||
export { ServiceConfig, Duration } from './service-config'; | ||
export { BackoffTimeout } from './backoff-timeout'; | ||
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig } from './load-balancer'; | ||
export { SubchannelAddress, subchannelAddressToString } from './subchannel'; | ||
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, createChildChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig, } from './load-balancer'; | ||
export { SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; | ||
export { ChildLoadBalancerHandler } from './load-balancer-child-handler'; | ||
export { Picker, UnavailablePicker, QueuePicker, PickResult, PickArgs, PickResultType } from './picker'; | ||
export { Picker, UnavailablePicker, QueuePicker, PickResult, PickArgs, PickResultType, } from './picker'; | ||
export { Call as CallStream } from './call-stream'; | ||
export { Filter, BaseFilter, FilterFactory } from './filter'; | ||
export { FilterStackFactory } from './filter-stack'; | ||
export { registerAdminService } from './admin'; |
@@ -12,7 +12,8 @@ "use strict"; | ||
var load_balancer_1 = require("./load-balancer"); | ||
Object.defineProperty(exports, "createChildChannelControlHelper", { enumerable: true, get: function () { return load_balancer_1.createChildChannelControlHelper; } }); | ||
Object.defineProperty(exports, "registerLoadBalancerType", { enumerable: true, get: function () { return load_balancer_1.registerLoadBalancerType; } }); | ||
Object.defineProperty(exports, "getFirstUsableConfig", { enumerable: true, get: function () { return load_balancer_1.getFirstUsableConfig; } }); | ||
Object.defineProperty(exports, "validateLoadBalancingConfig", { enumerable: true, get: function () { return load_balancer_1.validateLoadBalancingConfig; } }); | ||
var subchannel_1 = require("./subchannel"); | ||
Object.defineProperty(exports, "subchannelAddressToString", { enumerable: true, get: function () { return subchannel_1.subchannelAddressToString; } }); | ||
var subchannel_address_1 = require("./subchannel-address"); | ||
Object.defineProperty(exports, "subchannelAddressToString", { enumerable: true, get: function () { return subchannel_address_1.subchannelAddressToString; } }); | ||
var load_balancer_child_handler_1 = require("./load-balancer-child-handler"); | ||
@@ -28,2 +29,4 @@ Object.defineProperty(exports, "ChildLoadBalancerHandler", { enumerable: true, get: function () { return load_balancer_child_handler_1.ChildLoadBalancerHandler; } }); | ||
Object.defineProperty(exports, "FilterStackFactory", { enumerable: true, get: function () { return filter_stack_1.FilterStackFactory; } }); | ||
var admin_1 = require("./admin"); | ||
Object.defineProperty(exports, "registerAdminService", { enumerable: true, get: function () { return admin_1.registerAdminService; } }); | ||
//# sourceMappingURL=experimental.js.map |
@@ -13,2 +13,5 @@ /// <reference types="node" /> | ||
receiveTrailers(status: StatusObject): StatusObject; | ||
refresh(): void; | ||
push(filters: Filter[]): void; | ||
getFilters(): Filter[]; | ||
} | ||
@@ -18,3 +21,4 @@ export declare class FilterStackFactory implements FilterFactory<FilterStack> { | ||
constructor(factories: Array<FilterFactory<Filter>>); | ||
push(filterFactories: FilterFactory<Filter>[]): void; | ||
createFilter(callStream: Call): FilterStack; | ||
} |
@@ -59,2 +59,13 @@ "use strict"; | ||
} | ||
refresh() { | ||
for (const filter of this.filters) { | ||
filter.refresh(); | ||
} | ||
} | ||
push(filters) { | ||
this.filters.unshift(...filters); | ||
} | ||
getFilters() { | ||
return this.filters; | ||
} | ||
} | ||
@@ -66,2 +77,5 @@ exports.FilterStack = FilterStack; | ||
} | ||
push(filterFactories) { | ||
this.factories.unshift(...filterFactories); | ||
} | ||
createFilter(callStream) { | ||
@@ -68,0 +82,0 @@ return new FilterStack(this.factories.map((factory) => factory.createFilter(callStream))); |
@@ -14,2 +14,3 @@ /// <reference types="node" /> | ||
receiveTrailers(status: StatusObject): StatusObject; | ||
refresh(): void; | ||
} | ||
@@ -22,2 +23,3 @@ export declare abstract class BaseFilter implements Filter { | ||
receiveTrailers(status: StatusObject): StatusObject; | ||
refresh(): void; | ||
} | ||
@@ -24,0 +26,0 @@ export interface FilterFactory<T extends Filter> { |
@@ -36,4 +36,5 @@ "use strict"; | ||
} | ||
refresh() { } | ||
} | ||
exports.BaseFilter = BaseFilter; | ||
//# sourceMappingURL=filter.js.map |
/// <reference types="node" /> | ||
import { Socket } from 'net'; | ||
import * as tls from 'tls'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { ChannelOptions } from './channel-options'; | ||
@@ -6,0 +6,0 @@ import { GrpcUri } from './uri-parser'; |
@@ -26,3 +26,3 @@ "use strict"; | ||
const logging = require("./logging"); | ||
const subchannel_1 = require("./subchannel"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const uri_parser_1 = require("./uri-parser"); | ||
@@ -87,3 +87,3 @@ const url_1 = require("url"); | ||
const result = { | ||
address: `${hostname}:${port}` | ||
address: `${hostname}:${port}`, | ||
}; | ||
@@ -164,4 +164,7 @@ if (userCred) { | ||
}; | ||
const headers = { | ||
Host: parsedTarget.path, | ||
}; | ||
// Connect to the subchannel address as a proxy | ||
if (subchannel_1.isTcpSubchannelAddress(address)) { | ||
if (subchannel_address_1.isTcpSubchannelAddress(address)) { | ||
options.host = address.host; | ||
@@ -174,8 +177,8 @@ options.port = address.port; | ||
if ('grpc.http_connect_creds' in channelOptions) { | ||
options.headers = { | ||
'Proxy-Authorization': 'Basic ' + | ||
Buffer.from(channelOptions['grpc.http_connect_creds']).toString('base64'), | ||
}; | ||
headers['Proxy-Authorization'] = | ||
'Basic ' + | ||
Buffer.from(channelOptions['grpc.http_connect_creds']).toString('base64'); | ||
} | ||
const proxyAddressString = subchannel_1.subchannelAddressToString(address); | ||
options.headers = headers; | ||
const proxyAddressString = subchannel_address_1.subchannelAddressToString(address); | ||
trace('Using proxy ' + proxyAddressString + ' to connect to ' + options.path); | ||
@@ -182,0 +185,0 @@ return new Promise((resolve, reject) => { |
@@ -5,3 +5,4 @@ /// <reference types="node" /> | ||
import { Deadline, StatusObject } from './call-stream'; | ||
import { Channel, ConnectivityState, ChannelImplementation } from './channel'; | ||
import { Channel, ChannelImplementation } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -15,3 +16,3 @@ import { CallOptions, Client, ClientOptions, CallInvocationTransformer, CallProperties, UnaryCallback } from './client'; | ||
import { StatusBuilder } from './status-builder'; | ||
import { handleBidiStreamingCall, handleServerStreamingCall, handleClientStreamingCall, handleUnaryCall, sendUnaryData, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream } from './server-call'; | ||
import { handleBidiStreamingCall, handleServerStreamingCall, handleClientStreamingCall, handleUnaryCall, sendUnaryData, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream, ServerErrorResponse } from './server-call'; | ||
export { OAuth2Client }; | ||
@@ -54,5 +55,5 @@ /**** Client Credentials ****/ | ||
export declare const waitForClientReady: (client: Client, deadline: Date | number, callback: (error?: Error | undefined) => void) => void; | ||
export { sendUnaryData, ChannelCredentials, CallCredentials, Deadline, Serialize as serialize, Deserialize as deserialize, ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream, CallOptions, MethodDefinition, StatusObject, ServiceError, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream, ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, }; | ||
export { sendUnaryData, ChannelCredentials, CallCredentials, Deadline, Serialize as serialize, Deserialize as deserialize, ClientUnaryCall, ClientReadableStream, ClientWritableStream, ClientDuplexStream, CallOptions, MethodDefinition, StatusObject, ServiceError, ServerUnaryCall, ServerReadableStream, ServerWritableStream, ServerDuplexStream, ServerErrorResponse, ServiceDefinition, UntypedHandleCall, UntypedServiceImplementation, }; | ||
/**** Server ****/ | ||
export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall, handleClientStreamingCall }; | ||
export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall, handleClientStreamingCall, }; | ||
export declare type Call = ClientUnaryCall | ClientReadableStream<any> | ClientWritableStream<any> | ClientDuplexStream<any, any>; | ||
@@ -73,3 +74,5 @@ /**** Unimplemented function stubs ****/ | ||
export { ChannelOptions } from './channel-options'; | ||
export { getChannelzServiceDefinition, getChannelzHandlers } from './channelz'; | ||
export { addAdminServicesToServer } from './admin'; | ||
import * as experimental from './experimental'; | ||
export { experimental }; |
@@ -23,4 +23,5 @@ "use strict"; | ||
const channel_1 = require("./channel"); | ||
Object.defineProperty(exports, "connectivityState", { enumerable: true, get: function () { return channel_1.ConnectivityState; } }); | ||
Object.defineProperty(exports, "Channel", { enumerable: true, get: function () { return channel_1.ChannelImplementation; } }); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
Object.defineProperty(exports, "connectivityState", { enumerable: true, get: function () { return connectivity_state_1.ConnectivityState; } }); | ||
const channel_credentials_1 = require("./channel-credentials"); | ||
@@ -107,12 +108,25 @@ Object.defineProperty(exports, "ChannelCredentials", { enumerable: true, get: function () { return channel_credentials_1.ChannelCredentials; } }); | ||
Object.defineProperty(exports, "InterceptorConfigurationError", { enumerable: true, get: function () { return client_interceptors_1.InterceptorConfigurationError; } }); | ||
var channelz_1 = require("./channelz"); | ||
Object.defineProperty(exports, "getChannelzServiceDefinition", { enumerable: true, get: function () { return channelz_1.getChannelzServiceDefinition; } }); | ||
Object.defineProperty(exports, "getChannelzHandlers", { enumerable: true, get: function () { return channelz_1.getChannelzHandlers; } }); | ||
var admin_1 = require("./admin"); | ||
Object.defineProperty(exports, "addAdminServicesToServer", { enumerable: true, get: function () { return admin_1.addAdminServicesToServer; } }); | ||
const experimental = require("./experimental"); | ||
exports.experimental = experimental; | ||
const resolver = require("./resolver"); | ||
const load_balancer = require("./load-balancer"); | ||
const resolver_dns = require("./resolver-dns"); | ||
const resolver_uds = require("./resolver-uds"); | ||
const resolver_ip = require("./resolver-ip"); | ||
const load_balancer_pick_first = require("./load-balancer-pick-first"); | ||
const load_balancer_round_robin = require("./load-balancer-round-robin"); | ||
const channelz = require("./channelz"); | ||
const clientVersion = require('../../package.json').version; | ||
(() => { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'index', 'Loading @grpc/grpc-js version ' + clientVersion); | ||
resolver.registerAll(); | ||
load_balancer.registerAll(); | ||
resolver_dns.setup(); | ||
resolver_uds.setup(); | ||
resolver_ip.setup(); | ||
load_balancer_pick_first.setup(); | ||
load_balancer_round_robin.setup(); | ||
channelz.setup(); | ||
})(); | ||
//# sourceMappingURL=index.js.map |
import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
export declare class ChildLoadBalancerHandler implements LoadBalancer { | ||
@@ -4,0 +4,0 @@ private readonly channelControlHelper; |
@@ -21,3 +21,3 @@ "use strict"; | ||
const load_balancer_1 = require("./load-balancer"); | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const TYPE_NAME = 'child_load_balancer_helper'; | ||
@@ -40,3 +40,3 @@ class ChildLoadBalancerHandler { | ||
if (this.calledByPendingChild()) { | ||
if (connectivityState !== channel_1.ConnectivityState.READY) { | ||
if (connectivityState !== connectivity_state_1.ConnectivityState.READY) { | ||
return; | ||
@@ -63,2 +63,8 @@ } | ||
} | ||
addChannelzChild(child) { | ||
this.parent.channelControlHelper.addChannelzChild(child); | ||
} | ||
removeChannelzChild(child) { | ||
this.parent.channelControlHelper.removeChannelzChild(child); | ||
} | ||
calledByPendingChild() { | ||
@@ -65,0 +71,0 @@ return this.child === this.parent.pendingChild; |
import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
export declare class PickFirstLoadBalancingConfig implements LoadBalancingConfig { | ||
@@ -4,0 +4,0 @@ getLoadBalancerName(): string; |
@@ -21,5 +21,5 @@ "use strict"; | ||
const load_balancer_1 = require("./load-balancer"); | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const picker_1 = require("./picker"); | ||
const subchannel_1 = require("./subchannel"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const logging = require("./logging"); | ||
@@ -44,5 +44,6 @@ const constants_1 = require("./constants"); | ||
return { | ||
[TYPE_NAME]: {} | ||
[TYPE_NAME]: {}, | ||
}; | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
static createFromJson(obj) { | ||
@@ -66,3 +67,3 @@ return new PickFirstLoadBalancingConfig(); | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -94,3 +95,3 @@ }; | ||
*/ | ||
this.currentState = channel_1.ConnectivityState.IDLE; | ||
this.currentState = connectivity_state_1.ConnectivityState.IDLE; | ||
/** | ||
@@ -109,7 +110,7 @@ * The index within the `subchannels` array of the subchannel with the most | ||
this.subchannelStateCounts = { | ||
[channel_1.ConnectivityState.CONNECTING]: 0, | ||
[channel_1.ConnectivityState.IDLE]: 0, | ||
[channel_1.ConnectivityState.READY]: 0, | ||
[channel_1.ConnectivityState.SHUTDOWN]: 0, | ||
[channel_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
[connectivity_state_1.ConnectivityState.CONNECTING]: 0, | ||
[connectivity_state_1.ConnectivityState.IDLE]: 0, | ||
[connectivity_state_1.ConnectivityState.READY]: 0, | ||
[connectivity_state_1.ConnectivityState.SHUTDOWN]: 0, | ||
[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
}; | ||
@@ -124,6 +125,6 @@ this.subchannelStateListener = (subchannel, previousState, newState) => { | ||
if (subchannel === this.subchannels[this.currentSubchannelIndex] && | ||
newState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
newState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
this.startNextSubchannelConnecting(); | ||
} | ||
if (newState === channel_1.ConnectivityState.READY) { | ||
if (newState === connectivity_state_1.ConnectivityState.READY) { | ||
this.pickSubchannel(subchannel); | ||
@@ -134,3 +135,3 @@ return; | ||
if (this.triedAllSubchannels && | ||
this.subchannelStateCounts[channel_1.ConnectivityState.IDLE] === | ||
this.subchannelStateCounts[connectivity_state_1.ConnectivityState.IDLE] === | ||
this.subchannels.length) { | ||
@@ -141,3 +142,3 @@ /* If all of the subchannels are IDLE we should go back to a | ||
this.resetSubchannelList(); | ||
this.updateState(channel_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
return; | ||
@@ -148,14 +149,14 @@ } | ||
let newLBState; | ||
if (this.subchannelStateCounts[channel_1.ConnectivityState.CONNECTING] > 0) { | ||
newLBState = channel_1.ConnectivityState.CONNECTING; | ||
if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.CONNECTING] > 0) { | ||
newLBState = connectivity_state_1.ConnectivityState.CONNECTING; | ||
} | ||
else if (this.subchannelStateCounts[channel_1.ConnectivityState.TRANSIENT_FAILURE] > | ||
else if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE] > | ||
0) { | ||
newLBState = channel_1.ConnectivityState.TRANSIENT_FAILURE; | ||
newLBState = connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE; | ||
} | ||
else { | ||
newLBState = channel_1.ConnectivityState.IDLE; | ||
newLBState = connectivity_state_1.ConnectivityState.IDLE; | ||
} | ||
if (newLBState !== this.currentState) { | ||
if (newLBState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
if (newLBState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
this.updateState(newLBState, new picker_1.UnavailablePicker()); | ||
@@ -169,3 +170,3 @@ } | ||
else { | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -176,20 +177,21 @@ } | ||
this.pickedSubchannelStateListener = (subchannel, previousState, newState) => { | ||
if (newState !== channel_1.ConnectivityState.READY) { | ||
if (newState !== connectivity_state_1.ConnectivityState.READY) { | ||
this.currentPick = null; | ||
subchannel.unref(); | ||
subchannel.removeConnectivityStateListener(this.pickedSubchannelStateListener); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
if (this.subchannels.length > 0) { | ||
if (this.triedAllSubchannels) { | ||
let newLBState; | ||
if (this.subchannelStateCounts[channel_1.ConnectivityState.CONNECTING] > 0) { | ||
newLBState = channel_1.ConnectivityState.CONNECTING; | ||
if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.CONNECTING] > 0) { | ||
newLBState = connectivity_state_1.ConnectivityState.CONNECTING; | ||
} | ||
else if (this.subchannelStateCounts[channel_1.ConnectivityState.TRANSIENT_FAILURE] > | ||
else if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE] > | ||
0) { | ||
newLBState = channel_1.ConnectivityState.TRANSIENT_FAILURE; | ||
newLBState = connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE; | ||
} | ||
else { | ||
newLBState = channel_1.ConnectivityState.IDLE; | ||
newLBState = connectivity_state_1.ConnectivityState.IDLE; | ||
} | ||
if (newLBState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
if (newLBState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
this.updateState(newLBState, new picker_1.UnavailablePicker()); | ||
@@ -202,3 +204,3 @@ } | ||
else { | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -211,3 +213,3 @@ } | ||
*/ | ||
this.updateState(channel_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -226,4 +228,4 @@ } | ||
const subchannelState = subchannel.getConnectivityState(); | ||
if (subchannelState === channel_1.ConnectivityState.IDLE || | ||
subchannelState === channel_1.ConnectivityState.CONNECTING) { | ||
if (subchannelState === connectivity_state_1.ConnectivityState.IDLE || | ||
subchannelState === connectivity_state_1.ConnectivityState.CONNECTING) { | ||
this.startConnecting(index); | ||
@@ -244,3 +246,3 @@ return; | ||
if (this.subchannels[subchannelIndex].getConnectivityState() === | ||
channel_1.ConnectivityState.IDLE) { | ||
connectivity_state_1.ConnectivityState.IDLE) { | ||
trace('Start connecting to subchannel with address ' + | ||
@@ -263,5 +265,6 @@ this.subchannels[subchannelIndex].getAddress()); | ||
this.currentPick = subchannel; | ||
this.updateState(channel_1.ConnectivityState.READY, new PickFirstPicker(subchannel)); | ||
this.updateState(connectivity_state_1.ConnectivityState.READY, new PickFirstPicker(subchannel)); | ||
subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener); | ||
subchannel.ref(); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
this.resetSubchannelList(); | ||
@@ -271,5 +274,5 @@ clearTimeout(this.connectionDelayTimeout); | ||
updateState(newState, picker) { | ||
trace(channel_1.ConnectivityState[this.currentState] + | ||
trace(connectivity_state_1.ConnectivityState[this.currentState] + | ||
' -> ' + | ||
channel_1.ConnectivityState[newState]); | ||
connectivity_state_1.ConnectivityState[newState]); | ||
this.currentState = newState; | ||
@@ -282,10 +285,11 @@ this.channelControlHelper.updateState(newState, picker); | ||
subchannel.unref(); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
this.currentSubchannelIndex = 0; | ||
this.subchannelStateCounts = { | ||
[channel_1.ConnectivityState.CONNECTING]: 0, | ||
[channel_1.ConnectivityState.IDLE]: 0, | ||
[channel_1.ConnectivityState.READY]: 0, | ||
[channel_1.ConnectivityState.SHUTDOWN]: 0, | ||
[channel_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
[connectivity_state_1.ConnectivityState.CONNECTING]: 0, | ||
[connectivity_state_1.ConnectivityState.IDLE]: 0, | ||
[connectivity_state_1.ConnectivityState.READY]: 0, | ||
[connectivity_state_1.ConnectivityState.SHUTDOWN]: 0, | ||
[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
}; | ||
@@ -302,6 +306,7 @@ this.subchannels = []; | ||
trace('Connect to address list ' + | ||
this.latestAddressList.map((address) => subchannel_1.subchannelAddressToString(address))); | ||
this.latestAddressList.map((address) => subchannel_address_1.subchannelAddressToString(address))); | ||
this.subchannels = this.latestAddressList.map((address) => this.channelControlHelper.createSubchannel(address, {})); | ||
for (const subchannel of this.subchannels) { | ||
subchannel.ref(); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
@@ -311,3 +316,3 @@ for (const subchannel of this.subchannels) { | ||
this.subchannelStateCounts[subchannel.getConnectivityState()] += 1; | ||
if (subchannel.getConnectivityState() === channel_1.ConnectivityState.READY) { | ||
if (subchannel.getConnectivityState() === connectivity_state_1.ConnectivityState.READY) { | ||
this.pickSubchannel(subchannel); | ||
@@ -320,7 +325,7 @@ this.resetSubchannelList(); | ||
const subchannelState = subchannel.getConnectivityState(); | ||
if (subchannelState === channel_1.ConnectivityState.IDLE || | ||
subchannelState === channel_1.ConnectivityState.CONNECTING) { | ||
if (subchannelState === connectivity_state_1.ConnectivityState.IDLE || | ||
subchannelState === connectivity_state_1.ConnectivityState.CONNECTING) { | ||
this.startConnecting(index); | ||
if (this.currentPick === null) { | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -332,3 +337,3 @@ return; | ||
if (this.currentPick === null) { | ||
this.updateState(channel_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker()); | ||
this.updateState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker()); | ||
} | ||
@@ -351,3 +356,3 @@ } | ||
} | ||
if (this.currentState === channel_1.ConnectivityState.IDLE) { | ||
if (this.currentState === connectivity_state_1.ConnectivityState.IDLE) { | ||
if (this.latestAddressList.length > 0) { | ||
@@ -357,3 +362,3 @@ this.connectToAddressList(); | ||
} | ||
if (this.currentState === channel_1.ConnectivityState.IDLE || | ||
if (this.currentState === connectivity_state_1.ConnectivityState.IDLE || | ||
this.triedAllSubchannels) { | ||
@@ -372,2 +377,3 @@ this.channelControlHelper.requestReresolution(); | ||
this.currentPick.removeConnectivityStateListener(this.pickedSubchannelStateListener); | ||
this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef()); | ||
} | ||
@@ -382,4 +388,5 @@ } | ||
load_balancer_1.registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer, PickFirstLoadBalancingConfig); | ||
load_balancer_1.registerDefaultLoadBalancerType(TYPE_NAME); | ||
} | ||
exports.setup = setup; | ||
//# sourceMappingURL=load-balancer-pick-first.js.map |
import { LoadBalancer, ChannelControlHelper, LoadBalancingConfig } from './load-balancer'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
export declare class RoundRobinLoadBalancer implements LoadBalancer { | ||
@@ -4,0 +4,0 @@ private readonly channelControlHelper; |
@@ -21,5 +21,5 @@ "use strict"; | ||
const load_balancer_1 = require("./load-balancer"); | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const picker_1 = require("./picker"); | ||
const subchannel_1 = require("./subchannel"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const logging = require("./logging"); | ||
@@ -39,5 +39,6 @@ const constants_1 = require("./constants"); | ||
return { | ||
[TYPE_NAME]: {} | ||
[TYPE_NAME]: {}, | ||
}; | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
static createFromJson(obj) { | ||
@@ -59,3 +60,3 @@ return new RoundRobinLoadBalancingConfig(); | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -77,10 +78,10 @@ }; | ||
this.subchannels = []; | ||
this.currentState = channel_1.ConnectivityState.IDLE; | ||
this.currentState = connectivity_state_1.ConnectivityState.IDLE; | ||
this.currentReadyPicker = null; | ||
this.subchannelStateCounts = { | ||
[channel_1.ConnectivityState.CONNECTING]: 0, | ||
[channel_1.ConnectivityState.IDLE]: 0, | ||
[channel_1.ConnectivityState.READY]: 0, | ||
[channel_1.ConnectivityState.SHUTDOWN]: 0, | ||
[channel_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
[connectivity_state_1.ConnectivityState.CONNECTING]: 0, | ||
[connectivity_state_1.ConnectivityState.IDLE]: 0, | ||
[connectivity_state_1.ConnectivityState.READY]: 0, | ||
[connectivity_state_1.ConnectivityState.SHUTDOWN]: 0, | ||
[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
}; | ||
@@ -91,4 +92,4 @@ this.subchannelStateListener = (subchannel, previousState, newState) => { | ||
this.calculateAndUpdateState(); | ||
if (newState === channel_1.ConnectivityState.TRANSIENT_FAILURE || | ||
newState === channel_1.ConnectivityState.IDLE) { | ||
if (newState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE || | ||
newState === connectivity_state_1.ConnectivityState.IDLE) { | ||
this.channelControlHelper.requestReresolution(); | ||
@@ -100,4 +101,4 @@ subchannel.startConnecting(); | ||
calculateAndUpdateState() { | ||
if (this.subchannelStateCounts[channel_1.ConnectivityState.READY] > 0) { | ||
const readySubchannels = this.subchannels.filter((subchannel) => subchannel.getConnectivityState() === channel_1.ConnectivityState.READY); | ||
if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.READY] > 0) { | ||
const readySubchannels = this.subchannels.filter((subchannel) => subchannel.getConnectivityState() === connectivity_state_1.ConnectivityState.READY); | ||
let index = 0; | ||
@@ -110,19 +111,19 @@ if (this.currentReadyPicker !== null) { | ||
} | ||
this.updateState(channel_1.ConnectivityState.READY, new RoundRobinPicker(readySubchannels, index)); | ||
this.updateState(connectivity_state_1.ConnectivityState.READY, new RoundRobinPicker(readySubchannels, index)); | ||
} | ||
else if (this.subchannelStateCounts[channel_1.ConnectivityState.CONNECTING] > 0) { | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
else if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.CONNECTING] > 0) { | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
else if (this.subchannelStateCounts[channel_1.ConnectivityState.TRANSIENT_FAILURE] > 0) { | ||
this.updateState(channel_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker()); | ||
else if (this.subchannelStateCounts[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE] > 0) { | ||
this.updateState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker()); | ||
} | ||
else { | ||
this.updateState(channel_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
} | ||
} | ||
updateState(newState, picker) { | ||
trace(channel_1.ConnectivityState[this.currentState] + | ||
trace(connectivity_state_1.ConnectivityState[this.currentState] + | ||
' -> ' + | ||
channel_1.ConnectivityState[newState]); | ||
if (newState === channel_1.ConnectivityState.READY) { | ||
connectivity_state_1.ConnectivityState[newState]); | ||
if (newState === connectivity_state_1.ConnectivityState.READY) { | ||
this.currentReadyPicker = picker; | ||
@@ -140,9 +141,10 @@ } | ||
subchannel.unref(); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
this.subchannelStateCounts = { | ||
[channel_1.ConnectivityState.CONNECTING]: 0, | ||
[channel_1.ConnectivityState.IDLE]: 0, | ||
[channel_1.ConnectivityState.READY]: 0, | ||
[channel_1.ConnectivityState.SHUTDOWN]: 0, | ||
[channel_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
[connectivity_state_1.ConnectivityState.CONNECTING]: 0, | ||
[connectivity_state_1.ConnectivityState.IDLE]: 0, | ||
[connectivity_state_1.ConnectivityState.READY]: 0, | ||
[connectivity_state_1.ConnectivityState.SHUTDOWN]: 0, | ||
[connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE]: 0, | ||
}; | ||
@@ -154,3 +156,3 @@ this.subchannels = []; | ||
trace('Connect to address list ' + | ||
addressList.map((address) => subchannel_1.subchannelAddressToString(address))); | ||
addressList.map((address) => subchannel_address_1.subchannelAddressToString(address))); | ||
this.subchannels = addressList.map((address) => this.channelControlHelper.createSubchannel(address, {})); | ||
@@ -160,6 +162,7 @@ for (const subchannel of this.subchannels) { | ||
subchannel.addConnectivityStateListener(this.subchannelStateListener); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
const subchannelState = subchannel.getConnectivityState(); | ||
this.subchannelStateCounts[subchannelState] += 1; | ||
if (subchannelState === channel_1.ConnectivityState.IDLE || | ||
subchannelState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
if (subchannelState === connectivity_state_1.ConnectivityState.IDLE || | ||
subchannelState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
subchannel.startConnecting(); | ||
@@ -166,0 +169,0 @@ } |
import { ChannelOptions } from './channel-options'; | ||
import { Subchannel, SubchannelAddress } from './subchannel'; | ||
import { ConnectivityState } from './channel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { Picker } from './picker'; | ||
import { ChannelRef, SubchannelRef } from './channelz'; | ||
/** | ||
@@ -28,4 +30,15 @@ * A collection of functions associated with a channel that a load balancer | ||
requestReresolution(): void; | ||
addChannelzChild(child: ChannelRef | SubchannelRef): void; | ||
removeChannelzChild(child: ChannelRef | SubchannelRef): void; | ||
} | ||
/** | ||
* Create a child ChannelControlHelper that overrides some methods of the | ||
* parent while letting others pass through to the parent unmodified. This | ||
* allows other code to create these children without needing to know about | ||
* all of the methods to be passed through. | ||
* @param parent | ||
* @param overrides | ||
*/ | ||
export declare function createChildChannelControlHelper(parent: ChannelControlHelper, overrides: Partial<ChannelControlHelper>): ChannelControlHelper; | ||
/** | ||
* Tracks one or more connected subchannels and determines which subchannel | ||
@@ -81,6 +94,6 @@ * each request should use. | ||
export declare function registerLoadBalancerType(typeName: string, loadBalancerType: LoadBalancerConstructor, loadBalancingConfigType: LoadBalancingConfigConstructor): void; | ||
export declare function registerDefaultLoadBalancerType(typeName: string): void; | ||
export declare function createLoadBalancer(config: LoadBalancingConfig, channelControlHelper: ChannelControlHelper): LoadBalancer | null; | ||
export declare function isLoadBalancerNameRegistered(typeName: string): boolean; | ||
export declare function getFirstUsableConfig(configs: LoadBalancingConfig[], defaultPickFirst?: true): LoadBalancingConfig; | ||
export declare function getFirstUsableConfig(configs: LoadBalancingConfig[], fallbackTodefault?: true): LoadBalancingConfig; | ||
export declare function validateLoadBalancingConfig(obj: any): LoadBalancingConfig; | ||
export declare function registerAll(): void; |
@@ -19,13 +19,35 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.registerAll = exports.validateLoadBalancingConfig = exports.getFirstUsableConfig = exports.isLoadBalancerNameRegistered = exports.createLoadBalancer = exports.registerLoadBalancerType = void 0; | ||
const load_balancer_pick_first = require("./load-balancer-pick-first"); | ||
const load_balancer_round_robin = require("./load-balancer-round-robin"); | ||
exports.validateLoadBalancingConfig = exports.getFirstUsableConfig = exports.isLoadBalancerNameRegistered = exports.createLoadBalancer = exports.registerDefaultLoadBalancerType = exports.registerLoadBalancerType = exports.createChildChannelControlHelper = void 0; | ||
/** | ||
* Create a child ChannelControlHelper that overrides some methods of the | ||
* parent while letting others pass through to the parent unmodified. This | ||
* allows other code to create these children without needing to know about | ||
* all of the methods to be passed through. | ||
* @param parent | ||
* @param overrides | ||
*/ | ||
function createChildChannelControlHelper(parent, overrides) { | ||
var _a, _b, _c, _d, _e, _f, _g, _h, _j, _k; | ||
return { | ||
createSubchannel: (_b = (_a = overrides.createSubchannel) === null || _a === void 0 ? void 0 : _a.bind(overrides)) !== null && _b !== void 0 ? _b : parent.createSubchannel.bind(parent), | ||
updateState: (_d = (_c = overrides.updateState) === null || _c === void 0 ? void 0 : _c.bind(overrides)) !== null && _d !== void 0 ? _d : parent.updateState.bind(parent), | ||
requestReresolution: (_f = (_e = overrides.requestReresolution) === null || _e === void 0 ? void 0 : _e.bind(overrides)) !== null && _f !== void 0 ? _f : parent.requestReresolution.bind(parent), | ||
addChannelzChild: (_h = (_g = overrides.addChannelzChild) === null || _g === void 0 ? void 0 : _g.bind(overrides)) !== null && _h !== void 0 ? _h : parent.addChannelzChild.bind(parent), | ||
removeChannelzChild: (_k = (_j = overrides.removeChannelzChild) === null || _j === void 0 ? void 0 : _j.bind(overrides)) !== null && _k !== void 0 ? _k : parent.removeChannelzChild.bind(parent) | ||
}; | ||
} | ||
exports.createChildChannelControlHelper = createChildChannelControlHelper; | ||
const registeredLoadBalancerTypes = {}; | ||
let defaultLoadBalancerType = null; | ||
function registerLoadBalancerType(typeName, loadBalancerType, loadBalancingConfigType) { | ||
registeredLoadBalancerTypes[typeName] = { | ||
LoadBalancer: loadBalancerType, | ||
LoadBalancingConfig: loadBalancingConfigType | ||
LoadBalancingConfig: loadBalancingConfigType, | ||
}; | ||
} | ||
exports.registerLoadBalancerType = registerLoadBalancerType; | ||
function registerDefaultLoadBalancerType(typeName) { | ||
defaultLoadBalancerType = typeName; | ||
} | ||
exports.registerDefaultLoadBalancerType = registerDefaultLoadBalancerType; | ||
function createLoadBalancer(config, channelControlHelper) { | ||
@@ -45,3 +67,3 @@ const typeName = config.getLoadBalancerName(); | ||
exports.isLoadBalancerNameRegistered = isLoadBalancerNameRegistered; | ||
function getFirstUsableConfig(configs, defaultPickFirst = false) { | ||
function getFirstUsableConfig(configs, fallbackTodefault = false) { | ||
for (const config of configs) { | ||
@@ -52,4 +74,9 @@ if (config.getLoadBalancerName() in registeredLoadBalancerTypes) { | ||
} | ||
if (defaultPickFirst) { | ||
return new load_balancer_pick_first.PickFirstLoadBalancingConfig(); | ||
if (fallbackTodefault) { | ||
if (defaultLoadBalancerType) { | ||
return new registeredLoadBalancerTypes[defaultLoadBalancerType].LoadBalancingConfig(); | ||
} | ||
else { | ||
return null; | ||
} | ||
} | ||
@@ -61,4 +88,5 @@ else { | ||
exports.getFirstUsableConfig = getFirstUsableConfig; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
function validateLoadBalancingConfig(obj) { | ||
if (!(obj !== null && (typeof obj === 'object'))) { | ||
if (!(obj !== null && typeof obj === 'object')) { | ||
throw new Error('Load balancing config must be an object'); | ||
@@ -79,7 +107,2 @@ } | ||
exports.validateLoadBalancingConfig = validateLoadBalancingConfig; | ||
function registerAll() { | ||
load_balancer_pick_first.setup(); | ||
load_balancer_round_robin.setup(); | ||
} | ||
exports.registerAll = registerAll; | ||
//# sourceMappingURL=load-balancer.js.map |
@@ -22,3 +22,14 @@ "use strict"; | ||
const constants_1 = require("./constants"); | ||
let _logger = console; | ||
const DEFAULT_LOGGER = { | ||
error: (message, ...optionalParams) => { | ||
console.error('E ' + message, ...optionalParams); | ||
}, | ||
info: (message, ...optionalParams) => { | ||
console.error('I ' + message, ...optionalParams); | ||
}, | ||
debug: (message, ...optionalParams) => { | ||
console.error('D ' + message, ...optionalParams); | ||
}, | ||
}; | ||
let _logger = DEFAULT_LOGGER; | ||
let _logVerbosity = constants_1.LogVerbosity.ERROR; | ||
@@ -53,4 +64,23 @@ const verbosityString = (_b = (_a = process.env.GRPC_NODE_VERBOSITY) !== null && _a !== void 0 ? _a : process.env.GRPC_VERBOSITY) !== null && _b !== void 0 ? _b : ''; | ||
exports.log = (severity, ...args) => { | ||
if (severity >= _logVerbosity && typeof _logger.error === 'function') { | ||
_logger.error(...args); | ||
let logFunction; | ||
if (severity >= _logVerbosity) { | ||
switch (severity) { | ||
case constants_1.LogVerbosity.DEBUG: | ||
logFunction = _logger.debug; | ||
break; | ||
case constants_1.LogVerbosity.INFO: | ||
logFunction = _logger.info; | ||
break; | ||
case constants_1.LogVerbosity.ERROR: | ||
logFunction = _logger.error; | ||
break; | ||
} | ||
/* Fall back to _logger.error when other methods are not available for | ||
* compatiblity with older behavior that always logged to _logger.error */ | ||
if (!logFunction) { | ||
logFunction = _logger.error; | ||
} | ||
if (logFunction) { | ||
logFunction.bind(_logger)(...args); | ||
} | ||
} | ||
@@ -71,3 +101,4 @@ }; | ||
function trace(severity, tracer, text) { | ||
if (!disabledTracers.has(tracer) && (allEnabled || enabledTracers.has(tracer))) { | ||
if (!disabledTracers.has(tracer) && | ||
(allEnabled || enabledTracers.has(tracer))) { | ||
exports.log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text); | ||
@@ -74,0 +105,0 @@ } |
/// <reference types="node" /> | ||
import { BaseFilter, Filter, FilterFactory } from "./filter"; | ||
import { Call, WriteObject } from "./call-stream"; | ||
import { ChannelOptions } from "./channel-options"; | ||
import { BaseFilter, Filter, FilterFactory } from './filter'; | ||
import { Call, WriteObject } from './call-stream'; | ||
import { ChannelOptions } from './channel-options'; | ||
export declare class MaxMessageSizeFilter extends BaseFilter implements Filter { | ||
@@ -6,0 +6,0 @@ private readonly options; |
@@ -74,2 +74,9 @@ /// <reference types="node" /> | ||
/** | ||
* This modifies the behavior of JSON.stringify to show an object | ||
* representation of the metadata map. | ||
*/ | ||
toJSON(): { | ||
[key: string]: MetadataValue[]; | ||
}; | ||
/** | ||
* Returns a new Metadata object based fields in a given IncomingHttpHeaders | ||
@@ -76,0 +83,0 @@ * object. |
@@ -200,2 +200,13 @@ "use strict"; | ||
/** | ||
* This modifies the behavior of JSON.stringify to show an object | ||
* representation of the metadata map. | ||
*/ | ||
toJSON() { | ||
const result = {}; | ||
for (const [key, values] of this.internalRepr.entries()) { | ||
result[key] = values; | ||
} | ||
return result; | ||
} | ||
/** | ||
* Returns a new Metadata object based fields in a given IncomingHttpHeaders | ||
@@ -202,0 +213,0 @@ * object. |
@@ -30,3 +30,3 @@ import { Subchannel } from './subchannel'; | ||
*/ | ||
extraFilterFactory: FilterFactory<Filter> | null; | ||
extraFilterFactories: FilterFactory<Filter>[]; | ||
onCallStarted: (() => void) | null; | ||
@@ -38,3 +38,3 @@ } | ||
status: null; | ||
extraFilterFactory: FilterFactory<Filter> | null; | ||
extraFilterFactories: FilterFactory<Filter>[]; | ||
onCallStarted: (() => void) | null; | ||
@@ -46,3 +46,3 @@ } | ||
status: null; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -54,3 +54,3 @@ } | ||
status: StatusObject; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -62,3 +62,3 @@ } | ||
status: StatusObject; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -65,0 +65,0 @@ } |
@@ -51,3 +51,3 @@ "use strict"; | ||
status: this.status, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -82,3 +82,3 @@ }; | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -85,0 +85,0 @@ }; |
@@ -48,3 +48,3 @@ "use strict"; | ||
details: `Unrecognized scheme ${target.scheme} in IP resolver`, | ||
metadata: new metadata_1.Metadata() | ||
metadata: new metadata_1.Metadata(), | ||
}; | ||
@@ -60,11 +60,12 @@ return; | ||
details: `Failed to parse ${target.scheme} address ${path}`, | ||
metadata: new metadata_1.Metadata() | ||
metadata: new metadata_1.Metadata(), | ||
}; | ||
return; | ||
} | ||
if ((target.scheme === IPV4_SCHEME && !net_1.isIPv4(hostPort.host)) || (target.scheme === IPV6_SCHEME && !net_1.isIPv6(hostPort.host))) { | ||
if ((target.scheme === IPV4_SCHEME && !net_1.isIPv4(hostPort.host)) || | ||
(target.scheme === IPV6_SCHEME && !net_1.isIPv6(hostPort.host))) { | ||
this.error = { | ||
code: constants_1.Status.UNAVAILABLE, | ||
details: `Failed to parse ${target.scheme} address ${path}`, | ||
metadata: new metadata_1.Metadata() | ||
metadata: new metadata_1.Metadata(), | ||
}; | ||
@@ -75,3 +76,3 @@ return; | ||
host: hostPort.host, | ||
port: (_a = hostPort.port) !== null && _a !== void 0 ? _a : DEFAULT_PORT | ||
port: (_a = hostPort.port) !== null && _a !== void 0 ? _a : DEFAULT_PORT, | ||
}); | ||
@@ -78,0 +79,0 @@ } |
import { MethodConfig, ServiceConfig } from './service-config'; | ||
import { StatusObject } from './call-stream'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri } from './uri-parser'; | ||
@@ -8,2 +8,3 @@ import { ChannelOptions } from './channel-options'; | ||
import { Status } from './constants'; | ||
import { Filter, FilterFactory } from './filter'; | ||
export interface CallConfig { | ||
@@ -16,2 +17,3 @@ methodConfig: MethodConfig; | ||
status: Status; | ||
dynamicFilterFactories: FilterFactory<Filter>[]; | ||
} | ||
@@ -103,2 +105,1 @@ /** | ||
export declare function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null; | ||
export declare function registerAll(): void; |
@@ -19,6 +19,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.registerAll = exports.mapUriDefaultScheme = exports.getDefaultAuthority = exports.createResolver = exports.registerDefaultScheme = exports.registerResolver = void 0; | ||
const resolver_dns = require("./resolver-dns"); | ||
const resolver_uds = require("./resolver-uds"); | ||
const resolver_ip = require("./resolver-ip"); | ||
exports.mapUriDefaultScheme = exports.getDefaultAuthority = exports.createResolver = exports.registerDefaultScheme = exports.registerResolver = void 0; | ||
const uri_parser_1 = require("./uri-parser"); | ||
@@ -92,8 +89,2 @@ const registeredResolvers = {}; | ||
exports.mapUriDefaultScheme = mapUriDefaultScheme; | ||
function registerAll() { | ||
resolver_dns.setup(); | ||
resolver_uds.setup(); | ||
resolver_ip.setup(); | ||
} | ||
exports.registerAll = registerAll; | ||
//# sourceMappingURL=resolver.js.map |
import { ChannelControlHelper, LoadBalancer, LoadBalancingConfig } from './load-balancer'; | ||
import { ConfigSelector } from './resolver'; | ||
import { StatusObject } from './call-stream'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri } from './uri-parser'; | ||
@@ -6,0 +6,0 @@ import { ChannelOptions } from './channel-options'; |
@@ -22,3 +22,3 @@ "use strict"; | ||
const service_config_1 = require("./service-config"); | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const resolver_1 = require("./resolver"); | ||
@@ -41,3 +41,3 @@ const picker_1 = require("./picker"); | ||
var _a, _b; | ||
const splitName = methodName.split('/').filter(x => x.length > 0); | ||
const splitName = methodName.split('/').filter((x) => x.length > 0); | ||
const service = (_a = splitName[0]) !== null && _a !== void 0 ? _a : ''; | ||
@@ -48,7 +48,9 @@ const method = (_b = splitName[1]) !== null && _b !== void 0 ? _b : ''; | ||
for (const name of methodConfig.name) { | ||
if (name.service === service && (name.method === undefined || name.method === method)) { | ||
if (name.service === service && | ||
(name.method === undefined || name.method === method)) { | ||
return { | ||
methodConfig: methodConfig, | ||
pickInformation: {}, | ||
status: constants_1.Status.OK | ||
status: constants_1.Status.OK, | ||
dynamicFilterFactories: [] | ||
}; | ||
@@ -62,3 +64,4 @@ } | ||
pickInformation: {}, | ||
status: constants_1.Status.OK | ||
status: constants_1.Status.OK, | ||
dynamicFilterFactories: [] | ||
}; | ||
@@ -86,3 +89,3 @@ }; | ||
this.onFailedResolution = onFailedResolution; | ||
this.latestChildState = channel_1.ConnectivityState.IDLE; | ||
this.latestChildState = connectivity_state_1.ConnectivityState.IDLE; | ||
this.latestChildPicker = new picker_1.QueuePicker(this); | ||
@@ -92,3 +95,3 @@ /** | ||
*/ | ||
this.currentState = channel_1.ConnectivityState.IDLE; | ||
this.currentState = connectivity_state_1.ConnectivityState.IDLE; | ||
/** | ||
@@ -114,3 +117,3 @@ * The service config object from the last successful resolution, if | ||
} | ||
this.updateState(channel_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.IDLE, new picker_1.QueuePicker(this)); | ||
this.childLoadBalancer = new load_balancer_child_handler_1.ChildLoadBalancerHandler({ | ||
@@ -135,2 +138,4 @@ createSubchannel: channelControlHelper.createSubchannel.bind(channelControlHelper), | ||
}, | ||
addChannelzChild: channelControlHelper.addChannelzChild.bind(channelControlHelper), | ||
removeChannelzChild: channelControlHelper.removeChannelzChild.bind(channelControlHelper) | ||
}); | ||
@@ -201,4 +206,4 @@ this.innerResolver = resolver_1.createResolver(target, { | ||
this.innerResolver.updateResolution(); | ||
if (this.currentState === channel_1.ConnectivityState.IDLE) { | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
if (this.currentState === connectivity_state_1.ConnectivityState.IDLE) { | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -209,7 +214,7 @@ } | ||
' ' + | ||
channel_1.ConnectivityState[this.currentState] + | ||
connectivity_state_1.ConnectivityState[this.currentState] + | ||
' -> ' + | ||
channel_1.ConnectivityState[connectivityState]); | ||
connectivity_state_1.ConnectivityState[connectivityState]); | ||
// Ensure that this.exitIdle() is called by the picker | ||
if (connectivityState === channel_1.ConnectivityState.IDLE) { | ||
if (connectivityState === connectivity_state_1.ConnectivityState.IDLE) { | ||
picker = new picker_1.QueuePicker(this); | ||
@@ -221,4 +226,4 @@ } | ||
handleResolutionFailure(error) { | ||
if (this.latestChildState === channel_1.ConnectivityState.IDLE) { | ||
this.updateState(channel_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker(error)); | ||
if (this.latestChildState === connectivity_state_1.ConnectivityState.IDLE) { | ||
this.updateState(connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE, new picker_1.UnavailablePicker(error)); | ||
this.onFailedResolution(error); | ||
@@ -230,3 +235,3 @@ } | ||
this.childLoadBalancer.exitIdle(); | ||
if (this.currentState === channel_1.ConnectivityState.IDLE) { | ||
if (this.currentState === connectivity_state_1.ConnectivityState.IDLE) { | ||
if (this.backoffTimeout.isRunning()) { | ||
@@ -238,3 +243,3 @@ this.continueResolving = true; | ||
} | ||
this.updateState(channel_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
this.updateState(connectivity_state_1.ConnectivityState.CONNECTING, new picker_1.QueuePicker(this)); | ||
} | ||
@@ -252,3 +257,3 @@ } | ||
this.innerResolver.destroy(); | ||
this.updateState(channel_1.ConnectivityState.SHUTDOWN, new picker_1.UnavailablePicker()); | ||
this.updateState(connectivity_state_1.ConnectivityState.SHUTDOWN, new picker_1.UnavailablePicker()); | ||
} | ||
@@ -255,0 +260,0 @@ getTypeName() { |
@@ -233,2 +233,4 @@ "use strict"; | ||
this.emit('cancelled', 'cancelled'); | ||
this.emit('streamEnd', false); | ||
this.sendStatus({ code: constants_1.Status.CANCELLED, details: 'Cancelled by client', metadata: new metadata_1.Metadata() }); | ||
}); | ||
@@ -314,2 +316,3 @@ this.stream.on('drain', () => { | ||
} | ||
this.emit('receiveMessage'); | ||
resolve(this.deserializeMessage(requestBytes)); | ||
@@ -366,2 +369,4 @@ } | ||
var _a; | ||
this.emit('callEnd', statusObj.code); | ||
this.emit('streamEnd', statusObj.code === constants_1.Status.OK); | ||
if (this.checkCancelled()) { | ||
@@ -390,5 +395,2 @@ return; | ||
sendError(error) { | ||
if (this.checkCancelled()) { | ||
return; | ||
} | ||
const status = { | ||
@@ -424,2 +426,3 @@ code: constants_1.Status.UNKNOWN, | ||
this.sendMetadata(); | ||
this.emit('sendMessage'); | ||
return this.stream.write(chunk); | ||
@@ -449,2 +452,3 @@ } | ||
} | ||
this.emit('receiveMessage'); | ||
this.pushOrBufferMessage(readable, message); | ||
@@ -451,0 +455,0 @@ } |
@@ -5,2 +5,3 @@ import { Deserialize, Serialize, ServiceDefinition } from './make-client'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { ServerRef } from './channelz'; | ||
export declare type UntypedHandleCall = HandleCall<any, any>; | ||
@@ -16,3 +17,11 @@ export interface UntypedServiceImplementation { | ||
private options; | ||
private channelzRef; | ||
private channelzTrace; | ||
private callTracker; | ||
private listenerChildrenTracker; | ||
private sessionChildrenTracker; | ||
constructor(options?: ChannelOptions); | ||
private getChannelzInfo; | ||
private getChannelzSessionInfoGetter; | ||
private trace; | ||
addProtoService(): void; | ||
@@ -29,3 +38,4 @@ addService(service: ServiceDefinition, implementation: UntypedServiceImplementation): void; | ||
addHttp2Port(): void; | ||
getChannelzRef(): ServerRef; | ||
private _setupHandlers; | ||
} |
@@ -24,10 +24,9 @@ "use strict"; | ||
const server_call_1 = require("./server-call"); | ||
const server_credentials_1 = require("./server-credentials"); | ||
const resolver_1 = require("./resolver"); | ||
const logging = require("./logging"); | ||
const subchannel_1 = require("./subchannel"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const uri_parser_1 = require("./uri-parser"); | ||
const channelz_1 = require("./channelz"); | ||
const TRACER_NAME = 'server'; | ||
function trace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text); | ||
} | ||
function noop() { } | ||
@@ -68,6 +67,68 @@ function getUnimplementedStatusResponse(methodName) { | ||
this.handlers = new Map(); | ||
this.sessions = new Set(); | ||
this.sessions = new Map(); | ||
this.started = false; | ||
this.channelzTrace = new channelz_1.ChannelzTrace(); | ||
this.callTracker = new channelz_1.ChannelzCallTracker(); | ||
this.listenerChildrenTracker = new channelz_1.ChannelzChildrenTracker(); | ||
this.sessionChildrenTracker = new channelz_1.ChannelzChildrenTracker(); | ||
this.options = options !== null && options !== void 0 ? options : {}; | ||
this.channelzRef = channelz_1.registerChannelzServer(() => this.getChannelzInfo()); | ||
this.channelzTrace.addTrace('CT_INFO', 'Server created'); | ||
this.trace('Server constructed'); | ||
} | ||
getChannelzInfo() { | ||
return { | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
listenerChildren: this.listenerChildrenTracker.getChildLists(), | ||
sessionChildren: this.sessionChildrenTracker.getChildLists() | ||
}; | ||
} | ||
getChannelzSessionInfoGetter(session) { | ||
return () => { | ||
var _a, _b, _c; | ||
const sessionInfo = this.sessions.get(session); | ||
const sessionSocket = session.socket; | ||
const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; | ||
const localAddress = subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort); | ||
let tlsInfo; | ||
if (session.encrypted) { | ||
const tlsSocket = sessionSocket; | ||
const cipherInfo = tlsSocket.getCipher(); | ||
const certificate = tlsSocket.getCertificate(); | ||
const peerCertificate = tlsSocket.getPeerCertificate(); | ||
tlsInfo = { | ||
cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null, | ||
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, | ||
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, | ||
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null | ||
}; | ||
} | ||
else { | ||
tlsInfo = null; | ||
} | ||
const socketInfo = { | ||
remoteAddress: remoteAddress, | ||
localAddress: localAddress, | ||
security: tlsInfo, | ||
remoteName: null, | ||
streamsStarted: sessionInfo.streamTracker.callsStarted, | ||
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, | ||
streamsFailed: sessionInfo.streamTracker.callsFailed, | ||
messagesSent: sessionInfo.messagesSent, | ||
messagesReceived: sessionInfo.messagesReceived, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp, | ||
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, | ||
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, | ||
localFlowControlWindow: (_b = session.state.localWindowSize) !== null && _b !== void 0 ? _b : null, | ||
remoteFlowControlWindow: (_c = session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null | ||
}; | ||
return socketInfo; | ||
}; | ||
} | ||
trace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text); | ||
} | ||
addProtoService() { | ||
@@ -124,4 +185,3 @@ throw new Error('Not implemented. Use addService() instead'); | ||
removeService(service) { | ||
if (service === null || | ||
typeof service !== 'object') { | ||
if (service === null || typeof service !== 'object') { | ||
throw new Error('removeService() requires object as argument'); | ||
@@ -145,4 +205,4 @@ } | ||
} | ||
if (creds === null || typeof creds !== 'object') { | ||
throw new TypeError('creds must be an object'); | ||
if (creds === null || !(creds instanceof server_credentials_1.ServerCredentials)) { | ||
throw new TypeError('creds must be a ServerCredentials object'); | ||
} | ||
@@ -161,3 +221,3 @@ if (typeof callback !== 'function') { | ||
const serverOptions = { | ||
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER | ||
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, | ||
}; | ||
@@ -172,2 +232,5 @@ if ('grpc-node.max_session_memory' in this.options) { | ||
} | ||
const deferredCallback = (error, port) => { | ||
process.nextTick(() => callback(error, port)); | ||
}; | ||
const setupServer = () => { | ||
@@ -191,5 +254,5 @@ let http2Server; | ||
return Promise.all(addressList.map((address) => { | ||
trace('Attempting to bind ' + subchannel_1.subchannelAddressToString(address)); | ||
this.trace('Attempting to bind ' + subchannel_address_1.subchannelAddressToString(address)); | ||
let addr; | ||
if (subchannel_1.isTcpSubchannelAddress(address)) { | ||
if (subchannel_address_1.isTcpSubchannelAddress(address)) { | ||
addr = { | ||
@@ -205,16 +268,45 @@ host: address.host, | ||
return new Promise((resolve, reject) => { | ||
function onError(err) { | ||
const onError = (err) => { | ||
this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message); | ||
resolve(err); | ||
} | ||
}; | ||
http2Server.once('error', onError); | ||
http2Server.listen(addr, () => { | ||
trace('Successfully bound ' + subchannel_1.subchannelAddressToString(address)); | ||
this.http2ServerList.push(http2Server); | ||
const boundAddress = http2Server.address(); | ||
let boundSubchannelAddress; | ||
if (typeof boundAddress === 'string') { | ||
resolve(portNum); | ||
boundSubchannelAddress = { | ||
path: boundAddress | ||
}; | ||
} | ||
else { | ||
resolve(boundAddress.port); | ||
boundSubchannelAddress = { | ||
host: boundAddress.address, | ||
port: boundAddress.port | ||
}; | ||
} | ||
const channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => { | ||
return { | ||
localAddress: boundSubchannelAddress, | ||
remoteAddress: null, | ||
security: null, | ||
remoteName: null, | ||
streamsStarted: 0, | ||
streamsSucceeded: 0, | ||
streamsFailed: 0, | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null, | ||
localFlowControlWindow: null, | ||
remoteFlowControlWindow: null | ||
}; | ||
}); | ||
this.listenerChildrenTracker.refChild(channelzRef); | ||
this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef }); | ||
this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress)); | ||
resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum); | ||
http2Server.removeListener('error', onError); | ||
@@ -246,9 +338,37 @@ }); | ||
return new Promise((resolve, reject) => { | ||
function onError(err) { | ||
const onError = (err) => { | ||
this.trace('Failed to bind ' + subchannel_address_1.subchannelAddressToString(address) + ' with error ' + err.message); | ||
resolve(bindWildcardPort(addressList.slice(1))); | ||
} | ||
}; | ||
http2Server.once('error', onError); | ||
http2Server.listen(address, () => { | ||
this.http2ServerList.push(http2Server); | ||
resolve(bindSpecificPort(addressList.slice(1), http2Server.address().port, 1)); | ||
const boundAddress = http2Server.address(); | ||
const boundSubchannelAddress = { | ||
host: boundAddress.address, | ||
port: boundAddress.port | ||
}; | ||
const channelzRef = channelz_1.registerChannelzSocket(subchannel_address_1.subchannelAddressToString(boundSubchannelAddress), () => { | ||
return { | ||
localAddress: boundSubchannelAddress, | ||
remoteAddress: null, | ||
security: null, | ||
remoteName: null, | ||
streamsStarted: 0, | ||
streamsSucceeded: 0, | ||
streamsFailed: 0, | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null, | ||
localFlowControlWindow: null, | ||
remoteFlowControlWindow: null | ||
}; | ||
}); | ||
this.listenerChildrenTracker.refChild(channelzRef); | ||
this.http2ServerList.push({ server: http2Server, channelzRef: channelzRef }); | ||
this.trace('Successfully bound ' + subchannel_address_1.subchannelAddressToString(boundSubchannelAddress)); | ||
resolve(bindSpecificPort(addressList.slice(1), boundAddress.port, 1)); | ||
http2Server.removeListener('error', onError); | ||
@@ -263,7 +383,7 @@ }); | ||
if (addressList.length === 0) { | ||
callback(new Error(`No addresses resolved for port ${port}`), 0); | ||
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0); | ||
return; | ||
} | ||
let bindResultPromise; | ||
if (subchannel_1.isTcpSubchannelAddress(addressList[0])) { | ||
if (subchannel_address_1.isTcpSubchannelAddress(addressList[0])) { | ||
if (addressList[0].port === 0) { | ||
@@ -284,3 +404,3 @@ bindResultPromise = bindWildcardPort(addressList); | ||
logging.log(constants_1.LogVerbosity.ERROR, errorString); | ||
callback(new Error(errorString), 0); | ||
deferredCallback(new Error(errorString), 0); | ||
} | ||
@@ -291,3 +411,3 @@ else { | ||
} | ||
callback(null, bindResult.port); | ||
deferredCallback(null, bindResult.port); | ||
} | ||
@@ -297,7 +417,7 @@ }, (error) => { | ||
logging.log(constants_1.LogVerbosity.ERROR, errorString); | ||
callback(new Error(errorString), 0); | ||
deferredCallback(new Error(errorString), 0); | ||
}); | ||
}, | ||
onError: (error) => { | ||
callback(new Error(error.details), 0); | ||
deferredCallback(new Error(error.details), 0); | ||
}, | ||
@@ -310,5 +430,8 @@ }; | ||
// Close the server if it is still running. | ||
for (const http2Server of this.http2ServerList) { | ||
for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) { | ||
if (http2Server.listening) { | ||
http2Server.close(); | ||
http2Server.close(() => { | ||
this.listenerChildrenTracker.unrefChild(ref); | ||
channelz_1.unregisterChannelzRef(ref); | ||
}); | ||
} | ||
@@ -319,3 +442,3 @@ } | ||
// tryShutdown() calls are in progress. Don't wait on them to finish. | ||
this.sessions.forEach((session) => { | ||
this.sessions.forEach((channelzInfo, session) => { | ||
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to | ||
@@ -327,2 +450,3 @@ // recognize destroy(code) as a valid signature. | ||
this.sessions.clear(); | ||
channelz_1.unregisterChannelzRef(this.channelzRef); | ||
} | ||
@@ -347,3 +471,3 @@ register(name, handler, serialize, deserialize, type) { | ||
if (this.http2ServerList.length === 0 || | ||
this.http2ServerList.every((http2Server) => http2Server.listening !== true)) { | ||
this.http2ServerList.every(({ server: http2Server }) => http2Server.listening !== true)) { | ||
throw new Error('server must be bound in order to start'); | ||
@@ -354,5 +478,10 @@ } | ||
} | ||
this.channelzTrace.addTrace('CT_INFO', 'Starting'); | ||
this.started = true; | ||
} | ||
tryShutdown(callback) { | ||
const wrappedCallback = (error) => { | ||
channelz_1.unregisterChannelzRef(this.channelzRef); | ||
callback(error); | ||
}; | ||
let pendingChecks = 0; | ||
@@ -362,3 +491,3 @@ function maybeCallback() { | ||
if (pendingChecks === 0) { | ||
callback(); | ||
wrappedCallback(); | ||
} | ||
@@ -368,9 +497,13 @@ } | ||
this.started = false; | ||
for (const http2Server of this.http2ServerList) { | ||
for (const { server: http2Server, channelzRef: ref } of this.http2ServerList) { | ||
if (http2Server.listening) { | ||
pendingChecks++; | ||
http2Server.close(maybeCallback); | ||
http2Server.close(() => { | ||
this.listenerChildrenTracker.unrefChild(ref); | ||
channelz_1.unregisterChannelzRef(ref); | ||
maybeCallback(); | ||
}); | ||
} | ||
} | ||
this.sessions.forEach((session) => { | ||
this.sessions.forEach((channelzInfo, session) => { | ||
if (!session.closed) { | ||
@@ -382,3 +515,3 @@ pendingChecks += 1; | ||
if (pendingChecks === 0) { | ||
callback(); | ||
wrappedCallback(); | ||
} | ||
@@ -389,2 +522,5 @@ } | ||
} | ||
getChannelzRef() { | ||
return this.channelzRef; | ||
} | ||
_setupHandlers(http2Server) { | ||
@@ -395,2 +531,5 @@ if (http2Server === null) { | ||
http2Server.on('stream', (stream, headers) => { | ||
const channelzSessionInfo = this.sessions.get(stream.session); | ||
this.callTracker.addCallStarted(); | ||
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallStarted(); | ||
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; | ||
@@ -402,4 +541,7 @@ if (typeof contentType !== 'string' || | ||
}, { endStream: true }); | ||
this.callTracker.addCallFailed(); | ||
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed(); | ||
return; | ||
} | ||
let call = null; | ||
try { | ||
@@ -418,3 +560,3 @@ const path = headers[http2.constants.HTTP2_HEADER_PATH]; | ||
} | ||
trace('Received call to method ' + | ||
this.trace('Received call to method ' + | ||
path + | ||
@@ -425,3 +567,3 @@ ' at address ' + | ||
if (handler === undefined) { | ||
trace('No handler registered for method ' + | ||
this.trace('No handler registered for method ' + | ||
path + | ||
@@ -431,3 +573,29 @@ '. Sending UNIMPLEMENTED status.'); | ||
} | ||
const call = new server_call_1.Http2ServerCallStream(stream, handler, this.options); | ||
call = new server_call_1.Http2ServerCallStream(stream, handler, this.options); | ||
call.once('callEnd', (code) => { | ||
if (code === constants_1.Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} | ||
else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
if (channelzSessionInfo) { | ||
call.once('streamEnd', (success) => { | ||
if (success) { | ||
channelzSessionInfo.streamTracker.addCallSucceeded(); | ||
} | ||
else { | ||
channelzSessionInfo.streamTracker.addCallFailed(); | ||
} | ||
}); | ||
call.on('sendMessage', () => { | ||
channelzSessionInfo.messagesSent += 1; | ||
channelzSessionInfo.lastMessageSentTimestamp = new Date(); | ||
}); | ||
call.on('receiveMessage', () => { | ||
channelzSessionInfo.messagesReceived += 1; | ||
channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); | ||
}); | ||
} | ||
const metadata = call.receiveMetadata(headers); | ||
@@ -452,3 +620,7 @@ switch (handler.type) { | ||
catch (err) { | ||
const call = new server_call_1.Http2ServerCallStream(stream, null, this.options); | ||
if (!call) { | ||
call = new server_call_1.Http2ServerCallStream(stream, null, this.options); | ||
this.callTracker.addCallFailed(); | ||
channelzSessionInfo === null || channelzSessionInfo === void 0 ? void 0 : channelzSessionInfo.streamTracker.addCallFailed(); | ||
} | ||
if (err.code === undefined) { | ||
@@ -461,2 +633,3 @@ err.code = constants_1.Status.INTERNAL; | ||
http2Server.on('session', (session) => { | ||
var _a; | ||
if (!this.started) { | ||
@@ -466,4 +639,19 @@ session.destroy(); | ||
} | ||
this.sessions.add(session); | ||
const channelzRef = channelz_1.registerChannelzSocket((_a = session.socket.remoteAddress) !== null && _a !== void 0 ? _a : 'unknown', this.getChannelzSessionInfoGetter(session)); | ||
const channelzSessionInfo = { | ||
ref: channelzRef, | ||
streamTracker: new channelz_1.ChannelzCallTracker(), | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null | ||
}; | ||
this.sessions.set(session, channelzSessionInfo); | ||
const clientAddress = session.socket.remoteAddress; | ||
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); | ||
this.sessionChildrenTracker.refChild(channelzRef); | ||
session.on('close', () => { | ||
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); | ||
this.sessionChildrenTracker.unrefChild(channelzRef); | ||
channelz_1.unregisterChannelzRef(channelzRef); | ||
this.sessions.delete(session); | ||
@@ -470,0 +658,0 @@ }); |
@@ -6,6 +6,10 @@ import { LoadBalancingConfig } from './load-balancer'; | ||
} | ||
export interface Duration { | ||
seconds: number; | ||
nanos: number; | ||
} | ||
export interface MethodConfig { | ||
name: MethodConfigName[]; | ||
waitForReady?: boolean; | ||
timeout?: string; | ||
timeout?: Duration; | ||
maxRequestBytes?: number; | ||
@@ -12,0 +16,0 @@ maxResponseBytes?: number; |
@@ -59,2 +59,3 @@ "use strict"; | ||
function validateMethodConfig(obj) { | ||
var _a; | ||
const result = { | ||
@@ -76,7 +77,26 @@ name: [], | ||
if ('timeout' in obj) { | ||
if (!(typeof obj.timeout === 'string') || | ||
!TIMEOUT_REGEX.test(obj.timeout)) { | ||
if (typeof obj.timeout === 'object') { | ||
if (!('seconds' in obj.timeout) || | ||
!(typeof obj.timeout.seconds === 'number')) { | ||
throw new Error('Invalid method config: invalid timeout.seconds'); | ||
} | ||
if (!('nanos' in obj.timeout) || | ||
!(typeof obj.timeout.nanos === 'number')) { | ||
throw new Error('Invalid method config: invalid timeout.nanos'); | ||
} | ||
result.timeout = obj.timeout; | ||
} | ||
else if (typeof obj.timeout === 'string' && | ||
TIMEOUT_REGEX.test(obj.timeout)) { | ||
const timeoutParts = obj.timeout | ||
.substring(0, obj.timeout.length - 1) | ||
.split('.'); | ||
result.timeout = { | ||
seconds: timeoutParts[0] | 0, | ||
nanos: ((_a = timeoutParts[1]) !== null && _a !== void 0 ? _a : 0) | 0, | ||
}; | ||
} | ||
else { | ||
throw new Error('Invalid method config: invalid timeout'); | ||
} | ||
result.timeout = obj.timeout; | ||
} | ||
@@ -83,0 +103,0 @@ if ('maxRequestBytes' in obj) { |
import { ChannelOptions } from './channel-options'; | ||
import { Subchannel, SubchannelAddress } from './subchannel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -4,0 +5,0 @@ import { GrpcUri } from './uri-parser'; |
@@ -22,2 +22,3 @@ "use strict"; | ||
const subchannel_1 = require("./subchannel"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const uri_parser_1 = require("./uri-parser"); | ||
@@ -102,3 +103,3 @@ // 10 seconds in milliseconds. This value is arbitrary. | ||
for (const subchannelObj of subchannelObjArray) { | ||
if (subchannel_1.subchannelAddressEqual(subchannelTarget, subchannelObj.subchannelAddress) && | ||
if (subchannel_address_1.subchannelAddressEqual(subchannelTarget, subchannelObj.subchannelAddress) && | ||
channel_options_1.channelOptionsEqual(channelArguments, subchannelObj.channelArguments) && | ||
@@ -105,0 +106,0 @@ channelCredentials._equals(subchannelObj.channelCredentials)) { |
@@ -5,23 +5,12 @@ import { ChannelCredentials } from './channel-credentials'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { ConnectivityState } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { GrpcUri } from './uri-parser'; | ||
import { FilterFactory, Filter } from './filter'; | ||
import { Filter } from './filter'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { SubchannelRef } from './channelz'; | ||
export declare type ConnectivityStateListener = (subchannel: Subchannel, previousState: ConnectivityState, newState: ConnectivityState) => void; | ||
export interface TcpSubchannelAddress { | ||
port: number; | ||
host: string; | ||
export interface SubchannelCallStatsTracker { | ||
addMessageSent(): void; | ||
addMessageReceived(): void; | ||
} | ||
export interface IpcSubchannelAddress { | ||
path: string; | ||
} | ||
/** | ||
* This represents a single backend address to connect to. This interface is a | ||
* subset of net.SocketConnectOpts, i.e. the options described at | ||
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener. | ||
* Those are in turn a subset of the options that can be passed to http2.connect. | ||
*/ | ||
export declare type SubchannelAddress = TcpSubchannelAddress | IpcSubchannelAddress; | ||
export declare function isTcpSubchannelAddress(address: SubchannelAddress): address is TcpSubchannelAddress; | ||
export declare function subchannelAddressEqual(address1: SubchannelAddress, address2: SubchannelAddress): boolean; | ||
export declare function subchannelAddressToString(address: SubchannelAddress): string; | ||
export declare class Subchannel { | ||
@@ -95,3 +84,19 @@ private channelTarget; | ||
private subchannelAddressString; | ||
private channelzRef; | ||
private channelzTrace; | ||
private callTracker; | ||
private childrenTracker; | ||
private channelzSocketRef; | ||
/** | ||
* Name of the remote server, if it is not the same as the subchannel | ||
* address, i.e. if connecting through an HTTP CONNECT proxy. | ||
*/ | ||
private remoteName; | ||
private streamTracker; | ||
private keepalivesSent; | ||
private messagesSent; | ||
private messagesReceived; | ||
private lastMessageSentTimestamp; | ||
private lastMessageReceivedTimestamp; | ||
/** | ||
* A class representing a connection to a single backend. | ||
@@ -107,2 +112,7 @@ * @param channelTarget The target string for the channel as a whole | ||
constructor(channelTarget: GrpcUri, subchannelAddress: SubchannelAddress, options: ChannelOptions, credentials: ChannelCredentials); | ||
private getChannelzInfo; | ||
private getChannelzSocketInfo; | ||
private resetChannelzSocketInfo; | ||
private trace; | ||
private refTrace; | ||
private handleBackoffTimer; | ||
@@ -144,3 +154,3 @@ /** | ||
*/ | ||
startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilterFactory?: FilterFactory<Filter>): void; | ||
startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilters: Filter[]): void; | ||
/** | ||
@@ -176,2 +186,3 @@ * If the subchannel is currently IDLE, start connecting and switch to the | ||
getAddress(): string; | ||
getChannelzRef(): SubchannelRef; | ||
} |
@@ -19,6 +19,6 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Subchannel = exports.subchannelAddressToString = exports.subchannelAddressEqual = exports.isTcpSubchannelAddress = void 0; | ||
exports.Subchannel = void 0; | ||
const http2 = require("http2"); | ||
const tls_1 = require("tls"); | ||
const channel_1 = require("./channel"); | ||
const connectivity_state_1 = require("./connectivity-state"); | ||
const backoff_timeout_1 = require("./backoff-timeout"); | ||
@@ -31,10 +31,6 @@ const resolver_1 = require("./resolver"); | ||
const uri_parser_1 = require("./uri-parser"); | ||
const subchannel_address_1 = require("./subchannel-address"); | ||
const channelz_1 = require("./channelz"); | ||
const clientVersion = require('../../package.json').version; | ||
const TRACER_NAME = 'subchannel'; | ||
function trace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text); | ||
} | ||
function refTrace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', text); | ||
} | ||
const MIN_CONNECT_TIMEOUT_MS = 20000; | ||
@@ -60,26 +56,2 @@ const INITIAL_BACKOFF_MS = 1000; | ||
const tooManyPingsData = Buffer.from('too_many_pings', 'ascii'); | ||
function isTcpSubchannelAddress(address) { | ||
return 'port' in address; | ||
} | ||
exports.isTcpSubchannelAddress = isTcpSubchannelAddress; | ||
function subchannelAddressEqual(address1, address2) { | ||
if (isTcpSubchannelAddress(address1)) { | ||
return (isTcpSubchannelAddress(address2) && | ||
address1.host === address2.host && | ||
address1.port === address2.port); | ||
} | ||
else { | ||
return !isTcpSubchannelAddress(address2) && address1.path === address2.path; | ||
} | ||
} | ||
exports.subchannelAddressEqual = subchannelAddressEqual; | ||
function subchannelAddressToString(address) { | ||
if (isTcpSubchannelAddress(address)) { | ||
return address.host + ':' + address.port; | ||
} | ||
else { | ||
return address.path; | ||
} | ||
} | ||
exports.subchannelAddressToString = subchannelAddressToString; | ||
class Subchannel { | ||
@@ -105,3 +77,3 @@ /** | ||
*/ | ||
this.connectivityState = channel_1.ConnectivityState.IDLE; | ||
this.connectivityState = connectivity_state_1.ConnectivityState.IDLE; | ||
/** | ||
@@ -148,2 +120,17 @@ * The underlying http2 session used to make requests. | ||
this.refcount = 0; | ||
this.callTracker = new channelz_1.ChannelzCallTracker(); | ||
this.childrenTracker = new channelz_1.ChannelzChildrenTracker(); | ||
// Channelz socket info | ||
this.channelzSocketRef = null; | ||
/** | ||
* Name of the remote server, if it is not the same as the subchannel | ||
* address, i.e. if connecting through an HTTP CONNECT proxy. | ||
*/ | ||
this.remoteName = null; | ||
this.streamTracker = new channelz_1.ChannelzCallTracker(); | ||
this.keepalivesSent = 0; | ||
this.messagesSent = 0; | ||
this.messagesReceived = 0; | ||
this.lastMessageSentTimestamp = null; | ||
this.lastMessageReceivedTimestamp = null; | ||
// Build user-agent string. | ||
@@ -164,3 +151,4 @@ this.userAgent = [ | ||
if ('grpc.keepalive_permit_without_calls' in options) { | ||
this.keepaliveWithoutCalls = options['grpc.keepalive_permit_without_calls'] === 1; | ||
this.keepaliveWithoutCalls = | ||
options['grpc.keepalive_permit_without_calls'] === 1; | ||
} | ||
@@ -181,11 +169,87 @@ else { | ||
}, backoffOptions); | ||
this.subchannelAddressString = subchannelAddressToString(subchannelAddress); | ||
trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
this.subchannelAddressString = subchannel_address_1.subchannelAddressToString(subchannelAddress); | ||
this.channelzRef = channelz_1.registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo()); | ||
this.channelzTrace = new channelz_1.ChannelzTrace(); | ||
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); | ||
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
} | ||
getChannelzInfo() { | ||
return { | ||
state: this.connectivityState, | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
children: this.childrenTracker.getChildLists(), | ||
target: this.subchannelAddressString | ||
}; | ||
} | ||
getChannelzSocketInfo() { | ||
var _a, _b, _c; | ||
if (this.session === null) { | ||
return null; | ||
} | ||
const sessionSocket = this.session.socket; | ||
const remoteAddress = sessionSocket.remoteAddress ? subchannel_address_1.stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; | ||
const localAddress = subchannel_address_1.stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort); | ||
let tlsInfo; | ||
if (this.session.encrypted) { | ||
const tlsSocket = sessionSocket; | ||
const cipherInfo = tlsSocket.getCipher(); | ||
const certificate = tlsSocket.getCertificate(); | ||
const peerCertificate = tlsSocket.getPeerCertificate(); | ||
tlsInfo = { | ||
cipherSuiteStandardName: (_a = cipherInfo.standardName) !== null && _a !== void 0 ? _a : null, | ||
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, | ||
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, | ||
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null | ||
}; | ||
} | ||
else { | ||
tlsInfo = null; | ||
} | ||
const socketInfo = { | ||
remoteAddress: remoteAddress, | ||
localAddress: localAddress, | ||
security: tlsInfo, | ||
remoteName: this.remoteName, | ||
streamsStarted: this.streamTracker.callsStarted, | ||
streamsSucceeded: this.streamTracker.callsSucceeded, | ||
streamsFailed: this.streamTracker.callsFailed, | ||
messagesSent: this.messagesSent, | ||
messagesReceived: this.messagesReceived, | ||
keepAlivesSent: this.keepalivesSent, | ||
lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: this.lastMessageSentTimestamp, | ||
lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, | ||
localFlowControlWindow: (_b = this.session.state.localWindowSize) !== null && _b !== void 0 ? _b : null, | ||
remoteFlowControlWindow: (_c = this.session.state.remoteWindowSize) !== null && _c !== void 0 ? _c : null | ||
}; | ||
return socketInfo; | ||
} | ||
resetChannelzSocketInfo() { | ||
if (this.channelzSocketRef) { | ||
channelz_1.unregisterChannelzRef(this.channelzSocketRef); | ||
this.childrenTracker.unrefChild(this.channelzSocketRef); | ||
this.channelzSocketRef = null; | ||
} | ||
this.remoteName = null; | ||
this.streamTracker = new channelz_1.ChannelzCallTracker(); | ||
this.keepalivesSent = 0; | ||
this.messagesSent = 0; | ||
this.messagesReceived = 0; | ||
this.lastMessageSentTimestamp = null; | ||
this.lastMessageReceivedTimestamp = null; | ||
} | ||
trace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); | ||
} | ||
refTrace(text) { | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); | ||
} | ||
handleBackoffTimer() { | ||
if (this.continueConnecting) { | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE], channel_1.ConnectivityState.CONNECTING); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING); | ||
} | ||
else { | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE], channel_1.ConnectivityState.IDLE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.IDLE); | ||
} | ||
@@ -205,5 +269,7 @@ } | ||
var _a, _b; | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'keepalive', 'Sending ping to ' + this.subchannelAddressString); | ||
this.keepalivesSent += 1; | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + | ||
'Sending ping'); | ||
this.keepaliveTimeoutId = setTimeout(() => { | ||
this.transitionToState([channel_1.ConnectivityState.READY], channel_1.ConnectivityState.IDLE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE); | ||
}, this.keepaliveTimeoutMs); | ||
@@ -231,6 +297,8 @@ (_b = (_a = this.keepaliveTimeoutId).unref) === null || _b === void 0 ? void 0 : _b.call(_a); | ||
if (proxyConnectionResult.realTarget) { | ||
trace(this.subchannelAddressString + ' creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); | ||
this.remoteName = uri_parser_1.uriToString(proxyConnectionResult.realTarget); | ||
this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); | ||
} | ||
else { | ||
trace(this.subchannelAddressString + ' creating HTTP/2 session'); | ||
this.remoteName = null; | ||
this.trace('creating HTTP/2 session'); | ||
} | ||
@@ -308,2 +376,4 @@ const targetAuthority = resolver_1.getDefaultAuthority((_a = proxyConnectionResult.realTarget) !== null && _a !== void 0 ? _a : this.channelTarget); | ||
this.session = session; | ||
this.channelzSocketRef = channelz_1.registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()); | ||
this.childrenTracker.refChild(this.channelzSocketRef); | ||
session.unref(); | ||
@@ -316,3 +386,3 @@ /* For all of these events, check if the session at the time of the event | ||
if (this.session === session) { | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.READY); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.READY); | ||
} | ||
@@ -322,8 +392,8 @@ }); | ||
if (this.session === session) { | ||
trace(this.subchannelAddressString + ' connection closed'); | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
this.trace('connection closed'); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE); | ||
/* Transitioning directly to IDLE here should be OK because we are not | ||
* doing any backoff, because a connection was established at some | ||
* point */ | ||
this.transitionToState([channel_1.ConnectivityState.READY], channel_1.ConnectivityState.IDLE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE); | ||
} | ||
@@ -340,6 +410,5 @@ }); | ||
} | ||
trace(this.subchannelAddressString + | ||
' connection closed by GOAWAY with code ' + | ||
this.trace('connection closed by GOAWAY with code ' + | ||
errorCode); | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING, channel_1.ConnectivityState.READY], channel_1.ConnectivityState.IDLE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.IDLE); | ||
} | ||
@@ -350,6 +419,6 @@ }); | ||
* where we want to handle that. */ | ||
trace(this.subchannelAddressString + | ||
' connection closed with error ' + | ||
this.trace('connection closed with error ' + | ||
error.message); | ||
}); | ||
channelz_1.registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()); | ||
} | ||
@@ -392,3 +461,3 @@ startConnectingInternal() { | ||
}, (reason) => { | ||
this.transitionToState([channel_1.ConnectivityState.CONNECTING], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE); | ||
}); | ||
@@ -407,11 +476,10 @@ } | ||
} | ||
trace(this.subchannelAddressString + | ||
' ' + | ||
channel_1.ConnectivityState[this.connectivityState] + | ||
this.trace(connectivity_state_1.ConnectivityState[this.connectivityState] + | ||
' -> ' + | ||
channel_1.ConnectivityState[newState]); | ||
connectivity_state_1.ConnectivityState[newState]); | ||
this.channelzTrace.addTrace('CT_INFO', connectivity_state_1.ConnectivityState[this.connectivityState] + ' -> ' + connectivity_state_1.ConnectivityState[newState]); | ||
const previousState = this.connectivityState; | ||
this.connectivityState = newState; | ||
switch (newState) { | ||
case channel_1.ConnectivityState.READY: | ||
case connectivity_state_1.ConnectivityState.READY: | ||
this.stopBackoff(); | ||
@@ -427,3 +495,3 @@ this.session.socket.once('close', () => { | ||
break; | ||
case channel_1.ConnectivityState.CONNECTING: | ||
case connectivity_state_1.ConnectivityState.CONNECTING: | ||
this.startBackoff(); | ||
@@ -433,3 +501,3 @@ this.startConnectingInternal(); | ||
break; | ||
case channel_1.ConnectivityState.TRANSIENT_FAILURE: | ||
case connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE: | ||
if (this.session) { | ||
@@ -439,2 +507,3 @@ this.session.close(); | ||
this.session = null; | ||
this.resetChannelzSocketInfo(); | ||
this.stopKeepalivePings(); | ||
@@ -450,3 +519,3 @@ /* If the backoff timer has already ended by the time we get to the | ||
break; | ||
case channel_1.ConnectivityState.IDLE: | ||
case connectivity_state_1.ConnectivityState.IDLE: | ||
if (this.session) { | ||
@@ -456,2 +525,3 @@ this.session.close(); | ||
this.session = null; | ||
this.resetChannelzSocketInfo(); | ||
this.stopKeepalivePings(); | ||
@@ -477,11 +547,9 @@ break; | ||
if (this.callRefcount === 0 && this.refcount === 0) { | ||
this.transitionToState([ | ||
channel_1.ConnectivityState.CONNECTING, | ||
channel_1.ConnectivityState.READY, | ||
], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.CONNECTING, connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE); | ||
channelz_1.unregisterChannelzRef(this.channelzRef); | ||
} | ||
} | ||
callRef() { | ||
refTrace(this.subchannelAddressString + | ||
' callRefcount ' + | ||
this.refTrace('callRefcount ' + | ||
this.callRefcount + | ||
@@ -502,4 +570,3 @@ ' -> ' + | ||
callUnref() { | ||
refTrace(this.subchannelAddressString + | ||
' callRefcount ' + | ||
this.refTrace('callRefcount ' + | ||
this.callRefcount + | ||
@@ -521,4 +588,3 @@ ' -> ' + | ||
ref() { | ||
refTrace(this.subchannelAddressString + | ||
' refcount ' + | ||
this.refTrace('refcount ' + | ||
this.refcount + | ||
@@ -530,4 +596,3 @@ ' -> ' + | ||
unref() { | ||
refTrace(this.subchannelAddressString + | ||
' refcount ' + | ||
this.refTrace('refcount ' + | ||
this.refcount + | ||
@@ -553,3 +618,3 @@ ' -> ' + | ||
*/ | ||
startCallStream(metadata, callStream, extraFilterFactory) { | ||
startCallStream(metadata, callStream, extraFilters) { | ||
const headers = metadata.toHttp2Headers(); | ||
@@ -575,3 +640,3 @@ headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); | ||
catch (e) { | ||
this.transitionToState([channel_1.ConnectivityState.READY], channel_1.ConnectivityState.TRANSIENT_FAILURE); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.READY], connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE); | ||
throw e; | ||
@@ -583,4 +648,37 @@ } | ||
} | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString); | ||
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); | ||
logging.trace(constants_1.LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + | ||
'(' + this.channelzRef.id + ') ' + | ||
this.subchannelAddressString + | ||
' with headers\n' + | ||
headersString); | ||
this.callTracker.addCallStarted(); | ||
callStream.addStatusWatcher(status => { | ||
if (status.code === constants_1.Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} | ||
else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
const streamSession = this.session; | ||
this.streamTracker.addCallStarted(); | ||
callStream.addStreamEndWatcher(success => { | ||
if (streamSession === this.session) { | ||
if (success) { | ||
this.streamTracker.addCallSucceeded(); | ||
} | ||
else { | ||
this.streamTracker.addCallFailed(); | ||
} | ||
} | ||
}); | ||
callStream.attachHttp2Stream(http2Stream, this, extraFilters, { | ||
addMessageSent: () => { | ||
this.messagesSent += 1; | ||
this.lastMessageSentTimestamp = new Date(); | ||
}, | ||
addMessageReceived: () => { | ||
this.messagesReceived += 1; | ||
} | ||
}); | ||
} | ||
@@ -598,4 +696,4 @@ /** | ||
* connecting after the backoff timer ends. Otherwise do nothing */ | ||
if (!this.transitionToState([channel_1.ConnectivityState.IDLE], channel_1.ConnectivityState.CONNECTING)) { | ||
if (this.connectivityState === channel_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
if (!this.transitionToState([connectivity_state_1.ConnectivityState.IDLE], connectivity_state_1.ConnectivityState.CONNECTING)) { | ||
if (this.connectivityState === connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE) { | ||
this.continueConnecting = true; | ||
@@ -644,3 +742,3 @@ } | ||
this.backoffTimeout.reset(); | ||
this.transitionToState([channel_1.ConnectivityState.TRANSIENT_FAILURE], channel_1.ConnectivityState.CONNECTING); | ||
this.transitionToState([connectivity_state_1.ConnectivityState.TRANSIENT_FAILURE], connectivity_state_1.ConnectivityState.CONNECTING); | ||
} | ||
@@ -650,4 +748,7 @@ getAddress() { | ||
} | ||
getChannelzRef() { | ||
return this.channelzRef; | ||
} | ||
} | ||
exports.Subchannel = Subchannel; | ||
//# sourceMappingURL=subchannel.js.map |
{ | ||
"name": "@grpc/grpc-js", | ||
"version": "1.3.8", | ||
"version": "1.4.0", | ||
"description": "gRPC Library for Node - pure JS implementation", | ||
@@ -18,3 +18,2 @@ "homepage": "https://grpc.io/", | ||
"devDependencies": { | ||
"@grpc/proto-loader": "^0.5.5", | ||
"@types/gulp": "^4.0.6", | ||
@@ -33,2 +32,3 @@ "@types/gulp-mocha": "0.0.32", | ||
"lodash": "^4.17.4", | ||
"madge": "^5.0.1", | ||
"mocha-jenkins-reporter": "^0.4.1", | ||
@@ -58,5 +58,7 @@ "ncp": "^2.0.0", | ||
"pretest": "npm run compile", | ||
"posttest": "npm run check" | ||
"posttest": "npm run check && madge -c ./build/src", | ||
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ -O src/generated/ --grpcLib ../index channelz.proto" | ||
}, | ||
"dependencies": { | ||
"@grpc/proto-loader": "^0.6.4", | ||
"@types/node": ">=12.12.47" | ||
@@ -63,0 +65,0 @@ }, |
@@ -28,2 +28,4 @@ # Pure JavaScript gRPC Client | ||
If you need a feature from the `grpc` package that is not provided by the `@grpc/grpc-js`, please file a feature request with that information. | ||
This library does not directly handle `.proto` files. To use `.proto` files with this library we recommend using the `@grpc/proto-loader` package. | ||
@@ -30,0 +32,0 @@ |
@@ -69,3 +69,5 @@ /* | ||
); | ||
return Promise.reject<Metadata>('"authorization" metadata cannot have multiple values'); | ||
return Promise.reject<Metadata>( | ||
'"authorization" metadata cannot have multiple values' | ||
); | ||
} | ||
@@ -72,0 +74,0 @@ return resultMetadata; |
@@ -28,3 +28,3 @@ /* | ||
import { ChannelImplementation } from './channel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; | ||
import * as logging from './logging'; | ||
@@ -61,3 +61,3 @@ import { LogVerbosity } from './constants'; | ||
* own. | ||
* @param errno | ||
* @param errno | ||
*/ | ||
@@ -75,2 +75,14 @@ function getSystemErrorName(errno: number): string { | ||
function getMinDeadline(deadlineList: Deadline[]): Deadline { | ||
let minValue = Infinity; | ||
for (const deadline of deadlineList) { | ||
const deadlineMsecs = | ||
deadline instanceof Date ? deadline.getTime() : deadline; | ||
if (deadlineMsecs < minValue) { | ||
minValue = deadlineMsecs; | ||
} | ||
} | ||
return minValue; | ||
} | ||
export interface CallStreamOptions { | ||
@@ -205,3 +217,3 @@ deadline: Deadline; | ||
credentials: CallCredentials; | ||
filterStack: Filter; | ||
filterStack: FilterStack; | ||
private http2Stream: http2.ClientHttp2Stream | null = null; | ||
@@ -242,2 +254,9 @@ private pendingRead = false; | ||
private configDeadline: Deadline = Infinity; | ||
private statusWatchers: ((status: StatusObject) => void)[] = []; | ||
private streamEndWatchers: ((success: boolean) => void)[] = []; | ||
private callStatsTracker: SubchannelCallStatsTracker | null = null; | ||
constructor( | ||
@@ -260,3 +279,6 @@ private readonly methodName: string, | ||
}; | ||
if (this.options.parentCall && this.options.flags & Propagate.CANCELLATION) { | ||
if ( | ||
this.options.parentCall && | ||
this.options.flags & Propagate.CANCELLATION | ||
) { | ||
this.options.parentCall.on('cancelled', () => { | ||
@@ -275,2 +297,3 @@ this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call'); | ||
); | ||
this.statusWatchers.forEach(watcher => watcher(filteredStatus)); | ||
/* We delay the actual action of bubbling up the status to insulate the | ||
@@ -419,2 +442,3 @@ * cleanup code in this class from any errors that may be thrown in the | ||
private handleTrailers(headers: http2.IncomingHttpHeaders) { | ||
this.streamEndWatchers.forEach(watcher => watcher(true)); | ||
let headersString = ''; | ||
@@ -457,13 +481,14 @@ for (const header of Object.keys(headers)) { | ||
private writeMessageToStream(message: Buffer, callback: WriteCallback) { | ||
this.callStatsTracker?.addMessageSent(); | ||
this.http2Stream!.write(message, callback); | ||
} | ||
attachHttp2Stream( | ||
stream: http2.ClientHttp2Stream, | ||
subchannel: Subchannel, | ||
extraFilterFactory?: FilterFactory<Filter> | ||
extraFilters: Filter[], | ||
callStatsTracker: SubchannelCallStatsTracker | ||
): void { | ||
if (extraFilterFactory !== undefined) { | ||
this.filterStack = new FilterStack([ | ||
this.filterStack, | ||
extraFilterFactory.createFilter(this), | ||
]); | ||
} | ||
this.filterStack.push(extraFilters); | ||
if (this.finalStatus !== null) { | ||
@@ -477,2 +502,3 @@ stream.close(NGHTTP2_CANCEL); | ||
this.subchannel = subchannel; | ||
this.callStatsTracker = callStatsTracker; | ||
subchannel.addDisconnectListener(this.disconnectListener); | ||
@@ -543,2 +569,3 @@ subchannel.callRef(); | ||
this.trace('parsed message of length ' + message.length); | ||
this.callStatsTracker!.addMessageReceived(); | ||
this.tryPush(message); | ||
@@ -554,3 +581,3 @@ } | ||
* "error" event that may be emitted at about the same time, so that | ||
* we can bubble up the error message from that event. */ | ||
* we can bubble up the error message from that event. */ | ||
process.nextTick(() => { | ||
@@ -604,3 +631,3 @@ this.trace('HTTP/2 stream closed with code ' + stream.rstCode); | ||
} else { | ||
if (this.internalError.code === 'ECONNRESET') { | ||
if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') { | ||
code = Status.UNAVAILABLE; | ||
@@ -637,5 +664,15 @@ details = this.internalError.message; | ||
if (err.code !== 'ERR_HTTP2_STREAM_ERROR') { | ||
this.trace('Node error event: message=' + err.message + ' code=' + err.code + ' errno=' + getSystemErrorName(err.errno) + ' syscall=' + err.syscall); | ||
this.trace( | ||
'Node error event: message=' + | ||
err.message + | ||
' code=' + | ||
err.code + | ||
' errno=' + | ||
getSystemErrorName(err.errno) + | ||
' syscall=' + | ||
err.syscall | ||
); | ||
this.internalError = err; | ||
} | ||
this.streamEndWatchers.forEach(watcher => watcher(false)); | ||
}); | ||
@@ -655,3 +692,3 @@ if (!this.pendingRead) { | ||
try { | ||
stream.write(this.pendingWrite, this.pendingWriteCallback); | ||
this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback); | ||
} catch (error) { | ||
@@ -701,11 +738,10 @@ this.endCall({ | ||
getDeadline(): Deadline { | ||
const deadlineList = [this.options.deadline]; | ||
if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) { | ||
const parentDeadline = this.options.parentCall.getDeadline(); | ||
const selfDeadline = this.options.deadline; | ||
const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline; | ||
const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline; | ||
return Math.min(parentDeadlineMsecs, selfDeadlineMsecs); | ||
} else { | ||
return this.options.deadline; | ||
deadlineList.push(this.options.parentCall.getDeadline()); | ||
} | ||
if (this.configDeadline) { | ||
deadlineList.push(this.configDeadline); | ||
} | ||
return getMinDeadline(deadlineList); | ||
} | ||
@@ -737,2 +773,18 @@ | ||
setConfigDeadline(configDeadline: Deadline) { | ||
this.configDeadline = configDeadline; | ||
} | ||
addStatusWatcher(watcher: (status: StatusObject) => void) { | ||
this.statusWatchers.push(watcher); | ||
} | ||
addStreamEndWatcher(watcher: (success: boolean) => void) { | ||
this.streamEndWatchers.push(watcher); | ||
} | ||
addFilters(extraFilters: Filter[]) { | ||
this.filterStack.push(extraFilters); | ||
} | ||
startRead() { | ||
@@ -791,3 +843,3 @@ /* If the stream has ended with an error, we should not emit any more | ||
try { | ||
this.http2Stream.write(message.message, cb); | ||
this.writeMessageToStream(message.message, cb); | ||
} catch (error) { | ||
@@ -794,0 +846,0 @@ this.endCall({ |
@@ -84,3 +84,4 @@ /* | ||
export class ClientUnaryCallImpl extends EventEmitter | ||
export class ClientUnaryCallImpl | ||
extends EventEmitter | ||
implements ClientUnaryCall { | ||
@@ -101,3 +102,4 @@ public call?: InterceptingCallInterface; | ||
export class ClientReadableStreamImpl<ResponseType> extends Readable | ||
export class ClientReadableStreamImpl<ResponseType> | ||
extends Readable | ||
implements ClientReadableStream<ResponseType> { | ||
@@ -122,3 +124,4 @@ public call?: InterceptingCallInterface; | ||
export class ClientWritableStreamImpl<RequestType> extends Writable | ||
export class ClientWritableStreamImpl<RequestType> | ||
extends Writable | ||
implements ClientWritableStream<RequestType> { | ||
@@ -155,3 +158,4 @@ public call?: InterceptingCallInterface; | ||
export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex | ||
export class ClientDuplexStreamImpl<RequestType, ResponseType> | ||
extends Duplex | ||
implements ClientDuplexStream<RequestType, ResponseType> { | ||
@@ -158,0 +162,0 @@ public call?: InterceptingCallInterface; |
@@ -40,2 +40,3 @@ /* | ||
'grpc-node.max_session_memory'?: number; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
[key: string]: any; | ||
@@ -42,0 +43,0 @@ } |
@@ -36,5 +36,10 @@ /* | ||
import { CompressionFilterFactory } from './compression-filter'; | ||
import { CallConfig, ConfigSelector, getDefaultAuthority, mapUriDefaultScheme } from './resolver'; | ||
import { | ||
CallConfig, | ||
ConfigSelector, | ||
getDefaultAuthority, | ||
mapUriDefaultScheme, | ||
} from './resolver'; | ||
import { trace, log } from './logging'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; | ||
@@ -45,10 +50,6 @@ import { mapProxyName } from './http_proxy'; | ||
import { SurfaceCall } from './call'; | ||
import { Filter } from './filter'; | ||
export enum ConnectivityState { | ||
IDLE, | ||
CONNECTING, | ||
READY, | ||
TRANSIENT_FAILURE, | ||
SHUTDOWN, | ||
} | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; | ||
@@ -110,2 +111,8 @@ /** | ||
/** | ||
* Get the channelz reference object for this channel. A request to the | ||
* channelz service for the id in this object will provide information | ||
* about this channel. | ||
*/ | ||
getChannelzRef(): ChannelRef; | ||
/** | ||
* Create a call object. Call is an opaque type that is used by the Client | ||
@@ -154,2 +161,3 @@ * class. This function is called by the gRPC library when starting a | ||
callConfig: CallConfig; | ||
dynamicFilters: Filter[]; | ||
}> = []; | ||
@@ -169,2 +177,10 @@ private connectivityStateWatchers: ConnectivityStateWatcher[] = []; | ||
private configSelector: ConfigSelector | null = null; | ||
// Channelz info | ||
private originalTarget: string; | ||
private channelzRef: ChannelRef; | ||
private channelzTrace: ChannelzTrace; | ||
private callTracker = new ChannelzCallTracker(); | ||
private childrenTracker = new ChannelzChildrenTracker(); | ||
constructor( | ||
@@ -184,16 +200,7 @@ target: string, | ||
if (options) { | ||
if ( | ||
typeof options !== 'object' || | ||
!Object.values(options).every( | ||
(value) => | ||
typeof value === 'string' || | ||
typeof value === 'number' || | ||
typeof value === 'undefined' | ||
) | ||
) { | ||
throw new TypeError( | ||
'Channel options must be an object with string or number values' | ||
); | ||
if (typeof options !== 'object') { | ||
throw new TypeError('Channel options must be an object'); | ||
} | ||
} | ||
this.originalTarget = target; | ||
const originalTargetUri = parseUri(target); | ||
@@ -215,2 +222,6 @@ if (originalTargetUri === null) { | ||
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo()); | ||
this.channelzTrace = new ChannelzTrace(); | ||
this.channelzTrace.addTrace('CT_INFO', 'Channel created'); | ||
if (this.options['grpc.default_authority']) { | ||
@@ -235,3 +246,3 @@ this.defaultAuthority = this.options['grpc.default_authority'] as string; | ||
) => { | ||
return this.subchannelPool.getOrCreateSubchannel( | ||
const subchannel = this.subchannelPool.getOrCreateSubchannel( | ||
this.target, | ||
@@ -242,2 +253,4 @@ subchannelAddress, | ||
); | ||
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); | ||
return subchannel; | ||
}, | ||
@@ -249,4 +262,4 @@ updateState: (connectivityState: ConnectivityState, picker: Picker) => { | ||
this.callRefTimerUnref(); | ||
for (const { callStream, callMetadata, callConfig } of queueCopy) { | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
@@ -261,2 +274,8 @@ this.updateState(connectivityState); | ||
}, | ||
addChannelzChild: (child: ChannelRef | SubchannelRef) => { | ||
this.childrenTracker.refChild(child); | ||
}, | ||
removeChannelzChild: (child: ChannelRef | SubchannelRef) => { | ||
this.childrenTracker.unrefChild(child); | ||
} | ||
}; | ||
@@ -268,2 +287,3 @@ this.resolvingLoadBalancer = new ResolvingLoadBalancer( | ||
(configSelector) => { | ||
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); | ||
this.configSelector = configSelector; | ||
@@ -275,4 +295,4 @@ /* We process the queue asynchronously to ensure that the corresponding | ||
this.configSelectionQueue = []; | ||
this.callRefTimerUnref() | ||
for (const {callStream, callMetadata} of localQueue) { | ||
this.callRefTimerUnref(); | ||
for (const { callStream, callMetadata } of localQueue) { | ||
this.tryGetConfig(callStream, callMetadata); | ||
@@ -284,4 +304,5 @@ } | ||
(status) => { | ||
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); | ||
if (this.configSelectionQueue.length > 0) { | ||
trace(LogVerbosity.DEBUG, 'channel', 'Name resolution failed for target ' + uriToString(this.target) + ' with calls queued for config selection'); | ||
this.trace('Name resolution failed with calls queued for config selection'); | ||
} | ||
@@ -291,6 +312,6 @@ const localQueue = this.configSelectionQueue; | ||
this.callRefTimerUnref(); | ||
for (const {callStream, callMetadata} of localQueue) { | ||
for (const { callStream, callMetadata } of localQueue) { | ||
if (callMetadata.getOptions().waitForReady) { | ||
this.callRefTimerRef(); | ||
this.configSelectionQueue.push({callStream, callMetadata}); | ||
this.configSelectionQueue.push({ callStream, callMetadata }); | ||
} else { | ||
@@ -308,9 +329,28 @@ callStream.cancelWithStatus(status.code, status.details); | ||
]); | ||
trace(LogVerbosity.DEBUG, 'channel', 'Channel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
} | ||
private getChannelzInfo(): ChannelInfo { | ||
return { | ||
target: this.originalTarget, | ||
state: this.connectivityState, | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
children: this.childrenTracker.getChildLists() | ||
}; | ||
} | ||
private trace(text: string, verbosityOverride?: LogVerbosity) { | ||
trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text); | ||
} | ||
private callRefTimerRef() { | ||
// If the hasRef function does not exist, always run the code | ||
if (!this.callRefTimer.hasRef?.()) { | ||
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.ref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); | ||
this.trace( | ||
'callRefTimer.ref | configSelectionQueue.length=' + | ||
this.configSelectionQueue.length + | ||
' pickQueue.length=' + | ||
this.pickQueue.length | ||
); | ||
this.callRefTimer.ref?.(); | ||
@@ -322,4 +362,9 @@ } | ||
// If the hasRef function does not exist, always run the code | ||
if ((!this.callRefTimer.hasRef) || (this.callRefTimer.hasRef())) { | ||
trace(LogVerbosity.DEBUG, 'channel', 'callRefTimer.unref | configSelectionQueue.length=' + this.configSelectionQueue.length + ' pickQueue.length=' + this.pickQueue.length); | ||
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { | ||
this.trace( | ||
'callRefTimer.unref | configSelectionQueue.length=' + | ||
this.configSelectionQueue.length + | ||
' pickQueue.length=' + | ||
this.pickQueue.length | ||
); | ||
this.callRefTimer.unref?.(); | ||
@@ -329,4 +374,9 @@ } | ||
private pushPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { | ||
this.pickQueue.push({ callStream, callMetadata, callConfig }); | ||
private pushPick( | ||
callStream: Http2CallStream, | ||
callMetadata: Metadata, | ||
callConfig: CallConfig, | ||
dynamicFilters: Filter[] | ||
) { | ||
this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); | ||
this.callRefTimerRef(); | ||
@@ -342,7 +392,13 @@ } | ||
*/ | ||
private tryPick(callStream: Http2CallStream, callMetadata: Metadata, callConfig: CallConfig) { | ||
const pickResult = this.currentPicker.pick({ metadata: callMetadata, extraPickInfo: callConfig.pickInformation }); | ||
trace( | ||
LogVerbosity.DEBUG, | ||
'channel', | ||
private tryPick( | ||
callStream: Http2CallStream, | ||
callMetadata: Metadata, | ||
callConfig: CallConfig, | ||
dynamicFilters: Filter[] | ||
) { | ||
const pickResult = this.currentPicker.pick({ | ||
metadata: callMetadata, | ||
extraPickInfo: callConfig.pickInformation, | ||
}); | ||
this.trace( | ||
'Pick result: ' + | ||
@@ -380,3 +436,3 @@ PickResultType[pickResult.pickResultType] + | ||
); | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
break; | ||
@@ -394,6 +450,7 @@ } | ||
try { | ||
const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); | ||
pickResult.subchannel!.startCallStream( | ||
finalMetadata, | ||
callStream, | ||
pickResult.extraFilterFactory ?? undefined | ||
[...dynamicFilters, ...pickExtraFilters] | ||
); | ||
@@ -422,5 +479,3 @@ /* If we reach this point, the call stream has started | ||
* tryPick */ | ||
trace( | ||
LogVerbosity.INFO, | ||
'channel', | ||
this.trace( | ||
'Failed to start call on picked subchannel ' + | ||
@@ -430,9 +485,8 @@ pickResult.subchannel!.getAddress() + | ||
(error as Error).message + | ||
'. Retrying pick' | ||
'. Retrying pick', | ||
LogVerbosity.INFO | ||
); | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} else { | ||
trace( | ||
LogVerbosity.INFO, | ||
'channel', | ||
this.trace( | ||
'Failed to start call on picked subchanel ' + | ||
@@ -442,7 +496,10 @@ pickResult.subchannel!.getAddress() + | ||
(error as Error).message + | ||
'. Ending call' | ||
'. Ending call', | ||
LogVerbosity.INFO | ||
); | ||
callStream.cancelWithStatus( | ||
Status.INTERNAL, | ||
`Failed to start HTTP/2 stream with error: ${(error as Error).message}` | ||
`Failed to start HTTP/2 stream with error: ${ | ||
(error as Error).message | ||
}` | ||
); | ||
@@ -454,5 +511,3 @@ } | ||
* block above */ | ||
trace( | ||
LogVerbosity.INFO, | ||
'channel', | ||
this.trace( | ||
'Picked subchannel ' + | ||
@@ -462,5 +517,6 @@ pickResult.subchannel!.getAddress() + | ||
ConnectivityState[subchannelState] + | ||
' after metadata filters. Retrying pick' | ||
' after metadata filters. Retrying pick', | ||
LogVerbosity.INFO | ||
); | ||
this.tryPick(callStream, callMetadata, callConfig); | ||
this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} | ||
@@ -471,3 +527,3 @@ }, | ||
callStream.cancelWithStatus( | ||
(typeof error.code === 'number') ? error.code : Status.UNKNOWN, | ||
typeof error.code === 'number' ? error.code : Status.UNKNOWN, | ||
`Getting metadata from plugin failed with error: ${error.message}` | ||
@@ -480,7 +536,7 @@ ); | ||
case PickResultType.QUEUE: | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
break; | ||
case PickResultType.TRANSIENT_FAILURE: | ||
if (callMetadata.getOptions().waitForReady) { | ||
this.pushPick(callStream, callMetadata, callConfig); | ||
this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); | ||
} else { | ||
@@ -521,3 +577,4 @@ callStream.cancelWithStatus( | ||
'connectivity_state', | ||
uriToString(this.target) + | ||
'(' + this.channelzRef.id + ') ' + | ||
uriToString(this.target) + | ||
' ' + | ||
@@ -528,2 +585,3 @@ ConnectivityState[this.connectivityState] + | ||
); | ||
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); | ||
this.connectivityState = newState; | ||
@@ -533,3 +591,3 @@ const watchersCopy = this.connectivityStateWatchers.slice(); | ||
if (newState !== watcherObject.currentState) { | ||
if(watcherObject.timer) { | ||
if (watcherObject.timer) { | ||
clearTimeout(watcherObject.timer); | ||
@@ -544,2 +602,7 @@ } | ||
private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { | ||
if (stream.getStatus() !== null) { | ||
/* If the stream has a status, it has already finished and we don't need | ||
* to take any more actions on it. */ | ||
return; | ||
} | ||
if (this.configSelector === null) { | ||
@@ -551,5 +614,5 @@ /* This branch will only be taken at the beginning of the channel's life, | ||
this.resolvingLoadBalancer.exitIdle(); | ||
this.configSelectionQueue.push({ | ||
this.configSelectionQueue.push({ | ||
callStream: stream, | ||
callMetadata: metadata | ||
callMetadata: metadata, | ||
}); | ||
@@ -560,5 +623,41 @@ this.callRefTimerRef(); | ||
if (callConfig.status === Status.OK) { | ||
this.tryPick(stream, metadata, callConfig); | ||
if (callConfig.methodConfig.timeout) { | ||
const deadline = new Date(); | ||
deadline.setSeconds( | ||
deadline.getSeconds() + callConfig.methodConfig.timeout.seconds | ||
); | ||
deadline.setMilliseconds( | ||
deadline.getMilliseconds() + | ||
callConfig.methodConfig.timeout.nanos / 1_000_000 | ||
); | ||
stream.setConfigDeadline(deadline); | ||
// Refreshing the filters makes the deadline filter pick up the new deadline | ||
stream.filterStack.refresh(); | ||
} | ||
if (callConfig.dynamicFilterFactories.length > 0) { | ||
/* These dynamicFilters are the mechanism for implementing gRFC A39: | ||
* https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md | ||
* We run them here instead of with the rest of the filters because | ||
* that spec says "the xDS HTTP filters will run in between name | ||
* resolution and load balancing". | ||
* | ||
* We use the filter stack here to simplify the multi-filter async | ||
* waterfall logic, but we pass along the underlying list of filters | ||
* to avoid having nested filter stacks when combining it with the | ||
* original filter stack. We do not pass along the original filter | ||
* factory list because these filters may need to persist data | ||
* between sending headers and other operations. */ | ||
const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories); | ||
const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); | ||
dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { | ||
this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); | ||
}); | ||
} else { | ||
this.tryPick(stream, metadata, callConfig, []); | ||
} | ||
} else { | ||
stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); | ||
stream.cancelWithStatus( | ||
callConfig.status, | ||
'Failed to route call to method ' + stream.getMethod() | ||
); | ||
} | ||
@@ -576,2 +675,3 @@ } | ||
clearInterval(this.callRefTimer); | ||
unregisterChannelzRef(this.channelzRef); | ||
@@ -602,3 +702,3 @@ this.subchannelPool.unrefUnusedSubchannels(); | ||
let timer = null; | ||
if(deadline !== Infinity) { | ||
if (deadline !== Infinity) { | ||
const deadlineDate: Date = | ||
@@ -619,3 +719,3 @@ deadline instanceof Date ? deadline : new Date(deadline); | ||
); | ||
}, deadlineDate.getTime() - now.getTime()) | ||
}, deadlineDate.getTime() - now.getTime()); | ||
} | ||
@@ -625,3 +725,3 @@ const watcherObject = { | ||
callback, | ||
timer | ||
timer, | ||
}; | ||
@@ -631,2 +731,6 @@ this.connectivityStateWatchers.push(watcherObject); | ||
getChannelzRef() { | ||
return this.channelzRef; | ||
} | ||
createCall( | ||
@@ -651,7 +755,4 @@ method: string, | ||
const callNumber = getNewCallNumber(); | ||
trace( | ||
LogVerbosity.DEBUG, | ||
'channel', | ||
uriToString(this.target) + | ||
' createCall [' + | ||
this.trace( | ||
'createCall [' + | ||
callNumber + | ||
@@ -677,4 +778,12 @@ '] method="' + | ||
); | ||
this.callTracker.addCallStarted(); | ||
stream.addStatusWatcher(status => { | ||
if (status.code === Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
return stream; | ||
} | ||
} |
@@ -351,3 +351,6 @@ /* | ||
} catch (e) { | ||
this.call.cancelWithStatus(Status.INTERNAL, `Request message serialization failure: ${e.message}`); | ||
this.call.cancelWithStatus( | ||
Status.INTERNAL, | ||
`Request message serialization failure: ${e.message}` | ||
); | ||
return; | ||
@@ -407,3 +410,4 @@ } | ||
*/ | ||
class BaseUnaryInterceptingCall extends BaseInterceptingCall | ||
class BaseUnaryInterceptingCall | ||
extends BaseInterceptingCall | ||
implements InterceptingCallInterface { | ||
@@ -440,3 +444,4 @@ // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
*/ | ||
class BaseStreamingInterceptingCall extends BaseInterceptingCall | ||
class BaseStreamingInterceptingCall | ||
extends BaseInterceptingCall | ||
implements InterceptingCallInterface {} | ||
@@ -443,0 +448,0 @@ |
@@ -33,3 +33,4 @@ /* | ||
import { Deadline, StatusObject } from './call-stream'; | ||
import { Channel, ConnectivityState, ChannelImplementation } from './channel'; | ||
import { Channel, ChannelImplementation } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -59,3 +60,5 @@ import { ChannelOptions } from './channel-options'; | ||
function isFunction<ResponseType>(arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined): arg is UnaryCallback<ResponseType>{ | ||
function isFunction<ResponseType>( | ||
arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined | ||
): arg is UnaryCallback<ResponseType> { | ||
return typeof arg === 'function'; | ||
@@ -270,5 +273,7 @@ } | ||
): ClientUnaryCall { | ||
const checkedArguments = this.checkOptionalUnaryResponseArguments< | ||
ResponseType | ||
>(metadata, options, callback); | ||
const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>( | ||
metadata, | ||
options, | ||
callback | ||
); | ||
const methodDefinition: ClientMethodDefinition< | ||
@@ -387,5 +392,7 @@ RequestType, | ||
): ClientWritableStream<RequestType> { | ||
const checkedArguments = this.checkOptionalUnaryResponseArguments< | ||
ResponseType | ||
>(metadata, options, callback); | ||
const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>( | ||
metadata, | ||
options, | ||
callback | ||
); | ||
const methodDefinition: ClientMethodDefinition< | ||
@@ -414,5 +421,3 @@ RequestType, | ||
} | ||
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream< | ||
RequestType | ||
>; | ||
const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>; | ||
const interceptorArgs: InterceptorArguments = { | ||
@@ -539,5 +544,3 @@ clientInterceptors: this[INTERCEPTOR_SYMBOL], | ||
} | ||
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream< | ||
ResponseType | ||
>; | ||
const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>; | ||
const interceptorArgs: InterceptorArguments = { | ||
@@ -667,3 +670,3 @@ clientInterceptors: this[INTERCEPTOR_SYMBOL], | ||
onReceiveMessage(message: Buffer) { | ||
stream.push(message) | ||
stream.push(message); | ||
}, | ||
@@ -685,2 +688,1 @@ onReceiveStatus(status: StatusObject) { | ||
} | ||
@@ -55,3 +55,7 @@ /* | ||
// https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/propagation_bits.h#L43 | ||
DEFAULTS = 0xffff | Propagate.DEADLINE | Propagate.CENSUS_STATS_CONTEXT | Propagate.CENSUS_TRACING_CONTEXT | Propagate.CANCELLATION, | ||
DEFAULTS = 0xffff | | ||
Propagate.DEADLINE | | ||
Propagate.CENSUS_STATS_CONTEXT | | ||
Propagate.CENSUS_TRACING_CONTEXT | | ||
Propagate.CANCELLATION, | ||
} | ||
@@ -58,0 +62,0 @@ |
@@ -45,3 +45,3 @@ /* | ||
private timer: NodeJS.Timer | null = null; | ||
private deadline: number; | ||
private deadline = Infinity; | ||
constructor( | ||
@@ -52,3 +52,8 @@ private readonly channel: Channel, | ||
super(); | ||
const callDeadline = callStream.getDeadline(); | ||
this.retreiveDeadline(); | ||
this.runTimer(); | ||
} | ||
private retreiveDeadline() { | ||
const callDeadline = this.callStream.getDeadline(); | ||
if (callDeadline instanceof Date) { | ||
@@ -59,7 +64,13 @@ this.deadline = callDeadline.getTime(); | ||
} | ||
} | ||
private runTimer() { | ||
if (this.timer) { | ||
clearTimeout(this.timer); | ||
} | ||
const now: number = new Date().getTime(); | ||
let timeout = this.deadline - now; | ||
const timeout = this.deadline - now; | ||
if (timeout <= 0) { | ||
process.nextTick(() => { | ||
callStream.cancelWithStatus( | ||
this.callStream.cancelWithStatus( | ||
Status.DEADLINE_EXCEEDED, | ||
@@ -71,3 +82,3 @@ 'Deadline exceeded' | ||
this.timer = setTimeout(() => { | ||
callStream.cancelWithStatus( | ||
this.callStream.cancelWithStatus( | ||
Status.DEADLINE_EXCEEDED, | ||
@@ -81,2 +92,7 @@ 'Deadline exceeded' | ||
refresh() { | ||
this.retreiveDeadline(); | ||
this.runTimer(); | ||
} | ||
async sendMetadata(metadata: Promise<Metadata>) { | ||
@@ -83,0 +99,0 @@ if (this.deadline === Infinity) { |
export { trace } from './logging'; | ||
export { Resolver, ResolverListener, registerResolver, ConfigSelector } from './resolver'; | ||
export { | ||
Resolver, | ||
ResolverListener, | ||
registerResolver, | ||
ConfigSelector, | ||
} from './resolver'; | ||
export { GrpcUri, uriToString } from './uri-parser'; | ||
export { ServiceConfig } from './service-config'; | ||
export { ServiceConfig, Duration } from './service-config'; | ||
export { BackoffTimeout } from './backoff-timeout'; | ||
export { LoadBalancer, LoadBalancingConfig, ChannelControlHelper, registerLoadBalancerType, getFirstUsableConfig, validateLoadBalancingConfig } from './load-balancer'; | ||
export { SubchannelAddress, subchannelAddressToString } from './subchannel'; | ||
export { | ||
LoadBalancer, | ||
LoadBalancingConfig, | ||
ChannelControlHelper, | ||
createChildChannelControlHelper, | ||
registerLoadBalancerType, | ||
getFirstUsableConfig, | ||
validateLoadBalancingConfig, | ||
} from './load-balancer'; | ||
export { | ||
SubchannelAddress, | ||
subchannelAddressToString, | ||
} from './subchannel-address'; | ||
export { ChildLoadBalancerHandler } from './load-balancer-child-handler'; | ||
export { Picker, UnavailablePicker, QueuePicker, PickResult, PickArgs, PickResultType } from './picker'; | ||
export { | ||
Picker, | ||
UnavailablePicker, | ||
QueuePicker, | ||
PickResult, | ||
PickArgs, | ||
PickResultType, | ||
} from './picker'; | ||
export { Call as CallStream } from './call-stream'; | ||
export { Filter, BaseFilter, FilterFactory } from './filter'; | ||
export { FilterStackFactory } from './filter-stack'; | ||
export { FilterStackFactory } from './filter-stack'; | ||
export { registerAdminService } from './admin'; |
@@ -74,2 +74,16 @@ /* | ||
} | ||
refresh(): void { | ||
for (const filter of this.filters) { | ||
filter.refresh(); | ||
} | ||
} | ||
push(filters: Filter[]) { | ||
this.filters.unshift(...filters); | ||
} | ||
getFilters(): Filter[] { | ||
return this.filters; | ||
} | ||
} | ||
@@ -80,2 +94,6 @@ | ||
push(filterFactories: FilterFactory<Filter>[]) { | ||
this.factories.unshift(...filterFactories); | ||
} | ||
createFilter(callStream: Call): FilterStack { | ||
@@ -82,0 +100,0 @@ return new FilterStack( |
@@ -35,2 +35,4 @@ /* | ||
receiveTrailers(status: StatusObject): StatusObject; | ||
refresh(): void; | ||
} | ||
@@ -58,2 +60,4 @@ | ||
} | ||
refresh(): void {} | ||
} | ||
@@ -60,0 +64,0 @@ |
@@ -29,3 +29,3 @@ /* | ||
subchannelAddressToString, | ||
} from './subchannel'; | ||
} from './subchannel-address'; | ||
import { ChannelOptions } from './channel-options'; | ||
@@ -97,3 +97,3 @@ import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; | ||
const result: ProxyInfo = { | ||
address: `${hostname}:${port}` | ||
address: `${hostname}:${port}`, | ||
}; | ||
@@ -152,3 +152,5 @@ if (userCred) { | ||
if (host === serverHost) { | ||
trace('Not using proxy for target in no_proxy list: ' + uriToString(target)); | ||
trace( | ||
'Not using proxy for target in no_proxy list: ' + uriToString(target) | ||
); | ||
return noProxyResult; | ||
@@ -194,2 +196,5 @@ } | ||
}; | ||
const headers: http.OutgoingHttpHeaders = { | ||
Host: parsedTarget.path, | ||
}; | ||
// Connect to the subchannel address as a proxy | ||
@@ -203,10 +208,9 @@ if (isTcpSubchannelAddress(address)) { | ||
if ('grpc.http_connect_creds' in channelOptions) { | ||
options.headers = { | ||
'Proxy-Authorization': | ||
'Basic ' + | ||
Buffer.from( | ||
channelOptions['grpc.http_connect_creds'] as string | ||
).toString('base64'), | ||
}; | ||
headers['Proxy-Authorization'] = | ||
'Basic ' + | ||
Buffer.from( | ||
channelOptions['grpc.http_connect_creds'] as string | ||
).toString('base64'); | ||
} | ||
options.headers = headers | ||
const proxyAddressString = subchannelAddressToString(address); | ||
@@ -234,3 +238,3 @@ trace('Using proxy ' + proxyAddressString + ' to connect to ' + options.path); | ||
const remoteHost = hostPort?.host ?? targetPath; | ||
const cts = tls.connect( | ||
@@ -237,0 +241,0 @@ { |
@@ -27,3 +27,4 @@ /* | ||
import { Deadline, StatusObject } from './call-stream'; | ||
import { Channel, ConnectivityState, ChannelImplementation } from './channel'; | ||
import { Channel, ChannelImplementation } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -66,2 +67,3 @@ import { | ||
ServerDuplexStream, | ||
ServerErrorResponse, | ||
} from './server-call'; | ||
@@ -178,2 +180,3 @@ | ||
ServerDuplexStream, | ||
ServerErrorResponse, | ||
ServiceDefinition, | ||
@@ -186,3 +189,8 @@ UntypedHandleCall, | ||
export { handleBidiStreamingCall, handleServerStreamingCall, handleUnaryCall, handleClientStreamingCall }; | ||
export { | ||
handleBidiStreamingCall, | ||
handleServerStreamingCall, | ||
handleUnaryCall, | ||
handleClientStreamingCall, | ||
}; | ||
@@ -248,7 +256,18 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ | ||
export { | ||
getChannelzServiceDefinition, | ||
getChannelzHandlers | ||
} from './channelz'; | ||
export { addAdminServicesToServer } from './admin'; | ||
import * as experimental from './experimental'; | ||
export { experimental }; | ||
import * as resolver from './resolver'; | ||
import * as load_balancer from './load-balancer'; | ||
import * as resolver_dns from './resolver-dns'; | ||
import * as resolver_uds from './resolver-uds'; | ||
import * as resolver_ip from './resolver-ip'; | ||
import * as load_balancer_pick_first from './load-balancer-pick-first'; | ||
import * as load_balancer_round_robin from './load-balancer-round-robin'; | ||
import * as channelz from './channelz'; | ||
@@ -259,4 +278,8 @@ const clientVersion = require('../../package.json').version; | ||
logging.trace(LogVerbosity.DEBUG, 'index', 'Loading @grpc/grpc-js version ' + clientVersion); | ||
resolver.registerAll(); | ||
load_balancer.registerAll(); | ||
resolver_dns.setup(); | ||
resolver_uds.setup(); | ||
resolver_ip.setup(); | ||
load_balancer_pick_first.setup(); | ||
load_balancer_round_robin.setup(); | ||
channelz.setup(); | ||
})(); |
@@ -21,9 +21,11 @@ /* | ||
ChannelControlHelper, | ||
LoadBalancingConfig, | ||
createLoadBalancer, | ||
LoadBalancingConfig | ||
} from './load-balancer'; | ||
import { SubchannelAddress, Subchannel } from './subchannel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { ConnectivityState } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { Picker } from './picker'; | ||
import { ChannelRef, SubchannelRef } from './channelz'; | ||
@@ -70,2 +72,9 @@ const TYPE_NAME = 'child_load_balancer_helper'; | ||
} | ||
addChannelzChild(child: ChannelRef | SubchannelRef) { | ||
this.parent.channelControlHelper.addChannelzChild(child); | ||
} | ||
removeChannelzChild(child: ChannelRef | SubchannelRef) { | ||
this.parent.channelControlHelper.removeChannelzChild(child); | ||
} | ||
private calledByPendingChild(): boolean { | ||
@@ -72,0 +81,0 @@ return this.child === this.parent.pendingChild; |
@@ -21,6 +21,7 @@ /* | ||
ChannelControlHelper, | ||
LoadBalancingConfig, | ||
registerDefaultLoadBalancerType, | ||
registerLoadBalancerType, | ||
LoadBalancingConfig | ||
} from './load-balancer'; | ||
import { ConnectivityState } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { | ||
@@ -34,8 +35,7 @@ QueuePicker, | ||
} from './picker'; | ||
import { Subchannel, ConnectivityStateListener } from './subchannel'; | ||
import { | ||
Subchannel, | ||
ConnectivityStateListener, | ||
SubchannelAddress, | ||
subchannelAddressToString, | ||
} from './subchannel'; | ||
} from './subchannel-address'; | ||
import * as logging from './logging'; | ||
@@ -67,6 +67,7 @@ import { LogVerbosity } from './constants'; | ||
return { | ||
[TYPE_NAME]: {} | ||
[TYPE_NAME]: {}, | ||
}; | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
static createFromJson(obj: any) { | ||
@@ -89,3 +90,3 @@ return new PickFirstLoadBalancingConfig(); | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -235,2 +236,3 @@ }; | ||
); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
if (this.subchannels.length > 0) { | ||
@@ -328,2 +330,3 @@ if (this.triedAllSubchannels) { | ||
subchannel.ref(); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
this.resetSubchannelList(); | ||
@@ -347,2 +350,3 @@ clearTimeout(this.connectionDelayTimeout); | ||
subchannel.unref(); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
@@ -378,2 +382,3 @@ this.currentSubchannelIndex = 0; | ||
subchannel.ref(); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
@@ -459,2 +464,3 @@ for (const subchannel of this.subchannels) { | ||
); | ||
this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef()); | ||
} | ||
@@ -469,3 +475,8 @@ } | ||
export function setup(): void { | ||
registerLoadBalancerType(TYPE_NAME, PickFirstLoadBalancer, PickFirstLoadBalancingConfig); | ||
registerLoadBalancerType( | ||
TYPE_NAME, | ||
PickFirstLoadBalancer, | ||
PickFirstLoadBalancingConfig | ||
); | ||
registerDefaultLoadBalancerType(TYPE_NAME); | ||
} |
@@ -21,6 +21,6 @@ /* | ||
ChannelControlHelper, | ||
LoadBalancingConfig, | ||
registerLoadBalancerType, | ||
LoadBalancingConfig | ||
} from './load-balancer'; | ||
import { ConnectivityState } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { | ||
@@ -34,8 +34,7 @@ QueuePicker, | ||
} from './picker'; | ||
import { Subchannel, ConnectivityStateListener } from './subchannel'; | ||
import { | ||
Subchannel, | ||
ConnectivityStateListener, | ||
SubchannelAddress, | ||
subchannelAddressToString, | ||
} from './subchannel'; | ||
} from './subchannel-address'; | ||
import * as logging from './logging'; | ||
@@ -61,6 +60,7 @@ import { LogVerbosity } from './constants'; | ||
return { | ||
[TYPE_NAME]: {} | ||
[TYPE_NAME]: {}, | ||
}; | ||
} | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
static createFromJson(obj: any) { | ||
@@ -84,3 +84,3 @@ return new RoundRobinLoadBalancingConfig(); | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -135,3 +135,3 @@ }; | ||
this.calculateAndUpdateState(); | ||
if ( | ||
@@ -199,2 +199,3 @@ newState === ConnectivityState.TRANSIENT_FAILURE || | ||
subchannel.unref(); | ||
this.channelControlHelper.removeChannelzChild(subchannel.getChannelzRef()); | ||
} | ||
@@ -226,2 +227,3 @@ this.subchannelStateCounts = { | ||
subchannel.addConnectivityStateListener(this.subchannelStateListener); | ||
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); | ||
const subchannelState = subchannel.getConnectivityState(); | ||
@@ -257,3 +259,7 @@ this.subchannelStateCounts[subchannelState] += 1; | ||
export function setup() { | ||
registerLoadBalancerType(TYPE_NAME, RoundRobinLoadBalancer, RoundRobinLoadBalancingConfig); | ||
registerLoadBalancerType( | ||
TYPE_NAME, | ||
RoundRobinLoadBalancer, | ||
RoundRobinLoadBalancingConfig | ||
); | ||
} |
@@ -19,7 +19,7 @@ /* | ||
import { ChannelOptions } from './channel-options'; | ||
import { Subchannel, SubchannelAddress } from './subchannel'; | ||
import { ConnectivityState } from './channel'; | ||
import { Subchannel } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { Picker } from './picker'; | ||
import * as load_balancer_pick_first from './load-balancer-pick-first'; | ||
import * as load_balancer_round_robin from './load-balancer-round-robin'; | ||
import { ChannelRef, SubchannelRef } from './channelz'; | ||
@@ -52,5 +52,25 @@ /** | ||
requestReresolution(): void; | ||
addChannelzChild(child: ChannelRef | SubchannelRef): void; | ||
removeChannelzChild(child: ChannelRef | SubchannelRef): void; | ||
} | ||
/** | ||
* Create a child ChannelControlHelper that overrides some methods of the | ||
* parent while letting others pass through to the parent unmodified. This | ||
* allows other code to create these children without needing to know about | ||
* all of the methods to be passed through. | ||
* @param parent | ||
* @param overrides | ||
*/ | ||
export function createChildChannelControlHelper(parent: ChannelControlHelper, overrides: Partial<ChannelControlHelper>): ChannelControlHelper { | ||
return { | ||
createSubchannel: overrides.createSubchannel?.bind(overrides) ?? parent.createSubchannel.bind(parent), | ||
updateState: overrides.updateState?.bind(overrides) ?? parent.updateState.bind(parent), | ||
requestReresolution: overrides.requestReresolution?.bind(overrides) ?? parent.requestReresolution.bind(parent), | ||
addChannelzChild: overrides.addChannelzChild?.bind(overrides) ?? parent.addChannelzChild.bind(parent), | ||
removeChannelzChild: overrides.removeChannelzChild?.bind(overrides) ?? parent.removeChannelzChild.bind(parent) | ||
}; | ||
} | ||
/** | ||
* Tracks one or more connected subchannels and determines which subchannel | ||
@@ -107,3 +127,5 @@ * each request should use. | ||
export interface LoadBalancingConfigConstructor { | ||
new(...args: any): LoadBalancingConfig; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
new (...args: any): LoadBalancingConfig; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
createFromJson(obj: any): LoadBalancingConfig; | ||
@@ -114,7 +136,9 @@ } | ||
[name: string]: { | ||
LoadBalancer: LoadBalancerConstructor, | ||
LoadBalancingConfig: LoadBalancingConfigConstructor | ||
LoadBalancer: LoadBalancerConstructor; | ||
LoadBalancingConfig: LoadBalancingConfigConstructor; | ||
}; | ||
} = {}; | ||
let defaultLoadBalancerType: string | null = null; | ||
export function registerLoadBalancerType( | ||
@@ -127,6 +151,10 @@ typeName: string, | ||
LoadBalancer: loadBalancerType, | ||
LoadBalancingConfig: loadBalancingConfigType | ||
LoadBalancingConfig: loadBalancingConfigType, | ||
}; | ||
} | ||
export function registerDefaultLoadBalancerType(typeName: string) { | ||
defaultLoadBalancerType = typeName; | ||
} | ||
export function createLoadBalancer( | ||
@@ -138,3 +166,5 @@ config: LoadBalancingConfig, | ||
if (typeName in registeredLoadBalancerTypes) { | ||
return new registeredLoadBalancerTypes[typeName].LoadBalancer(channelControlHelper); | ||
return new registeredLoadBalancerTypes[typeName].LoadBalancer( | ||
channelControlHelper | ||
); | ||
} else { | ||
@@ -149,6 +179,9 @@ return null; | ||
export function getFirstUsableConfig(configs: LoadBalancingConfig[], defaultPickFirst?: true): LoadBalancingConfig; | ||
export function getFirstUsableConfig( | ||
configs: LoadBalancingConfig[], | ||
defaultPickFirst: boolean = false | ||
fallbackTodefault?: true | ||
): LoadBalancingConfig; | ||
export function getFirstUsableConfig( | ||
configs: LoadBalancingConfig[], | ||
fallbackTodefault = false | ||
): LoadBalancingConfig | null { | ||
@@ -160,4 +193,10 @@ for (const config of configs) { | ||
} | ||
if (defaultPickFirst) { | ||
return new load_balancer_pick_first.PickFirstLoadBalancingConfig() | ||
if (fallbackTodefault) { | ||
if (defaultLoadBalancerType) { | ||
return new registeredLoadBalancerTypes[ | ||
defaultLoadBalancerType | ||
]!.LoadBalancingConfig(); | ||
} else { | ||
return null; | ||
} | ||
} else { | ||
@@ -168,4 +207,5 @@ return null; | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
export function validateLoadBalancingConfig(obj: any): LoadBalancingConfig { | ||
if (!(obj !== null && (typeof obj === 'object'))) { | ||
if (!(obj !== null && typeof obj === 'object')) { | ||
throw new Error('Load balancing config must be an object'); | ||
@@ -175,7 +215,11 @@ } | ||
if (keys.length !== 1) { | ||
throw new Error('Provided load balancing config has multiple conflicting entries'); | ||
throw new Error( | ||
'Provided load balancing config has multiple conflicting entries' | ||
); | ||
} | ||
const typeName = keys[0]; | ||
if (typeName in registeredLoadBalancerTypes) { | ||
return registeredLoadBalancerTypes[typeName].LoadBalancingConfig.createFromJson(obj[typeName]); | ||
return registeredLoadBalancerTypes[ | ||
typeName | ||
].LoadBalancingConfig.createFromJson(obj[typeName]); | ||
} else { | ||
@@ -185,6 +229,1 @@ throw new Error(`Unrecognized load balancing config name ${typeName}`); | ||
} | ||
export function registerAll() { | ||
load_balancer_pick_first.setup(); | ||
load_balancer_round_robin.setup(); | ||
} |
@@ -20,6 +20,19 @@ /* | ||
let _logger: Partial<Console> = console; | ||
const DEFAULT_LOGGER: Partial<Console> = { | ||
error: (message?: any, ...optionalParams: any[]) => { | ||
console.error('E ' + message, ...optionalParams); | ||
}, | ||
info: (message?: any, ...optionalParams: any[]) => { | ||
console.error('I ' + message, ...optionalParams); | ||
}, | ||
debug: (message?: any, ...optionalParams: any[]) => { | ||
console.error('D ' + message, ...optionalParams); | ||
}, | ||
} | ||
let _logger: Partial<Console> = DEFAULT_LOGGER; | ||
let _logVerbosity: LogVerbosity = LogVerbosity.ERROR; | ||
const verbosityString = process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? ''; | ||
const verbosityString = | ||
process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? ''; | ||
@@ -57,8 +70,28 @@ switch (verbosityString.toUpperCase()) { | ||
export const log = (severity: LogVerbosity, ...args: any[]): void => { | ||
if (severity >= _logVerbosity && typeof _logger.error === 'function') { | ||
_logger.error(...args); | ||
let logFunction: typeof DEFAULT_LOGGER.error; | ||
if (severity >= _logVerbosity) { | ||
switch (severity) { | ||
case LogVerbosity.DEBUG: | ||
logFunction = _logger.debug; | ||
break; | ||
case LogVerbosity.INFO: | ||
logFunction = _logger.info; | ||
break; | ||
case LogVerbosity.ERROR: | ||
logFunction = _logger.error; | ||
break; | ||
} | ||
/* Fall back to _logger.error when other methods are not available for | ||
* compatiblity with older behavior that always logged to _logger.error */ | ||
if (!logFunction) { | ||
logFunction = _logger.error; | ||
} | ||
if (logFunction) { | ||
logFunction.bind(_logger)(...args); | ||
} | ||
} | ||
}; | ||
const tracersString = process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? ''; | ||
const tracersString = | ||
process.env.GRPC_NODE_TRACE ?? process.env.GRPC_TRACE ?? ''; | ||
const enabledTracers = new Set<string>(); | ||
@@ -70,3 +103,3 @@ const disabledTracers = new Set<string>(); | ||
} else { | ||
enabledTracers.add(tracerName) | ||
enabledTracers.add(tracerName); | ||
} | ||
@@ -81,5 +114,8 @@ } | ||
): void { | ||
if (!disabledTracers.has(tracer) && (allEnabled || enabledTracers.has(tracer))) { | ||
if ( | ||
!disabledTracers.has(tracer) && | ||
(allEnabled || enabledTracers.has(tracer)) | ||
) { | ||
log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text); | ||
} | ||
} |
@@ -101,3 +101,3 @@ /* | ||
*/ | ||
function isPrototypePolluted(key: string): Boolean { | ||
function isPrototypePolluted(key: string): boolean { | ||
return ['__proto__', 'prototype', 'constructor'].includes(key); | ||
@@ -104,0 +104,0 @@ } |
@@ -18,6 +18,10 @@ /* | ||
import { BaseFilter, Filter, FilterFactory } from "./filter"; | ||
import { Call, WriteObject } from "./call-stream"; | ||
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from "./constants"; | ||
import { ChannelOptions } from "./channel-options"; | ||
import { BaseFilter, Filter, FilterFactory } from './filter'; | ||
import { Call, WriteObject } from './call-stream'; | ||
import { | ||
Status, | ||
DEFAULT_MAX_SEND_MESSAGE_LENGTH, | ||
DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, | ||
} from './constants'; | ||
import { ChannelOptions } from './channel-options'; | ||
@@ -48,3 +52,6 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter { | ||
if (concreteMessage.message.length > this.maxSendMessageSize) { | ||
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`); | ||
this.callStream.cancelWithStatus( | ||
Status.RESOURCE_EXHAUSTED, | ||
`Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})` | ||
); | ||
return Promise.reject<WriteObject>('Message too large'); | ||
@@ -65,3 +72,6 @@ } else { | ||
if (concreteMessage.length > this.maxReceiveMessageSize) { | ||
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`); | ||
this.callStream.cancelWithStatus( | ||
Status.RESOURCE_EXHAUSTED, | ||
`Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})` | ||
); | ||
return Promise.reject<Buffer>('Message too large'); | ||
@@ -75,3 +85,4 @@ } else { | ||
export class MaxMessageSizeFilterFactory implements FilterFactory<MaxMessageSizeFilter> { | ||
export class MaxMessageSizeFilterFactory | ||
implements FilterFactory<MaxMessageSizeFilter> { | ||
constructor(private readonly options: ChannelOptions) {} | ||
@@ -78,0 +89,0 @@ |
@@ -246,2 +246,14 @@ /* | ||
/** | ||
* This modifies the behavior of JSON.stringify to show an object | ||
* representation of the metadata map. | ||
*/ | ||
toJSON() { | ||
const result: { [key: string]: MetadataValue[] } = {}; | ||
for (const [key, values] of this.internalRepr.entries()) { | ||
result[key] = values; | ||
} | ||
return result; | ||
} | ||
/** | ||
* Returns a new Metadata object based fields in a given IncomingHttpHeaders | ||
@@ -248,0 +260,0 @@ * object. |
@@ -50,3 +50,3 @@ /* | ||
*/ | ||
extraFilterFactory: FilterFactory<Filter> | null; | ||
extraFilterFactories: FilterFactory<Filter>[]; | ||
onCallStarted: (() => void) | null; | ||
@@ -59,3 +59,3 @@ } | ||
status: null; | ||
extraFilterFactory: FilterFactory<Filter> | null; | ||
extraFilterFactories: FilterFactory<Filter>[]; | ||
onCallStarted: (() => void) | null; | ||
@@ -68,3 +68,3 @@ } | ||
status: null; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -77,3 +77,3 @@ } | ||
status: StatusObject; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -86,3 +86,3 @@ } | ||
status: StatusObject; | ||
extraFilterFactory: null; | ||
extraFilterFactories: []; | ||
onCallStarted: null; | ||
@@ -93,3 +93,3 @@ } | ||
metadata: Metadata; | ||
extraPickInfo: {[key: string]: string}; | ||
extraPickInfo: { [key: string]: string }; | ||
} | ||
@@ -128,3 +128,3 @@ | ||
status: this.status, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -158,3 +158,3 @@ }; | ||
status: null, | ||
extraFilterFactory: null, | ||
extraFilterFactories: [], | ||
onCallStarted: null, | ||
@@ -161,0 +161,0 @@ }; |
@@ -31,3 +31,3 @@ /* | ||
import { LogVerbosity } from './constants'; | ||
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri, uriToString, splitHostPort } from './uri-parser'; | ||
@@ -133,3 +133,9 @@ import { isIPv6, isIPv4 } from 'net'; | ||
setImmediate(() => { | ||
this.listener.onSuccessfulResolution(this.ipResult!, null, null, null, {}); | ||
this.listener.onSuccessfulResolution( | ||
this.ipResult!, | ||
null, | ||
null, | ||
null, | ||
{} | ||
); | ||
}); | ||
@@ -136,0 +142,0 @@ return; |
@@ -17,10 +17,10 @@ /* | ||
import { isIPv4, isIPv6 } from "net"; | ||
import { StatusObject } from "./call-stream"; | ||
import { ChannelOptions } from "./channel-options"; | ||
import { LogVerbosity, Status } from "./constants"; | ||
import { Metadata } from "./metadata"; | ||
import { registerResolver, Resolver, ResolverListener } from "./resolver"; | ||
import { SubchannelAddress } from "./subchannel"; | ||
import { GrpcUri, splitHostPort, uriToString } from "./uri-parser"; | ||
import { isIPv4, isIPv6 } from 'net'; | ||
import { StatusObject } from './call-stream'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { LogVerbosity, Status } from './constants'; | ||
import { Metadata } from './metadata'; | ||
import { registerResolver, Resolver, ResolverListener } from './resolver'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri, splitHostPort, uriToString } from './uri-parser'; | ||
import * as logging from './logging'; | ||
@@ -56,3 +56,3 @@ | ||
details: `Unrecognized scheme ${target.scheme} in IP resolver`, | ||
metadata: new Metadata() | ||
metadata: new Metadata(), | ||
}; | ||
@@ -68,11 +68,14 @@ return; | ||
details: `Failed to parse ${target.scheme} address ${path}`, | ||
metadata: new Metadata() | ||
metadata: new Metadata(), | ||
}; | ||
return; | ||
} | ||
if ((target.scheme === IPV4_SCHEME && !isIPv4(hostPort.host)) || (target.scheme === IPV6_SCHEME && !isIPv6(hostPort.host))) { | ||
if ( | ||
(target.scheme === IPV4_SCHEME && !isIPv4(hostPort.host)) || | ||
(target.scheme === IPV6_SCHEME && !isIPv6(hostPort.host)) | ||
) { | ||
this.error = { | ||
code: Status.UNAVAILABLE, | ||
details: `Failed to parse ${target.scheme} address ${path}`, | ||
metadata: new Metadata() | ||
metadata: new Metadata(), | ||
}; | ||
@@ -83,3 +86,3 @@ return; | ||
host: hostPort.host, | ||
port: hostPort.port ?? DEFAULT_PORT | ||
port: hostPort.port ?? DEFAULT_PORT, | ||
}); | ||
@@ -93,5 +96,11 @@ } | ||
if (this.error) { | ||
this.listener.onError(this.error) | ||
this.listener.onError(this.error); | ||
} else { | ||
this.listener.onSuccessfulResolution(this.addresses, null, null, null, {}); | ||
this.listener.onSuccessfulResolution( | ||
this.addresses, | ||
null, | ||
null, | ||
null, | ||
{} | ||
); | ||
} | ||
@@ -112,2 +121,2 @@ }); | ||
registerResolver(IPV6_SCHEME, IpResolver); | ||
} | ||
} |
@@ -18,3 +18,3 @@ /* | ||
import { Resolver, ResolverListener, registerResolver } from './resolver'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri } from './uri-parser'; | ||
@@ -21,0 +21,0 @@ import { ChannelOptions } from './channel-options'; |
@@ -19,7 +19,4 @@ /* | ||
import { MethodConfig, ServiceConfig } from './service-config'; | ||
import * as resolver_dns from './resolver-dns'; | ||
import * as resolver_uds from './resolver-uds'; | ||
import * as resolver_ip from './resolver-ip'; | ||
import { StatusObject } from './call-stream'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri, uriToString } from './uri-parser'; | ||
@@ -29,2 +26,3 @@ import { ChannelOptions } from './channel-options'; | ||
import { Status } from './constants'; | ||
import { Filter, FilterFactory } from './filter'; | ||
@@ -34,4 +32,5 @@ export interface CallConfig { | ||
onCommitted?: () => void; | ||
pickInformation: {[key: string]: string}; | ||
pickInformation: { [key: string]: string }; | ||
status: Status; | ||
dynamicFilterFactories: FilterFactory<Filter>[]; | ||
} | ||
@@ -87,3 +86,3 @@ | ||
updateResolution(): void; | ||
/** | ||
@@ -183,7 +182,1 @@ * Destroy the resolver. Should be called when the owning channel shuts down. | ||
} | ||
export function registerAll() { | ||
resolver_dns.setup(); | ||
resolver_uds.setup(); | ||
resolver_ip.setup(); | ||
} |
@@ -21,7 +21,7 @@ /* | ||
LoadBalancer, | ||
LoadBalancingConfig, | ||
getFirstUsableConfig, | ||
LoadBalancingConfig | ||
} from './load-balancer'; | ||
import { ServiceConfig, validateServiceConfig } from './service-config'; | ||
import { ConnectivityState } from './channel'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { ConfigSelector, createResolver, Resolver } from './resolver'; | ||
@@ -36,3 +36,3 @@ import { ServiceError } from './call'; | ||
import { LogVerbosity } from './constants'; | ||
import { SubchannelAddress } from './subchannel'; | ||
import { SubchannelAddress } from './subchannel-address'; | ||
import { GrpcUri, uriToString } from './uri-parser'; | ||
@@ -51,5 +51,10 @@ import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; | ||
function getDefaultConfigSelector(serviceConfig: ServiceConfig | null): ConfigSelector { | ||
return function defaultConfigSelector(methodName: string, metadata: Metadata) { | ||
const splitName = methodName.split('/').filter(x => x.length > 0); | ||
function getDefaultConfigSelector( | ||
serviceConfig: ServiceConfig | null | ||
): ConfigSelector { | ||
return function defaultConfigSelector( | ||
methodName: string, | ||
metadata: Metadata | ||
) { | ||
const splitName = methodName.split('/').filter((x) => x.length > 0); | ||
const service = splitName[0] ?? ''; | ||
@@ -60,7 +65,11 @@ const method = splitName[1] ?? ''; | ||
for (const name of methodConfig.name) { | ||
if (name.service === service && (name.method === undefined || name.method === method)) { | ||
if ( | ||
name.service === service && | ||
(name.method === undefined || name.method === method) | ||
) { | ||
return { | ||
methodConfig: methodConfig, | ||
pickInformation: {}, | ||
status: Status.OK | ||
status: Status.OK, | ||
dynamicFilterFactories: [] | ||
}; | ||
@@ -72,7 +81,8 @@ } | ||
return { | ||
methodConfig: {name: []}, | ||
methodConfig: { name: [] }, | ||
pickInformation: {}, | ||
status: Status.OK | ||
status: Status.OK, | ||
dynamicFilterFactories: [] | ||
}; | ||
} | ||
}; | ||
} | ||
@@ -170,2 +180,8 @@ | ||
}, | ||
addChannelzChild: channelControlHelper.addChannelzChild.bind( | ||
channelControlHelper | ||
), | ||
removeChannelzChild: channelControlHelper.removeChannelzChild.bind( | ||
channelControlHelper | ||
) | ||
}); | ||
@@ -210,3 +226,6 @@ this.innerResolver = createResolver( | ||
workingServiceConfig?.loadBalancingConfig ?? []; | ||
const loadBalancingConfig = getFirstUsableConfig(workingConfigList, true); | ||
const loadBalancingConfig = getFirstUsableConfig( | ||
workingConfigList, | ||
true | ||
); | ||
if (loadBalancingConfig === null) { | ||
@@ -227,4 +246,7 @@ // There were load balancing configs but none are supported. This counts as a resolution failure | ||
); | ||
const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig; | ||
this.onSuccessfulResolution(configSelector ?? getDefaultConfigSelector(finalServiceConfig)); | ||
const finalServiceConfig = | ||
workingServiceConfig ?? this.defaultServiceConfig; | ||
this.onSuccessfulResolution( | ||
configSelector ?? getDefaultConfigSelector(finalServiceConfig) | ||
); | ||
}, | ||
@@ -231,0 +253,0 @@ onError: (error: StatusObject) => { |
@@ -103,3 +103,4 @@ /* | ||
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter | ||
export class ServerUnaryCallImpl<RequestType, ResponseType> | ||
extends EventEmitter | ||
implements ServerUnaryCall<RequestType, ResponseType> { | ||
@@ -243,3 +244,4 @@ cancelled: boolean; | ||
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex | ||
export class ServerDuplexStreamImpl<RequestType, ResponseType> | ||
extends Duplex | ||
implements ServerDuplexStream<RequestType, ResponseType> { | ||
@@ -414,2 +416,4 @@ cancelled: boolean; | ||
this.emit('cancelled', 'cancelled'); | ||
this.emit('streamEnd', false); | ||
this.sendStatus({code: Status.CANCELLED, details: 'Cancelled by client', metadata: new Metadata()}); | ||
}); | ||
@@ -517,2 +521,3 @@ | ||
this.emit('receiveMessage'); | ||
resolve(this.deserializeMessage(requestBytes)); | ||
@@ -580,2 +585,4 @@ } catch (err) { | ||
sendStatus(statusObj: StatusObject) { | ||
this.emit('callEnd', statusObj.code); | ||
this.emit('streamEnd', statusObj.code === Status.OK); | ||
if (this.checkCancelled()) { | ||
@@ -615,5 +622,2 @@ return; | ||
sendError(error: ServerErrorResponse | ServerStatusResponse) { | ||
if (this.checkCancelled()) { | ||
return; | ||
} | ||
const status: StatusObject = { | ||
@@ -660,2 +664,3 @@ code: Status.UNKNOWN, | ||
this.sendMetadata(); | ||
this.emit('sendMessage'); | ||
return this.stream.write(chunk); | ||
@@ -696,2 +701,3 @@ } | ||
} | ||
this.emit('receiveMessage'); | ||
this.pushOrBufferMessage(readable, message); | ||
@@ -698,0 +704,0 @@ } |
@@ -59,11 +59,10 @@ /* | ||
subchannelAddressToString, | ||
} from './subchannel'; | ||
stringToSubchannelAddress, | ||
} from './subchannel-address'; | ||
import { parseUri } from './uri-parser'; | ||
import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; | ||
import { CipherNameAndProtocol, TLSSocket } from 'tls'; | ||
const TRACER_NAME = 'server'; | ||
function trace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); | ||
} | ||
interface BindResult { | ||
@@ -129,4 +128,17 @@ port: number; | ||
interface ChannelzSessionInfo { | ||
ref: SocketRef; | ||
streamTracker: ChannelzCallTracker; | ||
messagesSent: number; | ||
messagesReceived: number; | ||
lastMessageSentTimestamp: Date | null; | ||
lastMessageReceivedTimestamp: Date | null; | ||
} | ||
interface ChannelzListenerInfo { | ||
ref: SocketRef; | ||
} | ||
export class Server { | ||
private http2ServerList: (http2.Http2Server | http2.Http2SecureServer)[] = []; | ||
private http2ServerList: { server: (http2.Http2Server | http2.Http2SecureServer), channelzRef: SocketRef }[] = []; | ||
@@ -137,10 +149,77 @@ private handlers: Map<string, UntypedHandler> = new Map< | ||
>(); | ||
private sessions = new Set<http2.ServerHttp2Session>(); | ||
private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>(); | ||
private started = false; | ||
private options: ChannelOptions; | ||
// Channelz Info | ||
private channelzRef: ServerRef; | ||
private channelzTrace = new ChannelzTrace(); | ||
private callTracker = new ChannelzCallTracker(); | ||
private listenerChildrenTracker = new ChannelzChildrenTracker(); | ||
private sessionChildrenTracker = new ChannelzChildrenTracker(); | ||
constructor(options?: ChannelOptions) { | ||
this.options = options ?? {}; | ||
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo()); | ||
this.channelzTrace.addTrace('CT_INFO', 'Server created'); | ||
this.trace('Server constructed'); | ||
} | ||
private getChannelzInfo(): ServerInfo { | ||
return { | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
listenerChildren: this.listenerChildrenTracker.getChildLists(), | ||
sessionChildren: this.sessionChildrenTracker.getChildLists() | ||
}; | ||
} | ||
private getChannelzSessionInfoGetter(session: http2.ServerHttp2Session): () => SocketInfo { | ||
return () => { | ||
const sessionInfo = this.sessions.get(session)!; | ||
const sessionSocket = session.socket; | ||
const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; | ||
const localAddress = stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort); | ||
let tlsInfo: TlsInfo | null; | ||
if (session.encrypted) { | ||
const tlsSocket: TLSSocket = sessionSocket as TLSSocket; | ||
const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); | ||
const certificate = tlsSocket.getCertificate(); | ||
const peerCertificate = tlsSocket.getPeerCertificate(); | ||
tlsInfo = { | ||
cipherSuiteStandardName: cipherInfo.standardName ?? null, | ||
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, | ||
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, | ||
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null | ||
}; | ||
} else { | ||
tlsInfo = null; | ||
} | ||
const socketInfo: SocketInfo = { | ||
remoteAddress: remoteAddress, | ||
localAddress: localAddress, | ||
security: tlsInfo, | ||
remoteName: null, | ||
streamsStarted: sessionInfo.streamTracker.callsStarted, | ||
streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, | ||
streamsFailed: sessionInfo.streamTracker.callsFailed, | ||
messagesSent: sessionInfo.messagesSent, | ||
messagesReceived: sessionInfo.messagesReceived, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp, | ||
lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, | ||
lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, | ||
localFlowControlWindow: session.state.localWindowSize ?? null, | ||
remoteFlowControlWindow: session.state.remoteWindowSize ?? null | ||
}; | ||
return socketInfo; | ||
}; | ||
} | ||
private trace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + text); | ||
} | ||
addProtoService(): void { | ||
@@ -215,6 +294,3 @@ throw new Error('Not implemented. Use addService() instead'); | ||
removeService(service: ServiceDefinition): void { | ||
if ( | ||
service === null || | ||
typeof service !== 'object' | ||
) { | ||
if (service === null || typeof service !== 'object') { | ||
throw new Error('removeService() requires object as argument'); | ||
@@ -247,4 +323,4 @@ } | ||
if (creds === null || typeof creds !== 'object') { | ||
throw new TypeError('creds must be an object'); | ||
if (creds === null || !(creds instanceof ServerCredentials)) { | ||
throw new TypeError('creds must be a ServerCredentials object'); | ||
} | ||
@@ -266,6 +342,8 @@ | ||
const serverOptions: http2.ServerOptions = { | ||
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER | ||
maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, | ||
}; | ||
if ('grpc-node.max_session_memory' in this.options) { | ||
serverOptions.maxSessionMemory = this.options['grpc-node.max_session_memory']; | ||
serverOptions.maxSessionMemory = this.options[ | ||
'grpc-node.max_session_memory' | ||
]; | ||
} | ||
@@ -278,2 +356,6 @@ if ('grpc.max_concurrent_streams' in this.options) { | ||
const deferredCallback = (error: Error | null, port: number) => { | ||
process.nextTick(() => callback(error, port)); | ||
} | ||
const setupServer = (): http2.Http2Server | http2.Http2SecureServer => { | ||
@@ -306,3 +388,3 @@ let http2Server: http2.Http2Server | http2.Http2SecureServer; | ||
addressList.map((address) => { | ||
trace('Attempting to bind ' + subchannelAddressToString(address)); | ||
this.trace('Attempting to bind ' + subchannelAddressToString(address)); | ||
let addr: SubchannelAddress; | ||
@@ -320,3 +402,4 @@ if (isTcpSubchannelAddress(address)) { | ||
return new Promise<number | Error>((resolve, reject) => { | ||
function onError(err: Error): void { | ||
const onError = (err: Error) => { | ||
this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message); | ||
resolve(err); | ||
@@ -328,10 +411,38 @@ } | ||
http2Server.listen(addr, () => { | ||
trace('Successfully bound ' + subchannelAddressToString(address)); | ||
this.http2ServerList.push(http2Server); | ||
const boundAddress = http2Server.address()!; | ||
let boundSubchannelAddress: SubchannelAddress; | ||
if (typeof boundAddress === 'string') { | ||
resolve(portNum); | ||
boundSubchannelAddress = { | ||
path: boundAddress | ||
}; | ||
} else { | ||
resolve(boundAddress.port); | ||
boundSubchannelAddress = { | ||
host: boundAddress.address, | ||
port: boundAddress.port | ||
} | ||
} | ||
const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { | ||
return { | ||
localAddress: boundSubchannelAddress, | ||
remoteAddress: null, | ||
security: null, | ||
remoteName: null, | ||
streamsStarted: 0, | ||
streamsSucceeded: 0, | ||
streamsFailed: 0, | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null, | ||
localFlowControlWindow: null, | ||
remoteFlowControlWindow: null | ||
}; | ||
}); | ||
this.listenerChildrenTracker.refChild(channelzRef); | ||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); | ||
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); | ||
resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum); | ||
http2Server.removeListener('error', onError); | ||
@@ -369,3 +480,4 @@ }); | ||
return new Promise<BindResult>((resolve, reject) => { | ||
function onError(err: Error): void { | ||
const onError = (err: Error) => { | ||
this.trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message); | ||
resolve(bindWildcardPort(addressList.slice(1))); | ||
@@ -377,7 +489,34 @@ } | ||
http2Server.listen(address, () => { | ||
this.http2ServerList.push(http2Server); | ||
const boundAddress = http2Server.address() as AddressInfo; | ||
const boundSubchannelAddress: SubchannelAddress = { | ||
host: boundAddress.address, | ||
port: boundAddress.port | ||
}; | ||
const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { | ||
return { | ||
localAddress: boundSubchannelAddress, | ||
remoteAddress: null, | ||
security: null, | ||
remoteName: null, | ||
streamsStarted: 0, | ||
streamsSucceeded: 0, | ||
streamsFailed: 0, | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
keepAlivesSent: 0, | ||
lastLocalStreamCreatedTimestamp: null, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null, | ||
localFlowControlWindow: null, | ||
remoteFlowControlWindow: null | ||
}; | ||
}); | ||
this.listenerChildrenTracker.refChild(channelzRef); | ||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); | ||
this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); | ||
resolve( | ||
bindSpecificPort( | ||
addressList.slice(1), | ||
(http2Server.address() as AddressInfo).port, | ||
boundAddress.port, | ||
1 | ||
@@ -400,3 +539,3 @@ ) | ||
if (addressList.length === 0) { | ||
callback(new Error(`No addresses resolved for port ${port}`), 0); | ||
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0); | ||
return; | ||
@@ -424,3 +563,3 @@ } | ||
logging.log(LogVerbosity.ERROR, errorString); | ||
callback(new Error(errorString), 0); | ||
deferredCallback(new Error(errorString), 0); | ||
} else { | ||
@@ -433,3 +572,3 @@ if (bindResult.count < addressList.length) { | ||
} | ||
callback(null, bindResult.port); | ||
deferredCallback(null, bindResult.port); | ||
} | ||
@@ -440,3 +579,3 @@ }, | ||
logging.log(LogVerbosity.ERROR, errorString); | ||
callback(new Error(errorString), 0); | ||
deferredCallback(new Error(errorString), 0); | ||
} | ||
@@ -446,3 +585,3 @@ ); | ||
onError: (error) => { | ||
callback(new Error(error.details), 0); | ||
deferredCallback(new Error(error.details), 0); | ||
}, | ||
@@ -458,5 +597,8 @@ }; | ||
for (const http2Server of this.http2ServerList) { | ||
for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) { | ||
if (http2Server.listening) { | ||
http2Server.close(); | ||
http2Server.close(() => { | ||
this.listenerChildrenTracker.unrefChild(ref); | ||
unregisterChannelzRef(ref); | ||
}); | ||
} | ||
@@ -469,3 +611,3 @@ } | ||
// tryShutdown() calls are in progress. Don't wait on them to finish. | ||
this.sessions.forEach((session) => { | ||
this.sessions.forEach((channelzInfo, session) => { | ||
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to | ||
@@ -477,2 +619,3 @@ // recognize destroy(code) as a valid signature. | ||
this.sessions.clear(); | ||
unregisterChannelzRef(this.channelzRef); | ||
} | ||
@@ -509,3 +652,3 @@ | ||
this.http2ServerList.every( | ||
(http2Server) => http2Server.listening !== true | ||
({server: http2Server}) => http2Server.listening !== true | ||
) | ||
@@ -519,3 +662,3 @@ ) { | ||
} | ||
this.channelzTrace.addTrace('CT_INFO', 'Starting'); | ||
this.started = true; | ||
@@ -525,2 +668,6 @@ } | ||
tryShutdown(callback: (error?: Error) => void): void { | ||
const wrappedCallback = (error?: Error) => { | ||
unregisterChannelzRef(this.channelzRef); | ||
callback(error); | ||
}; | ||
let pendingChecks = 0; | ||
@@ -532,3 +679,3 @@ | ||
if (pendingChecks === 0) { | ||
callback(); | ||
wrappedCallback(); | ||
} | ||
@@ -540,10 +687,14 @@ } | ||
for (const http2Server of this.http2ServerList) { | ||
for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) { | ||
if (http2Server.listening) { | ||
pendingChecks++; | ||
http2Server.close(maybeCallback); | ||
http2Server.close(() => { | ||
this.listenerChildrenTracker.unrefChild(ref); | ||
unregisterChannelzRef(ref); | ||
maybeCallback(); | ||
}); | ||
} | ||
} | ||
this.sessions.forEach((session) => { | ||
this.sessions.forEach((channelzInfo, session) => { | ||
if (!session.closed) { | ||
@@ -555,3 +706,3 @@ pendingChecks += 1; | ||
if (pendingChecks === 0) { | ||
callback(); | ||
wrappedCallback(); | ||
} | ||
@@ -564,2 +715,6 @@ } | ||
getChannelzRef() { | ||
return this.channelzRef; | ||
} | ||
private _setupHandlers( | ||
@@ -575,2 +730,5 @@ http2Server: http2.Http2Server | http2.Http2SecureServer | ||
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { | ||
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); | ||
this.callTracker.addCallStarted(); | ||
channelzSessionInfo?.streamTracker.addCallStarted(); | ||
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; | ||
@@ -589,5 +747,9 @@ | ||
); | ||
this.callTracker.addCallFailed(); | ||
channelzSessionInfo?.streamTracker.addCallFailed(); | ||
return; | ||
} | ||
let call: Http2ServerCallStream<any, any> | null = null; | ||
try { | ||
@@ -605,3 +767,3 @@ const path = headers[http2.constants.HTTP2_HEADER_PATH] as string; | ||
} | ||
trace( | ||
this.trace( | ||
'Received call to method ' + | ||
@@ -615,3 +777,3 @@ path + | ||
if (handler === undefined) { | ||
trace( | ||
this.trace( | ||
'No handler registered for method ' + | ||
@@ -624,3 +786,27 @@ path + | ||
const call = new Http2ServerCallStream(stream, handler, this.options); | ||
call = new Http2ServerCallStream(stream, handler, this.options); | ||
call.once('callEnd', (code: Status) => { | ||
if (code === Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
if (channelzSessionInfo) { | ||
call.once('streamEnd', (success: boolean) => { | ||
if (success) { | ||
channelzSessionInfo.streamTracker.addCallSucceeded(); | ||
} else { | ||
channelzSessionInfo.streamTracker.addCallFailed(); | ||
} | ||
}); | ||
call.on('sendMessage', () => { | ||
channelzSessionInfo.messagesSent += 1; | ||
channelzSessionInfo.lastMessageSentTimestamp = new Date(); | ||
}); | ||
call.on('receiveMessage', () => { | ||
channelzSessionInfo.messagesReceived += 1; | ||
channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); | ||
}); | ||
} | ||
const metadata: Metadata = call.receiveMetadata(headers) as Metadata; | ||
@@ -656,3 +842,7 @@ switch (handler.type) { | ||
} catch (err) { | ||
const call = new Http2ServerCallStream(stream, null!, this.options); | ||
if (!call) { | ||
call = new Http2ServerCallStream(stream, null!, this.options); | ||
this.callTracker.addCallFailed(); | ||
channelzSessionInfo?.streamTracker.addCallFailed() | ||
} | ||
@@ -674,5 +864,21 @@ if (err.code === undefined) { | ||
this.sessions.add(session); | ||
const channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session)); | ||
const channelzSessionInfo: ChannelzSessionInfo = { | ||
ref: channelzRef, | ||
streamTracker: new ChannelzCallTracker(), | ||
messagesSent: 0, | ||
messagesReceived: 0, | ||
lastMessageSentTimestamp: null, | ||
lastMessageReceivedTimestamp: null | ||
}; | ||
this.sessions.set(session, channelzSessionInfo); | ||
const clientAddress = session.socket.remoteAddress; | ||
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); | ||
this.sessionChildrenTracker.refChild(channelzRef); | ||
session.on('close', () => { | ||
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); | ||
this.sessionChildrenTracker.unrefChild(channelzRef); | ||
unregisterChannelzRef(channelzRef); | ||
this.sessions.delete(session); | ||
@@ -679,0 +885,0 @@ }); |
@@ -30,3 +30,6 @@ /* | ||
import * as os from 'os'; | ||
import { LoadBalancingConfig, validateLoadBalancingConfig } from './load-balancer'; | ||
import { | ||
LoadBalancingConfig, | ||
validateLoadBalancingConfig, | ||
} from './load-balancer'; | ||
@@ -38,6 +41,11 @@ export interface MethodConfigName { | ||
export interface Duration { | ||
seconds: number; | ||
nanos: number; | ||
} | ||
export interface MethodConfig { | ||
name: MethodConfigName[]; | ||
waitForReady?: boolean; | ||
timeout?: string; | ||
timeout?: Duration; | ||
maxRequestBytes?: number; | ||
@@ -106,9 +114,30 @@ maxResponseBytes?: number; | ||
if ('timeout' in obj) { | ||
if ( | ||
!(typeof obj.timeout === 'string') || | ||
!TIMEOUT_REGEX.test(obj.timeout) | ||
if (typeof obj.timeout === 'object') { | ||
if ( | ||
!('seconds' in obj.timeout) || | ||
!(typeof obj.timeout.seconds === 'number') | ||
) { | ||
throw new Error('Invalid method config: invalid timeout.seconds'); | ||
} | ||
if ( | ||
!('nanos' in obj.timeout) || | ||
!(typeof obj.timeout.nanos === 'number') | ||
) { | ||
throw new Error('Invalid method config: invalid timeout.nanos'); | ||
} | ||
result.timeout = obj.timeout; | ||
} else if ( | ||
typeof obj.timeout === 'string' && | ||
TIMEOUT_REGEX.test(obj.timeout) | ||
) { | ||
const timeoutParts = obj.timeout | ||
.substring(0, obj.timeout.length - 1) | ||
.split('.'); | ||
result.timeout = { | ||
seconds: timeoutParts[0] | 0, | ||
nanos: (timeoutParts[1] ?? 0) | 0, | ||
}; | ||
} else { | ||
throw new Error('Invalid method config: invalid timeout'); | ||
} | ||
result.timeout = obj.timeout; | ||
} | ||
@@ -115,0 +144,0 @@ if ('maxRequestBytes' in obj) { |
@@ -19,7 +19,7 @@ /* | ||
import { ChannelOptions, channelOptionsEqual } from './channel-options'; | ||
import { Subchannel } from './subchannel'; | ||
import { | ||
Subchannel, | ||
SubchannelAddress, | ||
subchannelAddressEqual, | ||
} from './subchannel'; | ||
} from './subchannel-address'; | ||
import { ChannelCredentials } from './channel-credentials'; | ||
@@ -26,0 +26,0 @@ import { GrpcUri, uriToString } from './uri-parser'; |
@@ -21,10 +21,10 @@ /* | ||
import { Metadata } from './metadata'; | ||
import { Http2CallStream } from './call-stream'; | ||
import { Call, Http2CallStream, WriteObject } from './call-stream'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { PeerCertificate, checkServerIdentity } from 'tls'; | ||
import { ConnectivityState } from './channel'; | ||
import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls'; | ||
import { ConnectivityState } from './connectivity-state'; | ||
import { BackoffTimeout, BackoffOptions } from './backoff-timeout'; | ||
import { getDefaultAuthority } from './resolver'; | ||
import * as logging from './logging'; | ||
import { LogVerbosity } from './constants'; | ||
import { LogVerbosity, Status } from './constants'; | ||
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; | ||
@@ -34,3 +34,9 @@ import * as net from 'net'; | ||
import { ConnectionOptions } from 'tls'; | ||
import { FilterFactory, Filter } from './filter'; | ||
import { FilterFactory, Filter, BaseFilter } from './filter'; | ||
import { | ||
stringToSubchannelAddress, | ||
SubchannelAddress, | ||
subchannelAddressToString, | ||
} from './subchannel-address'; | ||
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; | ||
@@ -41,10 +47,2 @@ const clientVersion = require('../../package.json').version; | ||
function trace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); | ||
} | ||
function refTrace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', text); | ||
} | ||
const MIN_CONNECT_TIMEOUT_MS = 20000; | ||
@@ -68,2 +66,7 @@ const INITIAL_BACKOFF_MS = 1000; | ||
export interface SubchannelCallStatsTracker { | ||
addMessageSent(): void; | ||
addMessageReceived(): void; | ||
} | ||
const { | ||
@@ -89,48 +92,2 @@ HTTP2_HEADER_AUTHORITY, | ||
export interface TcpSubchannelAddress { | ||
port: number; | ||
host: string; | ||
} | ||
export interface IpcSubchannelAddress { | ||
path: string; | ||
} | ||
/** | ||
* This represents a single backend address to connect to. This interface is a | ||
* subset of net.SocketConnectOpts, i.e. the options described at | ||
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener. | ||
* Those are in turn a subset of the options that can be passed to http2.connect. | ||
*/ | ||
export type SubchannelAddress = TcpSubchannelAddress | IpcSubchannelAddress; | ||
export function isTcpSubchannelAddress( | ||
address: SubchannelAddress | ||
): address is TcpSubchannelAddress { | ||
return 'port' in address; | ||
} | ||
export function subchannelAddressEqual( | ||
address1: SubchannelAddress, | ||
address2: SubchannelAddress | ||
): boolean { | ||
if (isTcpSubchannelAddress(address1)) { | ||
return ( | ||
isTcpSubchannelAddress(address2) && | ||
address1.host === address2.host && | ||
address1.port === address2.port | ||
); | ||
} else { | ||
return !isTcpSubchannelAddress(address2) && address1.path === address2.path; | ||
} | ||
} | ||
export function subchannelAddressToString(address: SubchannelAddress): string { | ||
if (isTcpSubchannelAddress(address)) { | ||
return address.host + ':' + address.port; | ||
} else { | ||
return address.path; | ||
} | ||
} | ||
export class Subchannel { | ||
@@ -191,3 +148,3 @@ /** | ||
*/ | ||
private keepaliveWithoutCalls: boolean = false; | ||
private keepaliveWithoutCalls = false; | ||
@@ -208,3 +165,23 @@ /** | ||
// Channelz info | ||
private channelzRef: SubchannelRef; | ||
private channelzTrace: ChannelzTrace; | ||
private callTracker = new ChannelzCallTracker(); | ||
private childrenTracker = new ChannelzChildrenTracker(); | ||
// Channelz socket info | ||
private channelzSocketRef: SocketRef | null = null; | ||
/** | ||
* Name of the remote server, if it is not the same as the subchannel | ||
* address, i.e. if connecting through an HTTP CONNECT proxy. | ||
*/ | ||
private remoteName: string | null = null; | ||
private streamTracker = new ChannelzCallTracker(); | ||
private keepalivesSent = 0; | ||
private messagesSent = 0; | ||
private messagesReceived = 0; | ||
private lastMessageSentTimestamp: Date | null = null; | ||
private lastMessageReceivedTimestamp: Date | null = null; | ||
/** | ||
* A class representing a connection to a single backend. | ||
@@ -241,3 +218,4 @@ * @param channelTarget The target string for the channel as a whole | ||
if ('grpc.keepalive_permit_without_calls' in options) { | ||
this.keepaliveWithoutCalls = options['grpc.keepalive_permit_without_calls'] === 1; | ||
this.keepaliveWithoutCalls = | ||
options['grpc.keepalive_permit_without_calls'] === 1; | ||
} else { | ||
@@ -258,5 +236,85 @@ this.keepaliveWithoutCalls = false; | ||
this.subchannelAddressString = subchannelAddressToString(subchannelAddress); | ||
trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo()); | ||
this.channelzTrace = new ChannelzTrace(); | ||
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); | ||
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); | ||
} | ||
private getChannelzInfo(): SubchannelInfo { | ||
return { | ||
state: this.connectivityState, | ||
trace: this.channelzTrace, | ||
callTracker: this.callTracker, | ||
children: this.childrenTracker.getChildLists(), | ||
target: this.subchannelAddressString | ||
}; | ||
} | ||
private getChannelzSocketInfo(): SocketInfo | null { | ||
if (this.session === null) { | ||
return null; | ||
} | ||
const sessionSocket = this.session.socket; | ||
const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; | ||
const localAddress = stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort); | ||
let tlsInfo: TlsInfo | null; | ||
if (this.session.encrypted) { | ||
const tlsSocket: TLSSocket = sessionSocket as TLSSocket; | ||
const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); | ||
const certificate = tlsSocket.getCertificate(); | ||
const peerCertificate = tlsSocket.getPeerCertificate(); | ||
tlsInfo = { | ||
cipherSuiteStandardName: cipherInfo.standardName ?? null, | ||
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, | ||
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, | ||
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null | ||
}; | ||
} else { | ||
tlsInfo = null; | ||
} | ||
const socketInfo: SocketInfo = { | ||
remoteAddress: remoteAddress, | ||
localAddress: localAddress, | ||
security: tlsInfo, | ||
remoteName: this.remoteName, | ||
streamsStarted: this.streamTracker.callsStarted, | ||
streamsSucceeded: this.streamTracker.callsSucceeded, | ||
streamsFailed: this.streamTracker.callsFailed, | ||
messagesSent: this.messagesSent, | ||
messagesReceived: this.messagesReceived, | ||
keepAlivesSent: this.keepalivesSent, | ||
lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, | ||
lastRemoteStreamCreatedTimestamp: null, | ||
lastMessageSentTimestamp: this.lastMessageSentTimestamp, | ||
lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, | ||
localFlowControlWindow: this.session.state.localWindowSize ?? null, | ||
remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null | ||
}; | ||
return socketInfo; | ||
} | ||
private resetChannelzSocketInfo() { | ||
if (this.channelzSocketRef) { | ||
unregisterChannelzRef(this.channelzSocketRef); | ||
this.childrenTracker.unrefChild(this.channelzSocketRef); | ||
this.channelzSocketRef = null; | ||
} | ||
this.remoteName = null; | ||
this.streamTracker = new ChannelzCallTracker(); | ||
this.keepalivesSent = 0; | ||
this.messagesSent = 0; | ||
this.messagesReceived = 0; | ||
this.lastMessageSentTimestamp = null; | ||
this.lastMessageReceivedTimestamp = null; | ||
} | ||
private trace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); | ||
} | ||
private refTrace(text: string): void { | ||
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); | ||
} | ||
private handleBackoffTimer() { | ||
@@ -289,3 +347,9 @@ if (this.continueConnecting) { | ||
private sendPing() { | ||
logging.trace(LogVerbosity.DEBUG, 'keepalive', 'Sending ping to ' + this.subchannelAddressString); | ||
this.keepalivesSent += 1; | ||
logging.trace( | ||
LogVerbosity.DEBUG, | ||
'keepalive', | ||
'(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + | ||
'Sending ping' | ||
); | ||
this.keepaliveTimeoutId = setTimeout(() => { | ||
@@ -306,3 +370,3 @@ this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE); | ||
}, this.keepaliveTimeMs); | ||
this.keepaliveIntervalId.unref?.() | ||
this.keepaliveIntervalId.unref?.(); | ||
/* Don't send a ping immediately because whatever caused us to start | ||
@@ -319,5 +383,7 @@ * sending pings should also involve some network activity. */ | ||
if (proxyConnectionResult.realTarget) { | ||
trace(this.subchannelAddressString + ' creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); | ||
this.remoteName = uriToString(proxyConnectionResult.realTarget); | ||
this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); | ||
} else { | ||
trace(this.subchannelAddressString + ' creating HTTP/2 session'); | ||
this.remoteName = null; | ||
this.trace('creating HTTP/2 session'); | ||
} | ||
@@ -331,3 +397,5 @@ const targetAuthority = getDefaultAuthority( | ||
if ('grpc-node.max_session_memory' in this.options) { | ||
connectionOptions.maxSessionMemory = this.options['grpc-node.max_session_memory']; | ||
connectionOptions.maxSessionMemory = this.options[ | ||
'grpc-node.max_session_memory' | ||
]; | ||
} | ||
@@ -410,2 +478,4 @@ let addressScheme = 'http://'; | ||
this.session = session; | ||
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!); | ||
this.childrenTracker.refChild(this.channelzSocketRef); | ||
session.unref(); | ||
@@ -426,3 +496,3 @@ /* For all of these events, check if the session at the time of the event | ||
if (this.session === session) { | ||
trace(this.subchannelAddressString + ' connection closed'); | ||
this.trace('connection closed'); | ||
this.transitionToState( | ||
@@ -457,8 +527,11 @@ [ConnectivityState.CONNECTING], | ||
LogVerbosity.ERROR, | ||
`Connection to ${uriToString(this.channelTarget)} at ${this.subchannelAddressString} rejected by server because of excess pings. Increasing ping interval to ${this.keepaliveTimeMs} ms` | ||
`Connection to ${uriToString(this.channelTarget)} at ${ | ||
this.subchannelAddressString | ||
} rejected by server because of excess pings. Increasing ping interval to ${ | ||
this.keepaliveTimeMs | ||
} ms` | ||
); | ||
} | ||
trace( | ||
this.subchannelAddressString + | ||
' connection closed by GOAWAY with code ' + | ||
this.trace( | ||
'connection closed by GOAWAY with code ' + | ||
errorCode | ||
@@ -476,8 +549,8 @@ ); | ||
* where we want to handle that. */ | ||
trace( | ||
this.subchannelAddressString + | ||
' connection closed with error ' + | ||
this.trace( | ||
'connection closed with error ' + | ||
(error as Error).message | ||
); | ||
}); | ||
registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!); | ||
} | ||
@@ -557,9 +630,8 @@ | ||
} | ||
trace( | ||
this.subchannelAddressString + | ||
' ' + | ||
ConnectivityState[this.connectivityState] + | ||
this.trace( | ||
ConnectivityState[this.connectivityState] + | ||
' -> ' + | ||
ConnectivityState[newState] | ||
); | ||
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); | ||
const previousState = this.connectivityState; | ||
@@ -589,2 +661,3 @@ this.connectivityState = newState; | ||
this.session = null; | ||
this.resetChannelzSocketInfo(); | ||
this.stopKeepalivePings(); | ||
@@ -605,2 +678,3 @@ /* If the backoff timer has already ended by the time we get to the | ||
this.session = null; | ||
this.resetChannelzSocketInfo(); | ||
this.stopKeepalivePings(); | ||
@@ -627,9 +701,8 @@ break; | ||
if (this.callRefcount === 0 && this.refcount === 0) { | ||
this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); | ||
this.transitionToState( | ||
[ | ||
ConnectivityState.CONNECTING, | ||
ConnectivityState.READY, | ||
], | ||
[ConnectivityState.CONNECTING, ConnectivityState.READY], | ||
ConnectivityState.TRANSIENT_FAILURE | ||
); | ||
unregisterChannelzRef(this.channelzRef); | ||
} | ||
@@ -639,5 +712,4 @@ } | ||
callRef() { | ||
refTrace( | ||
this.subchannelAddressString + | ||
' callRefcount ' + | ||
this.refTrace( | ||
'callRefcount ' + | ||
this.callRefcount + | ||
@@ -660,5 +732,4 @@ ' -> ' + | ||
callUnref() { | ||
refTrace( | ||
this.subchannelAddressString + | ||
' callRefcount ' + | ||
this.refTrace( | ||
'callRefcount ' + | ||
this.callRefcount + | ||
@@ -682,5 +753,4 @@ ' -> ' + | ||
ref() { | ||
refTrace( | ||
this.subchannelAddressString + | ||
' refcount ' + | ||
this.refTrace( | ||
'refcount ' + | ||
this.refcount + | ||
@@ -694,5 +764,4 @@ ' -> ' + | ||
unref() { | ||
refTrace( | ||
this.subchannelAddressString + | ||
' refcount ' + | ||
this.refTrace( | ||
'refcount ' + | ||
this.refcount + | ||
@@ -724,3 +793,3 @@ ' -> ' + | ||
callStream: Http2CallStream, | ||
extraFilterFactory?: FilterFactory<Filter> | ||
extraFilters: Filter[] | ||
) { | ||
@@ -756,4 +825,39 @@ const headers = metadata.toHttp2Headers(); | ||
} | ||
logging.trace(LogVerbosity.DEBUG, 'call_stream', 'Starting stream on subchannel ' + this.subchannelAddressString + ' with headers\n' + headersString); | ||
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); | ||
logging.trace( | ||
LogVerbosity.DEBUG, | ||
'call_stream', | ||
'Starting stream on subchannel ' + | ||
'(' + this.channelzRef.id + ') ' + | ||
this.subchannelAddressString + | ||
' with headers\n' + | ||
headersString | ||
); | ||
this.callTracker.addCallStarted(); | ||
callStream.addStatusWatcher(status => { | ||
if (status.code === Status.OK) { | ||
this.callTracker.addCallSucceeded(); | ||
} else { | ||
this.callTracker.addCallFailed(); | ||
} | ||
}); | ||
const streamSession = this.session; | ||
this.streamTracker.addCallStarted(); | ||
callStream.addStreamEndWatcher(success => { | ||
if (streamSession === this.session) { | ||
if (success) { | ||
this.streamTracker.addCallSucceeded(); | ||
} else { | ||
this.streamTracker.addCallFailed(); | ||
} | ||
} | ||
}); | ||
callStream.attachHttp2Stream(http2Stream, this, extraFilters, { | ||
addMessageSent: () => { | ||
this.messagesSent += 1; | ||
this.lastMessageSentTimestamp = new Date(); | ||
}, | ||
addMessageReceived: () => { | ||
this.messagesReceived += 1; | ||
} | ||
}); | ||
} | ||
@@ -837,2 +941,6 @@ | ||
} | ||
getChannelzRef(): SubchannelRef { | ||
return this.channelzRef; | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1205907
243
24880
48
2
+ Added@grpc/proto-loader@^0.6.4
+ Added@grpc/proto-loader@0.6.13(transitive)
+ Added@protobufjs/aspromise@1.1.2(transitive)
+ Added@protobufjs/base64@1.1.2(transitive)
+ Added@protobufjs/codegen@2.0.4(transitive)
+ Added@protobufjs/eventemitter@1.1.0(transitive)
+ Added@protobufjs/fetch@1.1.0(transitive)
+ Added@protobufjs/float@1.0.2(transitive)
+ Added@protobufjs/inquire@1.1.0(transitive)
+ Added@protobufjs/path@1.1.2(transitive)
+ Added@protobufjs/pool@1.1.0(transitive)
+ Added@protobufjs/utf8@1.1.0(transitive)
+ Added@types/long@4.0.2(transitive)
+ Addedansi-regex@5.0.1(transitive)
+ Addedansi-styles@4.3.0(transitive)
+ Addedcliui@7.0.4(transitive)
+ Addedcolor-convert@2.0.1(transitive)
+ Addedcolor-name@1.1.4(transitive)
+ Addedemoji-regex@8.0.0(transitive)
+ Addedescalade@3.2.0(transitive)
+ Addedget-caller-file@2.0.5(transitive)
+ Addedis-fullwidth-code-point@3.0.0(transitive)
+ Addedlodash.camelcase@4.3.0(transitive)
+ Addedlong@4.0.0(transitive)
+ Addedprotobufjs@6.11.4(transitive)
+ Addedrequire-directory@2.1.1(transitive)
+ Addedstring-width@4.2.3(transitive)
+ Addedstrip-ansi@6.0.1(transitive)
+ Addedwrap-ansi@7.0.0(transitive)
+ Addedy18n@5.0.8(transitive)
+ Addedyargs@16.2.0(transitive)
+ Addedyargs-parser@20.2.9(transitive)