@electric-sql/client
Advanced tools
Comparing version 0.7.3 to 0.8.0
@@ -133,2 +133,7 @@ /** | ||
type ParamsRecord = Omit<Record<string, string>, ReservedParamKeys>; | ||
type RetryOpts = { | ||
params?: ParamsRecord; | ||
headers?: Record<string, string>; | ||
}; | ||
type ShapeStreamErrorHandler = (error: Error) => void | RetryOpts | Promise<void | RetryOpts>; | ||
/** | ||
@@ -185,4 +190,3 @@ * Options for constructing a ShapeStream. | ||
*/ | ||
shapeHandle?: string; | ||
backoffOptions?: BackoffOptions; | ||
handle?: string; | ||
/** | ||
@@ -207,9 +211,16 @@ * HTTP headers to attach to requests made by the client. | ||
fetchClient?: typeof fetch; | ||
backoffOptions?: BackoffOptions; | ||
parser?: Parser<T>; | ||
/** | ||
* A function for handling shapestream errors. | ||
* This is optional, when it is not provided any shapestream errors will be thrown. | ||
* If the function is provided and returns an object containing parameters and/or headers | ||
* the shapestream will apply those changes and try syncing again. | ||
* If the function returns void the shapestream is stopped. | ||
*/ | ||
onError?: ShapeStreamErrorHandler; | ||
} | ||
interface ShapeStreamInterface<T extends Row<unknown> = Row> { | ||
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): void; | ||
unsubscribeAllUpToDateSubscribers(): void; | ||
unsubscribeAll(): void; | ||
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void; | ||
isLoading(): boolean; | ||
@@ -222,2 +233,3 @@ lastSyncedAt(): number | undefined; | ||
shapeHandle?: string; | ||
error?: unknown; | ||
} | ||
@@ -263,10 +275,7 @@ /** | ||
get shapeHandle(): string; | ||
get error(): unknown; | ||
get isUpToDate(): boolean; | ||
get lastOffset(): Offset; | ||
get error(): unknown; | ||
start(): Promise<void>; | ||
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): () => void; | ||
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: Error) => void): () => void; | ||
unsubscribeAll(): void; | ||
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void; | ||
unsubscribeAllUpToDateSubscribers(): void; | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
@@ -323,2 +332,3 @@ lastSyncedAt(): number | undefined; | ||
get lastOffset(): Offset; | ||
get handle(): string | undefined; | ||
get rows(): Promise<T[]>; | ||
@@ -325,0 +335,0 @@ get currentRows(): T[]; |
@@ -41,2 +41,85 @@ var __defProp = Object.defineProperty; | ||
// src/error.ts | ||
var FetchError = class _FetchError extends Error { | ||
constructor(status, text, json, headers, url, message) { | ||
super( | ||
message || `HTTP Error ${status} at ${url}: ${text != null ? text : JSON.stringify(json)}` | ||
); | ||
this.url = url; | ||
this.name = `FetchError`; | ||
this.status = status; | ||
this.text = text; | ||
this.json = json; | ||
this.headers = headers; | ||
} | ||
static async fromResponse(response, url) { | ||
const status = response.status; | ||
const headers = Object.fromEntries([...response.headers.entries()]); | ||
let text = void 0; | ||
let json = void 0; | ||
const contentType = response.headers.get(`content-type`); | ||
if (contentType && contentType.includes(`application/json`)) { | ||
json = await response.json(); | ||
} else { | ||
text = await response.text(); | ||
} | ||
return new _FetchError(status, text, json, headers, url); | ||
} | ||
}; | ||
var FetchBackoffAbortError = class extends Error { | ||
constructor() { | ||
super(`Fetch with backoff aborted`); | ||
this.name = `FetchBackoffAbortError`; | ||
} | ||
}; | ||
var MissingShapeUrlError = class extends Error { | ||
constructor() { | ||
super(`Invalid shape options: missing required url parameter`); | ||
this.name = `MissingShapeUrlError`; | ||
} | ||
}; | ||
var InvalidSignalError = class extends Error { | ||
constructor() { | ||
super(`Invalid signal option. It must be an instance of AbortSignal.`); | ||
this.name = `InvalidSignalError`; | ||
} | ||
}; | ||
var MissingShapeHandleError = class extends Error { | ||
constructor() { | ||
super( | ||
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)` | ||
); | ||
this.name = `MissingShapeHandleError`; | ||
} | ||
}; | ||
var ReservedParamError = class extends Error { | ||
constructor(reservedParams) { | ||
super( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
); | ||
this.name = `ReservedParamError`; | ||
} | ||
}; | ||
var ParserNullValueError = class extends Error { | ||
constructor(columnName) { | ||
super(`Column "${columnName != null ? columnName : `unknown`}" does not allow NULL values`); | ||
this.name = `ParserNullValueError`; | ||
} | ||
}; | ||
var MissingHeadersError = class extends Error { | ||
constructor(url, missingHeaders) { | ||
let msg = `The response for the shape request to ${url} didn't include the following required headers: | ||
`; | ||
missingHeaders.forEach((h) => { | ||
msg += `- ${h} | ||
`; | ||
}); | ||
msg += ` | ||
This is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`; | ||
msg += ` | ||
For more information visit the troubleshooting guide: /docs/guides/troubleshooting/missing-headers`; | ||
super(msg); | ||
} | ||
}; | ||
// src/parser.ts | ||
@@ -143,3 +226,3 @@ var parseNumber = (value) => Number(value); | ||
if (!isNullable) { | ||
throw new Error(`Column ${columnName != null ? columnName : `unknown`} is not nullable`); | ||
throw new ParserNullValueError(columnName != null ? columnName : `unknown`); | ||
} | ||
@@ -166,48 +249,2 @@ return null; | ||
// src/error.ts | ||
var FetchError = class _FetchError extends Error { | ||
constructor(status, text, json, headers, url, message) { | ||
super( | ||
message || `HTTP Error ${status} at ${url}: ${text != null ? text : JSON.stringify(json)}` | ||
); | ||
this.url = url; | ||
this.name = `FetchError`; | ||
this.status = status; | ||
this.text = text; | ||
this.json = json; | ||
this.headers = headers; | ||
} | ||
static async fromResponse(response, url) { | ||
const status = response.status; | ||
const headers = Object.fromEntries([...response.headers.entries()]); | ||
let text = void 0; | ||
let json = void 0; | ||
const contentType = response.headers.get(`content-type`); | ||
if (contentType && contentType.includes(`application/json`)) { | ||
json = await response.json(); | ||
} else { | ||
text = await response.text(); | ||
} | ||
return new _FetchError(status, text, json, headers, url); | ||
} | ||
}; | ||
var FetchBackoffAbortError = class extends Error { | ||
constructor() { | ||
super(`Fetch with backoff aborted`); | ||
} | ||
}; | ||
var MissingHeadersError = class extends Error { | ||
constructor(url, missingHeaders) { | ||
let msg = `The response for the shape request to ${url} didn't include the following required headers: | ||
`; | ||
missingHeaders.forEach((h) => { | ||
msg += `- ${h} | ||
`; | ||
}); | ||
msg += ` | ||
This is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`; | ||
super(msg); | ||
} | ||
}; | ||
// src/constants.ts | ||
@@ -312,3 +349,3 @@ var LIVE_CACHE_BUSTER_HEADER = `electric-cursor`; | ||
const missingHeaders = []; | ||
const addMissingHeaders = (requiredHeaders) => requiredHeaders.filter((h) => !headers.has(h)); | ||
const addMissingHeaders = (requiredHeaders) => missingHeaders.push(...requiredHeaders.filter((h) => !headers.has(h))); | ||
addMissingHeaders(requiredElectricResponseHeaders); | ||
@@ -430,10 +467,10 @@ const input = args[0]; | ||
]); | ||
var _fetchClient2, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _liveCacheBuster, _lastSyncedAt, _isUpToDate, _connected, _shapeHandle, _databaseId, _schema, _error, _replica, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn; | ||
var _error, _fetchClient2, _messageParser, _subscribers, _lastOffset, _liveCacheBuster, _lastSyncedAt, _isUpToDate, _connected, _shapeHandle, _databaseId, _schema, _onError, _replica, _ShapeStream_instances, start_fn, publish_fn, sendErrorToSubscribers_fn, reset_fn; | ||
var _ShapeStream = class _ShapeStream { | ||
constructor(options) { | ||
__privateAdd(this, _ShapeStream_instances); | ||
__privateAdd(this, _error, null); | ||
__privateAdd(this, _fetchClient2); | ||
__privateAdd(this, _messageParser); | ||
__privateAdd(this, _subscribers, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _upToDateSubscribers, /* @__PURE__ */ new Map()); | ||
__privateAdd(this, _lastOffset); | ||
@@ -449,13 +486,14 @@ __privateAdd(this, _liveCacheBuster); | ||
__privateAdd(this, _schema); | ||
__privateAdd(this, _error); | ||
__privateAdd(this, _onError); | ||
__privateAdd(this, _replica); | ||
var _a, _b, _c; | ||
validateOptions(options); | ||
this.options = __spreadValues({ subscribe: true }, options); | ||
validateOptions(this.options); | ||
__privateSet(this, _lastOffset, (_a = this.options.offset) != null ? _a : `-1`); | ||
__privateSet(this, _liveCacheBuster, ``); | ||
__privateSet(this, _shapeHandle, this.options.shapeHandle); | ||
__privateSet(this, _shapeHandle, this.options.handle); | ||
__privateSet(this, _databaseId, this.options.databaseId); | ||
__privateSet(this, _messageParser, new MessageParser(options.parser)); | ||
__privateSet(this, _replica, this.options.replica); | ||
__privateSet(this, _onError, this.options.onError); | ||
const baseFetchClient = (_b = options.fetchClient) != null ? _b : (...args) => fetch(...args); | ||
@@ -472,3 +510,3 @@ const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, __spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), { | ||
)); | ||
this.start(); | ||
__privateMethod(this, _ShapeStream_instances, start_fn).call(this); | ||
} | ||
@@ -478,2 +516,5 @@ get shapeHandle() { | ||
} | ||
get error() { | ||
return __privateGet(this, _error); | ||
} | ||
get isUpToDate() { | ||
@@ -485,114 +526,4 @@ return __privateGet(this, _isUpToDate); | ||
} | ||
get error() { | ||
return __privateGet(this, _error); | ||
} | ||
async start() { | ||
var _a, _b; | ||
__privateSet(this, _isUpToDate, false); | ||
const { url, table, where, columns, signal } = this.options; | ||
try { | ||
while (!(signal == null ? void 0 : signal.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) { | ||
const fetchUrl = new URL(url); | ||
if (this.options.params) { | ||
const reservedParams = Object.keys(this.options.params).filter( | ||
(key) => RESERVED_PARAMS.has(key) | ||
); | ||
if (reservedParams.length > 0) { | ||
throw new Error( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
); | ||
} | ||
for (const [key, value] of Object.entries(this.options.params)) { | ||
fetchUrl.searchParams.set(key, value); | ||
} | ||
} | ||
if (table) fetchUrl.searchParams.set(TABLE_QUERY_PARAM, table); | ||
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where); | ||
if (columns && columns.length > 0) | ||
fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`)); | ||
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset)); | ||
if (__privateGet(this, _isUpToDate)) { | ||
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`); | ||
fetchUrl.searchParams.set( | ||
LIVE_CACHE_BUSTER_QUERY_PARAM, | ||
__privateGet(this, _liveCacheBuster) | ||
); | ||
} | ||
if (__privateGet(this, _shapeHandle)) { | ||
fetchUrl.searchParams.set( | ||
SHAPE_HANDLE_QUERY_PARAM, | ||
__privateGet(this, _shapeHandle) | ||
); | ||
} | ||
if (__privateGet(this, _databaseId)) { | ||
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, __privateGet(this, _databaseId)); | ||
} | ||
if (((_a = __privateGet(this, _replica)) != null ? _a : _ShapeStream.Replica.DEFAULT) != _ShapeStream.Replica.DEFAULT) { | ||
fetchUrl.searchParams.set(REPLICA_PARAM, __privateGet(this, _replica)); | ||
} | ||
let response; | ||
try { | ||
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), { | ||
signal, | ||
headers: this.options.headers | ||
}); | ||
__privateSet(this, _connected, true); | ||
} catch (e) { | ||
if (e instanceof FetchBackoffAbortError) break; | ||
if (e instanceof MissingHeadersError) throw e; | ||
if (!(e instanceof FetchError)) throw e; | ||
if (e.status == 409) { | ||
const newShapeHandle = e.headers[SHAPE_HANDLE_HEADER]; | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeHandle); | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
} else if (e.status >= 400 && e.status < 500) { | ||
__privateMethod(this, _ShapeStream_instances, sendErrorToUpToDateSubscribers_fn).call(this, e); | ||
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e); | ||
throw e; | ||
} | ||
} | ||
const { headers, status } = response; | ||
const shapeHandle = headers.get(SHAPE_HANDLE_HEADER); | ||
if (shapeHandle) { | ||
__privateSet(this, _shapeHandle, shapeHandle); | ||
} | ||
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER); | ||
if (lastOffset) { | ||
__privateSet(this, _lastOffset, lastOffset); | ||
} | ||
const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER); | ||
if (liveCacheBuster) { | ||
__privateSet(this, _liveCacheBuster, liveCacheBuster); | ||
} | ||
const getSchema = () => { | ||
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER); | ||
return schemaHeader ? JSON.parse(schemaHeader) : {}; | ||
}; | ||
__privateSet(this, _schema, (_b = __privateGet(this, _schema)) != null ? _b : getSchema()); | ||
const messages = status === 204 ? `[]` : await response.text(); | ||
if (status === 204) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
} | ||
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema)); | ||
if (batch.length > 0) { | ||
const prevUpToDate = __privateGet(this, _isUpToDate); | ||
const lastMessage = batch[batch.length - 1]; | ||
if (isUpToDateMessage(lastMessage)) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
__privateSet(this, _isUpToDate, true); | ||
} | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch); | ||
if (!prevUpToDate && __privateGet(this, _isUpToDate)) { | ||
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this); | ||
} | ||
} | ||
} | ||
} catch (err) { | ||
__privateSet(this, _error, err); | ||
} finally { | ||
__privateSet(this, _connected, false); | ||
} | ||
} | ||
subscribe(callback, onError) { | ||
subscribe(callback, onError = () => { | ||
}) { | ||
const subscriptionId = Math.random(); | ||
@@ -607,12 +538,2 @@ __privateGet(this, _subscribers).set(subscriptionId, [callback, onError]); | ||
} | ||
subscribeOnceToUpToDate(callback, error) { | ||
const subscriptionId = Math.random(); | ||
__privateGet(this, _upToDateSubscribers).set(subscriptionId, [callback, error]); | ||
return () => { | ||
__privateGet(this, _upToDateSubscribers).delete(subscriptionId); | ||
}; | ||
} | ||
unsubscribeAllUpToDateSubscribers() { | ||
__privateGet(this, _upToDateSubscribers).clear(); | ||
} | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
@@ -633,9 +554,9 @@ lastSyncedAt() { | ||
isLoading() { | ||
return !this.isUpToDate; | ||
return !__privateGet(this, _isUpToDate); | ||
} | ||
}; | ||
_error = new WeakMap(); | ||
_fetchClient2 = new WeakMap(); | ||
_messageParser = new WeakMap(); | ||
_subscribers = new WeakMap(); | ||
_upToDateSubscribers = new WeakMap(); | ||
_lastOffset = new WeakMap(); | ||
@@ -649,5 +570,121 @@ _liveCacheBuster = new WeakMap(); | ||
_schema = new WeakMap(); | ||
_error = new WeakMap(); | ||
_onError = new WeakMap(); | ||
_replica = new WeakMap(); | ||
_ShapeStream_instances = new WeakSet(); | ||
start_fn = async function() { | ||
var _a, _b, _c; | ||
try { | ||
while (!((_a = this.options.signal) == null ? void 0 : _a.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) { | ||
const { url, table, where, columns, signal } = this.options; | ||
const fetchUrl = new URL(url); | ||
if (this.options.params) { | ||
const reservedParams = Object.keys(this.options.params).filter( | ||
(key) => RESERVED_PARAMS.has(key) | ||
); | ||
if (reservedParams.length > 0) { | ||
throw new Error( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
); | ||
} | ||
for (const [key, value] of Object.entries(this.options.params)) { | ||
fetchUrl.searchParams.set(key, value); | ||
} | ||
} | ||
if (table) fetchUrl.searchParams.set(TABLE_QUERY_PARAM, table); | ||
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where); | ||
if (columns && columns.length > 0) | ||
fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`)); | ||
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset)); | ||
if (__privateGet(this, _isUpToDate)) { | ||
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`); | ||
fetchUrl.searchParams.set( | ||
LIVE_CACHE_BUSTER_QUERY_PARAM, | ||
__privateGet(this, _liveCacheBuster) | ||
); | ||
} | ||
if (__privateGet(this, _shapeHandle)) { | ||
fetchUrl.searchParams.set( | ||
SHAPE_HANDLE_QUERY_PARAM, | ||
__privateGet(this, _shapeHandle) | ||
); | ||
} | ||
if (__privateGet(this, _databaseId)) { | ||
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, __privateGet(this, _databaseId)); | ||
} | ||
if (((_b = __privateGet(this, _replica)) != null ? _b : _ShapeStream.Replica.DEFAULT) != _ShapeStream.Replica.DEFAULT) { | ||
fetchUrl.searchParams.set(REPLICA_PARAM, __privateGet(this, _replica)); | ||
} | ||
let response; | ||
try { | ||
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), { | ||
signal, | ||
headers: this.options.headers | ||
}); | ||
__privateSet(this, _connected, true); | ||
} catch (e) { | ||
if (e instanceof FetchBackoffAbortError) break; | ||
if (!(e instanceof FetchError)) throw e; | ||
if (e.status == 409) { | ||
const newShapeHandle = e.headers[SHAPE_HANDLE_HEADER]; | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeHandle); | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json); | ||
continue; | ||
} else if (e.status >= 400 && e.status < 500) { | ||
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e); | ||
throw e; | ||
} | ||
} | ||
const { headers, status } = response; | ||
const shapeHandle = headers.get(SHAPE_HANDLE_HEADER); | ||
if (shapeHandle) { | ||
__privateSet(this, _shapeHandle, shapeHandle); | ||
} | ||
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER); | ||
if (lastOffset) { | ||
__privateSet(this, _lastOffset, lastOffset); | ||
} | ||
const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER); | ||
if (liveCacheBuster) { | ||
__privateSet(this, _liveCacheBuster, liveCacheBuster); | ||
} | ||
const getSchema = () => { | ||
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER); | ||
return schemaHeader ? JSON.parse(schemaHeader) : {}; | ||
}; | ||
__privateSet(this, _schema, (_c = __privateGet(this, _schema)) != null ? _c : getSchema()); | ||
const messages = status === 204 ? `[]` : await response.text(); | ||
if (status === 204) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
} | ||
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema)); | ||
if (batch.length > 0) { | ||
const lastMessage = batch[batch.length - 1]; | ||
if (isUpToDateMessage(lastMessage)) { | ||
__privateSet(this, _lastSyncedAt, Date.now()); | ||
__privateSet(this, _isUpToDate, true); | ||
} | ||
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch); | ||
} | ||
} | ||
} catch (err) { | ||
__privateSet(this, _error, err); | ||
if (__privateGet(this, _onError)) { | ||
const retryOpts = await __privateGet(this, _onError).call(this, err); | ||
if (typeof retryOpts === `object`) { | ||
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this); | ||
if (`params` in retryOpts) { | ||
this.options.params = retryOpts.params; | ||
} | ||
if (`headers` in retryOpts) { | ||
this.options.headers = retryOpts.headers; | ||
} | ||
__privateMethod(this, _ShapeStream_instances, start_fn).call(this); | ||
} | ||
return; | ||
} | ||
throw err; | ||
} finally { | ||
__privateSet(this, _connected, false); | ||
} | ||
}; | ||
publish_fn = async function(messages) { | ||
@@ -671,12 +708,2 @@ await Promise.all( | ||
}; | ||
notifyUpToDateSubscribers_fn = function() { | ||
__privateGet(this, _upToDateSubscribers).forEach(([callback]) => { | ||
callback(); | ||
}); | ||
}; | ||
sendErrorToUpToDateSubscribers_fn = function(error) { | ||
__privateGet(this, _upToDateSubscribers).forEach( | ||
([_, errorCallback]) => errorCallback(error) | ||
); | ||
}; | ||
/** | ||
@@ -686,6 +713,6 @@ * Resets the state of the stream, optionally with a provided | ||
*/ | ||
reset_fn = function(shapeHandle) { | ||
reset_fn = function(handle) { | ||
__privateSet(this, _lastOffset, `-1`); | ||
__privateSet(this, _liveCacheBuster, ``); | ||
__privateSet(this, _shapeHandle, shapeHandle); | ||
__privateSet(this, _shapeHandle, handle); | ||
__privateSet(this, _isUpToDate, false); | ||
@@ -702,13 +729,9 @@ __privateSet(this, _connected, false); | ||
if (!options.url) { | ||
throw new Error(`Invalid shape options. It must provide the url`); | ||
throw new MissingShapeUrlError(); | ||
} | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
); | ||
throw new InvalidSignalError(); | ||
} | ||
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeHandle) { | ||
throw new Error( | ||
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)` | ||
); | ||
if (options.offset !== void 0 && options.offset !== `-1` && !options.handle) { | ||
throw new MissingShapeHandleError(); | ||
} | ||
@@ -720,5 +743,3 @@ if (options.params) { | ||
if (reservedParams.length > 0) { | ||
throw new Error( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
); | ||
throw new ReservedParamError(reservedParams); | ||
} | ||
@@ -744,11 +765,2 @@ } | ||
); | ||
const unsubscribe = __privateGet(this, _stream).subscribeOnceToUpToDate( | ||
() => { | ||
unsubscribe(); | ||
}, | ||
(e) => { | ||
__privateMethod(this, _Shape_instances, handleError_fn).call(this, e); | ||
throw e; | ||
} | ||
); | ||
} | ||
@@ -761,2 +773,5 @@ get isUpToDate() { | ||
} | ||
get handle() { | ||
return __privateGet(this, _stream).shapeHandle; | ||
} | ||
get rows() { | ||
@@ -763,0 +778,0 @@ return this.value.then((v) => Array.from(v.values())); |
{ | ||
"name": "@electric-sql/client", | ||
"version": "0.7.3", | ||
"version": "0.8.0", | ||
"description": "Postgres everywhere - your data, in sync, wherever you need it.", | ||
@@ -5,0 +5,0 @@ "type": "module", |
@@ -96,2 +96,77 @@ <p align="center"> | ||
See the [Docs](https://electric-sql.com) and [Examples](https://electric-sql.com/examples/basic) for more information. | ||
### Error Handling | ||
The ShapeStream provides two ways to handle errors: | ||
1. Using the `onError` handler: | ||
```typescript | ||
const stream = new ShapeStream({ | ||
url: `${BASE_URL}/v1/shape`, | ||
table: `foo`, | ||
onError: (error) => { | ||
// Handle all stream errors here | ||
console.error('Stream error:', error) | ||
} | ||
}) | ||
``` | ||
If no `onError` handler is provided, the ShapeStream will throw errors that occur during streaming. | ||
2. Individual subscribers can optionally handle errors specific to their subscription: | ||
```typescript | ||
stream.subscribe( | ||
(messages) => { | ||
// Handle messages | ||
}, | ||
(error) => { | ||
// Handle errors for this specific subscription | ||
console.error('Subscription error:', error) | ||
} | ||
) | ||
``` | ||
Common error types include: | ||
- `MissingShapeUrlError`: Missing required URL parameter | ||
- `InvalidSignalError`: Invalid AbortSignal instance | ||
- `ReservedParamError`: Using reserved parameter names | ||
Runtime errors: | ||
- `FetchError`: HTTP errors during shape fetching | ||
- `FetchBackoffAbortError`: Fetch aborted using AbortSignal | ||
- `MissingShapeHandleError`: Missing required shape handle | ||
- `ParserNullValueError`: Parser encountered NULL value in a column that doesn't allow NULL values | ||
See the [typescript client docs on the website](https://electric-sql.com/docs/api/clients/typescript#error-handling) for more details on error handling. | ||
And in general, see the [docs website](https://electric-sql.com) and [examples folder](https://electric-sql.com/examples/basic) for more information. | ||
## Develop | ||
Install the pnpm workspace at the repo root: | ||
```shell | ||
pnpm install | ||
``` | ||
Build the package: | ||
```shell | ||
cd packages/typescript-client | ||
pnpm build | ||
``` | ||
## Test | ||
In one terminal, start the backend running: | ||
```shell | ||
cd ../sync-service | ||
mix deps.get | ||
mix stop_dev && mix compile && mix start_dev && ies -S mix | ||
``` | ||
Then in this folder: | ||
```shell | ||
pnpm test | ||
``` |
@@ -14,3 +14,6 @@ import { | ||
FetchBackoffAbortError, | ||
MissingHeadersError, | ||
MissingShapeUrlError, | ||
InvalidSignalError, | ||
MissingShapeHandleError, | ||
ReservedParamError, | ||
} from './error' | ||
@@ -67,2 +70,10 @@ import { | ||
type RetryOpts = { | ||
params?: ParamsRecord | ||
headers?: Record<string, string> | ||
} | ||
type ShapeStreamErrorHandler = ( | ||
error: Error | ||
) => void | RetryOpts | Promise<void | RetryOpts> | ||
/** | ||
@@ -111,2 +122,3 @@ * Options for constructing a ShapeStream. | ||
replica?: Replica | ||
/** | ||
@@ -121,2 +133,3 @@ * The "offset" on the shape log. This is typically not set as the ShapeStream | ||
offset?: Offset | ||
/** | ||
@@ -126,4 +139,3 @@ * Similar to `offset`, this isn't typically used unless you're maintaining | ||
*/ | ||
shapeHandle?: string | ||
backoffOptions?: BackoffOptions | ||
handle?: string | ||
@@ -149,5 +161,16 @@ /** | ||
subscribe?: boolean | ||
signal?: AbortSignal | ||
fetchClient?: typeof fetch | ||
backoffOptions?: BackoffOptions | ||
parser?: Parser<T> | ||
/** | ||
* A function for handling shapestream errors. | ||
* This is optional, when it is not provided any shapestream errors will be thrown. | ||
* If the function is provided and returns an object containing parameters and/or headers | ||
* the shapestream will apply those changes and try syncing again. | ||
* If the function returns void the shapestream is stopped. | ||
*/ | ||
onError?: ShapeStreamErrorHandler | ||
} | ||
@@ -160,8 +183,3 @@ | ||
): void | ||
unsubscribeAllUpToDateSubscribers(): void | ||
unsubscribeAll(): void | ||
subscribeOnceToUpToDate( | ||
callback: () => MaybePromise<void>, | ||
error: (err: FetchError | Error) => void | ||
): () => void | ||
@@ -176,2 +194,3 @@ isLoading(): boolean | ||
shapeHandle?: string | ||
error?: unknown | ||
} | ||
@@ -219,2 +238,3 @@ | ||
readonly options: ShapeStreamOptions<GetExtensions<T>> | ||
#error: unknown = null | ||
@@ -231,6 +251,2 @@ readonly #fetchClient: typeof fetch | ||
>() | ||
readonly #upToDateSubscribers = new Map< | ||
number, | ||
[() => void, (error: FetchError | Error) => void] | ||
>() | ||
@@ -245,14 +261,15 @@ #lastOffset: Offset | ||
#schema?: Schema | ||
#error?: unknown | ||
#onError?: ShapeStreamErrorHandler | ||
#replica?: Replica | ||
constructor(options: ShapeStreamOptions<GetExtensions<T>>) { | ||
validateOptions(options) | ||
this.options = { subscribe: true, ...options } | ||
validateOptions(this.options) | ||
this.#lastOffset = this.options.offset ?? `-1` | ||
this.#liveCacheBuster = `` | ||
this.#shapeHandle = this.options.shapeHandle | ||
this.#shapeHandle = this.options.handle | ||
this.#databaseId = this.options.databaseId | ||
this.#messageParser = new MessageParser<T>(options.parser) | ||
this.#replica = this.options.replica | ||
this.#onError = this.options.onError | ||
@@ -275,3 +292,3 @@ const baseFetchClient = | ||
this.start() | ||
this.#start() | ||
} | ||
@@ -283,2 +300,6 @@ | ||
get error() { | ||
return this.#error | ||
} | ||
get isUpToDate() { | ||
@@ -292,16 +313,10 @@ return this.#isUpToDate | ||
get error() { | ||
return this.#error | ||
} | ||
async start() { | ||
this.#isUpToDate = false | ||
const { url, table, where, columns, signal } = this.options | ||
async #start() { | ||
try { | ||
while ( | ||
(!signal?.aborted && !this.#isUpToDate) || | ||
(!this.options.signal?.aborted && !this.#isUpToDate) || | ||
this.options.subscribe | ||
) { | ||
const { url, table, where, columns, signal } = this.options | ||
const fetchUrl = new URL(url) | ||
@@ -369,3 +384,2 @@ | ||
if (e instanceof FetchBackoffAbortError) break // interrupted | ||
if (e instanceof MissingHeadersError) throw e | ||
if (!(e instanceof FetchError)) throw e // should never happen | ||
@@ -381,3 +395,2 @@ if (e.status == 409) { | ||
// Notify subscribers | ||
this.#sendErrorToUpToDateSubscribers(e) | ||
this.#sendErrorToSubscribers(e) | ||
@@ -424,3 +437,2 @@ | ||
if (batch.length > 0) { | ||
const prevUpToDate = this.#isUpToDate | ||
const lastMessage = batch[batch.length - 1] | ||
@@ -433,5 +445,2 @@ if (isUpToDateMessage(lastMessage)) { | ||
await this.#publish(batch) | ||
if (!prevUpToDate && this.#isUpToDate) { | ||
this.#notifyUpToDateSubscribers() | ||
} | ||
} | ||
@@ -441,2 +450,23 @@ } | ||
this.#error = err | ||
if (this.#onError) { | ||
const retryOpts = await this.#onError(err as Error) | ||
if (typeof retryOpts === `object`) { | ||
this.#reset() | ||
if (`params` in retryOpts) { | ||
this.options.params = retryOpts.params | ||
} | ||
if (`headers` in retryOpts) { | ||
this.options.headers = retryOpts.headers | ||
} | ||
// Restart | ||
this.#start() | ||
} | ||
return | ||
} | ||
// If no handler is provided for errors just throw so the error still bubbles up. | ||
throw err | ||
} finally { | ||
@@ -449,3 +479,3 @@ this.#connected = false | ||
callback: (messages: Message<T>[]) => MaybePromise<void>, | ||
onError?: (error: FetchError | Error) => void | ||
onError: (error: Error) => void = () => {} | ||
) { | ||
@@ -465,19 +495,2 @@ const subscriptionId = Math.random() | ||
subscribeOnceToUpToDate( | ||
callback: () => MaybePromise<void>, | ||
error: (err: FetchError | Error) => void | ||
) { | ||
const subscriptionId = Math.random() | ||
this.#upToDateSubscribers.set(subscriptionId, [callback, error]) | ||
return () => { | ||
this.#upToDateSubscribers.delete(subscriptionId) | ||
} | ||
} | ||
unsubscribeAllUpToDateSubscribers(): void { | ||
this.#upToDateSubscribers.clear() | ||
} | ||
/** Unix time at which we last synced. Undefined when `isLoading` is true. */ | ||
@@ -501,3 +514,3 @@ lastSyncedAt(): number | undefined { | ||
isLoading(): boolean { | ||
return !this.isUpToDate | ||
return !this.#isUpToDate | ||
} | ||
@@ -525,14 +538,2 @@ | ||
#notifyUpToDateSubscribers() { | ||
this.#upToDateSubscribers.forEach(([callback]) => { | ||
callback() | ||
}) | ||
} | ||
#sendErrorToUpToDateSubscribers(error: FetchError | Error) { | ||
this.#upToDateSubscribers.forEach(([_, errorCallback]) => | ||
errorCallback(error) | ||
) | ||
} | ||
/** | ||
@@ -542,6 +543,6 @@ * Resets the state of the stream, optionally with a provided | ||
*/ | ||
#reset(shapeHandle?: string) { | ||
#reset(handle?: string) { | ||
this.#lastOffset = `-1` | ||
this.#liveCacheBuster = `` | ||
this.#shapeHandle = shapeHandle | ||
this.#shapeHandle = handle | ||
this.#isUpToDate = false | ||
@@ -555,8 +556,6 @@ this.#connected = false | ||
if (!options.url) { | ||
throw new Error(`Invalid shape options. It must provide the url`) | ||
throw new MissingShapeUrlError() | ||
} | ||
if (options.signal && !(options.signal instanceof AbortSignal)) { | ||
throw new Error( | ||
`Invalid signal option. It must be an instance of AbortSignal.` | ||
) | ||
throw new InvalidSignalError() | ||
} | ||
@@ -567,7 +566,5 @@ | ||
options.offset !== `-1` && | ||
!options.shapeHandle | ||
!options.handle | ||
) { | ||
throw new Error( | ||
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)` | ||
) | ||
throw new MissingShapeHandleError() | ||
} | ||
@@ -581,5 +578,3 @@ | ||
if (reservedParams.length > 0) { | ||
throw new Error( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
) | ||
throw new ReservedParamError(reservedParams) | ||
} | ||
@@ -586,0 +581,0 @@ } |
@@ -49,5 +49,59 @@ export class FetchError extends Error { | ||
super(`Fetch with backoff aborted`) | ||
this.name = `FetchBackoffAbortError` | ||
} | ||
} | ||
export class InvalidShapeOptionsError extends Error { | ||
constructor(message: string) { | ||
super(message) | ||
this.name = `InvalidShapeOptionsError` | ||
} | ||
} | ||
export class MissingShapeUrlError extends Error { | ||
constructor() { | ||
super(`Invalid shape options: missing required url parameter`) | ||
this.name = `MissingShapeUrlError` | ||
} | ||
} | ||
export class InvalidSignalError extends Error { | ||
constructor() { | ||
super(`Invalid signal option. It must be an instance of AbortSignal.`) | ||
this.name = `InvalidSignalError` | ||
} | ||
} | ||
export class MissingShapeHandleError extends Error { | ||
constructor() { | ||
super( | ||
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)` | ||
) | ||
this.name = `MissingShapeHandleError` | ||
} | ||
} | ||
export class ReservedParamError extends Error { | ||
constructor(reservedParams: string[]) { | ||
super( | ||
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}` | ||
) | ||
this.name = `ReservedParamError` | ||
} | ||
} | ||
export class ParserNullValueError extends Error { | ||
constructor(columnName: string) { | ||
super(`Column "${columnName ?? `unknown`}" does not allow NULL values`) | ||
this.name = `ParserNullValueError` | ||
} | ||
} | ||
export class ShapeStreamAlreadyRunningError extends Error { | ||
constructor() { | ||
super(`ShapeStream is already running`) | ||
this.name = `ShapeStreamAlreadyRunningError` | ||
} | ||
} | ||
export class MissingHeadersError extends Error { | ||
@@ -60,4 +114,5 @@ constructor(url: string, missingHeaders: Array<string>) { | ||
msg += `\nThis is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.` | ||
msg += `\nFor more information visit the troubleshooting guide: /docs/guides/troubleshooting/missing-headers` | ||
super(msg) | ||
} | ||
} |
@@ -174,3 +174,3 @@ import { | ||
const addMissingHeaders = (requiredHeaders: Array<string>) => | ||
requiredHeaders.filter((h) => !headers.has(h)) | ||
missingHeaders.push(...requiredHeaders.filter((h) => !headers.has(h))) | ||
addMissingHeaders(requiredElectricResponseHeaders) | ||
@@ -177,0 +177,0 @@ |
import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types' | ||
import { ParserNullValueError } from './error' | ||
@@ -165,3 +166,3 @@ type NullToken = null | `NULL` | ||
if (!isNullable) { | ||
throw new Error(`Column ${columnName ?? `unknown`} is not nullable`) | ||
throw new ParserNullValueError(columnName ?? `unknown`) | ||
} | ||
@@ -168,0 +169,0 @@ return null |
@@ -58,11 +58,2 @@ import { Message, Offset, Row } from './types' | ||
) | ||
const unsubscribe = this.#stream.subscribeOnceToUpToDate( | ||
() => { | ||
unsubscribe() | ||
}, | ||
(e) => { | ||
this.#handleError(e) | ||
throw e | ||
} | ||
) | ||
} | ||
@@ -78,2 +69,6 @@ | ||
get handle(): string | undefined { | ||
return this.#stream.shapeHandle | ||
} | ||
get rows(): Promise<T[]> { | ||
@@ -80,0 +75,0 @@ return this.value.then((v) => Array.from(v.values())) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
432742
4516
172