New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@electric-sql/client

Package Overview
Dependencies
Maintainers
0
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@electric-sql/client - npm Package Compare versions

Comparing version 0.5.0 to 0.5.1

src/constants.ts

107

dist/index.d.ts

@@ -75,2 +75,3 @@ type Value = string | number | boolean | bigint | null | Value[] | {

};
type MaybePromise<T> = T | Promise<T>;

@@ -84,8 +85,24 @@ type NullToken = null | `NULL`;

type ShapeData<T extends Row = Row> = Map<string, T>;
type ShapeChangedCallback<T extends Row = Row> = (value: ShapeData<T>) => void;
declare class FetchError extends Error {
url: string;
status: number;
text?: string;
json?: object;
headers: Record<string, string>;
constructor(status: number, text: string | undefined, json: object | undefined, headers: Record<string, string>, url: string, message?: string);
static fromResponse(response: Response, url: string): Promise<FetchError>;
}
interface BackoffOptions {
/**
* Initial delay before retrying in milliseconds
*/
initialDelay: number;
/**
* Maximum retry delay in milliseconds
*/
maxDelay: number;
multiplier: number;
onFailedAttempt?: () => void;
debug?: boolean;
}

@@ -97,2 +114,3 @@ declare const BackoffDefaults: {

};
/**

@@ -135,10 +153,13 @@ * Options for constructing a ShapeStream.

}
declare class FetchError extends Error {
url: string;
status: number;
text?: string;
json?: object;
headers: Record<string, string>;
constructor(status: number, text: string | undefined, json: object | undefined, headers: Record<string, string>, url: string, message?: string);
static fromResponse(response: Response, url: string): Promise<FetchError>;
interface ShapeStreamInterface<T extends Row = Row> {
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): void;
unsubscribeAllUpToDateSubscribers(): void;
unsubscribeAll(): void;
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void;
isLoading(): boolean;
lastSyncedAt(): number | undefined;
lastSynced(): number;
isConnected(): boolean;
isUpToDate: boolean;
shapeId?: string;
}

@@ -175,38 +196,25 @@ /**

*/
declare class ShapeStream<T extends Row = Row> {
private options;
private backoffOptions;
private fetchClient;
private schema?;
private subscribers;
private upToDateSubscribers;
private lastOffset;
private messageParser;
private lastSyncedAt?;
isUpToDate: boolean;
private connected;
shapeId?: string;
declare class ShapeStream<T extends Row = Row> implements ShapeStreamInterface<T> {
#private;
readonly options: ShapeStreamOptions;
constructor(options: ShapeStreamOptions);
get shapeId(): string;
get isUpToDate(): boolean;
start(): Promise<void>;
subscribe(callback: (messages: Message<T>[]) => void | Promise<void>, onError?: (error: FetchError | Error) => void): () => void;
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): () => void;
unsubscribeAll(): void;
private publish;
private sendErrorToSubscribers;
subscribeOnceToUpToDate(callback: () => void | Promise<void>, error: (err: FetchError | Error) => void): () => void;
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void;
unsubscribeAllUpToDateSubscribers(): void;
/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt(): number | undefined;
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced(): number;
/** Indicates if we are connected to the Electric sync service. */
isConnected(): boolean;
/** True during initial fetch. False afterwise. */
isLoading(): boolean;
private notifyUpToDateSubscribers;
private sendErrorToUpToDateSubscribers;
/**
* Resets the state of the stream, optionally with a provided
* shape ID
*/
private reset;
private validateOptions;
private fetchWithBackoff;
}
type ShapeData<T extends Row = Row> = Map<string, T>;
type ShapeChangedCallback<T extends Row = Row> = (value: ShapeData<T>) => void;
/**

@@ -244,20 +252,19 @@ * A Shape is an object that subscribes to a shape log,

declare class Shape<T extends Row = Row> {
private stream;
private data;
private subscribers;
error: FetchError | false;
private hasNotifiedSubscribersUpToDate;
constructor(stream: ShapeStream<T>);
#private;
constructor(stream: ShapeStreamInterface<T>);
get isUpToDate(): boolean;
get value(): Promise<ShapeData<T>>;
get valueSync(): ShapeData<T>;
get error(): false | FetchError;
/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt(): number | undefined;
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced(): number;
/** True during initial fetch. False afterwise. */
isLoading(): boolean;
/** Indicates if we are connected to the Electric sync service. */
isConnected(): boolean;
/** True during initial fetch. False afterwise. */
isLoading(): boolean;
get value(): Promise<ShapeData<T>>;
get valueSync(): ShapeData<T>;
subscribe(callback: ShapeChangedCallback<T>): () => void;
unsubscribeAll(): void;
get numSubscribers(): number;
private process;
private handleError;
private notify;
}

@@ -302,2 +309,2 @@

export { BackoffDefaults, type BackoffOptions, type BitColumn, type BpcharColumn, type ChangeMessage, type ColumnInfo, type CommonColumnProps, type ControlMessage, FetchError, type IntervalColumn, type IntervalColumnWithPrecision, type Message, type NumericColumn, type Offset, type RegularColumn, type Row, type Schema, Shape, type ShapeChangedCallback, type ShapeData, ShapeStream, type ShapeStreamOptions, type TimeColumn, type TypedMessages, type Value, type VarcharColumn, isChangeMessage, isControlMessage };
export { BackoffDefaults, type BackoffOptions, type BitColumn, type BpcharColumn, type ChangeMessage, type ColumnInfo, type CommonColumnProps, type ControlMessage, FetchError, type IntervalColumn, type IntervalColumnWithPrecision, type MaybePromise, type Message, type NumericColumn, type Offset, type RegularColumn, type Row, type Schema, Shape, type ShapeChangedCallback, type ShapeData, ShapeStream, type ShapeStreamInterface, type ShapeStreamOptions, type TimeColumn, type TypedMessages, type Value, type VarcharColumn, isChangeMessage, isControlMessage };
var __defProp = Object.defineProperty;
var __defProps = Object.defineProperties;
var __getOwnPropDescs = Object.getOwnPropertyDescriptors;
var __getOwnPropSymbols = Object.getOwnPropertySymbols;
var __hasOwnProp = Object.prototype.hasOwnProperty;
var __propIsEnum = Object.prototype.propertyIsEnumerable;
var __typeError = (msg) => {
throw TypeError(msg);
};
var __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value;

