@electric-sql/client
Advanced tools
Comparing version 0.5.1 to 0.6.0
@@ -164,41 +164,2 @@ var __defProp = Object.defineProperty; | ||
// src/queue.ts | ||
function isThenable(value) { | ||
return !!value && typeof value === `object` && `then` in value && typeof value.then === `function`; | ||
} | ||
var _processingChain; | ||
var AsyncProcessingQueue = class { | ||
constructor() { | ||
__privateAdd(this, _processingChain); | ||
} | ||
process(callback) { | ||
__privateSet(this, _processingChain, isThenable(__privateGet(this, _processingChain)) ? __privateGet(this, _processingChain).then(callback) : callback()); | ||
return __privateGet(this, _processingChain); | ||
} | ||
async waitForProcessing() { | ||
let currentChain; | ||
do { | ||
currentChain = __privateGet(this, _processingChain); | ||
await currentChain; | ||
} while (__privateGet(this, _processingChain) !== currentChain); | ||
} | ||
}; | ||
_processingChain = new WeakMap(); | ||
var _queue, _callback; | ||
var MessageProcessor = class { | ||
constructor(callback) { | ||
__privateAdd(this, _queue, new AsyncProcessingQueue()); | ||
__privateAdd(this, _callback); | ||
__privateSet(this, _callback, callback); | ||
} | ||
process(messages) { | ||
__privateGet(this, _queue).process(() => __privateGet(this, _callback).call(this, messages)); | ||
} | ||
async waitForProcessing() { | ||
await __privateGet(this, _queue).waitForProcessing(); | ||
} | ||
}; | ||
_queue = new WeakMap(); | ||
_callback = new WeakMap(); | ||
// src/error.ts | ||
@@ -237,2 +198,12 @@ var FetchError = class _FetchError extends Error { | ||
// src/constants.ts | ||
var SHAPE_ID_HEADER = `electric-shape-id`; | ||
var CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset`; | ||
var CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date`; | ||
var SHAPE_SCHEMA_HEADER = `electric-schema`; | ||
var SHAPE_ID_QUERY_PARAM = `shape_id`; | ||
var OFFSET_QUERY_PARAM = `offset`; | ||
var WHERE_QUERY_PARAM = `where`; | ||
var LIVE_QUERY_PARAM = `live`; | ||
// src/fetch.ts | ||
@@ -281,18 +252,119 @@ var BackoffDefaults = { | ||
} | ||
var ChunkPrefetchDefaults = { | ||
maxChunksToPrefetch: 2 | ||
}; | ||
function createFetchWithChunkBuffer(fetchClient, prefetchOptions = ChunkPrefetchDefaults) { | ||
const { maxChunksToPrefetch } = prefetchOptions; | ||
let prefetchQueue; | ||
const prefetchClient = async (...args) => { | ||
const url = args[0].toString(); | ||
const prefetchedRequest = prefetchQueue == null ? void 0 : prefetchQueue.consume(...args); | ||
if (prefetchedRequest) { | ||
return prefetchedRequest; | ||
} | ||
prefetchQueue == null ? void 0 : prefetchQueue.abort(); | ||
const response = await fetchClient(...args); | ||
const nextUrl = getNextChunkUrl(url, response); | ||
if (nextUrl) { | ||
prefetchQueue = new PrefetchQueue({ | ||
fetchClient, | ||
maxPrefetchedRequests: maxChunksToPrefetch, | ||
url: nextUrl, | ||
requestInit: args[1] | ||
}); | ||
} | ||
return response; | ||
}; | ||
return prefetchClient; | ||
} | ||
var _fetchClient, _maxPrefetchedRequests, _prefetchQueue, _queueHeadUrl, _queueTailUrl, _PrefetchQueue_instances, prefetch_fn; | ||
var PrefetchQueue = class { | ||
constructor(options) { | ||
__privateAdd(this, _PrefetchQueue_instances); | ||
__privateAdd(this, _fetchClient); | ||
__privateAdd(this, _maxPrefetchedRequests); | ||
__privateAdd(this, _prefetchQueue, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _queueHeadUrl); | ||
__privateAdd(this, _queueTailUrl); | ||
var _a; | ||
__privateSet(this, _fetchClient, (_a = options.fetchClient) != null ? _a : (...args) => fetch(...args)); | ||
__privateSet(this, _maxPrefetchedRequests, options.maxPrefetchedRequests); | ||
__privateSet(this, _queueHeadUrl, options.url.toString()); | ||
__privateSet(this, _queueTailUrl, __privateGet(this, _queueHeadUrl)); | ||
__privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, options.url, options.requestInit); | ||
} | ||
abort() { | ||
__privateGet(this, _prefetchQueue).forEach(([_, aborter]) => aborter.abort()); | ||
} | ||
consume(...args) { | ||
var _a; | ||
const url = args[0].toString(); | ||
const request = (_a = __privateGet(this, _prefetchQueue).get(url)) == null ? void 0 : _a[0]; | ||
if (!request || url !== __privateGet(this, _queueHeadUrl)) return; | ||
__privateGet(this, _prefetchQueue).delete(url); | ||
request.then((response) => { | ||
const nextUrl = getNextChunkUrl(url, response); | ||
__privateSet(this, _queueHeadUrl, nextUrl); | ||
if (!__privateGet(this, _prefetchQueue).has(__privateGet(this, _queueTailUrl))) { | ||
__privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, __privateGet(this, _queueTailUrl), args[1]); | ||
} | ||
}).catch(() => { | ||
}); | ||
return request; | ||
} | ||
}; | ||
_fetchClient = new WeakMap(); | ||
_maxPrefetchedRequests = new WeakMap(); | ||
_prefetchQueue = new WeakMap(); | ||
_queueHeadUrl = new WeakMap(); | ||
_queueTailUrl = new WeakMap(); | ||
_PrefetchQueue_instances = new WeakSet(); | ||
prefetch_fn = function(...args) { | ||
var _a, _b; | ||
const url = args[0].toString(); | ||
if (__privateGet(this, _prefetchQueue).size >= __privateGet(this, _maxPrefetchedRequests)) return; | ||
const aborter = new AbortController(); | ||
try { | ||
const request = __privateGet(this, _fetchClient).call(this, url, __spreadProps(__spreadValues({}, (_a = args[1]) != null ? _a : {}), { | ||
signal: chainAborter(aborter, (_b = args[1]) == null ? void 0 : _b.signal) | ||
})); | ||
__privateGet(this, _prefetchQueue).set(url, [request, aborter]); | ||
request.then((response) => { | ||
if (!response.ok || aborter.signal.aborted) return; | ||
const nextUrl = getNextChunkUrl(url, response); | ||
if (!nextUrl || nextUrl === url) return; | ||
__privateSet(this, _queueTailUrl, nextUrl); | ||
return __privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, nextUrl, args[1]); | ||
}).catch(() => { | ||
}); | ||
} catch (_) { | ||
} | ||
}; | ||
function getNextChunkUrl(url, res) { | ||
const shapeId = res.headers.get(SHAPE_ID_HEADER); | ||
const lastOffset = res.headers.get(CHUNK_LAST_OFFSET_HEADER); | ||
const isUpToDate = res.headers.has(CHUNK_UP_TO_DATE_HEADER); | ||
if (!shapeId || !lastOffset || isUpToDate) return; | ||
const nextUrl = new URL(url); | ||
if (nextUrl.searchParams.has(LIVE_QUERY_PARAM)) return; | ||
nextUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, shapeId); | ||
nextUrl.searchParams.set(OFFSET_QUERY_PARAM, lastOffset); | ||
return nextUrl.toString(); | ||
} | ||
function chainAborter(aborter, sourceSignal) { | ||
if (!sourceSignal) return aborter.signal; | ||
if (sourceSignal.aborted) aborter.abort(); | ||
else | ||
sourceSignal.addEventListener(`abort`, () => aborter.abort(), { | ||
once: true | ||
}); | ||
return aborter.signal; | ||
} | ||
// src/constants.ts | ||
var SHAPE_ID_HEADER = `x-electric-shape-id`; | ||
var CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset`; | ||
var SHAPE_SCHEMA_HEADER = `x-electric-schema`; | ||
var SHAPE_ID_QUERY_PARAM = `shape_id`; | ||
var OFFSET_QUERY_PARAM = `offset`; | ||
var WHERE_QUERY_PARAM = `where`; | ||
var LIVE_QUERY_PARAM = `live`; | ||
// src/client.ts | ||
var _fetchClient, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn; | ||
var _fetchClient2, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn; | ||
var ShapeStream = class { | ||
constructor(options) { | ||
__privateAdd(this, _ShapeStream_instances); | ||
__privateAdd(this, _fetchClient); | ||
__privateAdd(this, _fetchClient2); | ||
__privateAdd(this, _messageParser); | ||
@@ -314,12 +386,11 @@ __privateAdd(this, _subscribers, /* @__PURE__ */ new Map()); | ||
__privateSet(this, _messageParser, new MessageParser(options.parser)); | ||
__privateSet(this, _fetchClient, createFetchWithBackoff( | ||
(_b = options.fetchClient) != null ? _b : (...args) => fetch(...args), | ||
__spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), { | ||
onFailedAttempt: () => { | ||
var _a2, _b2; | ||
__privateSet(this, _connected, false); | ||
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2); | ||
} | ||
}) | ||
)); | ||
const baseFetchClient = (_b = options.fetchClient) != null ? _b : (...args) => fetch(...args); | ||
const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, __spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), { | ||
onFailedAttempt: () => { | ||
var _a2, _b2; | ||
__privateSet(this, _connected, false); | ||
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2); | ||
} | ||
})); | ||
__privateSet(this, _fetchClient2, createFetchWithChunkBuffer(fetchWithBackoffClient)); | ||
this.start(); | ||
@@ -350,3 +421,3 @@ } | ||
try { | ||
response = await __privateGet(this, _fetchClient).call(this, fetchUrl.toString(), { signal }); | ||
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), { signal }); | ||
__privateSet(this, _connected, true); | ||
@@ -358,3 +429,3 @@ } catch (e) { | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this); | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
@@ -364,3 +435,3 @@ } else if (e.status == 409) { | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeId); | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
@@ -393,11 +464,12 @@ } else if (e.status >= 400 && e.status < 500) { | ||
if (batch.length > 0) { | ||
const prevUpToDate = __privateGet(this, _isUpToDate); | ||
const lastMessage = batch[batch.length - 1]; | ||
if (isUpToDateMessage(lastMessage)) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
if (!__privateGet(this, _isUpToDate)) { | ||
__privateSet(this, _isUpToDate, true); | ||
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this); | ||
} | ||
__privateSet(this, _isUpToDate, true); | ||
} | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch); | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch); | ||
if (!prevUpToDate && __privateGet(this, _isUpToDate)) { | ||
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this); | ||
} | ||
} | ||
@@ -411,4 +483,3 @@ } | ||
const subscriptionId = Math.random(); | ||
const subscriber = new MessageProcessor(callback); | ||
__privateGet(this, _subscribers).set(subscriptionId, [subscriber, onError]); | ||
__privateGet(this, _subscribers).set(subscriptionId, [callback, onError]); | ||
return () => { | ||
@@ -449,3 +520,3 @@ __privateGet(this, _subscribers).delete(subscriptionId); | ||
}; | ||
_fetchClient = new WeakMap(); | ||
_fetchClient2 = new WeakMap(); | ||
_messageParser = new WeakMap(); | ||
@@ -461,6 +532,14 @@ _subscribers = new WeakMap(); | ||
_ShapeStream_instances = new WeakSet(); | ||
publish_fn = function(messages) { | ||
__privateGet(this, _subscribers).forEach(([subscriber, _]) => { | ||
subscriber.process(messages); | ||
}); | ||
publish_fn = async function(messages) { | ||
await Promise.all( | ||
Array.from(__privateGet(this, _subscribers).values()).map(async ([callback, __]) => { | ||
try { | ||
await callback(messages); | ||
} catch (err) { | ||
queueMicrotask(() => { | ||
throw err; | ||
}); | ||
} | ||
}) | ||
); | ||
}; | ||
@@ -467,0 +546,0 @@ sendErrorToSubscribers_fn = function(error) { |
{ | ||
"name": "@electric-sql/client", | ||
"version": "0.5.1", | ||
"version": "0.6.0", | ||
"description": "Postgres everywhere - your data, in sync, wherever you need it.", | ||
@@ -5,0 +5,0 @@ "type": "module", |
@@ -58,2 +58,4 @@ <p align="center"> | ||
// messages is an array with one or more row updates | ||
// and the stream will wait for all subscribers to process them | ||
// before proceeding | ||
}) | ||
@@ -60,0 +62,0 @@ ``` |
import { Message, Offset, Schema, Row, MaybePromise } from './types' | ||
import { MessageParser, Parser } from './parser' | ||
import { isUpToDateMessage } from './helpers' | ||
import { MessageProcessor, MessageProcessorInterface } from './queue' | ||
import { FetchError, FetchBackoffAbortError } from './error' | ||
@@ -10,2 +9,3 @@ import { | ||
createFetchWithBackoff, | ||
createFetchWithChunkBuffer, | ||
} from './fetch' | ||
@@ -123,3 +123,3 @@ import { | ||
[ | ||
MessageProcessorInterface<Message<T>[]>, | ||
(messages: Message<T>[]) => MaybePromise<void>, | ||
((error: Error) => void) | undefined, | ||
@@ -147,14 +147,16 @@ ] | ||
this.#fetchClient = createFetchWithBackoff( | ||
const baseFetchClient = | ||
options.fetchClient ?? | ||
((...args: Parameters<typeof fetch>) => fetch(...args)), | ||
{ | ||
...(options.backoffOptions ?? BackoffDefaults), | ||
onFailedAttempt: () => { | ||
this.#connected = false | ||
options.backoffOptions?.onFailedAttempt?.() | ||
}, | ||
} | ||
) | ||
((...args: Parameters<typeof fetch>) => fetch(...args)) | ||
const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, { | ||
...(options.backoffOptions ?? BackoffDefaults), | ||
onFailedAttempt: () => { | ||
this.#connected = false | ||
options.backoffOptions?.onFailedAttempt?.() | ||
}, | ||
}) | ||
this.#fetchClient = createFetchWithChunkBuffer(fetchWithBackoffClient) | ||
this.start() | ||
@@ -205,3 +207,3 @@ } | ||
this.#reset() | ||
this.#publish(e.json as Message<T>[]) | ||
await this.#publish(e.json as Message<T>[]) | ||
continue | ||
@@ -213,3 +215,3 @@ } else if (e.status == 409) { | ||
this.#reset(newShapeId) | ||
this.#publish(e.json as Message<T>[]) | ||
await this.#publish(e.json as Message<T>[]) | ||
continue | ||
@@ -254,12 +256,13 @@ } else if (e.status >= 400 && e.status < 500) { | ||
if (batch.length > 0) { | ||
const prevUpToDate = this.#isUpToDate | ||
const lastMessage = batch[batch.length - 1] | ||
if (isUpToDateMessage(lastMessage)) { | ||
this.#lastSyncedAt = Date.now() | ||
if (!this.#isUpToDate) { | ||
this.#isUpToDate = true | ||
this.#notifyUpToDateSubscribers() | ||
} | ||
this.#isUpToDate = true | ||
} | ||
this.#publish(batch) | ||
await this.#publish(batch) | ||
if (!prevUpToDate && this.#isUpToDate) { | ||
this.#notifyUpToDateSubscribers() | ||
} | ||
} | ||
@@ -277,5 +280,4 @@ } | ||
const subscriptionId = Math.random() | ||
const subscriber = new MessageProcessor(callback) | ||
this.#subscribers.set(subscriptionId, [subscriber, onError]) | ||
this.#subscribers.set(subscriptionId, [callback, onError]) | ||
@@ -329,6 +331,14 @@ return () => { | ||
#publish(messages: Message<T>[]) { | ||
this.#subscribers.forEach(([subscriber, _]) => { | ||
subscriber.process(messages) | ||
}) | ||
async #publish(messages: Message<T>[]): Promise<void> { | ||
await Promise.all( | ||
Array.from(this.#subscribers.values()).map(async ([callback, __]) => { | ||
try { | ||
await callback(messages) | ||
} catch (err) { | ||
queueMicrotask(() => { | ||
throw err | ||
}) | ||
} | ||
}) | ||
) | ||
} | ||
@@ -335,0 +345,0 @@ |
@@ -1,4 +0,5 @@ | ||
export const SHAPE_ID_HEADER = `x-electric-shape-id` | ||
export const CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset` | ||
export const SHAPE_SCHEMA_HEADER = `x-electric-schema` | ||
export const SHAPE_ID_HEADER = `electric-shape-id` | ||
export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` | ||
export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` | ||
export const SHAPE_SCHEMA_HEADER = `electric-schema` | ||
export const SHAPE_ID_QUERY_PARAM = `shape_id` | ||
@@ -5,0 +6,0 @@ export const OFFSET_QUERY_PARAM = `offset` |
193
src/fetch.ts
@@ -0,1 +1,9 @@ | ||
import { | ||
CHUNK_LAST_OFFSET_HEADER, | ||
CHUNK_UP_TO_DATE_HEADER, | ||
LIVE_QUERY_PARAM, | ||
OFFSET_QUERY_PARAM, | ||
SHAPE_ID_HEADER, | ||
SHAPE_ID_QUERY_PARAM, | ||
} from './constants' | ||
import { FetchError, FetchBackoffAbortError } from './error' | ||
@@ -80,1 +88,186 @@ | ||
} | ||
interface ChunkPrefetchOptions { | ||
maxChunksToPrefetch: number | ||
} | ||
const ChunkPrefetchDefaults = { | ||
maxChunksToPrefetch: 2, | ||
} | ||
/** | ||
* Creates a fetch client that prefetches subsequent log chunks for | ||
* consumption by the shape stream without waiting for the chunk bodies | ||
* themselves to be loaded. | ||
* | ||
* @param fetchClient the client to wrap | ||
* @param prefetchOptions options to configure prefetching | ||
* @returns wrapped client with prefetch capabilities | ||
*/ | ||
export function createFetchWithChunkBuffer( | ||
fetchClient: typeof fetch, | ||
prefetchOptions: ChunkPrefetchOptions = ChunkPrefetchDefaults | ||
): typeof fetch { | ||
const { maxChunksToPrefetch } = prefetchOptions | ||
let prefetchQueue: PrefetchQueue | ||
const prefetchClient = async (...args: Parameters<typeof fetchClient>) => { | ||
const url = args[0].toString() | ||
// try to consume from the prefetch queue first, and if request is | ||
// not present abort the prefetch queue as it must no longer be valid | ||
const prefetchedRequest = prefetchQueue?.consume(...args) | ||
if (prefetchedRequest) { | ||
return prefetchedRequest | ||
} | ||
prefetchQueue?.abort() | ||
// perform request and fire off prefetch queue if request is eligible | ||
const response = await fetchClient(...args) | ||
const nextUrl = getNextChunkUrl(url, response) | ||
if (nextUrl) { | ||
prefetchQueue = new PrefetchQueue({ | ||
fetchClient, | ||
maxPrefetchedRequests: maxChunksToPrefetch, | ||
url: nextUrl, | ||
requestInit: args[1], | ||
}) | ||
} | ||
return response | ||
} | ||
return prefetchClient | ||
} | ||
class PrefetchQueue { | ||
readonly #fetchClient: typeof fetch | ||
readonly #maxPrefetchedRequests: number | ||
readonly #prefetchQueue = new Map< | ||
string, | ||
[Promise<Response>, AbortController] | ||
>() | ||
#queueHeadUrl: string | void | ||
#queueTailUrl: string | ||
constructor(options: { | ||
url: Parameters<typeof fetch>[0] | ||
requestInit: Parameters<typeof fetch>[1] | ||
maxPrefetchedRequests: number | ||
fetchClient?: typeof fetch | ||
}) { | ||
this.#fetchClient = | ||
options.fetchClient ?? | ||
((...args: Parameters<typeof fetch>) => fetch(...args)) | ||
this.#maxPrefetchedRequests = options.maxPrefetchedRequests | ||
this.#queueHeadUrl = options.url.toString() | ||
this.#queueTailUrl = this.#queueHeadUrl | ||
this.#prefetch(options.url, options.requestInit) | ||
} | ||
abort(): void { | ||
this.#prefetchQueue.forEach(([_, aborter]) => aborter.abort()) | ||
} | ||
consume(...args: Parameters<typeof fetch>): Promise<Response> | void { | ||
const url = args[0].toString() | ||
const request = this.#prefetchQueue.get(url)?.[0] | ||
// only consume if request is in queue and is the queue "head" | ||
// if request is in the queue but not the head, the queue is being | ||
// consumed out of order and should be restarted | ||
if (!request || url !== this.#queueHeadUrl) return | ||
this.#prefetchQueue.delete(url) | ||
// fire off new prefetch since request has been consumed | ||
request | ||
.then((response) => { | ||
const nextUrl = getNextChunkUrl(url, response) | ||
this.#queueHeadUrl = nextUrl | ||
if (!this.#prefetchQueue.has(this.#queueTailUrl)) { | ||
this.#prefetch(this.#queueTailUrl, args[1]) | ||
} | ||
}) | ||
.catch(() => {}) | ||
return request | ||
} | ||
#prefetch(...args: Parameters<typeof fetch>): void { | ||
const url = args[0].toString() | ||
// only prefetch when queue is not full | ||
if (this.#prefetchQueue.size >= this.#maxPrefetchedRequests) return | ||
// initialize aborter per request, to avoid aborting consumed requests that | ||
// are still streaming their bodies to the consumer | ||
const aborter = new AbortController() | ||
try { | ||
const request = this.#fetchClient(url, { | ||
...(args[1] ?? {}), | ||
signal: chainAborter(aborter, args[1]?.signal), | ||
}) | ||
this.#prefetchQueue.set(url, [request, aborter]) | ||
request | ||
.then((response) => { | ||
// only keep prefetching if response chain is uninterrupted | ||
if (!response.ok || aborter.signal.aborted) return | ||
const nextUrl = getNextChunkUrl(url, response) | ||
// only prefetch when there is a next URL | ||
if (!nextUrl || nextUrl === url) return | ||
this.#queueTailUrl = nextUrl | ||
return this.#prefetch(nextUrl, args[1]) | ||
}) | ||
.catch(() => {}) | ||
} catch (_) { | ||
// ignore prefetch errors | ||
} | ||
} | ||
} | ||
/** | ||
* Generate the next chunk's URL if the url and response are valid | ||
*/ | ||
function getNextChunkUrl(url: string, res: Response): string | void { | ||
const shapeId = res.headers.get(SHAPE_ID_HEADER) | ||
const lastOffset = res.headers.get(CHUNK_LAST_OFFSET_HEADER) | ||
const isUpToDate = res.headers.has(CHUNK_UP_TO_DATE_HEADER) | ||
// only prefetch if shape ID and offset for next chunk are available, and | ||
// response is not already up-to-date | ||
if (!shapeId || !lastOffset || isUpToDate) return | ||
const nextUrl = new URL(url) | ||
// don't prefetch live requests, rushing them will only | ||
// potentially miss more recent data | ||
if (nextUrl.searchParams.has(LIVE_QUERY_PARAM)) return | ||
nextUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, shapeId) | ||
nextUrl.searchParams.set(OFFSET_QUERY_PARAM, lastOffset) | ||
return nextUrl.toString() | ||
} | ||
/** | ||
* Chains an abort controller on an optional source signal's | ||
* aborted state - if the source signal is aborted, the provided abort | ||
* controller will also abort | ||
*/ | ||
function chainAborter( | ||
aborter: AbortController, | ||
sourceSignal?: AbortSignal | ||
): AbortSignal { | ||
if (!sourceSignal) return aborter.signal | ||
if (sourceSignal.aborted) aborter.abort() | ||
else | ||
sourceSignal.addEventListener(`abort`, () => aborter.abort(), { | ||
once: true, | ||
}) | ||
return aborter.signal | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
347048
3660
83
21
8