Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@electric-sql/client

Package Overview
Dependencies
Maintainers
0
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@electric-sql/client - npm Package Compare versions

Comparing version 0.5.1 to 0.6.0

229

dist/index.legacy-esm.js

@@ -164,41 +164,2 @@ var __defProp = Object.defineProperty;

// src/queue.ts
function isThenable(value) {
return !!value && typeof value === `object` && `then` in value && typeof value.then === `function`;
}
var _processingChain;
var AsyncProcessingQueue = class {
constructor() {
__privateAdd(this, _processingChain);
}
process(callback) {
__privateSet(this, _processingChain, isThenable(__privateGet(this, _processingChain)) ? __privateGet(this, _processingChain).then(callback) : callback());
return __privateGet(this, _processingChain);
}
async waitForProcessing() {
let currentChain;
do {
currentChain = __privateGet(this, _processingChain);
await currentChain;
} while (__privateGet(this, _processingChain) !== currentChain);
}
};
_processingChain = new WeakMap();
var _queue, _callback;
var MessageProcessor = class {
constructor(callback) {
__privateAdd(this, _queue, new AsyncProcessingQueue());
__privateAdd(this, _callback);
__privateSet(this, _callback, callback);
}
process(messages) {
__privateGet(this, _queue).process(() => __privateGet(this, _callback).call(this, messages));
}
async waitForProcessing() {
await __privateGet(this, _queue).waitForProcessing();
}
};
_queue = new WeakMap();
_callback = new WeakMap();
// src/error.ts