@@ -17,2 +22,3 @@ var __spreadValues = (a, b) => {

};
var __spreadProps = (a, b) => __defProps(a, __getOwnPropDescs(b));
var __objRest = (source, exclude) => {

@@ -30,2 +36,7 @@ var target = {};

};
var __accessCheck = (obj, member, msg) => member.has(obj) || __typeError("Cannot " + msg);
var __privateGet = (obj, member, getter) => (__accessCheck(obj, member, "read from private field"), getter ? getter.call(obj) : member.get(obj));
var __privateAdd = (obj, member, value) => member.has(obj) ? __typeError("Cannot add the same private member more than once") : member instanceof WeakSet ? member.add(obj) : member.set(obj, value);
var __privateSet = (obj, member, value, setter) => (__accessCheck(obj, member, "write to private field"), setter ? setter.call(obj, value) : member.set(obj, value), value);
var __privateMethod = (obj, member, method) => (__accessCheck(obj, member, "access private method"), method);

@@ -151,30 +162,46 @@ // src/parser.ts

}
function isUpToDateMessage(message) {
return isControlMessage(message) && message.headers.control === `up-to-date`;
}
// src/client.ts
var BackoffDefaults = {
initialDelay: 100,
maxDelay: 1e4,
multiplier: 1.3
// src/queue.ts
function isThenable(value) {
return !!value && typeof value === `object` && `then` in value && typeof value.then === `function`;
}
var _processingChain;
var AsyncProcessingQueue = class {
constructor() {
__privateAdd(this, _processingChain);
}
process(callback) {
__privateSet(this, _processingChain, isThenable(__privateGet(this, _processingChain)) ? __privateGet(this, _processingChain).then(callback) : callback());
return __privateGet(this, _processingChain);
}
async waitForProcessing() {
let currentChain;
do {
currentChain = __privateGet(this, _processingChain);
await currentChain;
} while (__privateGet(this, _processingChain) !== currentChain);
}
};
_processingChain = new WeakMap();
var _queue, _callback;
var MessageProcessor = class {
constructor(callback) {
this.messageQueue = [];
this.isProcessing = false;
this.callback = callback;
__privateAdd(this, _queue, new AsyncProcessingQueue());
__privateAdd(this, _callback);
__privateSet(this, _callback, callback);
}
process(messages) {
this.messageQueue.push(messages);
if (!this.isProcessing) {
this.processQueue();
}
__privateGet(this, _queue).process(() => __privateGet(this, _callback).call(this, messages));
}
async processQueue() {
this.isProcessing = true;
while (this.messageQueue.length > 0) {
const messages = this.messageQueue.shift();
await this.callback(messages);
}
this.isProcessing = false;
async waitForProcessing() {
await __privateGet(this, _queue).waitForProcessing();
}
};
_queue = new WeakMap();
_callback = new WeakMap();
// src/error.ts
var FetchError = class _FetchError extends Error {

@@ -206,53 +233,135 @@ constructor(status, text, json, headers, url, message) {

};
var FetchBackoffAbortError = class extends Error {
constructor() {
super(`Fetch with backoff aborted`);
}
};
// src/fetch.ts
var BackoffDefaults = {
initialDelay: 100,
maxDelay: 1e4,
multiplier: 1.3
};
function createFetchWithBackoff(fetchClient, backoffOptions = BackoffDefaults) {
const {
initialDelay,
maxDelay,
multiplier,
debug = false,
onFailedAttempt
} = backoffOptions;
return async (...args) => {
var _a;
const url = args[0];
const options = args[1];
let delay = initialDelay;
let attempt = 0;
while (true) {
try {
const result = await fetchClient(...args);
if (result.ok) return result;
else throw await FetchError.fromResponse(result, url.toString());
} catch (e) {
onFailedAttempt == null ? void 0 : onFailedAttempt();
if ((_a = options == null ? void 0 : options.signal) == null ? void 0 : _a.aborted) {
throw new FetchBackoffAbortError();
} else if (e instanceof FetchError && e.status >= 400 && e.status < 500) {
throw e;
} else {
await new Promise((resolve) => setTimeout(resolve, delay));
delay = Math.min(delay * multiplier, maxDelay);
if (debug) {
attempt++;
console.log(`Retry attempt #${attempt} after ${delay}ms`);
}
}
}
}
};
}
// src/constants.ts
var SHAPE_ID_HEADER = `x-electric-shape-id`;
var CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset`;
var SHAPE_SCHEMA_HEADER = `x-electric-schema`;
var SHAPE_ID_QUERY_PARAM = `shape_id`;
var OFFSET_QUERY_PARAM = `offset`;
var WHERE_QUERY_PARAM = `where`;
var LIVE_QUERY_PARAM = `live`;
// src/client.ts
var _fetchClient, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn;
var ShapeStream = class {
constructor(options) {
this.subscribers = /* @__PURE__ */ new Map();
this.upToDateSubscribers = /* @__PURE__ */ new Map();
__privateAdd(this, _ShapeStream_instances);
__privateAdd(this, _fetchClient);
__privateAdd(this, _messageParser);
__privateAdd(this, _subscribers, /* @__PURE__ */ new Map());
__privateAdd(this, _upToDateSubscribers, /* @__PURE__ */ new Map());
__privateAdd(this, _lastOffset);
__privateAdd(this, _lastSyncedAt);
// unix time
this.isUpToDate = false;
this.connected = false;
__privateAdd(this, _isUpToDate, false);
__privateAdd(this, _connected, false);
__privateAdd(this, _shapeId);
__privateAdd(this, _schema);
var _a, _b, _c;
this.validateOptions(options);
validateOptions(options);
this.options = __spreadValues({ subscribe: true }, options);
this.lastOffset = (_a = this.options.offset) != null ? _a : `-1`;
this.shapeId = this.options.shapeId;
this.messageParser = new MessageParser(options.parser);
this.backoffOptions = (_b = options.backoffOptions) != null ? _b : BackoffDefaults;
this.fetchClient = (_c = options.fetchClient) != null ? _c : (...args) => fetch(...args);
__privateSet(this, _lastOffset, (_a = this.options.offset) != null ? _a : `-1`);
__privateSet(this, _shapeId, this.options.shapeId);
__privateSet(this, _messageParser, new MessageParser(options.parser));
__privateSet(this, _fetchClient, createFetchWithBackoff(
(_b = options.fetchClient) != null ? _b : (...args) => fetch(...args),
__spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), {
onFailedAttempt: () => {
var _a2, _b2;
__privateSet(this, _connected, false);
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2);
}
})
));
this.start();
}
get shapeId() {
return __privateGet(this, _shapeId);
}
get isUpToDate() {
return __privateGet(this, _isUpToDate);
}
async start() {
var _a;
this.isUpToDate = false;
__privateSet(this, _isUpToDate, false);
const { url, where, signal } = this.options;
try {
while (!(signal == null ? void 0 : signal.aborted) && !this.isUpToDate || this.options.subscribe) {
while (!(signal == null ? void 0 : signal.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) {
const fetchUrl = new URL(url);
if (where) fetchUrl.searchParams.set(`where`, where);
fetchUrl.searchParams.set(`offset`, this.lastOffset);
if (this.isUpToDate) {
fetchUrl.searchParams.set(`live`, `true`);
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where);
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset));
if (__privateGet(this, _isUpToDate)) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`);
}
if (this.shapeId) {
fetchUrl.searchParams.set(`shape_id`, this.shapeId);
if (__privateGet(this, _shapeId)) {
fetchUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, __privateGet(this, _shapeId));
}
let response;
try {
const maybeResponse = await this.fetchWithBackoff(fetchUrl);
if (maybeResponse) response = maybeResponse;
else break;
response = await __privateGet(this, _fetchClient).call(this, fetchUrl.toString(), { signal });
__privateSet(this, _connected, true);
} catch (e) {
if (e instanceof FetchBackoffAbortError) break;
if (!(e instanceof FetchError)) throw e;
if (e.status == 400) {
this.reset();
this.publish(e.json);
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this);
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;
} else if (e.status == 409) {
const newShapeId = e.headers[`x-electric-shape-id`];
this.reset(newShapeId);
this.publish(e.json);
const newShapeId = e.headers[SHAPE_ID_HEADER];
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeId);
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;
} else if (e.status >= 400 && e.status < 500) {
this.sendErrorToUpToDateSubscribers(e);
this.sendErrorToSubscribers(e);
__privateMethod(this, _ShapeStream_instances, sendErrorToUpToDateSubscribers_fn).call(this, e);
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e);
throw e;

