@smithy/node-http-handler
Advanced tools
| const ids = new Uint16Array(1); | ||
| export class ClientHttp2SessionRef { | ||
| id = ids[0]++; | ||
| total = 0; | ||
| max = 0; | ||
| session; | ||
| refs = 0; | ||
| constructor(session) { | ||
| session.unref(); | ||
| this.session = session; | ||
| } | ||
| retain() { | ||
| if (this.session.destroyed) { | ||
| throw new Error("@smithy/node-http-handler - cannot acquire reference to destroyed session."); | ||
| } | ||
| this.refs += 1; | ||
| this.total += 1; | ||
| this.max = Math.max(this.refs, this.max); | ||
| this.session.ref(); | ||
| } | ||
| free() { | ||
| if (this.session.destroyed) { | ||
| return; | ||
| } | ||
| this.refs -= 1; | ||
| if (this.refs === 0) { | ||
| this.session.unref(); | ||
| } | ||
| if (this.refs < 0) { | ||
| throw new Error("@smithy/node-http-handler - ClientHttp2Session refcount at zero, cannot decrement."); | ||
| } | ||
| } | ||
| deref() { | ||
| return this.session; | ||
| } | ||
| destroy() { | ||
| this.refs = 0; | ||
| if (!this.session.destroyed) { | ||
| this.session.destroy(); | ||
| } | ||
| } | ||
| useCount() { | ||
| return this.refs; | ||
| } | ||
| } |
| import type { ClientHttp2Session } from "node:http2"; | ||
| /** | ||
| * Shared access ref counter for ClientHttp2Session, where owners are | ||
| * in-flight requests. | ||
| * | ||
| * @internal | ||
| * @since 4.6.0 | ||
| */ | ||
| export declare class ClientHttp2SessionRef { | ||
| readonly id: number; | ||
| /** | ||
| * Total calls to retain for this session. | ||
| */ | ||
| total: number; | ||
| /** | ||
| * Max ref count observed. | ||
| */ | ||
| max: number; | ||
| private readonly session; | ||
| private refs; | ||
| constructor(session: ClientHttp2Session); | ||
| /** | ||
| * Signal that the session is entering a request span and has an additional owning request. | ||
| * This must be called when beginning a request using the session. | ||
| */ | ||
| retain(): void; | ||
| /** | ||
| * Release reference to session, to be called when it exits request span, indicating one fewer owning request. | ||
| * When reaching zero, the session is unref'd. | ||
| * This must be called when concluding a request using the session. | ||
| */ | ||
| free(): void; | ||
| /** | ||
| * Access the session (don't call ref/unref on it). | ||
| */ | ||
| deref(): ClientHttp2Session; | ||
| destroy(): void; | ||
| /** | ||
| * @returns the current number of active references (in-flight requests). | ||
| */ | ||
| useCount(): number; | ||
| } |
+188
-93
@@ -428,36 +428,97 @@ 'use strict'; | ||
| const ids = new Uint16Array(1); | ||
| class ClientHttp2SessionRef { | ||
| id = ids[0]++; | ||
| total = 0; | ||
| max = 0; | ||
| session; | ||
| refs = 0; | ||
| constructor(session) { | ||
| session.unref(); | ||
| this.session = session; | ||
| } | ||
| retain() { | ||
| if (this.session.destroyed) { | ||
| throw new Error("@smithy/node-http-handler - cannot acquire reference to destroyed session."); | ||
| } | ||
| this.refs += 1; | ||
| this.total += 1; | ||
| this.max = Math.max(this.refs, this.max); | ||
| this.session.ref(); | ||
| } | ||
| free() { | ||
| if (this.session.destroyed) { | ||
| return; | ||
| } | ||
| this.refs -= 1; | ||
| if (this.refs === 0) { | ||
| this.session.unref(); | ||
| } | ||
| if (this.refs < 0) { | ||
| throw new Error("@smithy/node-http-handler - ClientHttp2Session refcount at zero, cannot decrement."); | ||
| } | ||
| } | ||
| deref() { | ||
| return this.session; | ||
| } | ||
| destroy() { | ||
| this.refs = 0; | ||
| if (!this.session.destroyed) { | ||
| this.session.destroy(); | ||
| } | ||
| } | ||
| useCount() { | ||
| return this.refs; | ||
| } | ||
| } | ||
| class NodeHttp2ConnectionPool { | ||
| sessions = []; | ||
| maxConcurrency = 0; | ||
| constructor(sessions) { | ||
| this.sessions = sessions ?? []; | ||
| this.sessions = (sessions ?? []).map((session) => new ClientHttp2SessionRef(session)); | ||
| } | ||
| poll() { | ||
| if (this.sessions.length > 0) { | ||
| return this.sessions.shift(); | ||
| let cleanup = false; | ||
| for (const session of this.sessions) { | ||
| if (session.deref().destroyed) { | ||
| cleanup = true; | ||
| continue; | ||
| } | ||
| if (!this.maxConcurrency || session.useCount() < this.maxConcurrency) { | ||
| return session; | ||
| } | ||
| } | ||
| if (cleanup) { | ||
| for (const session of this.sessions) { | ||
| if (session.deref().destroyed) { | ||
| this.remove(session); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| offerLast(session) { | ||
| this.sessions.push(session); | ||
| offerLast(ref) { | ||
| this.sessions.push(ref); | ||
| } | ||
| contains(session) { | ||
| return this.sessions.includes(session); | ||
| remove(ref) { | ||
| const ix = this.sessions.indexOf(ref); | ||
| if (ix > -1) { | ||
| this.sessions.splice(ix, 1); | ||
| } | ||
| } | ||
| remove(session) { | ||
| this.sessions = this.sessions.filter((s) => s !== session); | ||
| } | ||
| [Symbol.iterator]() { | ||
| return this.sessions[Symbol.iterator](); | ||
| } | ||
| destroy(connection) { | ||
| for (const session of this.sessions) { | ||
| if (session === connection) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| } | ||
| } | ||
| setMaxConcurrency(maxConcurrency) { | ||
| this.maxConcurrency = maxConcurrency; | ||
| } | ||
| destroy(ref) { | ||
| this.remove(ref); | ||
| ref.destroy(); | ||
| } | ||
| } | ||
| class NodeHttp2ConnectionManager { | ||
| config; | ||
| connectionPools = new Map(); | ||
| constructor(config) { | ||
@@ -469,14 +530,14 @@ this.config = config; | ||
| } | ||
| config; | ||
| sessionCache = new Map(); | ||
| lease(requestContext, connectionConfiguration) { | ||
| const url = this.getUrlString(requestContext); | ||
| const existingPool = this.sessionCache.get(url); | ||
| if (existingPool) { | ||
| const existingSession = existingPool.poll(); | ||
| if (existingSession && !this.config.disableConcurrency && !connectionConfiguration.isEventStream) { | ||
| return existingSession; | ||
| const pool = this.getPool(url); | ||
| if (!this.config.disableConcurrency && !connectionConfiguration.isEventStream) { | ||
| const available = pool.poll(); | ||
| if (available) { | ||
| available.retain(); | ||
| return available; | ||
| } | ||
| } | ||
| const session = http2.connect(url); | ||
| const ref = new ClientHttp2SessionRef(http2.connect(url)); | ||
| const session = ref.deref(); | ||
| if (this.config.maxConcurrency) { | ||
@@ -492,6 +553,5 @@ session.settings({ maxConcurrentStreams: this.config.maxConcurrency }, (err) => { | ||
| } | ||
| session.unref(); | ||
| const destroySessionCb = () => { | ||
| session.destroy(); | ||
| this.deleteSession(url, session); | ||
| this.removeFromPool(url, ref); | ||
| }; | ||
@@ -501,35 +561,37 @@ session.on("goaway", destroySessionCb); | ||
| session.on("frameError", destroySessionCb); | ||
| session.on("close", () => this.deleteSession(url, session)); | ||
| session.on("close", () => this.removeFromPool(url, ref)); | ||
| if (connectionConfiguration.requestTimeout) { | ||
| session.setTimeout(connectionConfiguration.requestTimeout, destroySessionCb); | ||
| } | ||
| const connectionPool = this.sessionCache.get(url) || new NodeHttp2ConnectionPool(); | ||
| connectionPool.offerLast(session); | ||
| this.sessionCache.set(url, connectionPool); | ||
| return session; | ||
| pool.offerLast(ref); | ||
| ref.retain(); | ||
| return ref; | ||
| } | ||
| deleteSession(authority, session) { | ||
| const existingConnectionPool = this.sessionCache.get(authority); | ||
| if (!existingConnectionPool) { | ||
| return; | ||
| release(_requestContext, ref) { | ||
| ref.free(); | ||
| } | ||
| createIsolatedSession(requestContext, connectionConfiguration) { | ||
| const url = this.getUrlString(requestContext); | ||
| const ref = new ClientHttp2SessionRef(http2.connect(url)); | ||
| const session = ref.deref(); | ||
| session.settings({ maxConcurrentStreams: 1 }); | ||
| const destroySession = () => { | ||
| session.destroy(); | ||
| }; | ||
| session.on("goaway", destroySession); | ||
| session.on("error", destroySession); | ||
| session.on("frameError", destroySession); | ||
| session.on("close", destroySession); | ||
| if (connectionConfiguration.requestTimeout) { | ||
| session.setTimeout(connectionConfiguration.requestTimeout, destroySession); | ||
| } | ||
| if (!existingConnectionPool.contains(session)) { | ||
| return; | ||
| } | ||
| existingConnectionPool.remove(session); | ||
| this.sessionCache.set(authority, existingConnectionPool); | ||
| ref.retain(); | ||
| return ref; | ||
| } | ||
| release(requestContext, session) { | ||
| const cacheKey = this.getUrlString(requestContext); | ||
| this.sessionCache.get(cacheKey)?.offerLast(session); | ||
| } | ||
| destroy() { | ||
| for (const [key, connectionPool] of this.sessionCache) { | ||
| for (const session of connectionPool) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| connectionPool.remove(session); | ||
| for (const [url, connectionPool] of this.connectionPools) { | ||
| for (const session of [...connectionPool]) { | ||
| session.destroy(); | ||
| } | ||
| this.sessionCache.delete(key); | ||
| this.connectionPools.delete(url); | ||
| } | ||
@@ -542,2 +604,5 @@ } | ||
| this.config.maxConcurrency = maxConcurrentStreams; | ||
| for (const pool of this.connectionPools.values()) { | ||
| pool.setMaxConcurrency(maxConcurrentStreams); | ||
| } | ||
| } | ||
@@ -547,2 +612,31 @@ setDisableConcurrentStreams(disableConcurrentStreams) { | ||
| } | ||
| debug() { | ||
| const pools = {}; | ||
| for (const [url, pool] of this.connectionPools) { | ||
| const sessions = []; | ||
| for (const ref of pool) { | ||
| sessions.push({ | ||
| id: ref.id, | ||
| active: ref.useCount(), | ||
| maxConcurrent: ref.max, | ||
| totalRequests: ref.total, | ||
| }); | ||
| } | ||
| pools[url] = { sessions }; | ||
| } | ||
| return pools; | ||
| } | ||
| removeFromPool(authority, ref) { | ||
| this.connectionPools.get(authority)?.remove(ref); | ||
| } | ||
| getPool(url) { | ||
| if (!this.connectionPools.has(url)) { | ||
| const pool = new NodeHttp2ConnectionPool(); | ||
| if (this.config.maxConcurrency) { | ||
| pool.setMaxConcurrency(this.config.maxConcurrency); | ||
| } | ||
| this.connectionPools.set(url, pool); | ||
| } | ||
| return this.connectionPools.get(url); | ||
| } | ||
| getUrlString(request) { | ||
@@ -584,8 +678,10 @@ return request.destination.toString(); | ||
| this.config = await this.configProvider; | ||
| this.connectionManager.setDisableConcurrentStreams(this.config.disableConcurrentStreams ?? false); | ||
| if (this.config.maxConcurrentStreams) { | ||
| this.connectionManager.setMaxConcurrentStreams(this.config.maxConcurrentStreams); | ||
| const { disableConcurrentStreams, maxConcurrentStreams } = this.config; | ||
| this.connectionManager.setDisableConcurrentStreams(disableConcurrentStreams ?? false); | ||
| if (maxConcurrentStreams) { | ||
| this.connectionManager.setMaxConcurrentStreams(maxConcurrentStreams); | ||
| } | ||
| } | ||
| const { requestTimeout: configRequestTimeout, disableConcurrentStreams } = this.config; | ||
| const useIsolatedSession = disableConcurrentStreams || isEventStream; | ||
| const effectiveRequestTimeout = requestTimeout ?? configRequestTimeout; | ||
@@ -618,9 +714,13 @@ return new Promise((_resolve, _reject) => { | ||
| const requestContext = { destination: new URL(authority) }; | ||
| const session = this.connectionManager.lease(requestContext, { | ||
| const connectConfig = { | ||
| requestTimeout: this.config?.sessionTimeout, | ||
| isEventStream, | ||
| }); | ||
| }; | ||
| const ref = useIsolatedSession | ||
| ? this.connectionManager.createIsolatedSession(requestContext, connectConfig) | ||
| : this.connectionManager.lease(requestContext, connectConfig); | ||
| const session = ref.deref(); | ||
| const rejectWithDestroy = (err) => { | ||
| if (disableConcurrentStreams) { | ||
| this.destroySession(session); | ||
| if (useIsolatedSession) { | ||
| ref.destroy(); | ||
| } | ||
@@ -638,3 +738,3 @@ fulfilled = true; | ||
| } | ||
| const req = session.request({ | ||
| const clientHttp2Stream = session.request({ | ||
| ...request.headers, | ||
@@ -644,19 +744,5 @@ [http2.constants.HTTP2_HEADER_PATH]: path, | ||
| }); | ||
| session.ref(); | ||
| req.on("response", (headers) => { | ||
| const httpResponse = new protocolHttp.HttpResponse({ | ||
| statusCode: headers[":status"] ?? -1, | ||
| headers: getTransformedHeaders(headers), | ||
| body: req, | ||
| }); | ||
| fulfilled = true; | ||
| resolve({ response: httpResponse }); | ||
| if (disableConcurrentStreams) { | ||
| session.close(); | ||
| this.connectionManager.deleteSession(authority, session); | ||
| } | ||
| }); | ||
| if (effectiveRequestTimeout) { | ||
| req.setTimeout(effectiveRequestTimeout, () => { | ||
| req.close(); | ||
| clientHttp2Stream.setTimeout(effectiveRequestTimeout, () => { | ||
| clientHttp2Stream.close(); | ||
| const timeoutError = new Error(`Stream timed out because of no activity for ${effectiveRequestTimeout} ms`); | ||
@@ -669,3 +755,3 @@ timeoutError.name = "TimeoutError"; | ||
| const onAbort = () => { | ||
| req.close(); | ||
| clientHttp2Stream.close(); | ||
| const abortError = buildAbortError(abortSignal); | ||
@@ -677,3 +763,3 @@ rejectWithDestroy(abortError); | ||
| signal.addEventListener("abort", onAbort, { once: true }); | ||
| req.once("close", () => signal.removeEventListener("abort", onAbort)); | ||
| clientHttp2Stream.once("close", () => signal.removeEventListener("abort", onAbort)); | ||
| } | ||
@@ -684,14 +770,28 @@ else { | ||
| } | ||
| req.on("frameError", (type, code, id) => { | ||
| clientHttp2Stream.on("frameError", (type, code, id) => { | ||
| rejectWithDestroy(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`)); | ||
| }); | ||
| req.on("error", rejectWithDestroy); | ||
| req.on("aborted", () => { | ||
| rejectWithDestroy(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`)); | ||
| clientHttp2Stream.on("error", rejectWithDestroy); | ||
| clientHttp2Stream.on("aborted", () => { | ||
| rejectWithDestroy(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${clientHttp2Stream.rstCode}.`)); | ||
| }); | ||
| req.on("close", () => { | ||
| session.unref(); | ||
| if (disableConcurrentStreams) { | ||
| session.destroy(); | ||
| clientHttp2Stream.on("response", (headers) => { | ||
| const httpResponse = new protocolHttp.HttpResponse({ | ||
| statusCode: headers[":status"] ?? -1, | ||
| headers: getTransformedHeaders(headers), | ||
| body: clientHttp2Stream, | ||
| }); | ||
| fulfilled = true; | ||
| resolve({ response: httpResponse }); | ||
| if (useIsolatedSession) { | ||
| session.close(); | ||
| } | ||
| }); | ||
| clientHttp2Stream.on("close", () => { | ||
| if (useIsolatedSession) { | ||
| ref.destroy(); | ||
| } | ||
| else { | ||
| this.connectionManager.release(requestContext, ref); | ||
| } | ||
| if (!fulfilled) { | ||
@@ -701,3 +801,3 @@ rejectWithDestroy(new Error("Unexpected error: http2 request did not get a response")); | ||
| }); | ||
| writeRequestBodyPromise = writeRequestBody(req, request, effectiveRequestTimeout); | ||
| writeRequestBodyPromise = writeRequestBody(clientHttp2Stream, request, effectiveRequestTimeout); | ||
| }); | ||
@@ -717,7 +817,2 @@ } | ||
| } | ||
| destroySession(session) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| } | ||
| } | ||
@@ -724,0 +819,0 @@ |
| import http2 from "node:http2"; | ||
| import { ClientHttp2SessionRef } from "./http2/ClientHttp2SessionRef"; | ||
| import { NodeHttp2ConnectionPool } from "./node-http2-connection-pool"; | ||
| export class NodeHttp2ConnectionManager { | ||
| config; | ||
| connectionPools = new Map(); | ||
| constructor(config) { | ||
@@ -10,14 +13,14 @@ this.config = config; | ||
| } | ||
| config; | ||
| sessionCache = new Map(); | ||
| lease(requestContext, connectionConfiguration) { | ||
| const url = this.getUrlString(requestContext); | ||
| const existingPool = this.sessionCache.get(url); | ||
| if (existingPool) { | ||
| const existingSession = existingPool.poll(); | ||
| if (existingSession && !this.config.disableConcurrency && !connectionConfiguration.isEventStream) { | ||
| return existingSession; | ||
| const pool = this.getPool(url); | ||
| if (!this.config.disableConcurrency && !connectionConfiguration.isEventStream) { | ||
| const available = pool.poll(); | ||
| if (available) { | ||
| available.retain(); | ||
| return available; | ||
| } | ||
| } | ||
| const session = http2.connect(url); | ||
| const ref = new ClientHttp2SessionRef(http2.connect(url)); | ||
| const session = ref.deref(); | ||
| if (this.config.maxConcurrency) { | ||
@@ -33,6 +36,5 @@ session.settings({ maxConcurrentStreams: this.config.maxConcurrency }, (err) => { | ||
| } | ||
| session.unref(); | ||
| const destroySessionCb = () => { | ||
| session.destroy(); | ||
| this.deleteSession(url, session); | ||
| this.removeFromPool(url, ref); | ||
| }; | ||
@@ -42,35 +44,37 @@ session.on("goaway", destroySessionCb); | ||
| session.on("frameError", destroySessionCb); | ||
| session.on("close", () => this.deleteSession(url, session)); | ||
| session.on("close", () => this.removeFromPool(url, ref)); | ||
| if (connectionConfiguration.requestTimeout) { | ||
| session.setTimeout(connectionConfiguration.requestTimeout, destroySessionCb); | ||
| } | ||
| const connectionPool = this.sessionCache.get(url) || new NodeHttp2ConnectionPool(); | ||
| connectionPool.offerLast(session); | ||
| this.sessionCache.set(url, connectionPool); | ||
| return session; | ||
| pool.offerLast(ref); | ||
| ref.retain(); | ||
| return ref; | ||
| } | ||
| deleteSession(authority, session) { | ||
| const existingConnectionPool = this.sessionCache.get(authority); | ||
| if (!existingConnectionPool) { | ||
| return; | ||
| release(_requestContext, ref) { | ||
| ref.free(); | ||
| } | ||
| createIsolatedSession(requestContext, connectionConfiguration) { | ||
| const url = this.getUrlString(requestContext); | ||
| const ref = new ClientHttp2SessionRef(http2.connect(url)); | ||
| const session = ref.deref(); | ||
| session.settings({ maxConcurrentStreams: 1 }); | ||
| const destroySession = () => { | ||
| session.destroy(); | ||
| }; | ||
| session.on("goaway", destroySession); | ||
| session.on("error", destroySession); | ||
| session.on("frameError", destroySession); | ||
| session.on("close", destroySession); | ||
| if (connectionConfiguration.requestTimeout) { | ||
| session.setTimeout(connectionConfiguration.requestTimeout, destroySession); | ||
| } | ||
| if (!existingConnectionPool.contains(session)) { | ||
| return; | ||
| } | ||
| existingConnectionPool.remove(session); | ||
| this.sessionCache.set(authority, existingConnectionPool); | ||
| ref.retain(); | ||
| return ref; | ||
| } | ||
| release(requestContext, session) { | ||
| const cacheKey = this.getUrlString(requestContext); | ||
| this.sessionCache.get(cacheKey)?.offerLast(session); | ||
| } | ||
| destroy() { | ||
| for (const [key, connectionPool] of this.sessionCache) { | ||
| for (const session of connectionPool) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| connectionPool.remove(session); | ||
| for (const [url, connectionPool] of this.connectionPools) { | ||
| for (const session of [...connectionPool]) { | ||
| session.destroy(); | ||
| } | ||
| this.sessionCache.delete(key); | ||
| this.connectionPools.delete(url); | ||
| } | ||
@@ -83,2 +87,5 @@ } | ||
| this.config.maxConcurrency = maxConcurrentStreams; | ||
| for (const pool of this.connectionPools.values()) { | ||
| pool.setMaxConcurrency(maxConcurrentStreams); | ||
| } | ||
| } | ||
@@ -88,2 +95,31 @@ setDisableConcurrentStreams(disableConcurrentStreams) { | ||
| } | ||
| debug() { | ||
| const pools = {}; | ||
| for (const [url, pool] of this.connectionPools) { | ||
| const sessions = []; | ||
| for (const ref of pool) { | ||
| sessions.push({ | ||
| id: ref.id, | ||
| active: ref.useCount(), | ||
| maxConcurrent: ref.max, | ||
| totalRequests: ref.total, | ||
| }); | ||
| } | ||
| pools[url] = { sessions }; | ||
| } | ||
| return pools; | ||
| } | ||
| removeFromPool(authority, ref) { | ||
| this.connectionPools.get(authority)?.remove(ref); | ||
| } | ||
| getPool(url) { | ||
| if (!this.connectionPools.has(url)) { | ||
| const pool = new NodeHttp2ConnectionPool(); | ||
| if (this.config.maxConcurrency) { | ||
| pool.setMaxConcurrency(this.config.maxConcurrency); | ||
| } | ||
| this.connectionPools.set(url, pool); | ||
| } | ||
| return this.connectionPools.get(url); | ||
| } | ||
| getUrlString(request) { | ||
@@ -90,0 +126,0 @@ return request.destination.toString(); |
@@ -0,32 +1,46 @@ | ||
| import { ClientHttp2SessionRef } from "./http2/ClientHttp2SessionRef"; | ||
| export class NodeHttp2ConnectionPool { | ||
| sessions = []; | ||
| maxConcurrency = 0; | ||
| constructor(sessions) { | ||
| this.sessions = sessions ?? []; | ||
| this.sessions = (sessions ?? []).map((session) => new ClientHttp2SessionRef(session)); | ||
| } | ||
| poll() { | ||
| if (this.sessions.length > 0) { | ||
| return this.sessions.shift(); | ||
| let cleanup = false; | ||
| for (const session of this.sessions) { | ||
| if (session.deref().destroyed) { | ||
| cleanup = true; | ||
| continue; | ||
| } | ||
| if (!this.maxConcurrency || session.useCount() < this.maxConcurrency) { | ||
| return session; | ||
| } | ||
| } | ||
| if (cleanup) { | ||
| for (const session of this.sessions) { | ||
| if (session.deref().destroyed) { | ||
| this.remove(session); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| offerLast(session) { | ||
| this.sessions.push(session); | ||
| offerLast(ref) { | ||
| this.sessions.push(ref); | ||
| } | ||
| contains(session) { | ||
| return this.sessions.includes(session); | ||
| remove(ref) { | ||
| const ix = this.sessions.indexOf(ref); | ||
| if (ix > -1) { | ||
| this.sessions.splice(ix, 1); | ||
| } | ||
| } | ||
| remove(session) { | ||
| this.sessions = this.sessions.filter((s) => s !== session); | ||
| } | ||
| [Symbol.iterator]() { | ||
| return this.sessions[Symbol.iterator](); | ||
| } | ||
| destroy(connection) { | ||
| for (const session of this.sessions) { | ||
| if (session === connection) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| } | ||
| } | ||
| setMaxConcurrency(maxConcurrency) { | ||
| this.maxConcurrency = maxConcurrency; | ||
| } | ||
| destroy(ref) { | ||
| this.remove(ref); | ||
| ref.destroy(); | ||
| } | ||
| } |
@@ -39,8 +39,10 @@ import { HttpResponse } from "@smithy/protocol-http"; | ||
| this.config = await this.configProvider; | ||
| this.connectionManager.setDisableConcurrentStreams(this.config.disableConcurrentStreams ?? false); | ||
| if (this.config.maxConcurrentStreams) { | ||
| this.connectionManager.setMaxConcurrentStreams(this.config.maxConcurrentStreams); | ||
| const { disableConcurrentStreams, maxConcurrentStreams } = this.config; | ||
| this.connectionManager.setDisableConcurrentStreams(disableConcurrentStreams ?? false); | ||
| if (maxConcurrentStreams) { | ||
| this.connectionManager.setMaxConcurrentStreams(maxConcurrentStreams); | ||
| } | ||
| } | ||
| const { requestTimeout: configRequestTimeout, disableConcurrentStreams } = this.config; | ||
| const useIsolatedSession = disableConcurrentStreams || isEventStream; | ||
| const effectiveRequestTimeout = requestTimeout ?? configRequestTimeout; | ||
@@ -73,9 +75,13 @@ return new Promise((_resolve, _reject) => { | ||
| const requestContext = { destination: new URL(authority) }; | ||
| const session = this.connectionManager.lease(requestContext, { | ||
| const connectConfig = { | ||
| requestTimeout: this.config?.sessionTimeout, | ||
| isEventStream, | ||
| }); | ||
| }; | ||
| const ref = useIsolatedSession | ||
| ? this.connectionManager.createIsolatedSession(requestContext, connectConfig) | ||
| : this.connectionManager.lease(requestContext, connectConfig); | ||
| const session = ref.deref(); | ||
| const rejectWithDestroy = (err) => { | ||
| if (disableConcurrentStreams) { | ||
| this.destroySession(session); | ||
| if (useIsolatedSession) { | ||
| ref.destroy(); | ||
| } | ||
@@ -93,3 +99,3 @@ fulfilled = true; | ||
| } | ||
| const req = session.request({ | ||
| const clientHttp2Stream = session.request({ | ||
| ...request.headers, | ||
@@ -99,19 +105,5 @@ [constants.HTTP2_HEADER_PATH]: path, | ||
| }); | ||
| session.ref(); | ||
| req.on("response", (headers) => { | ||
| const httpResponse = new HttpResponse({ | ||
| statusCode: headers[":status"] ?? -1, | ||
| headers: getTransformedHeaders(headers), | ||
| body: req, | ||
| }); | ||
| fulfilled = true; | ||
| resolve({ response: httpResponse }); | ||
| if (disableConcurrentStreams) { | ||
| session.close(); | ||
| this.connectionManager.deleteSession(authority, session); | ||
| } | ||
| }); | ||
| if (effectiveRequestTimeout) { | ||
| req.setTimeout(effectiveRequestTimeout, () => { | ||
| req.close(); | ||
| clientHttp2Stream.setTimeout(effectiveRequestTimeout, () => { | ||
| clientHttp2Stream.close(); | ||
| const timeoutError = new Error(`Stream timed out because of no activity for ${effectiveRequestTimeout} ms`); | ||
@@ -124,3 +116,3 @@ timeoutError.name = "TimeoutError"; | ||
| const onAbort = () => { | ||
| req.close(); | ||
| clientHttp2Stream.close(); | ||
| const abortError = buildAbortError(abortSignal); | ||
@@ -132,3 +124,3 @@ rejectWithDestroy(abortError); | ||
| signal.addEventListener("abort", onAbort, { once: true }); | ||
| req.once("close", () => signal.removeEventListener("abort", onAbort)); | ||
| clientHttp2Stream.once("close", () => signal.removeEventListener("abort", onAbort)); | ||
| } | ||
@@ -139,14 +131,28 @@ else { | ||
| } | ||
| req.on("frameError", (type, code, id) => { | ||
| clientHttp2Stream.on("frameError", (type, code, id) => { | ||
| rejectWithDestroy(new Error(`Frame type id ${type} in stream id ${id} has failed with code ${code}.`)); | ||
| }); | ||
| req.on("error", rejectWithDestroy); | ||
| req.on("aborted", () => { | ||
| rejectWithDestroy(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${req.rstCode}.`)); | ||
| clientHttp2Stream.on("error", rejectWithDestroy); | ||
| clientHttp2Stream.on("aborted", () => { | ||
| rejectWithDestroy(new Error(`HTTP/2 stream is abnormally aborted in mid-communication with result code ${clientHttp2Stream.rstCode}.`)); | ||
| }); | ||
| req.on("close", () => { | ||
| session.unref(); | ||
| if (disableConcurrentStreams) { | ||
| session.destroy(); | ||
| clientHttp2Stream.on("response", (headers) => { | ||
| const httpResponse = new HttpResponse({ | ||
| statusCode: headers[":status"] ?? -1, | ||
| headers: getTransformedHeaders(headers), | ||
| body: clientHttp2Stream, | ||
| }); | ||
| fulfilled = true; | ||
| resolve({ response: httpResponse }); | ||
| if (useIsolatedSession) { | ||
| session.close(); | ||
| } | ||
| }); | ||
| clientHttp2Stream.on("close", () => { | ||
| if (useIsolatedSession) { | ||
| ref.destroy(); | ||
| } | ||
| else { | ||
| this.connectionManager.release(requestContext, ref); | ||
| } | ||
| if (!fulfilled) { | ||
@@ -156,3 +162,3 @@ rejectWithDestroy(new Error("Unexpected error: http2 request did not get a response")); | ||
| }); | ||
| writeRequestBodyPromise = writeRequestBody(req, request, effectiveRequestTimeout); | ||
| writeRequestBodyPromise = writeRequestBody(clientHttp2Stream, request, effectiveRequestTimeout); | ||
| }); | ||
@@ -172,7 +178,2 @@ } | ||
| } | ||
| destroySession(session) { | ||
| if (!session.destroyed) { | ||
| session.destroy(); | ||
| } | ||
| } | ||
| } |
@@ -1,24 +0,44 @@ | ||
| import type { RequestContext } from "@smithy/types"; | ||
| import type { ConnectConfiguration } from "@smithy/types"; | ||
| import type { ConnectionManager, ConnectionManagerConfiguration } from "@smithy/types"; | ||
| import type { ClientHttp2Session } from "node:http2"; | ||
| import type { ConnectConfiguration, ConnectionManager, ConnectionManagerConfiguration, RequestContext } from "@smithy/types"; | ||
| import { ClientHttp2SessionRef } from "./http2/ClientHttp2SessionRef"; | ||
| /** | ||
| * @public | ||
| * This class previously implemented the ConnectionManager<ClientHttp2Session> interface, | ||
| * but this class isn't exported from this package, except as a private property of NodeHttp2Handler. | ||
| * | ||
| * @since 4.6.0 | ||
| * @internal | ||
| */ | ||
| export declare class NodeHttp2ConnectionManager implements ConnectionManager<ClientHttp2Session> { | ||
| export declare class NodeHttp2ConnectionManager implements ConnectionManager<ClientHttp2SessionRef> { | ||
| private config; | ||
| private readonly connectionPools; | ||
| constructor(config: ConnectionManagerConfiguration); | ||
| private config; | ||
| private readonly sessionCache; | ||
| lease(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): ClientHttp2Session; | ||
| /** | ||
| * Delete a session from the connection pool. | ||
| * @param authority The authority of the session to delete. | ||
| * @param session The session to delete. | ||
| * Acquire a session for making a request. | ||
| */ | ||
| deleteSession(authority: string, session: ClientHttp2Session): void; | ||
| release(requestContext: RequestContext, session: ClientHttp2Session): void; | ||
| lease(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): ClientHttp2SessionRef; | ||
| /** | ||
| * Signal that a request using this session has completed. | ||
| * | ||
| * The session remains in its pool for reuse. | ||
| * This method is not called for isolated sessions. | ||
| */ | ||
| release(_requestContext: RequestContext, ref: ClientHttp2SessionRef): void; | ||
| /** | ||
| * Create an isolated session that isn't part of the connection pools. | ||
| * For use in event-streams or when concurrency is turned off. | ||
| */ | ||
| createIsolatedSession(requestContext: RequestContext, connectionConfiguration: ConnectConfiguration): ClientHttp2SessionRef; | ||
| destroy(): void; | ||
| setMaxConcurrentStreams(maxConcurrentStreams: number): void; | ||
| setDisableConcurrentStreams(disableConcurrentStreams: boolean): void; | ||
| /** | ||
| * @internal | ||
| * @returns a snapshot of the state of all connection pools and their sessions. | ||
| */ | ||
| debug(): Record<string, any>; | ||
| /** | ||
| * Remove a session from the pools. Does not destroy it. | ||
| */ | ||
| private removeFromPool; | ||
| private getPool; | ||
| private getUrlString; | ||
| } |
| import type { ConnectionPool } from "@smithy/types"; | ||
| import type { ClientHttp2Session } from "node:http2"; | ||
| export declare class NodeHttp2ConnectionPool implements ConnectionPool<ClientHttp2Session> { | ||
| private sessions; | ||
| import { ClientHttp2SessionRef } from "./http2/ClientHttp2SessionRef"; | ||
| /** | ||
| * These are keyed by URL, therefore all sessions within this class' state | ||
| * are for the same URL. | ||
| * | ||
| * Sessions remain in the pool for their entire lifetime (until destroyed or | ||
| * removed). The pool tracks capacity via each session's ref count. | ||
| * | ||
| * Interface implementation changed from ConnectionPool<ClientHttp2Session>. | ||
| * @since 4.6.0 | ||
| * @internal | ||
| */ | ||
| export declare class NodeHttp2ConnectionPool implements ConnectionPool<ClientHttp2SessionRef> { | ||
| private readonly sessions; | ||
| private maxConcurrency; | ||
| constructor(sessions?: ClientHttp2Session[]); | ||
| poll(): ClientHttp2Session | void; | ||
| offerLast(session: ClientHttp2Session): void; | ||
| contains(session: ClientHttp2Session): boolean; | ||
| remove(session: ClientHttp2Session): void; | ||
| [Symbol.iterator](): ArrayIterator<ClientHttp2Session>; | ||
| destroy(connection: ClientHttp2Session): void; | ||
| /** | ||
| * Find a session with available capacity (refs < maxConcurrency). | ||
| * Returns undefined if all sessions are at capacity or the pool is empty. | ||
| */ | ||
| poll(): ClientHttp2SessionRef | undefined; | ||
| /** | ||
| * Add a session to the pool. | ||
| */ | ||
| offerLast(ref: ClientHttp2SessionRef): void; | ||
| remove(ref: ClientHttp2SessionRef): void; | ||
| [Symbol.iterator](): ArrayIterator<ClientHttp2SessionRef>; | ||
| setMaxConcurrency(maxConcurrency: number): void; | ||
| /** | ||
| * This is unused, but part of the interface. | ||
| * @deprecated | ||
| */ | ||
| destroy(ref: ClientHttp2SessionRef): void; | ||
| } |
@@ -70,8 +70,3 @@ import type { HttpHandler, HttpRequest } from "@smithy/protocol-http"; | ||
| httpHandlerConfigs(): NodeHttp2HandlerOptions; | ||
| /** | ||
| * Destroys a session. | ||
| * @param session - the session to destroy. | ||
| */ | ||
| private destroySession; | ||
| } | ||
| export {}; |
+1
-1
| { | ||
| "name": "@smithy/node-http-handler", | ||
| "version": "4.5.3", | ||
| "version": "4.6.0", | ||
| "description": "Provides a way to make requests", | ||
@@ -5,0 +5,0 @@ "scripts": { |
98197
9.7%44
4.76%2179
14.2%