@@ -237,2 +198,12 @@ var FetchError = class _FetchError extends Error {

// src/constants.ts
var SHAPE_ID_HEADER = `electric-shape-id`;
var CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset`;
var CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date`;
var SHAPE_SCHEMA_HEADER = `electric-schema`;
var SHAPE_ID_QUERY_PARAM = `shape_id`;
var OFFSET_QUERY_PARAM = `offset`;
var WHERE_QUERY_PARAM = `where`;
var LIVE_QUERY_PARAM = `live`;
// src/fetch.ts

@@ -281,18 +252,119 @@ var BackoffDefaults = {

}
var ChunkPrefetchDefaults = {
maxChunksToPrefetch: 2
};
function createFetchWithChunkBuffer(fetchClient, prefetchOptions = ChunkPrefetchDefaults) {
const { maxChunksToPrefetch } = prefetchOptions;
let prefetchQueue;
const prefetchClient = async (...args) => {
const url = args[0].toString();
const prefetchedRequest = prefetchQueue == null ? void 0 : prefetchQueue.consume(...args);
if (prefetchedRequest) {
return prefetchedRequest;
}
prefetchQueue == null ? void 0 : prefetchQueue.abort();
const response = await fetchClient(...args);
const nextUrl = getNextChunkUrl(url, response);
if (nextUrl) {
prefetchQueue = new PrefetchQueue({
fetchClient,
maxPrefetchedRequests: maxChunksToPrefetch,
url: nextUrl,
requestInit: args[1]
});
}
return response;
};
return prefetchClient;
}
var _fetchClient, _maxPrefetchedRequests, _prefetchQueue, _queueHeadUrl, _queueTailUrl, _PrefetchQueue_instances, prefetch_fn;
var PrefetchQueue = class {
constructor(options) {
__privateAdd(this, _PrefetchQueue_instances);
__privateAdd(this, _fetchClient);
__privateAdd(this, _maxPrefetchedRequests);
__privateAdd(this, _prefetchQueue, /* @__PURE__ */ new Map());
__privateAdd(this, _queueHeadUrl);
__privateAdd(this, _queueTailUrl);
var _a;
__privateSet(this, _fetchClient, (_a = options.fetchClient) != null ? _a : (...args) => fetch(...args));
__privateSet(this, _maxPrefetchedRequests, options.maxPrefetchedRequests);
__privateSet(this, _queueHeadUrl, options.url.toString());
__privateSet(this, _queueTailUrl, __privateGet(this, _queueHeadUrl));
__privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, options.url, options.requestInit);
}
abort() {
__privateGet(this, _prefetchQueue).forEach(([_, aborter]) => aborter.abort());
}
consume(...args) {
var _a;
const url = args[0].toString();
const request = (_a = __privateGet(this, _prefetchQueue).get(url)) == null ? void 0 : _a[0];
if (!request || url !== __privateGet(this, _queueHeadUrl)) return;
__privateGet(this, _prefetchQueue).delete(url);
request.then((response) => {
const nextUrl = getNextChunkUrl(url, response);
__privateSet(this, _queueHeadUrl, nextUrl);
if (!__privateGet(this, _prefetchQueue).has(__privateGet(this, _queueTailUrl))) {
__privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, __privateGet(this, _queueTailUrl), args[1]);
}
}).catch(() => {
});
return request;
}
};
_fetchClient = new WeakMap();
_maxPrefetchedRequests = new WeakMap();
_prefetchQueue = new WeakMap();
_queueHeadUrl = new WeakMap();
_queueTailUrl = new WeakMap();
_PrefetchQueue_instances = new WeakSet();
prefetch_fn = function(...args) {
var _a, _b;
const url = args[0].toString();
if (__privateGet(this, _prefetchQueue).size >= __privateGet(this, _maxPrefetchedRequests)) return;
const aborter = new AbortController();
try {
const request = __privateGet(this, _fetchClient).call(this, url, __spreadProps(__spreadValues({}, (_a = args[1]) != null ? _a : {}), {
signal: chainAborter(aborter, (_b = args[1]) == null ? void 0 : _b.signal)
}));
__privateGet(this, _prefetchQueue).set(url, [request, aborter]);
request.then((response) => {
if (!response.ok || aborter.signal.aborted) return;
const nextUrl = getNextChunkUrl(url, response);
if (!nextUrl || nextUrl === url) return;
__privateSet(this, _queueTailUrl, nextUrl);
return __privateMethod(this, _PrefetchQueue_instances, prefetch_fn).call(this, nextUrl, args[1]);
}).catch(() => {
});
} catch (_) {
}
};
function getNextChunkUrl(url, res) {
const shapeId = res.headers.get(SHAPE_ID_HEADER);
const lastOffset = res.headers.get(CHUNK_LAST_OFFSET_HEADER);
const isUpToDate = res.headers.has(CHUNK_UP_TO_DATE_HEADER);
if (!shapeId || !lastOffset || isUpToDate) return;
const nextUrl = new URL(url);
if (nextUrl.searchParams.has(LIVE_QUERY_PARAM)) return;
nextUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, shapeId);
nextUrl.searchParams.set(OFFSET_QUERY_PARAM, lastOffset);
return nextUrl.toString();
}
function chainAborter(aborter, sourceSignal) {
if (!sourceSignal) return aborter.signal;
if (sourceSignal.aborted) aborter.abort();
else
sourceSignal.addEventListener(`abort`, () => aborter.abort(), {
once: true
});
return aborter.signal;
}
// src/constants.ts
var SHAPE_ID_HEADER = `x-electric-shape-id`;
var CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset`;
var SHAPE_SCHEMA_HEADER = `x-electric-schema`;
var SHAPE_ID_QUERY_PARAM = `shape_id`;
var OFFSET_QUERY_PARAM = `offset`;
var WHERE_QUERY_PARAM = `where`;
var LIVE_QUERY_PARAM = `live`;
// src/client.ts
var _fetchClient, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn;
var _fetchClient2, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn;
var ShapeStream = class {
constructor(options) {
__privateAdd(this, _ShapeStream_instances);
__privateAdd(this, _fetchClient);
__privateAdd(this, _fetchClient2);
__privateAdd(this, _messageParser);

@@ -314,12 +386,11 @@ __privateAdd(this, _subscribers, /* @__PURE__ */ new Map());

__privateSet(this, _messageParser, new MessageParser(options.parser));
__privateSet(this, _fetchClient, createFetchWithBackoff(
(_b = options.fetchClient) != null ? _b : (...args) => fetch(...args),
__spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), {
onFailedAttempt: () => {
var _a2, _b2;
__privateSet(this, _connected, false);
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2);
}
})
));
const baseFetchClient = (_b = options.fetchClient) != null ? _b : (...args) => fetch(...args);
const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, __spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), {
onFailedAttempt: () => {
var _a2, _b2;
__privateSet(this, _connected, false);
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2);
}
}));
__privateSet(this, _fetchClient2, createFetchWithChunkBuffer(fetchWithBackoffClient));
this.start();

@@ -350,3 +421,3 @@ }

try {
response = await __privateGet(this, _fetchClient).call(this, fetchUrl.toString(), { signal });
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), { signal });
__privateSet(this, _connected, true);

@@ -358,3 +429,3 @@ } catch (e) {

__privateMethod(this, _ShapeStream_instances, reset_fn).call(this);
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;

@@ -364,3 +435,3 @@ } else if (e.status == 409) {

__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeId);
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;

@@ -393,11 +464,12 @@ } else if (e.status >= 400 && e.status < 500) {

if (batch.length > 0) {
const prevUpToDate = __privateGet(this, _isUpToDate);
const lastMessage = batch[batch.length - 1];
if (isUpToDateMessage(lastMessage)) {
__privateSet(this, _lastSyncedAt, Date.now());
if (!__privateGet(this, _isUpToDate)) {
__privateSet(this, _isUpToDate, true);
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this);
}
__privateSet(this, _isUpToDate, true);
}
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch);
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch);
if (!prevUpToDate && __privateGet(this, _isUpToDate)) {
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this);
}
}

@@ -411,4 +483,3 @@ }

const subscriptionId = Math.random();
const subscriber = new MessageProcessor(callback);
__privateGet(this, _subscribers).set(subscriptionId, [subscriber, onError]);
__privateGet(this, _subscribers).set(subscriptionId, [callback, onError]);
return () => {

@@ -449,3 +520,3 @@ __privateGet(this, _subscribers).delete(subscriptionId);

};
_fetchClient = new WeakMap();
_fetchClient2 = new WeakMap();
_messageParser = new WeakMap();

@@ -461,6 +532,14 @@ _subscribers = new WeakMap();

_ShapeStream_instances = new WeakSet();
publish_fn = function(messages) {
__privateGet(this, _subscribers).forEach(([subscriber, _]) => {
subscriber.process(messages);
});
publish_fn = async function(messages) {
await Promise.all(
Array.from(__privateGet(this, _subscribers).values()).map(async ([callback, __]) => {
try {
await callback(messages);
} catch (err) {
queueMicrotask(() => {
throw err;
});
}
})
);
};

@@ -467,0 +546,0 @@ sendErrorToSubscribers_fn = function(error) {

{
"name": "@electric-sql/client",
"version": "0.5.1",
"version": "0.6.0",
"description": "Postgres everywhere - your data, in sync, wherever you need it.",

@@ -5,0 +5,0 @@ "type": "module",

@@ -58,2 +58,4 @@ <p align="center">

// messages is an array with one or more row updates
// and the stream will wait for all subscribers to process them
// before proceeding
})

@@ -60,0 +62,0 @@ ```