@@ -262,34 +371,34 @@ }

const { headers, status } = response;
const shapeId = headers.get(`X-Electric-Shape-Id`);
const shapeId = headers.get(SHAPE_ID_HEADER);
if (shapeId) {
this.shapeId = shapeId;
__privateSet(this, _shapeId, shapeId);
}
const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`);
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER);
if (lastOffset) {
this.lastOffset = lastOffset;
__privateSet(this, _lastOffset, lastOffset);
}
const getSchema = () => {
const schemaHeader = headers.get(`X-Electric-Schema`);
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER);
return schemaHeader ? JSON.parse(schemaHeader) : {};
};
this.schema = (_a = this.schema) != null ? _a : getSchema();
__privateSet(this, _schema, (_a = __privateGet(this, _schema)) != null ? _a : getSchema());
const messages = status === 204 ? `[]` : await response.text();
if (status === 204) {
this.lastSyncedAt = Date.now();
__privateSet(this, _lastSyncedAt, Date.now());
}
const batch = this.messageParser.parse(messages, this.schema);
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema));
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1];
if (isControlMessage(lastMessage) && lastMessage.headers.control === `up-to-date`) {
this.lastSyncedAt = Date.now();
if (!this.isUpToDate) {
this.isUpToDate = true;
this.notifyUpToDateSubscribers();
if (isUpToDateMessage(lastMessage)) {
__privateSet(this, _lastSyncedAt, Date.now());
if (!__privateGet(this, _isUpToDate)) {
__privateSet(this, _isUpToDate, true);
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this);
}
}
this.publish(batch);
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch);
}
}
} finally {
this.connected = false;
__privateSet(this, _connected, false);
}

@@ -300,37 +409,32 @@ }

const subscriber = new MessageProcessor(callback);
this.subscribers.set(subscriptionId, [subscriber, onError]);
__privateGet(this, _subscribers).set(subscriptionId, [subscriber, onError]);
return () => {
this.subscribers.delete(subscriptionId);
__privateGet(this, _subscribers).delete(subscriptionId);
};
}
unsubscribeAll() {
this.subscribers.clear();
__privateGet(this, _subscribers).clear();
}
publish(messages) {
this.subscribers.forEach(([subscriber, _]) => {
subscriber.process(messages);
});
}
sendErrorToSubscribers(error) {
this.subscribers.forEach(([_, errorFn]) => {
errorFn == null ? void 0 : errorFn(error);
});
}
subscribeOnceToUpToDate(callback, error) {
const subscriptionId = Math.random();
this.upToDateSubscribers.set(subscriptionId, [callback, error]);
__privateGet(this, _upToDateSubscribers).set(subscriptionId, [callback, error]);
return () => {
this.upToDateSubscribers.delete(subscriptionId);
__privateGet(this, _upToDateSubscribers).delete(subscriptionId);
};
}
unsubscribeAllUpToDateSubscribers() {
this.upToDateSubscribers.clear();
__privateGet(this, _upToDateSubscribers).clear();
}
/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt() {
return __privateGet(this, _lastSyncedAt);
}
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced() {
if (this.lastSyncedAt === void 0) return Infinity;
return Date.now() - this.lastSyncedAt;
if (__privateGet(this, _lastSyncedAt) === void 0) return Infinity;
return Date.now() - __privateGet(this, _lastSyncedAt);
}
/** Indicates if we are connected to the Electric sync service. */
isConnected() {
return this.connected;
return __privateGet(this, _connected);
}

@@ -341,77 +445,78 @@ /** True during initial fetch. False afterwise. */

}
notifyUpToDateSubscribers() {
this.upToDateSubscribers.forEach(([callback]) => {
callback();
});
};
_fetchClient = new WeakMap();
_messageParser = new WeakMap();
_subscribers = new WeakMap();
_upToDateSubscribers = new WeakMap();
_lastOffset = new WeakMap();
_lastSyncedAt = new WeakMap();
_isUpToDate = new WeakMap();
_connected = new WeakMap();
_shapeId = new WeakMap();
_schema = new WeakMap();
_ShapeStream_instances = new WeakSet();
publish_fn = function(messages) {
__privateGet(this, _subscribers).forEach(([subscriber, _]) => {
subscriber.process(messages);
});
};
sendErrorToSubscribers_fn = function(error) {
__privateGet(this, _subscribers).forEach(([_, errorFn]) => {
errorFn == null ? void 0 : errorFn(error);
});
};
notifyUpToDateSubscribers_fn = function() {
__privateGet(this, _upToDateSubscribers).forEach(([callback]) => {
callback();
});
};
sendErrorToUpToDateSubscribers_fn = function(error) {
__privateGet(this, _upToDateSubscribers).forEach(
([_, errorCallback]) => errorCallback(error)
);
};
/**
* Resets the state of the stream, optionally with a provided
* shape ID
*/
reset_fn = function(shapeId) {
__privateSet(this, _lastOffset, `-1`);
__privateSet(this, _shapeId, shapeId);
__privateSet(this, _isUpToDate, false);
__privateSet(this, _connected, false);
__privateSet(this, _schema, void 0);
};
function validateOptions(options) {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`);
}
sendErrorToUpToDateSubscribers(error) {
this.upToDateSubscribers.forEach(
([_, errorCallback]) => errorCallback(error)
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
);
}
/**
* Resets the state of the stream, optionally with a provided
* shape ID
*/
reset(shapeId) {
this.lastOffset = `-1`;
this.shapeId = shapeId;
this.isUpToDate = false;
this.connected = false;
this.schema = void 0;
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeId) {
throw new Error(
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)`
);
}
validateOptions(options) {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`);
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
);
}
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeId) {
throw new Error(
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)`
);
}
}
async fetchWithBackoff(url) {
const { initialDelay, maxDelay, multiplier } = this.backoffOptions;
const signal = this.options.signal;
let delay = initialDelay;
let attempt = 0;
while (true) {
try {
const result = await this.fetchClient(url.toString(), { signal });
if (result.ok) {
if (this.options.subscribe) {
this.connected = true;
}
return result;
} else throw await FetchError.fromResponse(result, url.toString());
} catch (e) {
this.connected = false;
if (signal == null ? void 0 : signal.aborted) {
return void 0;
} else if (e instanceof FetchError && e.status >= 400 && e.status < 500) {
throw e;
} else {
await new Promise((resolve) => setTimeout(resolve, delay));
delay = Math.min(delay * multiplier, maxDelay);
attempt++;
console.log(`Retry attempt #${attempt} after ${delay}ms`);
}
}
}
}
};
return;
}
// src/shape.ts
var _stream, _data, _subscribers2, _hasNotifiedSubscribersUpToDate, _error, _Shape_instances, process_fn, handleError_fn, notify_fn;
var Shape = class {
constructor(stream) {
this.data = /* @__PURE__ */ new Map();
this.subscribers = /* @__PURE__ */ new Map();
this.error = false;
this.hasNotifiedSubscribersUpToDate = false;
this.stream = stream;
this.stream.subscribe(this.process.bind(this), this.handleError.bind(this));
const unsubscribe = this.stream.subscribeOnceToUpToDate(
__privateAdd(this, _Shape_instances);
__privateAdd(this, _stream);
__privateAdd(this, _data, /* @__PURE__ */ new Map());
__privateAdd(this, _subscribers2, /* @__PURE__ */ new Map());
__privateAdd(this, _hasNotifiedSubscribersUpToDate, false);
__privateAdd(this, _error, false);
__privateSet(this, _stream, stream);
__privateGet(this, _stream).subscribe(
__privateMethod(this, _Shape_instances, process_fn).bind(this),
__privateMethod(this, _Shape_instances, handleError_fn).bind(this)
);
const unsubscribe = __privateGet(this, _stream).subscribeOnceToUpToDate(
() => {

@@ -421,3 +526,3 @@ unsubscribe();

(e) => {
this.handleError(e);
__privateMethod(this, _Shape_instances, handleError_fn).call(this, e);
throw e;

@@ -427,26 +532,15 @@ }

}
lastSynced() {
return this.stream.lastSynced();
get isUpToDate() {
return __privateGet(this, _stream).isUpToDate;
}
isConnected() {
return this.stream.isConnected();
}
/** True during initial fetch. False afterwise. */
isLoading() {
return this.stream.isLoading();
}
get value() {
return new Promise((resolve) => {
if (this.stream.isUpToDate) {
return new Promise((resolve, reject) => {
if (__privateGet(this, _stream).isUpToDate) {
resolve(this.valueSync);
} else {
const unsubscribe = this.stream.subscribeOnceToUpToDate(
() => {
unsubscribe();
resolve(this.valueSync);
},
(e) => {
throw e;
}
);
const unsubscribe = this.subscribe((shapeData) => {
unsubscribe();
if (__privateGet(this, _error)) reject(__privateGet(this, _error));
resolve(shapeData);
});
}

@@ -456,72 +550,97 @@ });

get valueSync() {
return this.data;
return __privateGet(this, _data);
}
get error() {
return __privateGet(this, _error);
}
/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt() {
return __privateGet(this, _stream).lastSyncedAt();
}
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced() {
return __privateGet(this, _stream).lastSynced();
}
/** True during initial fetch. False afterwise. */
isLoading() {
return __privateGet(this, _stream).isLoading();
}
/** Indicates if we are connected to the Electric sync service. */
isConnected() {
return __privateGet(this, _stream).isConnected();
}
subscribe(callback) {
const subscriptionId = Math.random();
this.subscribers.set(subscriptionId, callback);
__privateGet(this, _subscribers2).set(subscriptionId, callback);
return () => {
this.subscribers.delete(subscriptionId);
__privateGet(this, _subscribers2).delete(subscriptionId);
};
}
unsubscribeAll() {
this.subscribers.clear();
__privateGet(this, _subscribers2).clear();
}
get numSubscribers() {
return this.subscribers.size;
return __privateGet(this, _subscribers2).size;
}
process(messages) {
let dataMayHaveChanged = false;
let isUpToDate = false;
let newlyUpToDate = false;
messages.forEach((message) => {
if (isChangeMessage(message)) {
dataMayHaveChanged = [`insert`, `update`, `delete`].includes(
message.headers.operation
);
switch (message.headers.operation) {
case `insert`:
this.data.set(message.key, message.value);
break;
case `update`:
this.data.set(message.key, __spreadValues(__spreadValues({}, this.data.get(message.key)), message.value));
break;
case `delete`:
this.data.delete(message.key);
break;
}
};
_stream = new WeakMap();
_data = new WeakMap();
_subscribers2 = new WeakMap();
_hasNotifiedSubscribersUpToDate = new WeakMap();
_error = new WeakMap();
_Shape_instances = new WeakSet();
process_fn = function(messages) {
let dataMayHaveChanged = false;
let isUpToDate = false;
let newlyUpToDate = false;
messages.forEach((message) => {
if (isChangeMessage(message)) {
dataMayHaveChanged = [`insert`, `update`, `delete`].includes(
message.headers.operation
);
switch (message.headers.operation) {
case `insert`:
__privateGet(this, _data).set(message.key, message.value);
break;
case `update`:
__privateGet(this, _data).set(message.key, __spreadValues(__spreadValues({}, __privateGet(this, _data).get(message.key)), message.value));
break;
case `delete`:
__privateGet(this, _data).delete(message.key);
break;
}
if (isControlMessage(message)) {
switch (message.headers.control) {
case `up-to-date`:
isUpToDate = true;
if (!this.hasNotifiedSubscribersUpToDate) {
newlyUpToDate = true;
}
break;
case `must-refetch`:
this.data.clear();
this.error = false;
isUpToDate = false;
newlyUpToDate = false;
break;
}
}
if (isControlMessage(message)) {
switch (message.headers.control) {
case `up-to-date`:
isUpToDate = true;
if (!__privateGet(this, _hasNotifiedSubscribersUpToDate)) {
newlyUpToDate = true;
}
break;
case `must-refetch`:
__privateGet(this, _data).clear();
__privateSet(this, _error, false);
isUpToDate = false;
newlyUpToDate = false;
break;
}
});
if (newlyUpToDate || isUpToDate && dataMayHaveChanged) {
this.hasNotifiedSubscribersUpToDate = true;
this.notify();
}
});
if (newlyUpToDate || isUpToDate && dataMayHaveChanged) {
__privateSet(this, _hasNotifiedSubscribersUpToDate, true);
__privateMethod(this, _Shape_instances, notify_fn).call(this);
}
handleError(e) {
if (e instanceof FetchError) {
this.error = e;
this.notify();
}
};
handleError_fn = function(e) {
if (e instanceof FetchError) {
__privateSet(this, _error, e);
__privateMethod(this, _Shape_instances, notify_fn).call(this);
}
notify() {
this.subscribers.forEach((callback) => {
callback(this.valueSync);
});
}
};
notify_fn = function() {
__privateGet(this, _subscribers2).forEach((callback) => {
callback(this.valueSync);
});
};
export {

@@ -528,0 +647,0 @@ BackoffDefaults,

{
"name": "@electric-sql/client",
"version": "0.5.0",
"version": "0.5.1",
"description": "Postgres everywhere - your data, in sync, wherever you need it.",

@@ -5,0 +5,0 @@ "type": "module",

@@ -1,22 +0,21 @@

import { Message, Offset, Schema, Row } from './types'
import { Message, Offset, Schema, Row, MaybePromise } from './types'
import { MessageParser, Parser } from './parser'
import { isChangeMessage, isControlMessage } from './helpers'
import { isUpToDateMessage } from './helpers'
import { MessageProcessor, MessageProcessorInterface } from './queue'
import { FetchError, FetchBackoffAbortError } from './error'
import {
BackoffDefaults,
BackoffOptions,
createFetchWithBackoff,
} from './fetch'
import {
CHUNK_LAST_OFFSET_HEADER,
LIVE_QUERY_PARAM,
OFFSET_QUERY_PARAM,
SHAPE_ID_HEADER,
SHAPE_ID_QUERY_PARAM,
SHAPE_SCHEMA_HEADER,
WHERE_QUERY_PARAM,
} from './constants'
export type ShapeData<T extends Row = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row = Row> = (
value: ShapeData<T>
) => void
export interface BackoffOptions {
initialDelay: number
maxDelay: number
multiplier: number
}
export const BackoffDefaults = {
initialDelay: 100,
maxDelay: 10_000,
multiplier: 1.3,
}
/**

@@ -60,84 +59,23 @@ * Options for constructing a ShapeStream.

/**
* Receives batches of `messages`, puts them on a queue and processes
* them asynchronously by passing to a registered callback function.
*
* @constructor
* @param {(messages: Message[]) => void} callback function
*/
class MessageProcessor<T extends Row = Row> {
private messageQueue: Message<T>[][] = []
private isProcessing = false
private callback: (messages: Message<T>[]) => void | Promise<void>
export interface ShapeStreamInterface<T extends Row = Row> {
subscribe(
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
): void
unsubscribeAllUpToDateSubscribers(): void
unsubscribeAll(): void
subscribeOnceToUpToDate(
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void
): () => void
constructor(callback: (messages: Message<T>[]) => void | Promise<void>) {
this.callback = callback
}
isLoading(): boolean
lastSyncedAt(): number | undefined
lastSynced(): number
isConnected(): boolean
process(messages: Message<T>[]) {
this.messageQueue.push(messages)
if (!this.isProcessing) {
this.processQueue()
}
}
private async processQueue() {
this.isProcessing = true
while (this.messageQueue.length > 0) {
const messages = this.messageQueue.shift()!
await this.callback(messages)
}
this.isProcessing = false
}
isUpToDate: boolean
shapeId?: string
}
export class FetchError extends Error {
status: number
text?: string
json?: object
headers: Record<string, string>
constructor(
status: number,
text: string | undefined,
json: object | undefined,
headers: Record<string, string>,
public url: string,
message?: string
) {
super(
message ||
`HTTP Error ${status} at ${url}: ${text ?? JSON.stringify(json)}`
)
this.name = `FetchError`
this.status = status
this.text = text
this.json = json
this.headers = headers
}
static async fromResponse(
response: Response,
url: string
): Promise<FetchError> {
const status = response.status
const headers = Object.fromEntries([...response.headers.entries()])
let text: string | undefined = undefined
let json: object | undefined = undefined
const contentType = response.headers.get(`content-type`)
if (contentType && contentType.includes(`application/json`)) {
json = (await response.json()) as object
} else {
text = await response.text()
}
return new FetchError(status, text, json, headers, url)
}
}
/**

@@ -173,13 +111,19 @@ * Reads updates to a shape from Electric using HTTP requests and long polling. Notifies subscribers

*/
export class ShapeStream<T extends Row = Row> {
private options: ShapeStreamOptions
private backoffOptions: BackoffOptions
private fetchClient: typeof fetch
private schema?: Schema
private subscribers = new Map<
export class ShapeStream<T extends Row = Row>
implements ShapeStreamInterface<T>
{
readonly options: ShapeStreamOptions
readonly #fetchClient: typeof fetch
readonly #messageParser: MessageParser<T>
readonly #subscribers = new Map<
number,
[MessageProcessor<T>, ((error: Error) => void) | undefined]
[
MessageProcessorInterface<Message<T>[]>,
((error: Error) => void) | undefined,
]
>()
private upToDateSubscribers = new Map<
readonly #upToDateSubscribers = new Map<
number,

@@ -189,21 +133,27 @@ [() => void, (error: FetchError | Error) => void]

private lastOffset: Offset
private messageParser: MessageParser<T>
private lastSyncedAt?: number // unix time
public isUpToDate: boolean = false
private connected: boolean = false
#lastOffset: Offset
#lastSyncedAt?: number // unix time
#isUpToDate: boolean = false
#connected: boolean = false
#shapeId?: string
#schema?: Schema
shapeId?: string
constructor(options: ShapeStreamOptions) {
this.validateOptions(options)
validateOptions(options)
this.options = { subscribe: true, ...options }
this.lastOffset = this.options.offset ?? `-1`
this.shapeId = this.options.shapeId
this.messageParser = new MessageParser<T>(options.parser)
this.#lastOffset = this.options.offset ?? `-1`
this.#shapeId = this.options.shapeId
this.#messageParser = new MessageParser<T>(options.parser)
this.backoffOptions = options.backoffOptions ?? BackoffDefaults
this.fetchClient =
this.#fetchClient = createFetchWithBackoff(
options.fetchClient ??
((...args: Parameters<typeof fetch>) => fetch(...args))
((...args: Parameters<typeof fetch>) => fetch(...args)),
{
...(options.backoffOptions ?? BackoffDefaults),
onFailedAttempt: () => {
this.#connected = false
options.backoffOptions?.onFailedAttempt?.()
},
}
)

@@ -213,4 +163,12 @@ this.start()

get shapeId() {
return this.#shapeId
}
get isUpToDate() {
return this.#isUpToDate
}
async start() {
this.isUpToDate = false
this.#isUpToDate = false

@@ -220,23 +178,25 @@ const { url, where, signal } = this.options

try {
while ((!signal?.aborted && !this.isUpToDate) || this.options.subscribe) {
while (
(!signal?.aborted && !this.#isUpToDate) ||
this.options.subscribe
) {
const fetchUrl = new URL(url)
if (where) fetchUrl.searchParams.set(`where`, where)
fetchUrl.searchParams.set(`offset`, this.lastOffset)
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where)
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset)
if (this.isUpToDate) {
fetchUrl.searchParams.set(`live`, `true`)
if (this.#isUpToDate) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`)
}
if (this.shapeId) {
if (this.#shapeId) {
// This should probably be a header for better cache breaking?
fetchUrl.searchParams.set(`shape_id`, this.shapeId!)
fetchUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, this.#shapeId!)
}
let response!: Response
try {
const maybeResponse = await this.fetchWithBackoff(fetchUrl)
if (maybeResponse) response = maybeResponse
else break
response = await this.#fetchClient(fetchUrl.toString(), { signal })
this.#connected = true
} catch (e) {
if (e instanceof FetchBackoffAbortError) break // interrupted
if (!(e instanceof FetchError)) throw e // should never happen

@@ -246,4 +206,4 @@ if (e.status == 400) {

// We should start from scratch, this will force the shape to be recreated.
this.reset()
this.publish(e.json as Message<T>[])
this.#reset()
this.#publish(e.json as Message<T>[])
continue

@@ -253,10 +213,10 @@ } else if (e.status == 409) {

// with the newly provided shape ID
const newShapeId = e.headers[`x-electric-shape-id`]
this.reset(newShapeId)
this.publish(e.json as Message<T>[])
const newShapeId = e.headers[SHAPE_ID_HEADER]
this.#reset(newShapeId)
this.#publish(e.json as Message<T>[])
continue
} else if (e.status >= 400 && e.status < 500) {
// Notify subscribers
this.sendErrorToUpToDateSubscribers(e)
this.sendErrorToSubscribers(e)
this.#sendErrorToUpToDateSubscribers(e)
this.#sendErrorToSubscribers(e)

@@ -269,17 +229,17 @@ // 400 errors are not actionable without additional user input, so we're throwing them.

const { headers, status } = response
const shapeId = headers.get(`X-Electric-Shape-Id`)
const shapeId = headers.get(SHAPE_ID_HEADER)
if (shapeId) {
this.shapeId = shapeId
this.#shapeId = shapeId
}
const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`)
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER)
if (lastOffset) {
this.lastOffset = lastOffset as Offset
this.#lastOffset = lastOffset as Offset
}
const getSchema = (): Schema => {
const schemaHeader = headers.get(`X-Electric-Schema`)
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
return schemaHeader ? JSON.parse(schemaHeader) : {}
}
this.schema = this.schema ?? getSchema()
this.#schema = this.#schema ?? getSchema()

@@ -290,6 +250,6 @@ const messages = status === 204 ? `[]` : await response.text()

// There's no content so we are live and up to date
this.lastSyncedAt = Date.now()
this.#lastSyncedAt = Date.now()
}
const batch = this.messageParser.parse(messages, this.schema)
const batch = this.#messageParser.parse(messages, this.#schema)

@@ -299,18 +259,15 @@ // Update isUpToDate

const lastMessage = batch[batch.length - 1]
if (
isControlMessage(lastMessage) &&
lastMessage.headers.control === `up-to-date`
) {
this.lastSyncedAt = Date.now()
if (!this.isUpToDate) {
this.isUpToDate = true
this.notifyUpToDateSubscribers()
if (isUpToDateMessage(lastMessage)) {
this.#lastSyncedAt = Date.now()
if (!this.#isUpToDate) {
this.#isUpToDate = true
this.#notifyUpToDateSubscribers()
}
}
this.publish(batch)
this.#publish(batch)
}
}
} finally {
this.connected = false
this.#connected = false
}

@@ -320,3 +277,3 @@ }

subscribe(
callback: (messages: Message<T>[]) => void | Promise<void>,
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void

@@ -327,6 +284,6 @@ ) {

this.subscribers.set(subscriptionId, [subscriber, onError])
this.#subscribers.set(subscriptionId, [subscriber, onError])
return () => {
this.subscribers.delete(subscriptionId)
this.#subscribers.delete(subscriptionId)
}

@@ -336,19 +293,7 @@ }

unsubscribeAll(): void {
this.subscribers.clear()
this.#subscribers.clear()
}
private publish(messages: Message<T>[]) {
this.subscribers.forEach(([subscriber, _]) => {
subscriber.process(messages)
})
}
private sendErrorToSubscribers(error: Error) {
this.subscribers.forEach(([_, errorFn]) => {
errorFn?.(error)
})
}
subscribeOnceToUpToDate(
callback: () => void | Promise<void>,
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void

@@ -358,6 +303,6 @@ ) {

this.upToDateSubscribers.set(subscriptionId, [callback, error])
this.#upToDateSubscribers.set(subscriptionId, [callback, error])
return () => {
this.upToDateSubscribers.delete(subscriptionId)
this.#upToDateSubscribers.delete(subscriptionId)
}

@@ -367,13 +312,19 @@ }

unsubscribeAllUpToDateSubscribers(): void {
this.upToDateSubscribers.clear()
this.#upToDateSubscribers.clear()
}
/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt(): number | undefined {
return this.#lastSyncedAt
}
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */
lastSynced(): number {
if (this.lastSyncedAt === undefined) return Infinity
return Date.now() - this.lastSyncedAt
if (this.#lastSyncedAt === undefined) return Infinity
return Date.now() - this.#lastSyncedAt
}
/** Indicates if we are connected to the Electric sync service. */
isConnected(): boolean {
return this.connected
return this.#connected
}

@@ -386,4 +337,16 @@

private notifyUpToDateSubscribers() {
this.upToDateSubscribers.forEach(([callback]) => {
#publish(messages: Message<T>[]) {
this.#subscribers.forEach(([subscriber, _]) => {
subscriber.process(messages)
})
}
#sendErrorToSubscribers(error: Error) {
this.#subscribers.forEach(([_, errorFn]) => {
errorFn?.(error)
})
}
#notifyUpToDateSubscribers() {
this.#upToDateSubscribers.forEach(([callback]) => {
callback()

@@ -393,5 +356,4 @@ })

private sendErrorToUpToDateSubscribers(error: FetchError | Error) {
// eslint-disable-next-line
this.upToDateSubscribers.forEach(([_, errorCallback]) =>
#sendErrorToUpToDateSubscribers(error: FetchError | Error) {
this.#upToDateSubscribers.forEach(([_, errorCallback]) =>
errorCallback(error)

@@ -405,246 +367,31 @@ )

*/
private reset(shapeId?: string) {
this.lastOffset = `-1`
this.shapeId = shapeId
this.isUpToDate = false
this.connected = false
this.schema = undefined
#reset(shapeId?: string) {
this.#lastOffset = `-1`
this.#shapeId = shapeId
this.#isUpToDate = false
this.#connected = false
this.#schema = undefined
}
}
private validateOptions(options: ShapeStreamOptions): void {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`)
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
)
}
if (
options.offset !== undefined &&
options.offset !== `-1` &&
!options.shapeId
) {
throw new Error(
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)`
)
}
function validateOptions(options: Partial<ShapeStreamOptions>): void {
if (!options.url) {
throw new Error(`Invalid shape option. It must provide the url`)
}
private async fetchWithBackoff(url: URL) {
const { initialDelay, maxDelay, multiplier } = this.backoffOptions
const signal = this.options.signal
let delay = initialDelay
let attempt = 0
// eslint-disable-next-line no-constant-condition -- we're retrying with a lag until we get a non-500 response or the abort signal is triggered
while (true) {
try {
const result = await this.fetchClient(url.toString(), { signal })
if (result.ok) {
if (this.options.subscribe) {
this.connected = true
}
return result
} else throw await FetchError.fromResponse(result, url.toString())
} catch (e) {
this.connected = false
if (signal?.aborted) {
return undefined
} else if (
e instanceof FetchError &&
e.status >= 400 &&
e.status < 500
) {
// Any client errors cannot be backed off on, leave it to the caller to handle.
throw e
} else {
// Exponentially backoff on errors.
// Wait for the current delay duration
await new Promise((resolve) => setTimeout(resolve, delay))
// Increase the delay for the next attempt
delay = Math.min(delay * multiplier, maxDelay)
attempt++
console.log(`Retry attempt #${attempt} after ${delay}ms`)
}
}
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
)
}
}
/**
* A Shape is an object that subscribes to a shape log,
* keeps a materialised shape `.value` in memory and
* notifies subscribers when the value has changed.
*
* It can be used without a framework and as a primitive
* to simplify developing framework hooks.
*
* @constructor
* @param {ShapeStream<T extends Row>} - the underlying shape stream
* @example
* ```
* const shapeStream = new ShapeStream<{ foo: number }>(url: 'http://localhost:3000/v1/shape/foo'})
* const shape = new Shape(shapeStream)
* ```
*
* `value` returns a promise that resolves the Shape data once the Shape has been
* fully loaded (and when resuming from being offline):
*
* const value = await shape.value
*
* `valueSync` returns the current data synchronously:
*
* const value = shape.valueSync
*
* Subscribe to updates. Called whenever the shape updates in Postgres.
*
* shape.subscribe(shapeData => {
* console.log(shapeData)
* })
*/
export class Shape<T extends Row = Row> {
private stream: ShapeStream<T>
private data: ShapeData<T> = new Map()
private subscribers = new Map<number, ShapeChangedCallback<T>>()
public error: FetchError | false = false
private hasNotifiedSubscribersUpToDate: boolean = false
constructor(stream: ShapeStream<T>) {
this.stream = stream
this.stream.subscribe(this.process.bind(this), this.handleError.bind(this))
const unsubscribe = this.stream.subscribeOnceToUpToDate(
() => {
unsubscribe()
},
(e) => {
this.handleError(e)
throw e
}
if (
options.offset !== undefined &&
options.offset !== `-1` &&
!options.shapeId
) {
throw new Error(
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)`
)
}
lastSynced(): number {
return this.stream.lastSynced()
}
isConnected(): boolean {
return this.stream.isConnected()
}
/** True during initial fetch. False afterwise. */
isLoading(): boolean {
return this.stream.isLoading()
}
get value(): Promise<ShapeData<T>> {
return new Promise((resolve) => {
if (this.stream.isUpToDate) {
resolve(this.valueSync)
} else {
const unsubscribe = this.stream.subscribeOnceToUpToDate(
() => {
unsubscribe()
resolve(this.valueSync)
},
(e) => {
throw e
}
)
}
})
}
get valueSync() {
return this.data
}
subscribe(callback: ShapeChangedCallback<T>): () => void {
const subscriptionId = Math.random()
this.subscribers.set(subscriptionId, callback)
return () => {
this.subscribers.delete(subscriptionId)
}
}
unsubscribeAll(): void {
this.subscribers.clear()
}
get numSubscribers() {
return this.subscribers.size
}
private process(messages: Message<T>[]): void {
let dataMayHaveChanged = false
let isUpToDate = false
let newlyUpToDate = false
messages.forEach((message) => {
if (isChangeMessage(message)) {
dataMayHaveChanged = [`insert`, `update`, `delete`].includes(
message.headers.operation
)
switch (message.headers.operation) {
case `insert`:
this.data.set(message.key, message.value)
break
case `update`:
this.data.set(message.key, {
...this.data.get(message.key)!,
...message.value,
})
break
case `delete`:
this.data.delete(message.key)
break
}
}
if (isControlMessage(message)) {
switch (message.headers.control) {
case `up-to-date`:
isUpToDate = true
if (!this.hasNotifiedSubscribersUpToDate) {
newlyUpToDate = true
}
break
case `must-refetch`:
this.data.clear()
this.error = false
isUpToDate = false
newlyUpToDate = false
break
}
}
})
// Always notify subscribers when the Shape first is up to date.
// FIXME this would be cleaner with a simple state machine.
if (newlyUpToDate || (isUpToDate && dataMayHaveChanged)) {
this.hasNotifiedSubscribersUpToDate = true
this.notify()
}
}
private handleError(e: Error): void {
if (e instanceof FetchError) {
this.error = e
this.notify()
}
}
private notify(): void {
this.subscribers.forEach((callback) => {
callback(this.valueSync)
})
}
return
}

@@ -48,1 +48,7 @@ import { ChangeMessage, ControlMessage, Message, Row } from './types'

}
export function isUpToDateMessage<T extends Row = Row>(
message: Message<T>
): message is ControlMessage & { up_to_date: true } {
return isControlMessage(message) && message.headers.control === `up-to-date`
}
export * from './client'
export * from './shape'
export * from './types'
export * from './helpers'
export { isChangeMessage, isControlMessage } from './helpers'
export { FetchError } from './error'
export { type BackoffOptions, BackoffDefaults } from './fetch'

@@ -111,1 +111,3 @@ export type Value =

}
export type MaybePromise<T> = T | Promise<T>

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc