@whatwg-node/server
Advanced tools
Comparing version 0.10.0-alpha-20240726141316-c6ce93b3598457ebe73b3b725986723af8f5e609 to 0.10.0-alpha-20241123133536-975c9068dde45574fcfa26567e4bab96f45d1f85
@@ -6,8 +6,6 @@ "use strict"; | ||
/* eslint-disable @typescript-eslint/ban-types */ | ||
const disposablestack_1 = require("@whatwg-node/disposablestack"); | ||
const DefaultFetchAPI = tslib_1.__importStar(require("@whatwg-node/fetch")); | ||
const utils_js_1 = require("./utils.js"); | ||
const uwebsockets_js_1 = require("./uwebsockets.js"); | ||
async function handleWaitUntils(waitUntilPromises) { | ||
await Promise.allSettled(waitUntilPromises); | ||
} | ||
// Required for envs like nextjs edge runtime | ||
@@ -33,2 +31,32 @@ function isRequestAccessible(serverContext) { | ||
const onResponseHooks = []; | ||
const waitUntilPromises = new Set(); | ||
const disposableStack = new disposablestack_1.AsyncDisposableStack(); | ||
const signals = new Set(); | ||
function registerSignal(signal) { | ||
signals.add(signal); | ||
signal.addEventListener('abort', () => { | ||
signals.delete(signal); | ||
}); | ||
} | ||
disposableStack.defer(() => { | ||
for (const signal of signals) { | ||
signal.sendAbort(); | ||
} | ||
}); | ||
function handleWaitUntils() { | ||
return Promise.allSettled(waitUntilPromises).then(() => { }, () => { }); | ||
} | ||
disposableStack.defer(handleWaitUntils); | ||
function waitUntil(promiseLike) { | ||
// If it is a Node.js environment, we should register the disposable stack to handle process termination events | ||
if (globalThis.process) { | ||
(0, utils_js_1.ensureDisposableStackRegisteredForTerminateEvents)(disposableStack); | ||
} | ||
waitUntilPromises.add(promiseLike.then(() => { | ||
waitUntilPromises.delete(promiseLike); | ||
}, err => { | ||
console.error(`Unexpected error while waiting: ${err.message || err}`); | ||
waitUntilPromises.delete(promiseLike); | ||
})); | ||
} | ||
if (options?.plugins != null) { | ||
@@ -112,16 +140,17 @@ for (const plugin of options.plugins) { | ||
// TODO: Remove this on the next major version | ||
function handleNodeRequestAndResponse(nodeRequest, nodeResponseOrContainer, ...ctx) { | ||
const nodeResponse = nodeResponseOrContainer.raw || nodeResponseOrContainer; | ||
function handleNodeRequest(nodeRequest, ...ctx) { | ||
const serverContext = ctx.length > 1 ? (0, utils_js_1.completeAssign)(...ctx) : ctx[0] || {}; | ||
const request = (0, utils_js_1.normalizeNodeRequest)(nodeRequest, nodeResponse, fetchAPI.Request); | ||
const request = (0, utils_js_1.normalizeNodeRequest)(nodeRequest, fetchAPI, registerSignal); | ||
return handleRequest(request, serverContext); | ||
} | ||
function handleNodeRequestAndResponse(nodeRequest, nodeResponseOrContainer, ...ctx) { | ||
const nodeResponse = nodeResponseOrContainer.raw || nodeResponseOrContainer; | ||
utils_js_1.nodeRequestResponseMap.set(nodeRequest, nodeResponse); | ||
return handleNodeRequest(nodeRequest, ...ctx); | ||
} | ||
function requestListener(nodeRequest, nodeResponse, ...ctx) { | ||
const waitUntilPromises = []; | ||
const defaultServerContext = { | ||
req: nodeRequest, | ||
res: nodeResponse, | ||
waitUntil(cb) { | ||
waitUntilPromises.push(cb.catch(err => console.error(err))); | ||
}, | ||
waitUntil, | ||
}; | ||
@@ -151,9 +180,6 @@ let response$; | ||
function handleUWS(res, req, ...ctx) { | ||
const waitUntilPromises = []; | ||
const defaultServerContext = { | ||
res, | ||
req, | ||
waitUntil(cb) { | ||
waitUntilPromises.push(cb.catch(err => console.error(err))); | ||
}, | ||
waitUntil, | ||
}; | ||
@@ -165,2 +191,3 @@ const filteredCtxParts = ctx.filter(partCtx => partCtx != null); | ||
const signal = new utils_js_1.ServerAdapterRequestAbortSignal(); | ||
registerSignal(signal); | ||
const originalResEnd = res.end.bind(res); | ||
@@ -197,3 +224,3 @@ let resEnded = false; | ||
if (!signal.aborted && !resEnded) { | ||
return (0, uwebsockets_js_1.sendResponseToUwsOpts)(res, response, signal); | ||
return (0, uwebsockets_js_1.sendResponseToUwsOpts)(res, response, signal, fetchAPI); | ||
} | ||
@@ -207,3 +234,3 @@ }) | ||
if (!signal.aborted && !resEnded) { | ||
return (0, uwebsockets_js_1.sendResponseToUwsOpts)(res, response$, signal); | ||
return (0, uwebsockets_js_1.sendResponseToUwsOpts)(res, response$, signal, fetchAPI); | ||
} | ||
@@ -228,13 +255,8 @@ } | ||
const filteredCtxParts = ctx.filter(partCtx => partCtx != null); | ||
let waitUntilPromises; | ||
const serverContext = filteredCtxParts.length > 1 | ||
? (0, utils_js_1.completeAssign)({}, ...filteredCtxParts) | ||
: (0, utils_js_1.isolateObject)(filteredCtxParts[0], filteredCtxParts[0] == null || filteredCtxParts[0].waitUntil == null | ||
? (waitUntilPromises = []) | ||
? waitUntil | ||
: undefined); | ||
const response$ = handleRequest(request, serverContext); | ||
if (waitUntilPromises?.length) { | ||
return handleWaitUntils(waitUntilPromises).then(() => response$); | ||
} | ||
return response$; | ||
return handleRequest(request, serverContext); | ||
} | ||
@@ -286,2 +308,3 @@ const fetchFn = (input, ...maybeCtx) => { | ||
fetch: fetchFn, | ||
handleNodeRequest, | ||
handleNodeRequestAndResponse, | ||
@@ -292,2 +315,9 @@ requestListener, | ||
handle: genericRequestHandler, | ||
disposableStack, | ||
[disposablestack_1.DisposableSymbols.asyncDispose]() { | ||
return disposableStack.disposeAsync(); | ||
}, | ||
dispose() { | ||
return disposableStack.disposeAsync(); | ||
}, | ||
}; | ||
@@ -294,0 +324,0 @@ const serverAdapter = new Proxy(genericRequestHandler, { |
@@ -24,3 +24,3 @@ "use strict"; | ||
} | ||
const newRequest = new fetchAPI.Request(request.url, { | ||
request = new fetchAPI.Request(request.url, { | ||
body: newBody, | ||
@@ -42,3 +42,3 @@ cache: request.cache, | ||
}); | ||
setRequest(newRequest); | ||
setRequest(request); | ||
} | ||
@@ -51,3 +51,4 @@ } | ||
}, | ||
onResponse({ request, response, setResponse, fetchAPI }) { | ||
onResponse({ request, response, setResponse, fetchAPI, serverContext }) { | ||
const waitUntil = serverContext.waitUntil?.bind(serverContext) || (() => { }); | ||
// Hack for avoiding to create whatwg-node to create a readable stream until it's needed | ||
@@ -66,17 +67,11 @@ if (response['bodyInit'] || response.body) { | ||
const writer = compressionStream.writable.getWriter(); | ||
writer.write(bufOfRes); | ||
writer.close(); | ||
const reader = compressionStream.readable.getReader(); | ||
return Promise.resolve().then(async () => { | ||
const chunks = []; | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
reader.releaseLock(); | ||
break; | ||
} | ||
else if (value) { | ||
chunks.push(...value); | ||
} | ||
} | ||
waitUntil(writer.write(bufOfRes)); | ||
waitUntil(writer.close()); | ||
const uint8Arrays$ = (0, utils_js_1.isReadable)(compressionStream.readable['readable']) | ||
? collectReadableValues(compressionStream.readable['readable']) | ||
: (0, utils_js_1.isAsyncIterable)(compressionStream.readable) | ||
? collectAsyncIterableValues(compressionStream.readable) | ||
: collectReadableStreamValues(compressionStream.readable); | ||
return uint8Arrays$.then(uint8Arrays => { | ||
const chunks = uint8Arrays.flatMap(uint8Array => [...uint8Array]); | ||
const uint8Array = new Uint8Array(chunks); | ||
@@ -92,2 +87,3 @@ const newHeaders = new fetchAPI.Headers(response.headers); | ||
setResponse(compressedResponse); | ||
waitUntil(compressionStream.writable.close()); | ||
}); | ||
@@ -113,1 +109,31 @@ } | ||
} | ||
function collectReadableValues(readable) { | ||
const values = []; | ||
readable.on('data', value => values.push(value)); | ||
return new Promise((resolve, reject) => { | ||
readable.once('end', () => resolve(values)); | ||
readable.once('error', reject); | ||
}); | ||
} | ||
async function collectAsyncIterableValues(asyncIterable) { | ||
const values = []; | ||
for await (const value of asyncIterable) { | ||
values.push(value); | ||
} | ||
return values; | ||
} | ||
async function collectReadableStreamValues(readableStream) { | ||
const reader = readableStream.getReader(); | ||
const values = []; | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
reader.releaseLock(); | ||
break; | ||
} | ||
else if (value) { | ||
values.push(value); | ||
} | ||
} | ||
return values; | ||
} |
@@ -27,2 +27,7 @@ "use strict"; | ||
class HTTPError extends Error { | ||
status; | ||
message; | ||
headers; | ||
details; | ||
name = 'HTTPError'; | ||
constructor(status = 500, message, headers = {}, details) { | ||
@@ -34,3 +39,2 @@ super(message); | ||
this.details = details; | ||
this.name = 'HTTPError'; | ||
Error.captureStackTrace(this, HTTPError); | ||
@@ -37,0 +41,0 @@ } |
218
cjs/utils.js
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.decompressedResponseMap = exports.ServerAdapterRequestAbortSignal = void 0; | ||
exports.decompressedResponseMap = exports.nodeRequestResponseMap = exports.ServerAdapterRequestAbortSignal = void 0; | ||
exports.isAsyncIterable = isAsyncIterable; | ||
@@ -22,3 +22,3 @@ exports.normalizeNodeRequest = normalizeNodeRequest; | ||
exports.handleResponseDecompression = handleResponseDecompression; | ||
const fetch_1 = require("@whatwg-node/fetch"); | ||
exports.ensureDisposableStackRegisteredForTerminateEvents = ensureDisposableStackRegisteredForTerminateEvents; | ||
function isAsyncIterable(body) { | ||
@@ -74,7 +74,5 @@ return (body != null && typeof body === 'object' && typeof body[Symbol.asyncIterator] === 'function'); | ||
class ServerAdapterRequestAbortSignal extends EventTarget { | ||
constructor() { | ||
super(...arguments); | ||
this.aborted = false; | ||
this._onabort = null; | ||
} | ||
aborted = false; | ||
_onabort = null; | ||
reason; | ||
throwIfAborted() { | ||
@@ -108,7 +106,8 @@ if (this.aborted) { | ||
let bunNodeCompatModeWarned = false; | ||
function normalizeNodeRequest(nodeRequest, nodeResponse, RequestCtor) { | ||
exports.nodeRequestResponseMap = new WeakMap(); | ||
function normalizeNodeRequest(nodeRequest, fetchAPI, registerSignal) { | ||
const rawRequest = nodeRequest.raw || nodeRequest.req || nodeRequest; | ||
let fullUrl = buildFullUrl(rawRequest); | ||
if (nodeRequest.query) { | ||
const url = new fetch_1.URL(fullUrl); | ||
const url = new fetchAPI.URL(fullUrl); | ||
for (const key in nodeRequest.query) { | ||
@@ -120,2 +119,4 @@ url.searchParams.set(key, nodeRequest.query[key]); | ||
let signal; | ||
const nodeResponse = exports.nodeRequestResponseMap.get(nodeRequest); | ||
exports.nodeRequestResponseMap.delete(nodeRequest); | ||
let normalizedHeaders = nodeRequest.headers; | ||
@@ -133,4 +134,6 @@ if (nodeRequest.headers?.[':method']) { | ||
// If ponyfilled | ||
if (RequestCtor !== globalThis.Request) { | ||
signal = new ServerAdapterRequestAbortSignal(); | ||
if (fetchAPI.Request !== globalThis.Request) { | ||
const newSignal = new ServerAdapterRequestAbortSignal(); | ||
registerSignal?.(newSignal); | ||
signal = newSignal; | ||
sendAbortSignal = () => signal.sendAbort(); | ||
@@ -156,3 +159,3 @@ } | ||
if (nodeRequest.method === 'GET' || nodeRequest.method === 'HEAD') { | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -172,3 +175,3 @@ headers: normalizedHeaders, | ||
if (isRequestBody(maybeParsedBody)) { | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -180,3 +183,3 @@ headers: normalizedHeaders, | ||
} | ||
const request = new RequestCtor(fullUrl, { | ||
const request = new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -209,3 +212,3 @@ headers: normalizedHeaders, | ||
} | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -234,3 +237,3 @@ headers: normalizedHeaders, | ||
// perf: instead of spreading the object, we can just pass it as is and it performs better | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -273,7 +276,22 @@ headers: normalizedHeaders, | ||
async function sendAsyncIterable(serverResponse, asyncIterable) { | ||
let closed = false; | ||
const closeEventListener = () => { | ||
closed = true; | ||
}; | ||
serverResponse.once('error', closeEventListener); | ||
serverResponse.once('close', closeEventListener); | ||
serverResponse.once('finish', () => { | ||
serverResponse.removeListener('close', closeEventListener); | ||
}); | ||
for await (const chunk of asyncIterable) { | ||
if (closed) { | ||
break; | ||
} | ||
if (!serverResponse | ||
// @ts-expect-error http and http2 writes are actually compatible | ||
.write(chunk)) { | ||
break; | ||
if (closed) { | ||
break; | ||
} | ||
await new Promise(resolve => serverResponse.once('drain', resolve)); | ||
} | ||
@@ -289,3 +307,3 @@ } | ||
serverResponse.statusCode = 404; | ||
serverResponse.end(); | ||
endResponse(serverResponse); | ||
return; | ||
@@ -339,2 +357,5 @@ } | ||
} | ||
if (isReadableStream(fetchBody)) { | ||
return sendReadableStream(serverResponse, fetchBody); | ||
} | ||
if (isAsyncIterable(fetchBody)) { | ||
@@ -344,2 +365,20 @@ return sendAsyncIterable(serverResponse, fetchBody); | ||
} | ||
async function sendReadableStream(serverResponse, readableStream) { | ||
const reader = readableStream.getReader(); | ||
serverResponse.req.once('error', err => { | ||
reader.cancel(err); | ||
}); | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
break; | ||
} | ||
if (!serverResponse | ||
// @ts-expect-error http and http2 writes are actually compatible | ||
.write(value)) { | ||
await new Promise(resolve => serverResponse.once('drain', resolve)); | ||
} | ||
} | ||
endResponse(serverResponse); | ||
} | ||
function isRequestInit(val) { | ||
@@ -421,73 +460,14 @@ return (val != null && | ||
} | ||
function isolateObject(originalCtx, waitUntilPromises) { | ||
function isolateObject(originalCtx, waitUntilFn) { | ||
if (originalCtx == null) { | ||
return {}; | ||
if (waitUntilFn == null) { | ||
return {}; | ||
} | ||
return { | ||
waitUntil: waitUntilFn, | ||
}; | ||
} | ||
const extraProps = {}; | ||
const deletedProps = new Set(); | ||
return new Proxy(originalCtx, { | ||
get(originalCtx, prop) { | ||
if (waitUntilPromises != null && prop === 'waitUntil') { | ||
return function waitUntil(promise) { | ||
waitUntilPromises.push(promise.catch(err => console.error(err))); | ||
}; | ||
} | ||
const extraPropVal = extraProps[prop]; | ||
if (extraPropVal != null) { | ||
if (typeof extraPropVal === 'function') { | ||
return extraPropVal.bind(extraProps); | ||
} | ||
return extraPropVal; | ||
} | ||
if (deletedProps.has(prop)) { | ||
return undefined; | ||
} | ||
return originalCtx[prop]; | ||
}, | ||
set(_originalCtx, prop, value) { | ||
extraProps[prop] = value; | ||
return true; | ||
}, | ||
has(originalCtx, prop) { | ||
if (waitUntilPromises != null && prop === 'waitUntil') { | ||
return true; | ||
} | ||
if (deletedProps.has(prop)) { | ||
return false; | ||
} | ||
if (prop in extraProps) { | ||
return true; | ||
} | ||
return prop in originalCtx; | ||
}, | ||
defineProperty(_originalCtx, prop, descriptor) { | ||
return Reflect.defineProperty(extraProps, prop, descriptor); | ||
}, | ||
deleteProperty(_originalCtx, prop) { | ||
if (prop in extraProps) { | ||
return Reflect.deleteProperty(extraProps, prop); | ||
} | ||
deletedProps.add(prop); | ||
return true; | ||
}, | ||
ownKeys(originalCtx) { | ||
const extraKeys = Reflect.ownKeys(extraProps); | ||
const originalKeys = Reflect.ownKeys(originalCtx); | ||
const deletedKeys = Array.from(deletedProps); | ||
const allKeys = new Set(extraKeys.concat(originalKeys.filter(keys => !deletedKeys.includes(keys)))); | ||
if (waitUntilPromises != null) { | ||
allKeys.add('waitUntil'); | ||
} | ||
return Array.from(allKeys); | ||
}, | ||
getOwnPropertyDescriptor(originalCtx, prop) { | ||
if (prop in extraProps) { | ||
return Reflect.getOwnPropertyDescriptor(extraProps, prop); | ||
} | ||
if (deletedProps.has(prop)) { | ||
return undefined; | ||
} | ||
return Reflect.getOwnPropertyDescriptor(originalCtx, prop); | ||
}, | ||
}); | ||
return completeAssign(Object.create(originalCtx), { | ||
waitUntil: waitUntilFn, | ||
}, originalCtx); | ||
} | ||
@@ -534,18 +514,23 @@ function createDeferredPromise() { | ||
const possibleEncodings = ['deflate', 'gzip', 'deflate-raw', 'br']; | ||
supportedEncodings = possibleEncodings.filter(encoding => { | ||
// deflate-raw is not supported in Node.js >v20 | ||
if (globalThis.process?.version?.startsWith('v2') && | ||
fetchAPI.DecompressionStream === globalThis.DecompressionStream && | ||
encoding === 'deflate-raw') { | ||
return false; | ||
} | ||
try { | ||
// eslint-disable-next-line no-new | ||
new fetchAPI.DecompressionStream(encoding); | ||
return true; | ||
} | ||
catch { | ||
return false; | ||
} | ||
}); | ||
if (fetchAPI.DecompressionStream?.['supportedFormats']) { | ||
supportedEncodings = fetchAPI.DecompressionStream['supportedFormats']; | ||
} | ||
else { | ||
supportedEncodings = possibleEncodings.filter(encoding => { | ||
// deflate-raw is not supported in Node.js >v20 | ||
if (globalThis.process?.version?.startsWith('v2') && | ||
fetchAPI.DecompressionStream === globalThis.DecompressionStream && | ||
encoding === 'deflate-raw') { | ||
return false; | ||
} | ||
try { | ||
// eslint-disable-next-line no-new | ||
new fetchAPI.DecompressionStream(encoding); | ||
return true; | ||
} | ||
catch { | ||
return false; | ||
} | ||
}); | ||
} | ||
supportedEncodingsByFetchAPI.set(fetchAPI, supportedEncodings); | ||
@@ -581,1 +566,28 @@ } | ||
} | ||
const terminateEvents = ['SIGINT', 'SIGTERM', 'exit']; | ||
const disposableStacks = new Set(); | ||
let eventListenerRegistered = false; | ||
function ensureEventListenerForDisposableStacks() { | ||
if (eventListenerRegistered) { | ||
return; | ||
} | ||
eventListenerRegistered = true; | ||
for (const event of terminateEvents) { | ||
globalThis.process.once(event, function terminateHandler() { | ||
return Promise.allSettled([...disposableStacks].map(stack => stack.disposeAsync().catch(e => { | ||
console.error('Error while disposing:', e); | ||
}))); | ||
}); | ||
} | ||
} | ||
function ensureDisposableStackRegisteredForTerminateEvents(disposableStack) { | ||
if (globalThis.process) { | ||
ensureEventListenerForDisposableStacks(); | ||
if (!disposableStacks.has(disposableStack)) { | ||
disposableStacks.add(disposableStack); | ||
disposableStack.defer(() => { | ||
disposableStacks.delete(disposableStack); | ||
}); | ||
} | ||
} | ||
} |
@@ -5,3 +5,6 @@ "use strict"; | ||
exports.getRequestFromUWSRequest = getRequestFromUWSRequest; | ||
exports.createWritableFromUWS = createWritableFromUWS; | ||
exports.sendResponseToUwsOpts = sendResponseToUwsOpts; | ||
exports.fakePromise = fakePromise; | ||
const utils_js_1 = require("./utils.js"); | ||
function isUWSResponse(res) { | ||
@@ -11,41 +14,63 @@ return !!res.onData; | ||
function getRequestFromUWSRequest({ req, res, fetchAPI, signal }) { | ||
let body; | ||
const method = req.getMethod(); | ||
let duplex; | ||
const chunks = []; | ||
const pushFns = [ | ||
(chunk) => { | ||
chunks.push(chunk); | ||
}, | ||
]; | ||
const push = (chunk) => { | ||
for (const pushFn of pushFns) { | ||
pushFn(chunk); | ||
} | ||
}; | ||
let stopped = false; | ||
const stopFns = [ | ||
() => { | ||
stopped = true; | ||
}, | ||
]; | ||
const stop = () => { | ||
for (const stopFn of stopFns) { | ||
stopFn(); | ||
} | ||
}; | ||
res.onData(function (ab, isLast) { | ||
push(Buffer.from(Buffer.from(ab, 0, ab.byteLength))); | ||
if (isLast) { | ||
stop(); | ||
} | ||
}); | ||
let getReadableStream; | ||
if (method !== 'get' && method !== 'head') { | ||
let controller; | ||
body = new fetchAPI.ReadableStream({ | ||
start(c) { | ||
controller = c; | ||
}, | ||
duplex = 'half'; | ||
signal.addEventListener('abort', () => { | ||
stop(); | ||
}); | ||
const readable = body.readable; | ||
if (readable) { | ||
signal.addEventListener('abort', () => { | ||
readable.push(null); | ||
}); | ||
res.onData(function (ab, isLast) { | ||
const chunk = Buffer.from(ab, 0, ab.byteLength); | ||
readable.push(Buffer.from(chunk)); | ||
if (isLast) { | ||
readable.push(null); | ||
} | ||
}); | ||
} | ||
else { | ||
let closed = false; | ||
signal.addEventListener('abort', () => { | ||
if (!closed) { | ||
closed = true; | ||
controller.close(); | ||
} | ||
}); | ||
res.onData(function (ab, isLast) { | ||
const chunk = Buffer.from(ab, 0, ab.byteLength); | ||
controller.enqueue(Buffer.from(chunk)); | ||
if (isLast) { | ||
closed = true; | ||
controller.close(); | ||
} | ||
}); | ||
} | ||
let readableStream; | ||
getReadableStream = () => { | ||
if (!readableStream) { | ||
readableStream = new fetchAPI.ReadableStream({ | ||
start(controller) { | ||
for (const chunk of chunks) { | ||
controller.enqueue(chunk); | ||
} | ||
if (stopped) { | ||
controller.close(); | ||
return; | ||
} | ||
pushFns.push((chunk) => { | ||
controller.enqueue(chunk); | ||
}); | ||
stopFns.push(() => { | ||
if (controller.desiredSize) { | ||
controller.close(); | ||
} | ||
}); | ||
}, | ||
}); | ||
} | ||
return readableStream; | ||
}; | ||
} | ||
@@ -61,26 +86,93 @@ const headers = new fetchAPI.Headers(); | ||
} | ||
return new fetchAPI.Request(url, { | ||
let buffer; | ||
function getBody() { | ||
if (!getReadableStream) { | ||
return null; | ||
} | ||
if (stopped) { | ||
return getBufferFromChunks(); | ||
} | ||
return getReadableStream(); | ||
} | ||
const request = new fetchAPI.Request(url, { | ||
method, | ||
headers, | ||
body: body, | ||
get body() { | ||
return getBody(); | ||
}, | ||
signal, | ||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | ||
// @ts-ignore - not in the TS types yet | ||
duplex: 'half', | ||
duplex, | ||
}); | ||
} | ||
async function forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse, signal) { | ||
for await (const chunk of fetchResponse.body) { | ||
if (signal.aborted) { | ||
return; | ||
function getBufferFromChunks() { | ||
if (!buffer) { | ||
buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks); | ||
} | ||
uwsResponse.cork(() => { | ||
uwsResponse.write(chunk); | ||
return buffer; | ||
} | ||
function collectBuffer() { | ||
if (stopped) { | ||
return fakePromise(getBufferFromChunks()); | ||
} | ||
return new Promise((resolve, reject) => { | ||
try { | ||
stopFns.push(() => { | ||
resolve(getBufferFromChunks()); | ||
}); | ||
} | ||
catch (e) { | ||
reject(e); | ||
} | ||
}); | ||
} | ||
uwsResponse.cork(() => { | ||
uwsResponse.end(); | ||
Object.defineProperties(request, { | ||
body: { | ||
get() { | ||
return getBody(); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
json: { | ||
value() { | ||
return collectBuffer() | ||
.then(b => b.toString('utf8')) | ||
.then(t => JSON.parse(t)); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
text: { | ||
value() { | ||
return collectBuffer().then(b => b.toString('utf8')); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
arrayBuffer: { | ||
value() { | ||
return collectBuffer(); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
}); | ||
return request; | ||
} | ||
function sendResponseToUwsOpts(uwsResponse, fetchResponse, signal) { | ||
function createWritableFromUWS(uwsResponse, fetchAPI) { | ||
return new fetchAPI.WritableStream({ | ||
write(chunk) { | ||
uwsResponse.cork(() => { | ||
uwsResponse.write(chunk); | ||
}); | ||
}, | ||
close() { | ||
uwsResponse.cork(() => { | ||
uwsResponse.end(); | ||
}); | ||
}, | ||
}); | ||
} | ||
function sendResponseToUwsOpts(uwsResponse, fetchResponse, signal, fetchAPI) { | ||
if (!fetchResponse) { | ||
@@ -115,11 +207,57 @@ uwsResponse.writeStatus('404 Not Found'); | ||
} | ||
else if (!fetchResponse.body) { | ||
uwsResponse.end(); | ||
} | ||
}); | ||
if (bufferOfRes) { | ||
if (bufferOfRes || !fetchResponse.body) { | ||
return; | ||
} | ||
if (!fetchResponse.body) { | ||
uwsResponse.end(); | ||
return; | ||
signal.addEventListener('abort', () => { | ||
if (!fetchResponse.body?.locked) { | ||
fetchResponse.body?.cancel(signal.reason); | ||
} | ||
}); | ||
return fetchResponse.body | ||
.pipeTo(createWritableFromUWS(uwsResponse, fetchAPI), { | ||
signal, | ||
}) | ||
.catch(err => { | ||
if (signal.aborted) { | ||
return; | ||
} | ||
throw err; | ||
}); | ||
} | ||
function fakePromise(value) { | ||
if ((0, utils_js_1.isPromise)(value)) { | ||
return value; | ||
} | ||
return forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse, signal); | ||
// Write a fake promise to avoid the promise constructor | ||
// being called with `new Promise` in the browser. | ||
return { | ||
then(resolve) { | ||
if (resolve) { | ||
const callbackResult = resolve(value); | ||
if ((0, utils_js_1.isPromise)(callbackResult)) { | ||
return callbackResult; | ||
} | ||
return fakePromise(callbackResult); | ||
} | ||
return this; | ||
}, | ||
catch() { | ||
return this; | ||
}, | ||
finally(cb) { | ||
if (cb) { | ||
const callbackResult = cb(); | ||
if ((0, utils_js_1.isPromise)(callbackResult)) { | ||
return callbackResult.then(() => value); | ||
} | ||
return fakePromise(value); | ||
} | ||
return this; | ||
}, | ||
[Symbol.toStringTag]: 'Promise', | ||
}; | ||
} |
/* eslint-disable @typescript-eslint/ban-types */ | ||
import { AsyncDisposableStack, DisposableSymbols } from '@whatwg-node/disposablestack'; | ||
import * as DefaultFetchAPI from '@whatwg-node/fetch'; | ||
import { completeAssign, handleAbortSignalAndPromiseResponse, handleErrorFromRequestHandler, isFetchEvent, isNodeRequest, isolateObject, isPromise, isRequestInit, isServerResponse, iterateAsyncVoid, normalizeNodeRequest, sendNodeResponse, ServerAdapterRequestAbortSignal, } from './utils.js'; | ||
import { completeAssign, ensureDisposableStackRegisteredForTerminateEvents, handleAbortSignalAndPromiseResponse, handleErrorFromRequestHandler, isFetchEvent, isNodeRequest, isolateObject, isPromise, isRequestInit, isServerResponse, iterateAsyncVoid, nodeRequestResponseMap, normalizeNodeRequest, sendNodeResponse, ServerAdapterRequestAbortSignal, } from './utils.js'; | ||
import { getRequestFromUWSRequest, isUWSResponse, sendResponseToUwsOpts, } from './uwebsockets.js'; | ||
async function handleWaitUntils(waitUntilPromises) { | ||
await Promise.allSettled(waitUntilPromises); | ||
} | ||
// Required for envs like nextjs edge runtime | ||
@@ -28,2 +26,32 @@ function isRequestAccessible(serverContext) { | ||
const onResponseHooks = []; | ||
const waitUntilPromises = new Set(); | ||
const disposableStack = new AsyncDisposableStack(); | ||
const signals = new Set(); | ||
function registerSignal(signal) { | ||
signals.add(signal); | ||
signal.addEventListener('abort', () => { | ||
signals.delete(signal); | ||
}); | ||
} | ||
disposableStack.defer(() => { | ||
for (const signal of signals) { | ||
signal.sendAbort(); | ||
} | ||
}); | ||
function handleWaitUntils() { | ||
return Promise.allSettled(waitUntilPromises).then(() => { }, () => { }); | ||
} | ||
disposableStack.defer(handleWaitUntils); | ||
function waitUntil(promiseLike) { | ||
// If it is a Node.js environment, we should register the disposable stack to handle process termination events | ||
if (globalThis.process) { | ||
ensureDisposableStackRegisteredForTerminateEvents(disposableStack); | ||
} | ||
waitUntilPromises.add(promiseLike.then(() => { | ||
waitUntilPromises.delete(promiseLike); | ||
}, err => { | ||
console.error(`Unexpected error while waiting: ${err.message || err}`); | ||
waitUntilPromises.delete(promiseLike); | ||
})); | ||
} | ||
if (options?.plugins != null) { | ||
@@ -107,16 +135,17 @@ for (const plugin of options.plugins) { | ||
// TODO: Remove this on the next major version | ||
function handleNodeRequestAndResponse(nodeRequest, nodeResponseOrContainer, ...ctx) { | ||
const nodeResponse = nodeResponseOrContainer.raw || nodeResponseOrContainer; | ||
function handleNodeRequest(nodeRequest, ...ctx) { | ||
const serverContext = ctx.length > 1 ? completeAssign(...ctx) : ctx[0] || {}; | ||
const request = normalizeNodeRequest(nodeRequest, nodeResponse, fetchAPI.Request); | ||
const request = normalizeNodeRequest(nodeRequest, fetchAPI, registerSignal); | ||
return handleRequest(request, serverContext); | ||
} | ||
function handleNodeRequestAndResponse(nodeRequest, nodeResponseOrContainer, ...ctx) { | ||
const nodeResponse = nodeResponseOrContainer.raw || nodeResponseOrContainer; | ||
nodeRequestResponseMap.set(nodeRequest, nodeResponse); | ||
return handleNodeRequest(nodeRequest, ...ctx); | ||
} | ||
function requestListener(nodeRequest, nodeResponse, ...ctx) { | ||
const waitUntilPromises = []; | ||
const defaultServerContext = { | ||
req: nodeRequest, | ||
res: nodeResponse, | ||
waitUntil(cb) { | ||
waitUntilPromises.push(cb.catch(err => console.error(err))); | ||
}, | ||
waitUntil, | ||
}; | ||
@@ -146,9 +175,6 @@ let response$; | ||
function handleUWS(res, req, ...ctx) { | ||
const waitUntilPromises = []; | ||
const defaultServerContext = { | ||
res, | ||
req, | ||
waitUntil(cb) { | ||
waitUntilPromises.push(cb.catch(err => console.error(err))); | ||
}, | ||
waitUntil, | ||
}; | ||
@@ -160,2 +186,3 @@ const filteredCtxParts = ctx.filter(partCtx => partCtx != null); | ||
const signal = new ServerAdapterRequestAbortSignal(); | ||
registerSignal(signal); | ||
const originalResEnd = res.end.bind(res); | ||
@@ -192,3 +219,3 @@ let resEnded = false; | ||
if (!signal.aborted && !resEnded) { | ||
return sendResponseToUwsOpts(res, response, signal); | ||
return sendResponseToUwsOpts(res, response, signal, fetchAPI); | ||
} | ||
@@ -202,3 +229,3 @@ }) | ||
if (!signal.aborted && !resEnded) { | ||
return sendResponseToUwsOpts(res, response$, signal); | ||
return sendResponseToUwsOpts(res, response$, signal, fetchAPI); | ||
} | ||
@@ -223,13 +250,8 @@ } | ||
const filteredCtxParts = ctx.filter(partCtx => partCtx != null); | ||
let waitUntilPromises; | ||
const serverContext = filteredCtxParts.length > 1 | ||
? completeAssign({}, ...filteredCtxParts) | ||
: isolateObject(filteredCtxParts[0], filteredCtxParts[0] == null || filteredCtxParts[0].waitUntil == null | ||
? (waitUntilPromises = []) | ||
? waitUntil | ||
: undefined); | ||
const response$ = handleRequest(request, serverContext); | ||
if (waitUntilPromises?.length) { | ||
return handleWaitUntils(waitUntilPromises).then(() => response$); | ||
} | ||
return response$; | ||
return handleRequest(request, serverContext); | ||
} | ||
@@ -281,2 +303,3 @@ const fetchFn = (input, ...maybeCtx) => { | ||
fetch: fetchFn, | ||
handleNodeRequest, | ||
handleNodeRequestAndResponse, | ||
@@ -287,2 +310,9 @@ requestListener, | ||
handle: genericRequestHandler, | ||
disposableStack, | ||
[DisposableSymbols.asyncDispose]() { | ||
return disposableStack.disposeAsync(); | ||
}, | ||
dispose() { | ||
return disposableStack.disposeAsync(); | ||
}, | ||
}; | ||
@@ -289,0 +319,0 @@ const serverAdapter = new Proxy(genericRequestHandler, { |
@@ -1,2 +0,2 @@ | ||
import { decompressedResponseMap, getSupportedEncodings } from '../utils.js'; | ||
import { decompressedResponseMap, getSupportedEncodings, isAsyncIterable, isReadable, } from '../utils.js'; | ||
export function useContentEncoding() { | ||
@@ -21,3 +21,3 @@ const encodingMap = new WeakMap(); | ||
} | ||
const newRequest = new fetchAPI.Request(request.url, { | ||
request = new fetchAPI.Request(request.url, { | ||
body: newBody, | ||
@@ -39,3 +39,3 @@ cache: request.cache, | ||
}); | ||
setRequest(newRequest); | ||
setRequest(request); | ||
} | ||
@@ -48,3 +48,4 @@ } | ||
}, | ||
onResponse({ request, response, setResponse, fetchAPI }) { | ||
onResponse({ request, response, setResponse, fetchAPI, serverContext }) { | ||
const waitUntil = serverContext.waitUntil?.bind(serverContext) || (() => { }); | ||
// Hack for avoiding to create whatwg-node to create a readable stream until it's needed | ||
@@ -63,17 +64,11 @@ if (response['bodyInit'] || response.body) { | ||
const writer = compressionStream.writable.getWriter(); | ||
writer.write(bufOfRes); | ||
writer.close(); | ||
const reader = compressionStream.readable.getReader(); | ||
return Promise.resolve().then(async () => { | ||
const chunks = []; | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
reader.releaseLock(); | ||
break; | ||
} | ||
else if (value) { | ||
chunks.push(...value); | ||
} | ||
} | ||
waitUntil(writer.write(bufOfRes)); | ||
waitUntil(writer.close()); | ||
const uint8Arrays$ = isReadable(compressionStream.readable['readable']) | ||
? collectReadableValues(compressionStream.readable['readable']) | ||
: isAsyncIterable(compressionStream.readable) | ||
? collectAsyncIterableValues(compressionStream.readable) | ||
: collectReadableStreamValues(compressionStream.readable); | ||
return uint8Arrays$.then(uint8Arrays => { | ||
const chunks = uint8Arrays.flatMap(uint8Array => [...uint8Array]); | ||
const uint8Array = new Uint8Array(chunks); | ||
@@ -89,2 +84,3 @@ const newHeaders = new fetchAPI.Headers(response.headers); | ||
setResponse(compressedResponse); | ||
waitUntil(compressionStream.writable.close()); | ||
}); | ||
@@ -110,1 +106,31 @@ } | ||
} | ||
function collectReadableValues(readable) { | ||
const values = []; | ||
readable.on('data', value => values.push(value)); | ||
return new Promise((resolve, reject) => { | ||
readable.once('end', () => resolve(values)); | ||
readable.once('error', reject); | ||
}); | ||
} | ||
async function collectAsyncIterableValues(asyncIterable) { | ||
const values = []; | ||
for await (const value of asyncIterable) { | ||
values.push(value); | ||
} | ||
return values; | ||
} | ||
async function collectReadableStreamValues(readableStream) { | ||
const reader = readableStream.getReader(); | ||
const values = []; | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
reader.releaseLock(); | ||
break; | ||
} | ||
else if (value) { | ||
values.push(value); | ||
} | ||
} | ||
return values; | ||
} |
@@ -22,2 +22,7 @@ import { Response as DefaultResponseCtor } from '@whatwg-node/fetch'; | ||
export class HTTPError extends Error { | ||
status; | ||
message; | ||
headers; | ||
details; | ||
name = 'HTTPError'; | ||
constructor(status = 500, message, headers = {}, details) { | ||
@@ -29,3 +34,2 @@ super(message); | ||
this.details = details; | ||
this.name = 'HTTPError'; | ||
Error.captureStackTrace(this, HTTPError); | ||
@@ -32,0 +36,0 @@ } |
215
esm/utils.js
@@ -1,2 +0,1 @@ | ||
import { URL } from '@whatwg-node/fetch'; | ||
export function isAsyncIterable(body) { | ||
@@ -52,7 +51,5 @@ return (body != null && typeof body === 'object' && typeof body[Symbol.asyncIterator] === 'function'); | ||
export class ServerAdapterRequestAbortSignal extends EventTarget { | ||
constructor() { | ||
super(...arguments); | ||
this.aborted = false; | ||
this._onabort = null; | ||
} | ||
aborted = false; | ||
_onabort = null; | ||
reason; | ||
throwIfAborted() { | ||
@@ -85,7 +82,8 @@ if (this.aborted) { | ||
let bunNodeCompatModeWarned = false; | ||
export function normalizeNodeRequest(nodeRequest, nodeResponse, RequestCtor) { | ||
export const nodeRequestResponseMap = new WeakMap(); | ||
export function normalizeNodeRequest(nodeRequest, fetchAPI, registerSignal) { | ||
const rawRequest = nodeRequest.raw || nodeRequest.req || nodeRequest; | ||
let fullUrl = buildFullUrl(rawRequest); | ||
if (nodeRequest.query) { | ||
const url = new URL(fullUrl); | ||
const url = new fetchAPI.URL(fullUrl); | ||
for (const key in nodeRequest.query) { | ||
@@ -97,2 +95,4 @@ url.searchParams.set(key, nodeRequest.query[key]); | ||
let signal; | ||
const nodeResponse = nodeRequestResponseMap.get(nodeRequest); | ||
nodeRequestResponseMap.delete(nodeRequest); | ||
let normalizedHeaders = nodeRequest.headers; | ||
@@ -110,4 +110,6 @@ if (nodeRequest.headers?.[':method']) { | ||
// If ponyfilled | ||
if (RequestCtor !== globalThis.Request) { | ||
signal = new ServerAdapterRequestAbortSignal(); | ||
if (fetchAPI.Request !== globalThis.Request) { | ||
const newSignal = new ServerAdapterRequestAbortSignal(); | ||
registerSignal?.(newSignal); | ||
signal = newSignal; | ||
sendAbortSignal = () => signal.sendAbort(); | ||
@@ -133,3 +135,3 @@ } | ||
if (nodeRequest.method === 'GET' || nodeRequest.method === 'HEAD') { | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -149,3 +151,3 @@ headers: normalizedHeaders, | ||
if (isRequestBody(maybeParsedBody)) { | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -157,3 +159,3 @@ headers: normalizedHeaders, | ||
} | ||
const request = new RequestCtor(fullUrl, { | ||
const request = new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -186,3 +188,3 @@ headers: normalizedHeaders, | ||
} | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -211,3 +213,3 @@ headers: normalizedHeaders, | ||
// perf: instead of spreading the object, we can just pass it as is and it performs better | ||
return new RequestCtor(fullUrl, { | ||
return new fetchAPI.Request(fullUrl, { | ||
method: nodeRequest.method, | ||
@@ -250,7 +252,22 @@ headers: normalizedHeaders, | ||
async function sendAsyncIterable(serverResponse, asyncIterable) { | ||
let closed = false; | ||
const closeEventListener = () => { | ||
closed = true; | ||
}; | ||
serverResponse.once('error', closeEventListener); | ||
serverResponse.once('close', closeEventListener); | ||
serverResponse.once('finish', () => { | ||
serverResponse.removeListener('close', closeEventListener); | ||
}); | ||
for await (const chunk of asyncIterable) { | ||
if (closed) { | ||
break; | ||
} | ||
if (!serverResponse | ||
// @ts-expect-error http and http2 writes are actually compatible | ||
.write(chunk)) { | ||
break; | ||
if (closed) { | ||
break; | ||
} | ||
await new Promise(resolve => serverResponse.once('drain', resolve)); | ||
} | ||
@@ -266,3 +283,3 @@ } | ||
serverResponse.statusCode = 404; | ||
serverResponse.end(); | ||
endResponse(serverResponse); | ||
return; | ||
@@ -316,2 +333,5 @@ } | ||
} | ||
if (isReadableStream(fetchBody)) { | ||
return sendReadableStream(serverResponse, fetchBody); | ||
} | ||
if (isAsyncIterable(fetchBody)) { | ||
@@ -321,2 +341,20 @@ return sendAsyncIterable(serverResponse, fetchBody); | ||
} | ||
async function sendReadableStream(serverResponse, readableStream) { | ||
const reader = readableStream.getReader(); | ||
serverResponse.req.once('error', err => { | ||
reader.cancel(err); | ||
}); | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
break; | ||
} | ||
if (!serverResponse | ||
// @ts-expect-error http and http2 writes are actually compatible | ||
.write(value)) { | ||
await new Promise(resolve => serverResponse.once('drain', resolve)); | ||
} | ||
} | ||
endResponse(serverResponse); | ||
} | ||
export function isRequestInit(val) { | ||
@@ -398,73 +436,14 @@ return (val != null && | ||
} | ||
export function isolateObject(originalCtx, waitUntilPromises) { | ||
export function isolateObject(originalCtx, waitUntilFn) { | ||
if (originalCtx == null) { | ||
return {}; | ||
if (waitUntilFn == null) { | ||
return {}; | ||
} | ||
return { | ||
waitUntil: waitUntilFn, | ||
}; | ||
} | ||
const extraProps = {}; | ||
const deletedProps = new Set(); | ||
return new Proxy(originalCtx, { | ||
get(originalCtx, prop) { | ||
if (waitUntilPromises != null && prop === 'waitUntil') { | ||
return function waitUntil(promise) { | ||
waitUntilPromises.push(promise.catch(err => console.error(err))); | ||
}; | ||
} | ||
const extraPropVal = extraProps[prop]; | ||
if (extraPropVal != null) { | ||
if (typeof extraPropVal === 'function') { | ||
return extraPropVal.bind(extraProps); | ||
} | ||
return extraPropVal; | ||
} | ||
if (deletedProps.has(prop)) { | ||
return undefined; | ||
} | ||
return originalCtx[prop]; | ||
}, | ||
set(_originalCtx, prop, value) { | ||
extraProps[prop] = value; | ||
return true; | ||
}, | ||
has(originalCtx, prop) { | ||
if (waitUntilPromises != null && prop === 'waitUntil') { | ||
return true; | ||
} | ||
if (deletedProps.has(prop)) { | ||
return false; | ||
} | ||
if (prop in extraProps) { | ||
return true; | ||
} | ||
return prop in originalCtx; | ||
}, | ||
defineProperty(_originalCtx, prop, descriptor) { | ||
return Reflect.defineProperty(extraProps, prop, descriptor); | ||
}, | ||
deleteProperty(_originalCtx, prop) { | ||
if (prop in extraProps) { | ||
return Reflect.deleteProperty(extraProps, prop); | ||
} | ||
deletedProps.add(prop); | ||
return true; | ||
}, | ||
ownKeys(originalCtx) { | ||
const extraKeys = Reflect.ownKeys(extraProps); | ||
const originalKeys = Reflect.ownKeys(originalCtx); | ||
const deletedKeys = Array.from(deletedProps); | ||
const allKeys = new Set(extraKeys.concat(originalKeys.filter(keys => !deletedKeys.includes(keys)))); | ||
if (waitUntilPromises != null) { | ||
allKeys.add('waitUntil'); | ||
} | ||
return Array.from(allKeys); | ||
}, | ||
getOwnPropertyDescriptor(originalCtx, prop) { | ||
if (prop in extraProps) { | ||
return Reflect.getOwnPropertyDescriptor(extraProps, prop); | ||
} | ||
if (deletedProps.has(prop)) { | ||
return undefined; | ||
} | ||
return Reflect.getOwnPropertyDescriptor(originalCtx, prop); | ||
}, | ||
}); | ||
return completeAssign(Object.create(originalCtx), { | ||
waitUntil: waitUntilFn, | ||
}, originalCtx); | ||
} | ||
@@ -511,18 +490,23 @@ export function createDeferredPromise() { | ||
const possibleEncodings = ['deflate', 'gzip', 'deflate-raw', 'br']; | ||
supportedEncodings = possibleEncodings.filter(encoding => { | ||
// deflate-raw is not supported in Node.js >v20 | ||
if (globalThis.process?.version?.startsWith('v2') && | ||
fetchAPI.DecompressionStream === globalThis.DecompressionStream && | ||
encoding === 'deflate-raw') { | ||
return false; | ||
} | ||
try { | ||
// eslint-disable-next-line no-new | ||
new fetchAPI.DecompressionStream(encoding); | ||
return true; | ||
} | ||
catch { | ||
return false; | ||
} | ||
}); | ||
if (fetchAPI.DecompressionStream?.['supportedFormats']) { | ||
supportedEncodings = fetchAPI.DecompressionStream['supportedFormats']; | ||
} | ||
else { | ||
supportedEncodings = possibleEncodings.filter(encoding => { | ||
// deflate-raw is not supported in Node.js >v20 | ||
if (globalThis.process?.version?.startsWith('v2') && | ||
fetchAPI.DecompressionStream === globalThis.DecompressionStream && | ||
encoding === 'deflate-raw') { | ||
return false; | ||
} | ||
try { | ||
// eslint-disable-next-line no-new | ||
new fetchAPI.DecompressionStream(encoding); | ||
return true; | ||
} | ||
catch { | ||
return false; | ||
} | ||
}); | ||
} | ||
supportedEncodingsByFetchAPI.set(fetchAPI, supportedEncodings); | ||
@@ -558,1 +542,28 @@ } | ||
} | ||
const terminateEvents = ['SIGINT', 'SIGTERM', 'exit']; | ||
const disposableStacks = new Set(); | ||
let eventListenerRegistered = false; | ||
function ensureEventListenerForDisposableStacks() { | ||
if (eventListenerRegistered) { | ||
return; | ||
} | ||
eventListenerRegistered = true; | ||
for (const event of terminateEvents) { | ||
globalThis.process.once(event, function terminateHandler() { | ||
return Promise.allSettled([...disposableStacks].map(stack => stack.disposeAsync().catch(e => { | ||
console.error('Error while disposing:', e); | ||
}))); | ||
}); | ||
} | ||
} | ||
export function ensureDisposableStackRegisteredForTerminateEvents(disposableStack) { | ||
if (globalThis.process) { | ||
ensureEventListenerForDisposableStacks(); | ||
if (!disposableStacks.has(disposableStack)) { | ||
disposableStacks.add(disposableStack); | ||
disposableStack.defer(() => { | ||
disposableStacks.delete(disposableStack); | ||
}); | ||
} | ||
} | ||
} |
@@ -0,1 +1,2 @@ | ||
import { isPromise } from './utils.js'; | ||
export function isUWSResponse(res) { | ||
@@ -5,41 +6,63 @@ return !!res.onData; | ||
export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }) { | ||
let body; | ||
const method = req.getMethod(); | ||
let duplex; | ||
const chunks = []; | ||
const pushFns = [ | ||
(chunk) => { | ||
chunks.push(chunk); | ||
}, | ||
]; | ||
const push = (chunk) => { | ||
for (const pushFn of pushFns) { | ||
pushFn(chunk); | ||
} | ||
}; | ||
let stopped = false; | ||
const stopFns = [ | ||
() => { | ||
stopped = true; | ||
}, | ||
]; | ||
const stop = () => { | ||
for (const stopFn of stopFns) { | ||
stopFn(); | ||
} | ||
}; | ||
res.onData(function (ab, isLast) { | ||
push(Buffer.from(Buffer.from(ab, 0, ab.byteLength))); | ||
if (isLast) { | ||
stop(); | ||
} | ||
}); | ||
let getReadableStream; | ||
if (method !== 'get' && method !== 'head') { | ||
let controller; | ||
body = new fetchAPI.ReadableStream({ | ||
start(c) { | ||
controller = c; | ||
}, | ||
duplex = 'half'; | ||
signal.addEventListener('abort', () => { | ||
stop(); | ||
}); | ||
const readable = body.readable; | ||
if (readable) { | ||
signal.addEventListener('abort', () => { | ||
readable.push(null); | ||
}); | ||
res.onData(function (ab, isLast) { | ||
const chunk = Buffer.from(ab, 0, ab.byteLength); | ||
readable.push(Buffer.from(chunk)); | ||
if (isLast) { | ||
readable.push(null); | ||
} | ||
}); | ||
} | ||
else { | ||
let closed = false; | ||
signal.addEventListener('abort', () => { | ||
if (!closed) { | ||
closed = true; | ||
controller.close(); | ||
} | ||
}); | ||
res.onData(function (ab, isLast) { | ||
const chunk = Buffer.from(ab, 0, ab.byteLength); | ||
controller.enqueue(Buffer.from(chunk)); | ||
if (isLast) { | ||
closed = true; | ||
controller.close(); | ||
} | ||
}); | ||
} | ||
let readableStream; | ||
getReadableStream = () => { | ||
if (!readableStream) { | ||
readableStream = new fetchAPI.ReadableStream({ | ||
start(controller) { | ||
for (const chunk of chunks) { | ||
controller.enqueue(chunk); | ||
} | ||
if (stopped) { | ||
controller.close(); | ||
return; | ||
} | ||
pushFns.push((chunk) => { | ||
controller.enqueue(chunk); | ||
}); | ||
stopFns.push(() => { | ||
if (controller.desiredSize) { | ||
controller.close(); | ||
} | ||
}); | ||
}, | ||
}); | ||
} | ||
return readableStream; | ||
}; | ||
} | ||
@@ -55,26 +78,93 @@ const headers = new fetchAPI.Headers(); | ||
} | ||
return new fetchAPI.Request(url, { | ||
let buffer; | ||
function getBody() { | ||
if (!getReadableStream) { | ||
return null; | ||
} | ||
if (stopped) { | ||
return getBufferFromChunks(); | ||
} | ||
return getReadableStream(); | ||
} | ||
const request = new fetchAPI.Request(url, { | ||
method, | ||
headers, | ||
body: body, | ||
get body() { | ||
return getBody(); | ||
}, | ||
signal, | ||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | ||
// @ts-ignore - not in the TS types yet | ||
duplex: 'half', | ||
duplex, | ||
}); | ||
} | ||
async function forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse, signal) { | ||
for await (const chunk of fetchResponse.body) { | ||
if (signal.aborted) { | ||
return; | ||
function getBufferFromChunks() { | ||
if (!buffer) { | ||
buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks); | ||
} | ||
uwsResponse.cork(() => { | ||
uwsResponse.write(chunk); | ||
return buffer; | ||
} | ||
function collectBuffer() { | ||
if (stopped) { | ||
return fakePromise(getBufferFromChunks()); | ||
} | ||
return new Promise((resolve, reject) => { | ||
try { | ||
stopFns.push(() => { | ||
resolve(getBufferFromChunks()); | ||
}); | ||
} | ||
catch (e) { | ||
reject(e); | ||
} | ||
}); | ||
} | ||
uwsResponse.cork(() => { | ||
uwsResponse.end(); | ||
Object.defineProperties(request, { | ||
body: { | ||
get() { | ||
return getBody(); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
json: { | ||
value() { | ||
return collectBuffer() | ||
.then(b => b.toString('utf8')) | ||
.then(t => JSON.parse(t)); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
text: { | ||
value() { | ||
return collectBuffer().then(b => b.toString('utf8')); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
arrayBuffer: { | ||
value() { | ||
return collectBuffer(); | ||
}, | ||
configurable: true, | ||
enumerable: true, | ||
}, | ||
}); | ||
return request; | ||
} | ||
export function sendResponseToUwsOpts(uwsResponse, fetchResponse, signal) { | ||
export function createWritableFromUWS(uwsResponse, fetchAPI) { | ||
return new fetchAPI.WritableStream({ | ||
write(chunk) { | ||
uwsResponse.cork(() => { | ||
uwsResponse.write(chunk); | ||
}); | ||
}, | ||
close() { | ||
uwsResponse.cork(() => { | ||
uwsResponse.end(); | ||
}); | ||
}, | ||
}); | ||
} | ||
export function sendResponseToUwsOpts(uwsResponse, fetchResponse, signal, fetchAPI) { | ||
if (!fetchResponse) { | ||
@@ -109,11 +199,57 @@ uwsResponse.writeStatus('404 Not Found'); | ||
} | ||
else if (!fetchResponse.body) { | ||
uwsResponse.end(); | ||
} | ||
}); | ||
if (bufferOfRes) { | ||
if (bufferOfRes || !fetchResponse.body) { | ||
return; | ||
} | ||
if (!fetchResponse.body) { | ||
uwsResponse.end(); | ||
return; | ||
signal.addEventListener('abort', () => { | ||
if (!fetchResponse.body?.locked) { | ||
fetchResponse.body?.cancel(signal.reason); | ||
} | ||
}); | ||
return fetchResponse.body | ||
.pipeTo(createWritableFromUWS(uwsResponse, fetchAPI), { | ||
signal, | ||
}) | ||
.catch(err => { | ||
if (signal.aborted) { | ||
return; | ||
} | ||
throw err; | ||
}); | ||
} | ||
export function fakePromise(value) { | ||
if (isPromise(value)) { | ||
return value; | ||
} | ||
return forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse, signal); | ||
// Write a fake promise to avoid the promise constructor | ||
// being called with `new Promise` in the browser. | ||
return { | ||
then(resolve) { | ||
if (resolve) { | ||
const callbackResult = resolve(value); | ||
if (isPromise(callbackResult)) { | ||
return callbackResult; | ||
} | ||
return fakePromise(callbackResult); | ||
} | ||
return this; | ||
}, | ||
catch() { | ||
return this; | ||
}, | ||
finally(cb) { | ||
if (cb) { | ||
const callbackResult = cb(); | ||
if (isPromise(callbackResult)) { | ||
return callbackResult.then(() => value); | ||
} | ||
return fakePromise(value); | ||
} | ||
return this; | ||
}, | ||
[Symbol.toStringTag]: 'Promise', | ||
}; | ||
} |
{ | ||
"name": "@whatwg-node/server", | ||
"version": "0.10.0-alpha-20240726141316-c6ce93b3598457ebe73b3b725986723af8f5e609", | ||
"version": "0.10.0-alpha-20241123133536-975c9068dde45574fcfa26567e4bab96f45d1f85", | ||
"description": "Fetch API compliant HTTP Server adapter", | ||
"sideEffects": false, | ||
"dependencies": { | ||
"@whatwg-node/fetch": "^0.9.19", | ||
"@whatwg-node/disposablestack": "^0.0.5", | ||
"@whatwg-node/fetch": "^0.10.0", | ||
"tslib": "^2.6.3" | ||
@@ -18,3 +19,3 @@ }, | ||
"engines": { | ||
"node": ">=16.0.0" | ||
"node": ">=18.0.0" | ||
}, | ||
@@ -21,0 +22,0 @@ "main": "cjs/index.js", |
@@ -1,2 +0,2 @@ | ||
import { ServerAdapterPlugin } from './types.js'; | ||
import type { ServerAdapterPlugin } from './types.js'; | ||
export type CORSOptions = { | ||
@@ -3,0 +3,0 @@ origin?: string[] | string; |
@@ -1,2 +0,2 @@ | ||
import { ServerAdapterPlugin } from './types.js'; | ||
import type { ServerAdapterPlugin } from './types.js'; | ||
export declare function createDefaultErrorHandler<TServerContext = {}>(ResponseCtor?: typeof Response): ErrorHandler<TServerContext>; | ||
@@ -9,5 +9,5 @@ export declare class HTTPError extends Error { | ||
name: string; | ||
constructor(status: number, message: string, headers?: HeadersInit, details?: any); | ||
constructor(status: number | undefined, message: string, headers?: HeadersInit, details?: any); | ||
} | ||
export type ErrorHandler<TServerContext> = (e: any, request: Request, ctx: TServerContext) => Response | Promise<Response>; | ||
export declare function useErrorHandling<TServerContext>(onError?: ErrorHandler<TServerContext>): ServerAdapterPlugin<TServerContext>; |
@@ -16,3 +16,3 @@ import type { RequestListener } from 'http'; | ||
} | ||
export interface ServerAdapterObject<TServerContext> extends EventListenerObject { | ||
export interface ServerAdapterObject<TServerContext> extends EventListenerObject, AsyncDisposable { | ||
/** | ||
@@ -36,2 +36,8 @@ * A basic request listener that takes a `Request` with the server context and returns a `Response`. | ||
/** | ||
* This function takes Node's request object and returns a WHATWG Fetch spec compliant `Response` object. | ||
* | ||
* @deprecated Use `handleNodeRequestAndResponse` instead. | ||
**/ | ||
handleNodeRequest(nodeRequest: NodeRequest, ...ctx: Partial<TServerContext & ServerAdapterInitialContext>[]): Promise<Response> | Response; | ||
/** | ||
* This function takes Node's request and response objects and returns a WHATWG Fetch spec compliant `Response` object. | ||
@@ -55,2 +61,4 @@ */ | ||
} & Partial<TServerContext & ServerAdapterInitialContext>, ...ctx: Partial<TServerContext & ServerAdapterInitialContext>[]): Promise<Response> | Response; | ||
disposableStack: AsyncDisposableStack; | ||
dispose(): Promise<void>; | ||
} | ||
@@ -74,3 +82,3 @@ export interface RequestLike { | ||
}; | ||
export type WaitUntilFn = (promise: Promise<unknown>) => void; | ||
export type WaitUntilFn = (promise: Promise<void> | void) => void; | ||
export type FetchAPI = ReturnType<typeof import('@whatwg-node/fetch').createFetch>; | ||
@@ -77,0 +85,0 @@ export type ServerAdapterInitialContext = { |
@@ -33,3 +33,4 @@ import type { IncomingMessage, ServerResponse } from 'http'; | ||
} | ||
export declare function normalizeNodeRequest(nodeRequest: NodeRequest, nodeResponse: NodeResponse, RequestCtor: typeof Request): Request; | ||
export declare const nodeRequestResponseMap: WeakMap<NodeRequest, NodeResponse>; | ||
export declare function normalizeNodeRequest(nodeRequest: NodeRequest, fetchAPI: FetchAPI, registerSignal?: (signal: ServerAdapterRequestAbortSignal) => void): Request; | ||
export declare function isReadable(stream: any): stream is Readable; | ||
@@ -46,3 +47,3 @@ export declare function isNodeRequest(request: any): request is NodeRequest; | ||
export declare function handleErrorFromRequestHandler(error: any, ResponseCtor: typeof Response): Response; | ||
export declare function isolateObject<TIsolatedObject extends object>(originalCtx: TIsolatedObject, waitUntilPromises?: Promise<unknown>[]): TIsolatedObject; | ||
export declare function isolateObject<TIsolatedObject extends object>(originalCtx: TIsolatedObject, waitUntilFn?: (promiseLike: PromiseLike<unknown>) => void): TIsolatedObject; | ||
export interface DeferredPromise<T = void> { | ||
@@ -58,1 +59,2 @@ promise: Promise<T>; | ||
export declare function handleResponseDecompression(response: Response, fetchAPI: FetchAPI): Response; | ||
export declare function ensureDisposableStackRegisteredForTerminateEvents(disposableStack: AsyncDisposableStack): void; |
@@ -30,3 +30,5 @@ import type { FetchAPI } from './types.js'; | ||
export declare function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequestFromUWSOpts): Request; | ||
export declare function sendResponseToUwsOpts(uwsResponse: UWSResponse, fetchResponse: Response, signal: ServerAdapterRequestAbortSignal): Promise<void> | undefined; | ||
export declare function createWritableFromUWS(uwsResponse: UWSResponse, fetchAPI: FetchAPI): WritableStream<any>; | ||
export declare function sendResponseToUwsOpts(uwsResponse: UWSResponse, fetchResponse: Response, signal: ServerAdapterRequestAbortSignal, fetchAPI: FetchAPI): Promise<void> | undefined; | ||
export declare function fakePromise<T>(value: T): Promise<T>; | ||
export {}; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
152298
3248
3
+ Added@whatwg-node/disposablestack@0.0.5(transitive)
+ Added@whatwg-node/fetch@0.10.1(transitive)
+ Added@whatwg-node/node-fetch@0.7.4(transitive)
- Removed@whatwg-node/fetch@0.9.23(transitive)
- Removed@whatwg-node/node-fetch@0.6.0(transitive)
Updated@whatwg-node/fetch@^0.10.0