import { Message, Offset, Schema, Row, MaybePromise } from './types'
import { MessageParser, Parser } from './parser'
import { isUpToDateMessage } from './helpers'
import { MessageProcessor, MessageProcessorInterface } from './queue'
import { FetchError, FetchBackoffAbortError } from './error'

@@ -10,2 +9,3 @@ import {

createFetchWithBackoff,
createFetchWithChunkBuffer,
} from './fetch'

@@ -123,3 +123,3 @@ import {

[
MessageProcessorInterface<Message<T>[]>,
(messages: Message<T>[]) => MaybePromise<void>,
((error: Error) => void) | undefined,

@@ -147,14 +147,16 @@ ]

this.#fetchClient = createFetchWithBackoff(
const baseFetchClient =
options.fetchClient ??
((...args: Parameters<typeof fetch>) => fetch(...args)),
{
...(options.backoffOptions ?? BackoffDefaults),
onFailedAttempt: () => {
this.#connected = false
options.backoffOptions?.onFailedAttempt?.()
},
}
)
((...args: Parameters<typeof fetch>) => fetch(...args))
const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, {
...(options.backoffOptions ?? BackoffDefaults),
onFailedAttempt: () => {
this.#connected = false
options.backoffOptions?.onFailedAttempt?.()
},
})
this.#fetchClient = createFetchWithChunkBuffer(fetchWithBackoffClient)
this.start()

@@ -205,3 +207,3 @@ }

this.#reset()
this.#publish(e.json as Message<T>[])
await this.#publish(e.json as Message<T>[])
continue

@@ -213,3 +215,3 @@ } else if (e.status == 409) {

this.#reset(newShapeId)
this.#publish(e.json as Message<T>[])
await this.#publish(e.json as Message<T>[])
continue

@@ -254,12 +256,13 @@ } else if (e.status >= 400 && e.status < 500) {

if (batch.length > 0) {
const prevUpToDate = this.#isUpToDate
const lastMessage = batch[batch.length - 1]
if (isUpToDateMessage(lastMessage)) {
this.#lastSyncedAt = Date.now()
if (!this.#isUpToDate) {
this.#isUpToDate = true
this.#notifyUpToDateSubscribers()
}
this.#isUpToDate = true
}
this.#publish(batch)
await this.#publish(batch)
if (!prevUpToDate && this.#isUpToDate) {
this.#notifyUpToDateSubscribers()
}
}

@@ -277,5 +280,4 @@ }

const subscriptionId = Math.random()
const subscriber = new MessageProcessor(callback)
this.#subscribers.set(subscriptionId, [subscriber, onError])
this.#subscribers.set(subscriptionId, [callback, onError])

@@ -329,6 +331,14 @@ return () => {

#publish(messages: Message<T>[]) {
this.#subscribers.forEach(([subscriber, _]) => {
subscriber.process(messages)
})
async #publish(messages: Message<T>[]): Promise<void> {
await Promise.all(
Array.from(this.#subscribers.values()).map(async ([callback, __]) => {
try {
await callback(messages)
} catch (err) {
queueMicrotask(() => {
throw err
})
}
})
)
}

@@ -335,0 +345,0 @@

@@ -1,4 +0,5 @@

