@electric-sql/client
Advanced tools
Comparing version 0.5.0 to 0.5.1
@@ -75,2 +75,3 @@ type Value = string | number | boolean | bigint | null | Value[] | { | ||
}; | ||
type MaybePromise<T> = T | Promise<T>; | ||
@@ -84,8 +85,24 @@ type NullToken = null | `NULL`; | ||
type ShapeData<T extends Row = Row> = Map<string, T>; | ||
type ShapeChangedCallback<T extends Row = Row> = (value: ShapeData<T>) => void; | ||
declare class FetchError extends Error { | ||
url: string; | ||
status: number; | ||
text?: string; | ||
json?: object; | ||
headers: Record<string, string>; | ||
constructor(status: number, text: string | undefined, json: object | undefined, headers: Record<string, string>, url: string, message?: string); | ||
static fromResponse(response: Response, url: string): Promise<FetchError>; | ||
} | ||
interface BackoffOptions { | ||
/** | ||
* Initial delay before retrying in milliseconds | ||
*/ | ||
initialDelay: number; | ||
/** | ||
* Maximum retry delay in milliseconds | ||
*/ | ||
maxDelay: number; | ||
multiplier: number; | ||
onFailedAttempt?: () => void; | ||
debug?: boolean; | ||
} | ||
@@ -97,2 +114,3 @@ declare const BackoffDefaults: { | ||
}; | ||
/** | ||
@@ -135,10 +153,13 @@ * Options for constructing a ShapeStream. | ||
} | ||
declare class FetchError extends Error { | ||
url: string; | ||
status: number; | ||
text?: string; | ||
json?: object; | ||
headers: Record<string, string>; | ||
constructor(status: number, text: string | undefined, json: object | undefined, headers: Record<string, string>, url: string, message?: string); | ||
static fromResponse(response: Response, url: string): Promise<FetchError>; | ||
interface ShapeStreamInterface<T extends Row = Row> { | ||
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): void; | ||
unsubscribeAllUpToDateSubscribers(): void; | ||
unsubscribeAll(): void; | ||
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void; | ||
isLoading(): boolean; | ||
lastSyncedAt(): number | undefined; | ||
lastSynced(): number; | ||
isConnected(): boolean; | ||
isUpToDate: boolean; | ||
shapeId?: string; | ||
} | ||
@@ -175,38 +196,25 @@ /** | ||
*/ | ||
declare class ShapeStream<T extends Row = Row> { | ||
private options; | ||
private backoffOptions; | ||
private fetchClient; | ||
private schema?; | ||
private subscribers; | ||
private upToDateSubscribers; | ||
private lastOffset; | ||
private messageParser; | ||
private lastSyncedAt?; | ||
isUpToDate: boolean; | ||
private connected; | ||
shapeId?: string; | ||
declare class ShapeStream<T extends Row = Row> implements ShapeStreamInterface<T> { | ||
#private; | ||
readonly options: ShapeStreamOptions; | ||
constructor(options: ShapeStreamOptions); | ||
get shapeId(): string; | ||
get isUpToDate(): boolean; | ||
start(): Promise<void>; | ||
subscribe(callback: (messages: Message<T>[]) => void | Promise<void>, onError?: (error: FetchError | Error) => void): () => void; | ||
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): () => void; | ||
unsubscribeAll(): void; | ||
private publish; | ||
private sendErrorToSubscribers; | ||
subscribeOnceToUpToDate(callback: () => void | Promise<void>, error: (err: FetchError | Error) => void): () => void; | ||
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void; | ||
unsubscribeAllUpToDateSubscribers(): void; | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
lastSyncedAt(): number | undefined; | ||
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ | ||
lastSynced(): number; | ||
/** Indicates if we are connected to the Electric sync service. */ | ||
isConnected(): boolean; | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading(): boolean; | ||
private notifyUpToDateSubscribers; | ||
private sendErrorToUpToDateSubscribers; | ||
/** | ||
* Resets the state of the stream, optionally with a provided | ||
* shape ID | ||
*/ | ||
private reset; | ||
private validateOptions; | ||
private fetchWithBackoff; | ||
} | ||
type ShapeData<T extends Row = Row> = Map<string, T>; | ||
type ShapeChangedCallback<T extends Row = Row> = (value: ShapeData<T>) => void; | ||
/** | ||
@@ -244,20 +252,19 @@ * A Shape is an object that subscribes to a shape log, | ||
declare class Shape<T extends Row = Row> { | ||
private stream; | ||
private data; | ||
private subscribers; | ||
error: FetchError | false; | ||
private hasNotifiedSubscribersUpToDate; | ||
constructor(stream: ShapeStream<T>); | ||
#private; | ||
constructor(stream: ShapeStreamInterface<T>); | ||
get isUpToDate(): boolean; | ||
get value(): Promise<ShapeData<T>>; | ||
get valueSync(): ShapeData<T>; | ||
get error(): false | FetchError; | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
lastSyncedAt(): number | undefined; | ||
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ | ||
lastSynced(): number; | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading(): boolean; | ||
/** Indicates if we are connected to the Electric sync service. */ | ||
isConnected(): boolean; | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading(): boolean; | ||
get value(): Promise<ShapeData<T>>; | ||
get valueSync(): ShapeData<T>; | ||
subscribe(callback: ShapeChangedCallback<T>): () => void; | ||
unsubscribeAll(): void; | ||
get numSubscribers(): number; | ||
private process; | ||
private handleError; | ||
private notify; | ||
} | ||
@@ -302,2 +309,2 @@ | ||
export { BackoffDefaults, type BackoffOptions, type BitColumn, type BpcharColumn, type ChangeMessage, type ColumnInfo, type CommonColumnProps, type ControlMessage, FetchError, type IntervalColumn, type IntervalColumnWithPrecision, type Message, type NumericColumn, type Offset, type RegularColumn, type Row, type Schema, Shape, type ShapeChangedCallback, type ShapeData, ShapeStream, type ShapeStreamOptions, type TimeColumn, type TypedMessages, type Value, type VarcharColumn, isChangeMessage, isControlMessage }; | ||
export { BackoffDefaults, type BackoffOptions, type BitColumn, type BpcharColumn, type ChangeMessage, type ColumnInfo, type CommonColumnProps, type ControlMessage, FetchError, type IntervalColumn, type IntervalColumnWithPrecision, type MaybePromise, type Message, type NumericColumn, type Offset, type RegularColumn, type Row, type Schema, Shape, type ShapeChangedCallback, type ShapeData, ShapeStream, type ShapeStreamInterface, type ShapeStreamOptions, type TimeColumn, type TypedMessages, type Value, type VarcharColumn, isChangeMessage, isControlMessage }; |
var __defProp = Object.defineProperty; | ||
var __defProps = Object.defineProperties; | ||
var __getOwnPropDescs = Object.getOwnPropertyDescriptors; | ||
var __getOwnPropSymbols = Object.getOwnPropertySymbols; | ||
var __hasOwnProp = Object.prototype.hasOwnProperty; | ||
var __propIsEnum = Object.prototype.propertyIsEnumerable; | ||
var __typeError = (msg) => { | ||
throw TypeError(msg); | ||
}; | ||
var __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; | ||
@@ -17,2 +22,3 @@ var __spreadValues = (a, b) => { | ||
}; | ||
var __spreadProps = (a, b) => __defProps(a, __getOwnPropDescs(b)); | ||
var __objRest = (source, exclude) => { | ||
@@ -30,2 +36,7 @@ var target = {}; | ||
}; | ||
var __accessCheck = (obj, member, msg) => member.has(obj) || __typeError("Cannot " + msg); | ||
var __privateGet = (obj, member, getter) => (__accessCheck(obj, member, "read from private field"), getter ? getter.call(obj) : member.get(obj)); | ||
var __privateAdd = (obj, member, value) => member.has(obj) ? __typeError("Cannot add the same private member more than once") : member instanceof WeakSet ? member.add(obj) : member.set(obj, value); | ||
var __privateSet = (obj, member, value, setter) => (__accessCheck(obj, member, "write to private field"), setter ? setter.call(obj, value) : member.set(obj, value), value); | ||
var __privateMethod = (obj, member, method) => (__accessCheck(obj, member, "access private method"), method); | ||
@@ -151,30 +162,46 @@ // src/parser.ts | ||
} | ||
function isUpToDateMessage(message) { | ||
return isControlMessage(message) && message.headers.control === `up-to-date`; | ||
} | ||
// src/client.ts | ||
var BackoffDefaults = { | ||
initialDelay: 100, | ||
maxDelay: 1e4, | ||
multiplier: 1.3 | ||
// src/queue.ts | ||
function isThenable(value) { | ||
return !!value && typeof value === `object` && `then` in value && typeof value.then === `function`; | ||
} | ||
var _processingChain; | ||
var AsyncProcessingQueue = class { | ||
constructor() { | ||
__privateAdd(this, _processingChain); | ||
} | ||
process(callback) { | ||
__privateSet(this, _processingChain, isThenable(__privateGet(this, _processingChain)) ? __privateGet(this, _processingChain).then(callback) : callback()); | ||
return __privateGet(this, _processingChain); | ||
} | ||
async waitForProcessing() { | ||
let currentChain; | ||
do { | ||
currentChain = __privateGet(this, _processingChain); | ||
await currentChain; | ||
} while (__privateGet(this, _processingChain) !== currentChain); | ||
} | ||
}; | ||
_processingChain = new WeakMap(); | ||
var _queue, _callback; | ||
var MessageProcessor = class { | ||
constructor(callback) { | ||
this.messageQueue = []; | ||
this.isProcessing = false; | ||
this.callback = callback; | ||
__privateAdd(this, _queue, new AsyncProcessingQueue()); | ||
__privateAdd(this, _callback); | ||
__privateSet(this, _callback, callback); | ||
} | ||
process(messages) { | ||
this.messageQueue.push(messages); | ||
if (!this.isProcessing) { | ||
this.processQueue(); | ||
} | ||
__privateGet(this, _queue).process(() => __privateGet(this, _callback).call(this, messages)); | ||
} | ||
async processQueue() { | ||
this.isProcessing = true; | ||
while (this.messageQueue.length > 0) { | ||
const messages = this.messageQueue.shift(); | ||
await this.callback(messages); | ||
} | ||
this.isProcessing = false; | ||
async waitForProcessing() { | ||
await __privateGet(this, _queue).waitForProcessing(); | ||
} | ||
}; | ||
_queue = new WeakMap(); | ||
_callback = new WeakMap(); | ||
// src/error.ts | ||
var FetchError = class _FetchError extends Error { | ||
@@ -206,53 +233,135 @@ constructor(status, text, json, headers, url, message) { | ||
}; | ||
var FetchBackoffAbortError = class extends Error { | ||
constructor() { | ||
super(`Fetch with backoff aborted`); | ||
} | ||
}; | ||
// src/fetch.ts | ||
var BackoffDefaults = { | ||
initialDelay: 100, | ||
maxDelay: 1e4, | ||
multiplier: 1.3 | ||
}; | ||
function createFetchWithBackoff(fetchClient, backoffOptions = BackoffDefaults) { | ||
const { | ||
initialDelay, | ||
maxDelay, | ||
multiplier, | ||
debug = false, | ||
onFailedAttempt | ||
} = backoffOptions; | ||
return async (...args) => { | ||
var _a; | ||
const url = args[0]; | ||
const options = args[1]; | ||
let delay = initialDelay; | ||
let attempt = 0; | ||
while (true) { | ||
try { | ||
const result = await fetchClient(...args); | ||
if (result.ok) return result; | ||
else throw await FetchError.fromResponse(result, url.toString()); | ||
} catch (e) { | ||
onFailedAttempt == null ? void 0 : onFailedAttempt(); | ||
if ((_a = options == null ? void 0 : options.signal) == null ? void 0 : _a.aborted) { | ||
throw new FetchBackoffAbortError(); | ||
} else if (e instanceof FetchError && e.status >= 400 && e.status < 500) { | ||
throw e; | ||
} else { | ||
await new Promise((resolve) => setTimeout(resolve, delay)); | ||
delay = Math.min(delay * multiplier, maxDelay); | ||
if (debug) { | ||
attempt++; | ||
console.log(`Retry attempt #${attempt} after ${delay}ms`); | ||
} | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
// src/constants.ts | ||
var SHAPE_ID_HEADER = `x-electric-shape-id`; | ||
var CHUNK_LAST_OFFSET_HEADER = `x-electric-chunk-last-offset`; | ||
var SHAPE_SCHEMA_HEADER = `x-electric-schema`; | ||
var SHAPE_ID_QUERY_PARAM = `shape_id`; | ||
var OFFSET_QUERY_PARAM = `offset`; | ||
var WHERE_QUERY_PARAM = `where`; | ||
var LIVE_QUERY_PARAM = `live`; | ||
// src/client.ts | ||
var _fetchClient, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _lastSyncedAt, _isUpToDate, _connected, _shapeId, _schema, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn; | ||
var ShapeStream = class { | ||
constructor(options) { | ||
this.subscribers = /* @__PURE__ */ new Map(); | ||
this.upToDateSubscribers = /* @__PURE__ */ new Map(); | ||
__privateAdd(this, _ShapeStream_instances); | ||
__privateAdd(this, _fetchClient); | ||
__privateAdd(this, _messageParser); | ||
__privateAdd(this, _subscribers, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _upToDateSubscribers, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _lastOffset); | ||
__privateAdd(this, _lastSyncedAt); | ||
// unix time | ||
this.isUpToDate = false; | ||
this.connected = false; | ||
__privateAdd(this, _isUpToDate, false); | ||
__privateAdd(this, _connected, false); | ||
__privateAdd(this, _shapeId); | ||
__privateAdd(this, _schema); | ||
var _a, _b, _c; | ||
this.validateOptions(options); | ||
validateOptions(options); | ||
this.options = __spreadValues({ subscribe: true }, options); | ||
this.lastOffset = (_a = this.options.offset) != null ? _a : `-1`; | ||
this.shapeId = this.options.shapeId; | ||
this.messageParser = new MessageParser(options.parser); | ||
this.backoffOptions = (_b = options.backoffOptions) != null ? _b : BackoffDefaults; | ||
this.fetchClient = (_c = options.fetchClient) != null ? _c : (...args) => fetch(...args); | ||
__privateSet(this, _lastOffset, (_a = this.options.offset) != null ? _a : `-1`); | ||
__privateSet(this, _shapeId, this.options.shapeId); | ||
__privateSet(this, _messageParser, new MessageParser(options.parser)); | ||
__privateSet(this, _fetchClient, createFetchWithBackoff( | ||
(_b = options.fetchClient) != null ? _b : (...args) => fetch(...args), | ||
__spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), { | ||
onFailedAttempt: () => { | ||
var _a2, _b2; | ||
__privateSet(this, _connected, false); | ||
(_b2 = (_a2 = options.backoffOptions) == null ? void 0 : _a2.onFailedAttempt) == null ? void 0 : _b2.call(_a2); | ||
} | ||
}) | ||
)); | ||
this.start(); | ||
} | ||
get shapeId() { | ||
return __privateGet(this, _shapeId); | ||
} | ||
get isUpToDate() { | ||
return __privateGet(this, _isUpToDate); | ||
} | ||
async start() { | ||
var _a; | ||
this.isUpToDate = false; | ||
__privateSet(this, _isUpToDate, false); | ||
const { url, where, signal } = this.options; | ||
try { | ||
while (!(signal == null ? void 0 : signal.aborted) && !this.isUpToDate || this.options.subscribe) { | ||
while (!(signal == null ? void 0 : signal.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) { | ||
const fetchUrl = new URL(url); | ||
if (where) fetchUrl.searchParams.set(`where`, where); | ||
fetchUrl.searchParams.set(`offset`, this.lastOffset); | ||
if (this.isUpToDate) { | ||
fetchUrl.searchParams.set(`live`, `true`); | ||
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where); | ||
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset)); | ||
if (__privateGet(this, _isUpToDate)) { | ||
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`); | ||
} | ||
if (this.shapeId) { | ||
fetchUrl.searchParams.set(`shape_id`, this.shapeId); | ||
if (__privateGet(this, _shapeId)) { | ||
fetchUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, __privateGet(this, _shapeId)); | ||
} | ||
let response; | ||
try { | ||
const maybeResponse = await this.fetchWithBackoff(fetchUrl); | ||
if (maybeResponse) response = maybeResponse; | ||
else break; | ||
response = await __privateGet(this, _fetchClient).call(this, fetchUrl.toString(), { signal }); | ||
__privateSet(this, _connected, true); | ||
} catch (e) { | ||
if (e instanceof FetchBackoffAbortError) break; | ||
if (!(e instanceof FetchError)) throw e; | ||
if (e.status == 400) { | ||
this.reset(); | ||
this.publish(e.json); | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this); | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
} else if (e.status == 409) { | ||
const newShapeId = e.headers[`x-electric-shape-id`]; | ||
this.reset(newShapeId); | ||
this.publish(e.json); | ||
const newShapeId = e.headers[SHAPE_ID_HEADER]; | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeId); | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
} else if (e.status >= 400 && e.status < 500) { | ||
this.sendErrorToUpToDateSubscribers(e); | ||
this.sendErrorToSubscribers(e); | ||
__privateMethod(this, _ShapeStream_instances, sendErrorToUpToDateSubscribers_fn).call(this, e); | ||
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e); | ||
throw e; | ||
@@ -262,34 +371,34 @@ } | ||
const { headers, status } = response; | ||
const shapeId = headers.get(`X-Electric-Shape-Id`); | ||
const shapeId = headers.get(SHAPE_ID_HEADER); | ||
if (shapeId) { | ||
this.shapeId = shapeId; | ||
__privateSet(this, _shapeId, shapeId); | ||
} | ||
const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`); | ||
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER); | ||
if (lastOffset) { | ||
this.lastOffset = lastOffset; | ||
__privateSet(this, _lastOffset, lastOffset); | ||
} | ||
const getSchema = () => { | ||
const schemaHeader = headers.get(`X-Electric-Schema`); | ||
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER); | ||
return schemaHeader ? JSON.parse(schemaHeader) : {}; | ||
}; | ||
this.schema = (_a = this.schema) != null ? _a : getSchema(); | ||
__privateSet(this, _schema, (_a = __privateGet(this, _schema)) != null ? _a : getSchema()); | ||
const messages = status === 204 ? `[]` : await response.text(); | ||
if (status === 204) { | ||
this.lastSyncedAt = Date.now(); | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
} | ||
const batch = this.messageParser.parse(messages, this.schema); | ||
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema)); | ||
if (batch.length > 0) { | ||
const lastMessage = batch[batch.length - 1]; | ||
if (isControlMessage(lastMessage) && lastMessage.headers.control === `up-to-date`) { | ||
this.lastSyncedAt = Date.now(); | ||
if (!this.isUpToDate) { | ||
this.isUpToDate = true; | ||
this.notifyUpToDateSubscribers(); | ||
if (isUpToDateMessage(lastMessage)) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
if (!__privateGet(this, _isUpToDate)) { | ||
__privateSet(this, _isUpToDate, true); | ||
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this); | ||
} | ||
} | ||
this.publish(batch); | ||
__privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch); | ||
} | ||
} | ||
} finally { | ||
this.connected = false; | ||
__privateSet(this, _connected, false); | ||
} | ||
@@ -300,37 +409,32 @@ } | ||
const subscriber = new MessageProcessor(callback); | ||
this.subscribers.set(subscriptionId, [subscriber, onError]); | ||
__privateGet(this, _subscribers).set(subscriptionId, [subscriber, onError]); | ||
return () => { | ||
this.subscribers.delete(subscriptionId); | ||
__privateGet(this, _subscribers).delete(subscriptionId); | ||
}; | ||
} | ||
unsubscribeAll() { | ||
this.subscribers.clear(); | ||
__privateGet(this, _subscribers).clear(); | ||
} | ||
publish(messages) { | ||
this.subscribers.forEach(([subscriber, _]) => { | ||
subscriber.process(messages); | ||
}); | ||
} | ||
sendErrorToSubscribers(error) { | ||
this.subscribers.forEach(([_, errorFn]) => { | ||
errorFn == null ? void 0 : errorFn(error); | ||
}); | ||
} | ||
subscribeOnceToUpToDate(callback, error) { | ||
const subscriptionId = Math.random(); | ||
this.upToDateSubscribers.set(subscriptionId, [callback, error]); | ||
__privateGet(this, _upToDateSubscribers).set(subscriptionId, [callback, error]); | ||
return () => { | ||
this.upToDateSubscribers.delete(subscriptionId); | ||
__privateGet(this, _upToDateSubscribers).delete(subscriptionId); | ||
}; | ||
} | ||
unsubscribeAllUpToDateSubscribers() { | ||
this.upToDateSubscribers.clear(); | ||
__privateGet(this, _upToDateSubscribers).clear(); | ||
} | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
lastSyncedAt() { | ||
return __privateGet(this, _lastSyncedAt); | ||
} | ||
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ | ||
lastSynced() { | ||
if (this.lastSyncedAt === void 0) return Infinity; | ||
return Date.now() - this.lastSyncedAt; | ||
if (__privateGet(this, _lastSyncedAt) === void 0) return Infinity; | ||
return Date.now() - __privateGet(this, _lastSyncedAt); | ||
} | ||
/** Indicates if we are connected to the Electric sync service. */ | ||
isConnected() { | ||
return this.connected; | ||
return __privateGet(this, _connected); | ||
} | ||
@@ -341,77 +445,78 @@ /** True during initial fetch. False afterwise. */ | ||
} | ||
notifyUpToDateSubscribers() { | ||
this.upToDateSubscribers.forEach(([callback]) => { | ||
callback(); | ||
}); | ||
}; | ||
_fetchClient = new WeakMap(); | ||
_messageParser = new WeakMap(); | ||
_subscribers = new WeakMap(); | ||
_upToDateSubscribers = new WeakMap(); | ||
_lastOffset = new WeakMap(); | ||
_lastSyncedAt = new WeakMap(); | ||
_isUpToDate = new WeakMap(); | ||
_connected = new WeakMap(); | ||
_shapeId = new WeakMap(); | ||
_schema = new WeakMap(); | ||
_ShapeStream_instances = new WeakSet(); | ||
publish_fn = function(messages) { | ||
__privateGet(this, _subscribers).forEach(([subscriber, _]) => { | ||
subscriber.process(messages); | ||
}); | ||
}; | ||
sendErrorToSubscribers_fn = function(error) { | ||
__privateGet(this, _subscribers).forEach(([_, errorFn]) => { | ||
errorFn == null ? void 0 : errorFn(error); | ||
}); | ||
}; | ||
notifyUpToDateSubscribers_fn = function() { | ||
__privateGet(this, _upToDateSubscribers).forEach(([callback]) => { | ||
callback(); | ||
}); | ||
}; | ||
sendErrorToUpToDateSubscribers_fn = function(error) { | ||
__privateGet(this, _upToDateSubscribers).forEach( | ||
([_, errorCallback]) => errorCallback(error) | ||
); | ||
}; | ||
/** | ||
* Resets the state of the stream, optionally with a provided | ||
* shape ID | ||
*/ | ||
reset_fn = function(shapeId) { | ||
__privateSet(this, _lastOffset, `-1`); | ||
__privateSet(this, _shapeId, shapeId); | ||
__privateSet(this, _isUpToDate, false); | ||
__privateSet(this, _connected, false); | ||
__privateSet(this, _schema, void 0); | ||
}; | ||
function validateOptions(options) { | ||
if (!options.url) { | ||
throw new Error(`Invalid shape option. It must provide the url`); | ||
} | ||
sendErrorToUpToDateSubscribers(error) { | ||
this.upToDateSubscribers.forEach( | ||
([_, errorCallback]) => errorCallback(error) | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
); | ||
} | ||
/** | ||
* Resets the state of the stream, optionally with a provided | ||
* shape ID | ||
*/ | ||
reset(shapeId) { | ||
this.lastOffset = `-1`; | ||
this.shapeId = shapeId; | ||
this.isUpToDate = false; | ||
this.connected = false; | ||
this.schema = void 0; | ||
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeId) { | ||
throw new Error( | ||
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)` | ||
); | ||
} | ||
validateOptions(options) { | ||
if (!options.url) { | ||
throw new Error(`Invalid shape option. It must provide the url`); | ||
} | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
); | ||
} | ||
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeId) { | ||
throw new Error( | ||
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)` | ||
); | ||
} | ||
} | ||
async fetchWithBackoff(url) { | ||
const { initialDelay, maxDelay, multiplier } = this.backoffOptions; | ||
const signal = this.options.signal; | ||
let delay = initialDelay; | ||
let attempt = 0; | ||
while (true) { | ||
try { | ||
const result = await this.fetchClient(url.toString(), { signal }); | ||
if (result.ok) { | ||
if (this.options.subscribe) { | ||
this.connected = true; | ||
} | ||
return result; | ||
} else throw await FetchError.fromResponse(result, url.toString()); | ||
} catch (e) { | ||
this.connected = false; | ||
if (signal == null ? void 0 : signal.aborted) { | ||
return void 0; | ||
} else if (e instanceof FetchError && e.status >= 400 && e.status < 500) { | ||
throw e; | ||
} else { | ||
await new Promise((resolve) => setTimeout(resolve, delay)); | ||
delay = Math.min(delay * multiplier, maxDelay); | ||
attempt++; | ||
console.log(`Retry attempt #${attempt} after ${delay}ms`); | ||
} | ||
} | ||
} | ||
} | ||
}; | ||
return; | ||
} | ||
// src/shape.ts | ||
var _stream, _data, _subscribers2, _hasNotifiedSubscribersUpToDate, _error, _Shape_instances, process_fn, handleError_fn, notify_fn; | ||
var Shape = class { | ||
constructor(stream) { | ||
this.data = /* @__PURE__ */ new Map(); | ||
this.subscribers = /* @__PURE__ */ new Map(); | ||
this.error = false; | ||
this.hasNotifiedSubscribersUpToDate = false; | ||
this.stream = stream; | ||
this.stream.subscribe(this.process.bind(this), this.handleError.bind(this)); | ||
const unsubscribe = this.stream.subscribeOnceToUpToDate( | ||
__privateAdd(this, _Shape_instances); | ||
__privateAdd(this, _stream); | ||
__privateAdd(this, _data, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _subscribers2, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _hasNotifiedSubscribersUpToDate, false); | ||
__privateAdd(this, _error, false); | ||
__privateSet(this, _stream, stream); | ||
__privateGet(this, _stream).subscribe( | ||
__privateMethod(this, _Shape_instances, process_fn).bind(this), | ||
__privateMethod(this, _Shape_instances, handleError_fn).bind(this) | ||
); | ||
const unsubscribe = __privateGet(this, _stream).subscribeOnceToUpToDate( | ||
() => { | ||
@@ -421,3 +526,3 @@ unsubscribe(); | ||
(e) => { | ||
this.handleError(e); | ||
__privateMethod(this, _Shape_instances, handleError_fn).call(this, e); | ||
throw e; | ||
@@ -427,26 +532,15 @@ } | ||
} | ||
lastSynced() { | ||
return this.stream.lastSynced(); | ||
get isUpToDate() { | ||
return __privateGet(this, _stream).isUpToDate; | ||
} | ||
isConnected() { | ||
return this.stream.isConnected(); | ||
} | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading() { | ||
return this.stream.isLoading(); | ||
} | ||
get value() { | ||
return new Promise((resolve) => { | ||
if (this.stream.isUpToDate) { | ||
return new Promise((resolve, reject) => { | ||
if (__privateGet(this, _stream).isUpToDate) { | ||
resolve(this.valueSync); | ||
} else { | ||
const unsubscribe = this.stream.subscribeOnceToUpToDate( | ||
() => { | ||
unsubscribe(); | ||
resolve(this.valueSync); | ||
}, | ||
(e) => { | ||
throw e; | ||
} | ||
); | ||
const unsubscribe = this.subscribe((shapeData) => { | ||
unsubscribe(); | ||
if (__privateGet(this, _error)) reject(__privateGet(this, _error)); | ||
resolve(shapeData); | ||
}); | ||
} | ||
@@ -456,72 +550,97 @@ }); | ||
get valueSync() { | ||
return this.data; | ||
return __privateGet(this, _data); | ||
} | ||
get error() { | ||
return __privateGet(this, _error); | ||
} | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
lastSyncedAt() { | ||
return __privateGet(this, _stream).lastSyncedAt(); | ||
} | ||
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ | ||
lastSynced() { | ||
return __privateGet(this, _stream).lastSynced(); | ||
} | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading() { | ||
return __privateGet(this, _stream).isLoading(); | ||
} | ||
/** Indicates if we are connected to the Electric sync service. */ | ||
isConnected() { | ||
return __privateGet(this, _stream).isConnected(); | ||
} | ||
subscribe(callback) { | ||
const subscriptionId = Math.random(); | ||
this.subscribers.set(subscriptionId, callback); | ||
__privateGet(this, _subscribers2).set(subscriptionId, callback); | ||
return () => { | ||
this.subscribers.delete(subscriptionId); | ||
__privateGet(this, _subscribers2).delete(subscriptionId); | ||
}; | ||
} | ||
unsubscribeAll() { | ||
this.subscribers.clear(); | ||
__privateGet(this, _subscribers2).clear(); | ||
} | ||
get numSubscribers() { | ||
return this.subscribers.size; | ||
return __privateGet(this, _subscribers2).size; | ||
} | ||
process(messages) { | ||
let dataMayHaveChanged = false; | ||
let isUpToDate = false; | ||
let newlyUpToDate = false; | ||
messages.forEach((message) => { | ||
if (isChangeMessage(message)) { | ||
dataMayHaveChanged = [`insert`, `update`, `delete`].includes( | ||
message.headers.operation | ||
); | ||
switch (message.headers.operation) { | ||
case `insert`: | ||
this.data.set(message.key, message.value); | ||
break; | ||
case `update`: | ||
this.data.set(message.key, __spreadValues(__spreadValues({}, this.data.get(message.key)), message.value)); | ||
break; | ||
case `delete`: | ||
this.data.delete(message.key); | ||
break; | ||
} | ||
}; | ||
_stream = new WeakMap(); | ||
_data = new WeakMap(); | ||
_subscribers2 = new WeakMap(); | ||
_hasNotifiedSubscribersUpToDate = new WeakMap(); | ||
_error = new WeakMap(); | ||
_Shape_instances = new WeakSet(); | ||
process_fn = function(messages) { | ||
let dataMayHaveChanged = false; | ||
let isUpToDate = false; | ||
let newlyUpToDate = false; | ||
messages.forEach((message) => { | ||
if (isChangeMessage(message)) { | ||
dataMayHaveChanged = [`insert`, `update`, `delete`].includes( | ||
message.headers.operation | ||
); | ||
switch (message.headers.operation) { | ||
case `insert`: | ||
__privateGet(this, _data).set(message.key, message.value); | ||
break; | ||
case `update`: | ||
__privateGet(this, _data).set(message.key, __spreadValues(__spreadValues({}, __privateGet(this, _data).get(message.key)), message.value)); | ||
break; | ||
case `delete`: | ||
__privateGet(this, _data).delete(message.key); | ||
break; | ||
} | ||
if (isControlMessage(message)) { | ||
switch (message.headers.control) { | ||
case `up-to-date`: | ||
isUpToDate = true; | ||
if (!this.hasNotifiedSubscribersUpToDate) { | ||
newlyUpToDate = true; | ||
} | ||
break; | ||
case `must-refetch`: | ||
this.data.clear(); | ||
this.error = false; | ||
isUpToDate = false; | ||
newlyUpToDate = false; | ||
break; | ||
} | ||
} | ||
if (isControlMessage(message)) { | ||
switch (message.headers.control) { | ||
case `up-to-date`: | ||
isUpToDate = true; | ||
if (!__privateGet(this, _hasNotifiedSubscribersUpToDate)) { | ||
newlyUpToDate = true; | ||
} | ||
break; | ||
case `must-refetch`: | ||
__privateGet(this, _data).clear(); | ||
__privateSet(this, _error, false); | ||
isUpToDate = false; | ||
newlyUpToDate = false; | ||
break; | ||
} | ||
}); | ||
if (newlyUpToDate || isUpToDate && dataMayHaveChanged) { | ||
this.hasNotifiedSubscribersUpToDate = true; | ||
this.notify(); | ||
} | ||
}); | ||
if (newlyUpToDate || isUpToDate && dataMayHaveChanged) { | ||
__privateSet(this, _hasNotifiedSubscribersUpToDate, true); | ||
__privateMethod(this, _Shape_instances, notify_fn).call(this); | ||
} | ||
handleError(e) { | ||
if (e instanceof FetchError) { | ||
this.error = e; | ||
this.notify(); | ||
} | ||
}; | ||
handleError_fn = function(e) { | ||
if (e instanceof FetchError) { | ||
__privateSet(this, _error, e); | ||
__privateMethod(this, _Shape_instances, notify_fn).call(this); | ||
} | ||
notify() { | ||
this.subscribers.forEach((callback) => { | ||
callback(this.valueSync); | ||
}); | ||
} | ||
}; | ||
notify_fn = function() { | ||
__privateGet(this, _subscribers2).forEach((callback) => { | ||
callback(this.valueSync); | ||
}); | ||
}; | ||
export { | ||
@@ -528,0 +647,0 @@ BackoffDefaults, |
{ | ||
"name": "@electric-sql/client", | ||
"version": "0.5.0", | ||
"version": "0.5.1", | ||
"description": "Postgres everywhere - your data, in sync, wherever you need it.", | ||
@@ -5,0 +5,0 @@ "type": "module", |
@@ -1,22 +0,21 @@ | ||
import { Message, Offset, Schema, Row } from './types' | ||
import { Message, Offset, Schema, Row, MaybePromise } from './types' | ||
import { MessageParser, Parser } from './parser' | ||
import { isChangeMessage, isControlMessage } from './helpers' | ||
import { isUpToDateMessage } from './helpers' | ||
import { MessageProcessor, MessageProcessorInterface } from './queue' | ||
import { FetchError, FetchBackoffAbortError } from './error' | ||
import { | ||
BackoffDefaults, | ||
BackoffOptions, | ||
createFetchWithBackoff, | ||
} from './fetch' | ||
import { | ||
CHUNK_LAST_OFFSET_HEADER, | ||
LIVE_QUERY_PARAM, | ||
OFFSET_QUERY_PARAM, | ||
SHAPE_ID_HEADER, | ||
SHAPE_ID_QUERY_PARAM, | ||
SHAPE_SCHEMA_HEADER, | ||
WHERE_QUERY_PARAM, | ||
} from './constants' | ||
export type ShapeData<T extends Row = Row> = Map<string, T> | ||
export type ShapeChangedCallback<T extends Row = Row> = ( | ||
value: ShapeData<T> | ||
) => void | ||
export interface BackoffOptions { | ||
initialDelay: number | ||
maxDelay: number | ||
multiplier: number | ||
} | ||
export const BackoffDefaults = { | ||
initialDelay: 100, | ||
maxDelay: 10_000, | ||
multiplier: 1.3, | ||
} | ||
/** | ||
@@ -60,84 +59,23 @@ * Options for constructing a ShapeStream. | ||
/** | ||
* Receives batches of `messages`, puts them on a queue and processes | ||
* them asynchronously by passing to a registered callback function. | ||
* | ||
* @constructor | ||
* @param {(messages: Message[]) => void} callback function | ||
*/ | ||
class MessageProcessor<T extends Row = Row> { | ||
private messageQueue: Message<T>[][] = [] | ||
private isProcessing = false | ||
private callback: (messages: Message<T>[]) => void | Promise<void> | ||
export interface ShapeStreamInterface<T extends Row = Row> { | ||
subscribe( | ||
callback: (messages: Message<T>[]) => MaybePromise<void>, | ||
onError?: (error: FetchError | Error) => void | ||
): void | ||
unsubscribeAllUpToDateSubscribers(): void | ||
unsubscribeAll(): void | ||
subscribeOnceToUpToDate( | ||
callback: () => MaybePromise<void>, | ||
error: (err: FetchError | Error) => void | ||
): () => void | ||
constructor(callback: (messages: Message<T>[]) => void | Promise<void>) { | ||
this.callback = callback | ||
} | ||
isLoading(): boolean | ||
lastSyncedAt(): number | undefined | ||
lastSynced(): number | ||
isConnected(): boolean | ||
process(messages: Message<T>[]) { | ||
this.messageQueue.push(messages) | ||
if (!this.isProcessing) { | ||
this.processQueue() | ||
} | ||
} | ||
private async processQueue() { | ||
this.isProcessing = true | ||
while (this.messageQueue.length > 0) { | ||
const messages = this.messageQueue.shift()! | ||
await this.callback(messages) | ||
} | ||
this.isProcessing = false | ||
} | ||
isUpToDate: boolean | ||
shapeId?: string | ||
} | ||
export class FetchError extends Error { | ||
status: number | ||
text?: string | ||
json?: object | ||
headers: Record<string, string> | ||
constructor( | ||
status: number, | ||
text: string | undefined, | ||
json: object | undefined, | ||
headers: Record<string, string>, | ||
public url: string, | ||
message?: string | ||
) { | ||
super( | ||
message || | ||
`HTTP Error ${status} at ${url}: ${text ?? JSON.stringify(json)}` | ||
) | ||
this.name = `FetchError` | ||
this.status = status | ||
this.text = text | ||
this.json = json | ||
this.headers = headers | ||
} | ||
static async fromResponse( | ||
response: Response, | ||
url: string | ||
): Promise<FetchError> { | ||
const status = response.status | ||
const headers = Object.fromEntries([...response.headers.entries()]) | ||
let text: string | undefined = undefined | ||
let json: object | undefined = undefined | ||
const contentType = response.headers.get(`content-type`) | ||
if (contentType && contentType.includes(`application/json`)) { | ||
json = (await response.json()) as object | ||
} else { | ||
text = await response.text() | ||
} | ||
return new FetchError(status, text, json, headers, url) | ||
} | ||
} | ||
/** | ||
@@ -173,13 +111,19 @@ * Reads updates to a shape from Electric using HTTP requests and long polling. Notifies subscribers | ||
*/ | ||
export class ShapeStream<T extends Row = Row> { | ||
private options: ShapeStreamOptions | ||
private backoffOptions: BackoffOptions | ||
private fetchClient: typeof fetch | ||
private schema?: Schema | ||
private subscribers = new Map< | ||
export class ShapeStream<T extends Row = Row> | ||
implements ShapeStreamInterface<T> | ||
{ | ||
readonly options: ShapeStreamOptions | ||
readonly #fetchClient: typeof fetch | ||
readonly #messageParser: MessageParser<T> | ||
readonly #subscribers = new Map< | ||
number, | ||
[MessageProcessor<T>, ((error: Error) => void) | undefined] | ||
[ | ||
MessageProcessorInterface<Message<T>[]>, | ||
((error: Error) => void) | undefined, | ||
] | ||
>() | ||
private upToDateSubscribers = new Map< | ||
readonly #upToDateSubscribers = new Map< | ||
number, | ||
@@ -189,21 +133,27 @@ [() => void, (error: FetchError | Error) => void] | ||
private lastOffset: Offset | ||
private messageParser: MessageParser<T> | ||
private lastSyncedAt?: number // unix time | ||
public isUpToDate: boolean = false | ||
private connected: boolean = false | ||
#lastOffset: Offset | ||
#lastSyncedAt?: number // unix time | ||
#isUpToDate: boolean = false | ||
#connected: boolean = false | ||
#shapeId?: string | ||
#schema?: Schema | ||
shapeId?: string | ||
constructor(options: ShapeStreamOptions) { | ||
this.validateOptions(options) | ||
validateOptions(options) | ||
this.options = { subscribe: true, ...options } | ||
this.lastOffset = this.options.offset ?? `-1` | ||
this.shapeId = this.options.shapeId | ||
this.messageParser = new MessageParser<T>(options.parser) | ||
this.#lastOffset = this.options.offset ?? `-1` | ||
this.#shapeId = this.options.shapeId | ||
this.#messageParser = new MessageParser<T>(options.parser) | ||
this.backoffOptions = options.backoffOptions ?? BackoffDefaults | ||
this.fetchClient = | ||
this.#fetchClient = createFetchWithBackoff( | ||
options.fetchClient ?? | ||
((...args: Parameters<typeof fetch>) => fetch(...args)) | ||
((...args: Parameters<typeof fetch>) => fetch(...args)), | ||
{ | ||
...(options.backoffOptions ?? BackoffDefaults), | ||
onFailedAttempt: () => { | ||
this.#connected = false | ||
options.backoffOptions?.onFailedAttempt?.() | ||
}, | ||
} | ||
) | ||
@@ -213,4 +163,12 @@ this.start() | ||
get shapeId() { | ||
return this.#shapeId | ||
} | ||
get isUpToDate() { | ||
return this.#isUpToDate | ||
} | ||
async start() { | ||
this.isUpToDate = false | ||
this.#isUpToDate = false | ||
@@ -220,23 +178,25 @@ const { url, where, signal } = this.options | ||
try { | ||
while ((!signal?.aborted && !this.isUpToDate) || this.options.subscribe) { | ||
while ( | ||
(!signal?.aborted && !this.#isUpToDate) || | ||
this.options.subscribe | ||
) { | ||
const fetchUrl = new URL(url) | ||
if (where) fetchUrl.searchParams.set(`where`, where) | ||
fetchUrl.searchParams.set(`offset`, this.lastOffset) | ||
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where) | ||
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) | ||
if (this.isUpToDate) { | ||
fetchUrl.searchParams.set(`live`, `true`) | ||
if (this.#isUpToDate) { | ||
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) | ||
} | ||
if (this.shapeId) { | ||
if (this.#shapeId) { | ||
// This should probably be a header for better cache breaking? | ||
fetchUrl.searchParams.set(`shape_id`, this.shapeId!) | ||
fetchUrl.searchParams.set(SHAPE_ID_QUERY_PARAM, this.#shapeId!) | ||
} | ||
let response!: Response | ||
try { | ||
const maybeResponse = await this.fetchWithBackoff(fetchUrl) | ||
if (maybeResponse) response = maybeResponse | ||
else break | ||
response = await this.#fetchClient(fetchUrl.toString(), { signal }) | ||
this.#connected = true | ||
} catch (e) { | ||
if (e instanceof FetchBackoffAbortError) break // interrupted | ||
if (!(e instanceof FetchError)) throw e // should never happen | ||
@@ -246,4 +206,4 @@ if (e.status == 400) { | ||
// We should start from scratch, this will force the shape to be recreated. | ||
this.reset() | ||
this.publish(e.json as Message<T>[]) | ||
this.#reset() | ||
this.#publish(e.json as Message<T>[]) | ||
continue | ||
@@ -253,10 +213,10 @@ } else if (e.status == 409) { | ||
// with the newly provided shape ID | ||
const newShapeId = e.headers[`x-electric-shape-id`] | ||
this.reset(newShapeId) | ||
this.publish(e.json as Message<T>[]) | ||
const newShapeId = e.headers[SHAPE_ID_HEADER] | ||
this.#reset(newShapeId) | ||
this.#publish(e.json as Message<T>[]) | ||
continue | ||
} else if (e.status >= 400 && e.status < 500) { | ||
// Notify subscribers | ||
this.sendErrorToUpToDateSubscribers(e) | ||
this.sendErrorToSubscribers(e) | ||
this.#sendErrorToUpToDateSubscribers(e) | ||
this.#sendErrorToSubscribers(e) | ||
@@ -269,17 +229,17 @@ // 400 errors are not actionable without additional user input, so we're throwing them. | ||
const { headers, status } = response | ||
const shapeId = headers.get(`X-Electric-Shape-Id`) | ||
const shapeId = headers.get(SHAPE_ID_HEADER) | ||
if (shapeId) { | ||
this.shapeId = shapeId | ||
this.#shapeId = shapeId | ||
} | ||
const lastOffset = headers.get(`X-Electric-Chunk-Last-Offset`) | ||
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER) | ||
if (lastOffset) { | ||
this.lastOffset = lastOffset as Offset | ||
this.#lastOffset = lastOffset as Offset | ||
} | ||
const getSchema = (): Schema => { | ||
const schemaHeader = headers.get(`X-Electric-Schema`) | ||
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) | ||
return schemaHeader ? JSON.parse(schemaHeader) : {} | ||
} | ||
this.schema = this.schema ?? getSchema() | ||
this.#schema = this.#schema ?? getSchema() | ||
@@ -290,6 +250,6 @@ const messages = status === 204 ? `[]` : await response.text() | ||
// There's no content so we are live and up to date | ||
this.lastSyncedAt = Date.now() | ||
this.#lastSyncedAt = Date.now() | ||
} | ||
const batch = this.messageParser.parse(messages, this.schema) | ||
const batch = this.#messageParser.parse(messages, this.#schema) | ||
@@ -299,18 +259,15 @@ // Update isUpToDate | ||
const lastMessage = batch[batch.length - 1] | ||
if ( | ||
isControlMessage(lastMessage) && | ||
lastMessage.headers.control === `up-to-date` | ||
) { | ||
this.lastSyncedAt = Date.now() | ||
if (!this.isUpToDate) { | ||
this.isUpToDate = true | ||
this.notifyUpToDateSubscribers() | ||
if (isUpToDateMessage(lastMessage)) { | ||
this.#lastSyncedAt = Date.now() | ||
if (!this.#isUpToDate) { | ||
this.#isUpToDate = true | ||
this.#notifyUpToDateSubscribers() | ||
} | ||
} | ||
this.publish(batch) | ||
this.#publish(batch) | ||
} | ||
} | ||
} finally { | ||
this.connected = false | ||
this.#connected = false | ||
} | ||
@@ -320,3 +277,3 @@ } | ||
subscribe( | ||
callback: (messages: Message<T>[]) => void | Promise<void>, | ||
callback: (messages: Message<T>[]) => MaybePromise<void>, | ||
onError?: (error: FetchError | Error) => void | ||
@@ -327,6 +284,6 @@ ) { | ||
this.subscribers.set(subscriptionId, [subscriber, onError]) | ||
this.#subscribers.set(subscriptionId, [subscriber, onError]) | ||
return () => { | ||
this.subscribers.delete(subscriptionId) | ||
this.#subscribers.delete(subscriptionId) | ||
} | ||
@@ -336,19 +293,7 @@ } | ||
unsubscribeAll(): void { | ||
this.subscribers.clear() | ||
this.#subscribers.clear() | ||
} | ||
private publish(messages: Message<T>[]) { | ||
this.subscribers.forEach(([subscriber, _]) => { | ||
subscriber.process(messages) | ||
}) | ||
} | ||
private sendErrorToSubscribers(error: Error) { | ||
this.subscribers.forEach(([_, errorFn]) => { | ||
errorFn?.(error) | ||
}) | ||
} | ||
subscribeOnceToUpToDate( | ||
callback: () => void | Promise<void>, | ||
callback: () => MaybePromise<void>, | ||
error: (err: FetchError | Error) => void | ||
@@ -358,6 +303,6 @@ ) { | ||
this.upToDateSubscribers.set(subscriptionId, [callback, error]) | ||
this.#upToDateSubscribers.set(subscriptionId, [callback, error]) | ||
return () => { | ||
this.upToDateSubscribers.delete(subscriptionId) | ||
this.#upToDateSubscribers.delete(subscriptionId) | ||
} | ||
@@ -367,13 +312,19 @@ } | ||
unsubscribeAllUpToDateSubscribers(): void { | ||
this.upToDateSubscribers.clear() | ||
this.#upToDateSubscribers.clear() | ||
} | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
lastSyncedAt(): number | undefined { | ||
return this.#lastSyncedAt | ||
} | ||
/** Time elapsed since last sync (in ms). Infinity if we did not yet sync. */ | ||
lastSynced(): number { | ||
if (this.lastSyncedAt === undefined) return Infinity | ||
return Date.now() - this.lastSyncedAt | ||
if (this.#lastSyncedAt === undefined) return Infinity | ||
return Date.now() - this.#lastSyncedAt | ||
} | ||
/** Indicates if we are connected to the Electric sync service. */ | ||
isConnected(): boolean { | ||
return this.connected | ||
return this.#connected | ||
} | ||
@@ -386,4 +337,16 @@ | ||
private notifyUpToDateSubscribers() { | ||
this.upToDateSubscribers.forEach(([callback]) => { | ||
#publish(messages: Message<T>[]) { | ||
this.#subscribers.forEach(([subscriber, _]) => { | ||
subscriber.process(messages) | ||
}) | ||
} | ||
#sendErrorToSubscribers(error: Error) { | ||
this.#subscribers.forEach(([_, errorFn]) => { | ||
errorFn?.(error) | ||
}) | ||
} | ||
#notifyUpToDateSubscribers() { | ||
this.#upToDateSubscribers.forEach(([callback]) => { | ||
callback() | ||
@@ -393,5 +356,4 @@ }) | ||
private sendErrorToUpToDateSubscribers(error: FetchError | Error) { | ||
// eslint-disable-next-line | ||
this.upToDateSubscribers.forEach(([_, errorCallback]) => | ||
#sendErrorToUpToDateSubscribers(error: FetchError | Error) { | ||
this.#upToDateSubscribers.forEach(([_, errorCallback]) => | ||
errorCallback(error) | ||
@@ -405,246 +367,31 @@ ) | ||
*/ | ||
private reset(shapeId?: string) { | ||
this.lastOffset = `-1` | ||
this.shapeId = shapeId | ||
this.isUpToDate = false | ||
this.connected = false | ||
this.schema = undefined | ||
#reset(shapeId?: string) { | ||
this.#lastOffset = `-1` | ||
this.#shapeId = shapeId | ||
this.#isUpToDate = false | ||
this.#connected = false | ||
this.#schema = undefined | ||
} | ||
} | ||
private validateOptions(options: ShapeStreamOptions): void { | ||
if (!options.url) { | ||
throw new Error(`Invalid shape option. It must provide the url`) | ||
} | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
) | ||
} | ||
if ( | ||
options.offset !== undefined && | ||
options.offset !== `-1` && | ||
!options.shapeId | ||
) { | ||
throw new Error( | ||
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)` | ||
) | ||
} | ||
function validateOptions(options: Partial<ShapeStreamOptions>): void { | ||
if (!options.url) { | ||
throw new Error(`Invalid shape option. It must provide the url`) | ||
} | ||
private async fetchWithBackoff(url: URL) { | ||
const { initialDelay, maxDelay, multiplier } = this.backoffOptions | ||
const signal = this.options.signal | ||
let delay = initialDelay | ||
let attempt = 0 | ||
// eslint-disable-next-line no-constant-condition -- we're retrying with a lag until we get a non-500 response or the abort signal is triggered | ||
while (true) { | ||
try { | ||
const result = await this.fetchClient(url.toString(), { signal }) | ||
if (result.ok) { | ||
if (this.options.subscribe) { | ||
this.connected = true | ||
} | ||
return result | ||
} else throw await FetchError.fromResponse(result, url.toString()) | ||
} catch (e) { | ||
this.connected = false | ||
if (signal?.aborted) { | ||
return undefined | ||
} else if ( | ||
e instanceof FetchError && | ||
e.status >= 400 && | ||
e.status < 500 | ||
) { | ||
// Any client errors cannot be backed off on, leave it to the caller to handle. | ||
throw e | ||
} else { | ||
// Exponentially backoff on errors. | ||
// Wait for the current delay duration | ||
await new Promise((resolve) => setTimeout(resolve, delay)) | ||
// Increase the delay for the next attempt | ||
delay = Math.min(delay * multiplier, maxDelay) | ||
attempt++ | ||
console.log(`Retry attempt #${attempt} after ${delay}ms`) | ||
} | ||
} | ||
} | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
) | ||
} | ||
} | ||
/** | ||
* A Shape is an object that subscribes to a shape log, | ||
* keeps a materialised shape `.value` in memory and | ||
* notifies subscribers when the value has changed. | ||
* | ||
* It can be used without a framework and as a primitive | ||
* to simplify developing framework hooks. | ||
* | ||
* @constructor | ||
* @param {ShapeStream<T extends Row>} - the underlying shape stream | ||
* @example | ||
* ``` | ||
* const shapeStream = new ShapeStream<{ foo: number }>(url: 'http://localhost:3000/v1/shape/foo'}) | ||
* const shape = new Shape(shapeStream) | ||
* ``` | ||
* | ||
* `value` returns a promise that resolves the Shape data once the Shape has been | ||
* fully loaded (and when resuming from being offline): | ||
* | ||
* const value = await shape.value | ||
* | ||
* `valueSync` returns the current data synchronously: | ||
* | ||
* const value = shape.valueSync | ||
* | ||
* Subscribe to updates. Called whenever the shape updates in Postgres. | ||
* | ||
* shape.subscribe(shapeData => { | ||
* console.log(shapeData) | ||
* }) | ||
*/ | ||
export class Shape<T extends Row = Row> { | ||
private stream: ShapeStream<T> | ||
private data: ShapeData<T> = new Map() | ||
private subscribers = new Map<number, ShapeChangedCallback<T>>() | ||
public error: FetchError | false = false | ||
private hasNotifiedSubscribersUpToDate: boolean = false | ||
constructor(stream: ShapeStream<T>) { | ||
this.stream = stream | ||
this.stream.subscribe(this.process.bind(this), this.handleError.bind(this)) | ||
const unsubscribe = this.stream.subscribeOnceToUpToDate( | ||
() => { | ||
unsubscribe() | ||
}, | ||
(e) => { | ||
this.handleError(e) | ||
throw e | ||
} | ||
if ( | ||
options.offset !== undefined && | ||
options.offset !== `-1` && | ||
!options.shapeId | ||
) { | ||
throw new Error( | ||
`shapeId is required if this isn't an initial fetch (i.e. offset > -1)` | ||
) | ||
} | ||
lastSynced(): number { | ||
return this.stream.lastSynced() | ||
} | ||
isConnected(): boolean { | ||
return this.stream.isConnected() | ||
} | ||
/** True during initial fetch. False afterwise. */ | ||
isLoading(): boolean { | ||
return this.stream.isLoading() | ||
} | ||
get value(): Promise<ShapeData<T>> { | ||
return new Promise((resolve) => { | ||
if (this.stream.isUpToDate) { | ||
resolve(this.valueSync) | ||
} else { | ||
const unsubscribe = this.stream.subscribeOnceToUpToDate( | ||
() => { | ||
unsubscribe() | ||
resolve(this.valueSync) | ||
}, | ||
(e) => { | ||
throw e | ||
} | ||
) | ||
} | ||
}) | ||
} | ||
get valueSync() { | ||
return this.data | ||
} | ||
subscribe(callback: ShapeChangedCallback<T>): () => void { | ||
const subscriptionId = Math.random() | ||
this.subscribers.set(subscriptionId, callback) | ||
return () => { | ||
this.subscribers.delete(subscriptionId) | ||
} | ||
} | ||
unsubscribeAll(): void { | ||
this.subscribers.clear() | ||
} | ||
get numSubscribers() { | ||
return this.subscribers.size | ||
} | ||
private process(messages: Message<T>[]): void { | ||
let dataMayHaveChanged = false | ||
let isUpToDate = false | ||
let newlyUpToDate = false | ||
messages.forEach((message) => { | ||
if (isChangeMessage(message)) { | ||
dataMayHaveChanged = [`insert`, `update`, `delete`].includes( | ||
message.headers.operation | ||
) | ||
switch (message.headers.operation) { | ||
case `insert`: | ||
this.data.set(message.key, message.value) | ||
break | ||
case `update`: | ||
this.data.set(message.key, { | ||
...this.data.get(message.key)!, | ||
...message.value, | ||
}) | ||
break | ||
case `delete`: | ||
this.data.delete(message.key) | ||
break | ||
} | ||
} | ||
if (isControlMessage(message)) { | ||
switch (message.headers.control) { | ||
case `up-to-date`: | ||
isUpToDate = true | ||
if (!this.hasNotifiedSubscribersUpToDate) { | ||
newlyUpToDate = true | ||
} | ||
break | ||
case `must-refetch`: | ||
this.data.clear() | ||
this.error = false | ||
isUpToDate = false | ||
newlyUpToDate = false | ||
break | ||
} | ||
} | ||
}) | ||
// Always notify subscribers when the Shape first is up to date. | ||
// FIXME this would be cleaner with a simple state machine. | ||
if (newlyUpToDate || (isUpToDate && dataMayHaveChanged)) { | ||
this.hasNotifiedSubscribersUpToDate = true | ||
this.notify() | ||
} | ||
} | ||
private handleError(e: Error): void { | ||
if (e instanceof FetchError) { | ||
this.error = e | ||
this.notify() | ||
} | ||
} | ||
private notify(): void { | ||
this.subscribers.forEach((callback) => { | ||
callback(this.valueSync) | ||
}) | ||
} | ||
return | ||
} |
@@ -48,1 +48,7 @@ import { ChangeMessage, ControlMessage, Message, Row } from './types' | ||
} | ||
export function isUpToDateMessage<T extends Row = Row>( | ||
message: Message<T> | ||
): message is ControlMessage & { up_to_date: true } { | ||
return isControlMessage(message) && message.headers.control === `up-to-date` | ||
} |
export * from './client' | ||
export * from './shape' | ||
export * from './types' | ||
export * from './helpers' | ||
export { isChangeMessage, isControlMessage } from './helpers' | ||
export { FetchError } from './error' | ||
export { type BackoffOptions, BackoffDefaults } from './fetch' |
@@ -111,1 +111,3 @@ export type Value = | ||
} | ||
export type MaybePromise<T> = T | Promise<T> |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
302861
22
3299