export const SHAPE_ID_HEADER = `x-electric-shape-id`
export const CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset`
export const SHAPE_SCHEMA_HEADER = `x-electric-schema`
export const SHAPE_ID_HEADER = `electric-shape-id`
export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset`
export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date`
export const SHAPE_SCHEMA_HEADER = `electric-schema`
export const SHAPE_ID_QUERY_PARAM = `shape_id`

@@ -5,0 +6,0 @@ export const OFFSET_QUERY_PARAM = `offset`

@@ -0,1 +1,9 @@

import {
CHUNK_LAST_OFFSET_HEADER,
CHUNK_UP_TO_DATE_HEADER,
LIVE_QUERY_PARAM,
OFFSET_QUERY_PARAM,
SHAPE_ID_HEADER,
SHAPE_ID_QUERY_PARAM,
} from './constants'
import { FetchError, FetchBackoffAbortError } from './error'

@@ -80,1 +88,186 @@

}
interface ChunkPrefetchOptions {
maxChunksToPrefetch: number
}
const ChunkPrefetchDefaults = {
maxChunksToPrefetch: 2,
}
/**
* Creates a fetch client that prefetches subsequent log chunks for
* consumption by the shape stream without waiting for the chunk bodies
* themselves to be loaded.
*
* @param fetchClient the client to wrap
* @param prefetchOptions options to configure prefetching
* @returns wrapped client with prefetch capabilities
*/
export function createFetchWithChunkBuffer(
fetchClient: typeof fetch,
prefetchOptions: ChunkPrefetchOptions = ChunkPrefetchDefaults
): typeof fetch {
const { maxChunksToPrefetch } = prefetchOptions
let prefetchQueue: PrefetchQueue
const prefetchClient = async (...args: Parameters<typeof fetchClient>) => {
const url = args[0].toString()
// try to consume from the prefetch queue first, and if request is
// not present abort the prefetch queue as it must no longer be valid
const prefetchedRequest = prefetchQueue?.consume(...args)
if (prefetchedRequest) {
return prefetchedRequest
}
prefetchQueue?.abort()
// perform request and fire off prefetch queue if request is eligible
const response = await fetchClient(...args)
const nextUrl = getNextChunkUrl(url, response)
if (nextUrl) {
prefetchQueue = new PrefetchQueue({
fetchClient,
maxPrefetchedRequests: maxChunksToPrefetch,
url: nextUrl,
requestInit: args[1],
})
}
return response
}
return prefetchClient
}
class PrefetchQueue {
readonly #fetchClient: typeof fetch
readonly #maxPrefetchedRequests: number
readonly #prefetchQueue = new Map<
string,
[Promise<Response>, AbortController]
>()
#queueHeadUrl: string | void
#queueTailUrl: string
constructor(options: {
url: Parameters<typeof fetch>[0]
requestInit: Parameters<typeof fetch>[1]
maxPrefetchedRequests: number
fetchClient?: typeof fetch
}) {
this.#fetchClient =
options.fetchClient ??
((...args: Parameters<typeof fetch>) => fetch(...args))
this.#maxPrefetchedRequests = options.maxPrefetchedRequests
this.#queueHeadUrl = options.url.toString()
this.#queueTailUrl = this.#queueHeadUrl
this.#prefetch(options.url, options.requestInit)
}
abort(): void {
this.#prefetchQueue.forEach(([_, aborter]) => aborter.abort())
}
consume(...args: Parameters<typeof fetch>): Promise<Response> | void {
const url = args[0].toString()
const request = this.#prefetchQueue.get(url)?.[0]
// only consume if request is in queue and is the queue "head"
// if request is in the queue but not the head, the queue is being
// consumed out of order and should be restarted
if (!request || url !== this.#queueHeadUrl) return
this.#prefetchQueue.delete(url)
// fire off new prefetch since request has been consumed
request
.then((response) => {
const nextUrl = getNextChunkUrl(url, response)
this.#queueHeadUrl = nextUrl
if (!this.#prefetchQueue.has(this.#queueTailUrl)) {
this.#prefetch(this.#queueTailUrl, args[1])
}
})
.catch(() => {})
return request
}
#prefetch(...args: Parameters<typeof fetch>): void {
const url = args[0].toString()
// only prefetch when queue is not full
if (this.#prefetchQueue.size >= this.#maxPrefetchedRequests) return
// initialize aborter per request, to avoid aborting consumed requests that
// are still streaming their bodies to the consumer
const aborter = new AbortController()
try {
const request = this.#fetchClient(url, {
...(args[1] ?? {}),
signal: chainAborter(aborter, args[1]?.signal),
})
this.#prefetchQueue.set(url, [request, aborter])
request
.then((response) => {
// only keep prefetching if response chain is uninterrupted
if (!response.ok || aborter.signal.aborted) return
const nextUrl = getNextChunkUrl(url, response)
// only prefetch when there is a next URL
if (!nextUrl || nextUrl === url) return
this.#queueTailUrl = nextUrl
return this.#prefetch(nextUrl, args[1])
})
.catch(() => {})
} catch (_) {
// ignore prefetch errors
}
}
}
/**
* Generate the next chunk's URL if the url and response are valid
*/
function getNextChunkUrl(url: string, res: Response): string | void {
const shapeId = res.headers.get(SHAPE_ID_HEADER)
const lastOffset = res.headers.get(CHUNK_LAST_OFFSET_HEADER)
const isUpToDate = res.headers.has(CHUNK_UP_TO_DATE_HEADER)
// only prefetch if shape ID and offset for next chunk are available, and
// response is not already up-to-date
if (!shapeId || !lastOffset || isUpToDate) return
const nextUrl = new URL(url)
// don't prefetch live requests, rushing them will only
// potentially miss more recent data
if (nextUrl.searchParams.has(LIVE_QUERY_PARAM)) return
nextUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, shapeId)
nextUrl.searchParams.set(OFFSET_QUERY_PARAM, lastOffset)
return nextUrl.toString()
}
/**
* Chains an abort controller on an optional source signal's
* aborted state - if the source signal is aborted, the provided abort
* controller will also abort
*/
function chainAborter(
aborter: AbortController,
sourceSignal?: AbortSignal
): AbortSignal {
if (!sourceSignal) return aborter.signal
if (sourceSignal.aborted) aborter.abort()
else
sourceSignal.addEventListener(`abort`, () => aborter.abort(), {
once: true,
})
return aborter.signal
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc