Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@electric-sql/client

Package Overview
Dependencies
Maintainers
3
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@electric-sql/client - npm Package Compare versions

Comparing version 0.7.3 to 0.8.0

28

dist/index.d.ts

@@ -133,2 +133,7 @@ /**

type ParamsRecord = Omit<Record<string, string>, ReservedParamKeys>;
type RetryOpts = {
params?: ParamsRecord;
headers?: Record<string, string>;
};
type ShapeStreamErrorHandler = (error: Error) => void | RetryOpts | Promise<void | RetryOpts>;
/**

@@ -185,4 +190,3 @@ * Options for constructing a ShapeStream.

*/
shapeHandle?: string;
backoffOptions?: BackoffOptions;
handle?: string;
/**

@@ -207,9 +211,16 @@ * HTTP headers to attach to requests made by the client.

fetchClient?: typeof fetch;
backoffOptions?: BackoffOptions;
parser?: Parser<T>;
/**
* A function for handling shapestream errors.
* This is optional, when it is not provided any shapestream errors will be thrown.
* If the function is provided and returns an object containing parameters and/or headers
* the shapestream will apply those changes and try syncing again.
* If the function returns void the shapestream is stopped.
*/
onError?: ShapeStreamErrorHandler;
}
interface ShapeStreamInterface<T extends Row<unknown> = Row> {
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): void;
unsubscribeAllUpToDateSubscribers(): void;
unsubscribeAll(): void;
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void;
isLoading(): boolean;

@@ -222,2 +233,3 @@ lastSyncedAt(): number | undefined;

shapeHandle?: string;
error?: unknown;
}

@@ -263,10 +275,7 @@ /**

get shapeHandle(): string;
get error(): unknown;
get isUpToDate(): boolean;
get lastOffset(): Offset;
get error(): unknown;
start(): Promise<void>;
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: FetchError | Error) => void): () => void;
subscribe(callback: (messages: Message<T>[]) => MaybePromise<void>, onError?: (error: Error) => void): () => void;
unsubscribeAll(): void;
subscribeOnceToUpToDate(callback: () => MaybePromise<void>, error: (err: FetchError | Error) => void): () => void;
unsubscribeAllUpToDateSubscribers(): void;
/** Unix time at which we last synced. Undefined when `isLoading` is true. */

@@ -323,2 +332,3 @@ lastSyncedAt(): number | undefined;

get lastOffset(): Offset;
get handle(): string | undefined;
get rows(): Promise<T[]>;

@@ -325,0 +335,0 @@ get currentRows(): T[];

@@ -41,2 +41,85 @@ var __defProp = Object.defineProperty;

// src/error.ts
var FetchError = class _FetchError extends Error {
constructor(status, text, json, headers, url, message) {
super(
message || `HTTP Error ${status} at ${url}: ${text != null ? text : JSON.stringify(json)}`
);
this.url = url;
this.name = `FetchError`;
this.status = status;
this.text = text;
this.json = json;
this.headers = headers;
}
static async fromResponse(response, url) {
const status = response.status;
const headers = Object.fromEntries([...response.headers.entries()]);
let text = void 0;
let json = void 0;
const contentType = response.headers.get(`content-type`);
if (contentType && contentType.includes(`application/json`)) {
json = await response.json();
} else {
text = await response.text();
}
return new _FetchError(status, text, json, headers, url);
}
};
var FetchBackoffAbortError = class extends Error {
constructor() {
super(`Fetch with backoff aborted`);
this.name = `FetchBackoffAbortError`;
}
};
var MissingShapeUrlError = class extends Error {
constructor() {
super(`Invalid shape options: missing required url parameter`);
this.name = `MissingShapeUrlError`;
}
};
var InvalidSignalError = class extends Error {
constructor() {
super(`Invalid signal option. It must be an instance of AbortSignal.`);
this.name = `InvalidSignalError`;
}
};
var MissingShapeHandleError = class extends Error {
constructor() {
super(
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)`
);
this.name = `MissingShapeHandleError`;
}
};
var ReservedParamError = class extends Error {
constructor(reservedParams) {
super(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
);
this.name = `ReservedParamError`;
}
};
var ParserNullValueError = class extends Error {
constructor(columnName) {
super(`Column "${columnName != null ? columnName : `unknown`}" does not allow NULL values`);
this.name = `ParserNullValueError`;
}
};
var MissingHeadersError = class extends Error {
constructor(url, missingHeaders) {
let msg = `The response for the shape request to ${url} didn't include the following required headers:
`;
missingHeaders.forEach((h) => {
msg += `- ${h}
`;
});
msg += `
This is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`;
msg += `
For more information visit the troubleshooting guide: /docs/guides/troubleshooting/missing-headers`;
super(msg);
}
};
// src/parser.ts

@@ -143,3 +226,3 @@ var parseNumber = (value) => Number(value);

if (!isNullable) {
throw new Error(`Column ${columnName != null ? columnName : `unknown`} is not nullable`);
throw new ParserNullValueError(columnName != null ? columnName : `unknown`);
}

@@ -166,48 +249,2 @@ return null;

// src/error.ts
var FetchError = class _FetchError extends Error {
constructor(status, text, json, headers, url, message) {
super(
message || `HTTP Error ${status} at ${url}: ${text != null ? text : JSON.stringify(json)}`
);
this.url = url;
this.name = `FetchError`;
this.status = status;
this.text = text;
this.json = json;
this.headers = headers;
}
static async fromResponse(response, url) {
const status = response.status;
const headers = Object.fromEntries([...response.headers.entries()]);
let text = void 0;
let json = void 0;
const contentType = response.headers.get(`content-type`);
if (contentType && contentType.includes(`application/json`)) {
json = await response.json();
} else {
text = await response.text();
}
return new _FetchError(status, text, json, headers, url);
}
};
var FetchBackoffAbortError = class extends Error {
constructor() {
super(`Fetch with backoff aborted`);
}
};
var MissingHeadersError = class extends Error {
constructor(url, missingHeaders) {
let msg = `The response for the shape request to ${url} didn't include the following required headers:
`;
missingHeaders.forEach((h) => {
msg += `- ${h}
`;
});
msg += `
This is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`;
super(msg);
}
};
// src/constants.ts

@@ -312,3 +349,3 @@ var LIVE_CACHE_BUSTER_HEADER = `electric-cursor`;

const missingHeaders = [];
const addMissingHeaders = (requiredHeaders) => requiredHeaders.filter((h) => !headers.has(h));
const addMissingHeaders = (requiredHeaders) => missingHeaders.push(...requiredHeaders.filter((h) => !headers.has(h)));
addMissingHeaders(requiredElectricResponseHeaders);

@@ -430,10 +467,10 @@ const input = args[0];

]);
var _fetchClient2, _messageParser, _subscribers, _upToDateSubscribers, _lastOffset, _liveCacheBuster, _lastSyncedAt, _isUpToDate, _connected, _shapeHandle, _databaseId, _schema, _error, _replica, _ShapeStream_instances, publish_fn, sendErrorToSubscribers_fn, notifyUpToDateSubscribers_fn, sendErrorToUpToDateSubscribers_fn, reset_fn;
var _error, _fetchClient2, _messageParser, _subscribers, _lastOffset, _liveCacheBuster, _lastSyncedAt, _isUpToDate, _connected, _shapeHandle, _databaseId, _schema, _onError, _replica, _ShapeStream_instances, start_fn, publish_fn, sendErrorToSubscribers_fn, reset_fn;
var _ShapeStream = class _ShapeStream {
constructor(options) {
__privateAdd(this, _ShapeStream_instances);
__privateAdd(this, _error, null);
__privateAdd(this, _fetchClient2);
__privateAdd(this, _messageParser);
__privateAdd(this, _subscribers, /* @__PURE__ */ new Map());
__privateAdd(this, _upToDateSubscribers, /* @__PURE__ */ new Map());
__privateAdd(this, _lastOffset);

@@ -449,13 +486,14 @@ __privateAdd(this, _liveCacheBuster);

__privateAdd(this, _schema);
__privateAdd(this, _error);
__privateAdd(this, _onError);
__privateAdd(this, _replica);
var _a, _b, _c;
validateOptions(options);
this.options = __spreadValues({ subscribe: true }, options);
validateOptions(this.options);
__privateSet(this, _lastOffset, (_a = this.options.offset) != null ? _a : `-1`);
__privateSet(this, _liveCacheBuster, ``);
__privateSet(this, _shapeHandle, this.options.shapeHandle);
__privateSet(this, _shapeHandle, this.options.handle);
__privateSet(this, _databaseId, this.options.databaseId);
__privateSet(this, _messageParser, new MessageParser(options.parser));
__privateSet(this, _replica, this.options.replica);
__privateSet(this, _onError, this.options.onError);
const baseFetchClient = (_b = options.fetchClient) != null ? _b : (...args) => fetch(...args);

@@ -472,3 +510,3 @@ const fetchWithBackoffClient = createFetchWithBackoff(baseFetchClient, __spreadProps(__spreadValues({}, (_c = options.backoffOptions) != null ? _c : BackoffDefaults), {

));
this.start();
__privateMethod(this, _ShapeStream_instances, start_fn).call(this);
}

@@ -478,2 +516,5 @@ get shapeHandle() {

}
get error() {
return __privateGet(this, _error);
}
get isUpToDate() {

@@ -485,114 +526,4 @@ return __privateGet(this, _isUpToDate);

}
get error() {
return __privateGet(this, _error);
}
async start() {
var _a, _b;
__privateSet(this, _isUpToDate, false);
const { url, table, where, columns, signal } = this.options;
try {
while (!(signal == null ? void 0 : signal.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) {
const fetchUrl = new URL(url);
if (this.options.params) {
const reservedParams = Object.keys(this.options.params).filter(
(key) => RESERVED_PARAMS.has(key)
);
if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
);
}
for (const [key, value] of Object.entries(this.options.params)) {
fetchUrl.searchParams.set(key, value);
}
}
if (table) fetchUrl.searchParams.set(TABLE_QUERY_PARAM, table);
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where);
if (columns && columns.length > 0)
fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`));
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset));
if (__privateGet(this, _isUpToDate)) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`);
fetchUrl.searchParams.set(
LIVE_CACHE_BUSTER_QUERY_PARAM,
__privateGet(this, _liveCacheBuster)
);
}
if (__privateGet(this, _shapeHandle)) {
fetchUrl.searchParams.set(
SHAPE_HANDLE_QUERY_PARAM,
__privateGet(this, _shapeHandle)
);
}
if (__privateGet(this, _databaseId)) {
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, __privateGet(this, _databaseId));
}
if (((_a = __privateGet(this, _replica)) != null ? _a : _ShapeStream.Replica.DEFAULT) != _ShapeStream.Replica.DEFAULT) {
fetchUrl.searchParams.set(REPLICA_PARAM, __privateGet(this, _replica));
}
let response;
try {
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), {
signal,
headers: this.options.headers
});
__privateSet(this, _connected, true);
} catch (e) {
if (e instanceof FetchBackoffAbortError) break;
if (e instanceof MissingHeadersError) throw e;
if (!(e instanceof FetchError)) throw e;
if (e.status == 409) {
const newShapeHandle = e.headers[SHAPE_HANDLE_HEADER];
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeHandle);
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;
} else if (e.status >= 400 && e.status < 500) {
__privateMethod(this, _ShapeStream_instances, sendErrorToUpToDateSubscribers_fn).call(this, e);
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e);
throw e;
}
}
const { headers, status } = response;
const shapeHandle = headers.get(SHAPE_HANDLE_HEADER);
if (shapeHandle) {
__privateSet(this, _shapeHandle, shapeHandle);
}
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER);
if (lastOffset) {
__privateSet(this, _lastOffset, lastOffset);
}
const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER);
if (liveCacheBuster) {
__privateSet(this, _liveCacheBuster, liveCacheBuster);
}
const getSchema = () => {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER);
return schemaHeader ? JSON.parse(schemaHeader) : {};
};
__privateSet(this, _schema, (_b = __privateGet(this, _schema)) != null ? _b : getSchema());
const messages = status === 204 ? `[]` : await response.text();
if (status === 204) {
__privateSet(this, _lastSyncedAt, Date.now());
}
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema));
if (batch.length > 0) {
const prevUpToDate = __privateGet(this, _isUpToDate);
const lastMessage = batch[batch.length - 1];
if (isUpToDateMessage(lastMessage)) {
__privateSet(this, _lastSyncedAt, Date.now());
__privateSet(this, _isUpToDate, true);
}
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch);
if (!prevUpToDate && __privateGet(this, _isUpToDate)) {
__privateMethod(this, _ShapeStream_instances, notifyUpToDateSubscribers_fn).call(this);
}
}
}
} catch (err) {
__privateSet(this, _error, err);
} finally {
__privateSet(this, _connected, false);
}
}
subscribe(callback, onError) {
subscribe(callback, onError = () => {
}) {
const subscriptionId = Math.random();

@@ -607,12 +538,2 @@ __privateGet(this, _subscribers).set(subscriptionId, [callback, onError]);

}
subscribeOnceToUpToDate(callback, error) {
const subscriptionId = Math.random();
__privateGet(this, _upToDateSubscribers).set(subscriptionId, [callback, error]);
return () => {
__privateGet(this, _upToDateSubscribers).delete(subscriptionId);
};
}
unsubscribeAllUpToDateSubscribers() {
__privateGet(this, _upToDateSubscribers).clear();
}
/** Unix time at which we last synced. Undefined when `isLoading` is true. */

@@ -633,9 +554,9 @@ lastSyncedAt() {

isLoading() {
return !this.isUpToDate;
return !__privateGet(this, _isUpToDate);
}
};
_error = new WeakMap();
_fetchClient2 = new WeakMap();
_messageParser = new WeakMap();
_subscribers = new WeakMap();
_upToDateSubscribers = new WeakMap();
_lastOffset = new WeakMap();

@@ -649,5 +570,121 @@ _liveCacheBuster = new WeakMap();

_schema = new WeakMap();
_error = new WeakMap();
_onError = new WeakMap();
_replica = new WeakMap();
_ShapeStream_instances = new WeakSet();
start_fn = async function() {
var _a, _b, _c;
try {
while (!((_a = this.options.signal) == null ? void 0 : _a.aborted) && !__privateGet(this, _isUpToDate) || this.options.subscribe) {
const { url, table, where, columns, signal } = this.options;
const fetchUrl = new URL(url);
if (this.options.params) {
const reservedParams = Object.keys(this.options.params).filter(
(key) => RESERVED_PARAMS.has(key)
);
if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
);
}
for (const [key, value] of Object.entries(this.options.params)) {
fetchUrl.searchParams.set(key, value);
}
}
if (table) fetchUrl.searchParams.set(TABLE_QUERY_PARAM, table);
if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where);
if (columns && columns.length > 0)
fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`));
fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, __privateGet(this, _lastOffset));
if (__privateGet(this, _isUpToDate)) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`);
fetchUrl.searchParams.set(
LIVE_CACHE_BUSTER_QUERY_PARAM,
__privateGet(this, _liveCacheBuster)
);
}
if (__privateGet(this, _shapeHandle)) {
fetchUrl.searchParams.set(
SHAPE_HANDLE_QUERY_PARAM,
__privateGet(this, _shapeHandle)
);
}
if (__privateGet(this, _databaseId)) {
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, __privateGet(this, _databaseId));
}
if (((_b = __privateGet(this, _replica)) != null ? _b : _ShapeStream.Replica.DEFAULT) != _ShapeStream.Replica.DEFAULT) {
fetchUrl.searchParams.set(REPLICA_PARAM, __privateGet(this, _replica));
}
let response;
try {
response = await __privateGet(this, _fetchClient2).call(this, fetchUrl.toString(), {
signal,
headers: this.options.headers
});
__privateSet(this, _connected, true);
} catch (e) {
if (e instanceof FetchBackoffAbortError) break;
if (!(e instanceof FetchError)) throw e;
if (e.status == 409) {
const newShapeHandle = e.headers[SHAPE_HANDLE_HEADER];
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this, newShapeHandle);
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, e.json);
continue;
} else if (e.status >= 400 && e.status < 500) {
__privateMethod(this, _ShapeStream_instances, sendErrorToSubscribers_fn).call(this, e);
throw e;
}
}
const { headers, status } = response;
const shapeHandle = headers.get(SHAPE_HANDLE_HEADER);
if (shapeHandle) {
__privateSet(this, _shapeHandle, shapeHandle);
}
const lastOffset = headers.get(CHUNK_LAST_OFFSET_HEADER);
if (lastOffset) {
__privateSet(this, _lastOffset, lastOffset);
}
const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER);
if (liveCacheBuster) {
__privateSet(this, _liveCacheBuster, liveCacheBuster);
}
const getSchema = () => {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER);
return schemaHeader ? JSON.parse(schemaHeader) : {};
};
__privateSet(this, _schema, (_c = __privateGet(this, _schema)) != null ? _c : getSchema());
const messages = status === 204 ? `[]` : await response.text();
if (status === 204) {
__privateSet(this, _lastSyncedAt, Date.now());
}
const batch = __privateGet(this, _messageParser).parse(messages, __privateGet(this, _schema));
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1];
if (isUpToDateMessage(lastMessage)) {
__privateSet(this, _lastSyncedAt, Date.now());
__privateSet(this, _isUpToDate, true);
}
await __privateMethod(this, _ShapeStream_instances, publish_fn).call(this, batch);
}
}
} catch (err) {
__privateSet(this, _error, err);
if (__privateGet(this, _onError)) {
const retryOpts = await __privateGet(this, _onError).call(this, err);
if (typeof retryOpts === `object`) {
__privateMethod(this, _ShapeStream_instances, reset_fn).call(this);
if (`params` in retryOpts) {
this.options.params = retryOpts.params;
}
if (`headers` in retryOpts) {
this.options.headers = retryOpts.headers;
}
__privateMethod(this, _ShapeStream_instances, start_fn).call(this);
}
return;
}
throw err;
} finally {
__privateSet(this, _connected, false);
}
};
publish_fn = async function(messages) {

@@ -671,12 +708,2 @@ await Promise.all(

};
notifyUpToDateSubscribers_fn = function() {
__privateGet(this, _upToDateSubscribers).forEach(([callback]) => {
callback();
});
};
sendErrorToUpToDateSubscribers_fn = function(error) {
__privateGet(this, _upToDateSubscribers).forEach(
([_, errorCallback]) => errorCallback(error)
);
};
/**

@@ -686,6 +713,6 @@ * Resets the state of the stream, optionally with a provided

*/
reset_fn = function(shapeHandle) {
reset_fn = function(handle) {
__privateSet(this, _lastOffset, `-1`);
__privateSet(this, _liveCacheBuster, ``);
__privateSet(this, _shapeHandle, shapeHandle);
__privateSet(this, _shapeHandle, handle);
__privateSet(this, _isUpToDate, false);

@@ -702,13 +729,9 @@ __privateSet(this, _connected, false);

if (!options.url) {
throw new Error(`Invalid shape options. It must provide the url`);
throw new MissingShapeUrlError();
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
);
throw new InvalidSignalError();
}
if (options.offset !== void 0 && options.offset !== `-1` && !options.shapeHandle) {
throw new Error(
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)`
);
if (options.offset !== void 0 && options.offset !== `-1` && !options.handle) {
throw new MissingShapeHandleError();
}

@@ -720,5 +743,3 @@ if (options.params) {

if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
);
throw new ReservedParamError(reservedParams);
}

@@ -744,11 +765,2 @@ }

);
const unsubscribe = __privateGet(this, _stream).subscribeOnceToUpToDate(
() => {
unsubscribe();
},
(e) => {
__privateMethod(this, _Shape_instances, handleError_fn).call(this, e);
throw e;
}
);
}

@@ -761,2 +773,5 @@ get isUpToDate() {

}
get handle() {
return __privateGet(this, _stream).shapeHandle;
}
get rows() {

@@ -763,0 +778,0 @@ return this.value.then((v) => Array.from(v.values()));

{
"name": "@electric-sql/client",
"version": "0.7.3",
"version": "0.8.0",
"description": "Postgres everywhere - your data, in sync, wherever you need it.",

@@ -5,0 +5,0 @@ "type": "module",

@@ -96,2 +96,77 @@ <p align="center">

See the [Docs](https://electric-sql.com) and [Examples](https://electric-sql.com/examples/basic) for more information.
### Error Handling
The ShapeStream provides two ways to handle errors:
1. Using the `onError` handler:
```typescript
const stream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: `foo`,
onError: (error) => {
// Handle all stream errors here
console.error('Stream error:', error)
}
})
```
If no `onError` handler is provided, the ShapeStream will throw errors that occur during streaming.
2. Individual subscribers can optionally handle errors specific to their subscription:
```typescript
stream.subscribe(
(messages) => {
// Handle messages
},
(error) => {
// Handle errors for this specific subscription
console.error('Subscription error:', error)
}
)
```
Common error types include:
- `MissingShapeUrlError`: Missing required URL parameter
- `InvalidSignalError`: Invalid AbortSignal instance
- `ReservedParamError`: Using reserved parameter names
Runtime errors:
- `FetchError`: HTTP errors during shape fetching
- `FetchBackoffAbortError`: Fetch aborted using AbortSignal
- `MissingShapeHandleError`: Missing required shape handle
- `ParserNullValueError`: Parser encountered NULL value in a column that doesn't allow NULL values
See the [typescript client docs on the website](https://electric-sql.com/docs/api/clients/typescript#error-handling) for more details on error handling.
And in general, see the [docs website](https://electric-sql.com) and [examples folder](https://electric-sql.com/examples/basic) for more information.
## Develop
Install the pnpm workspace at the repo root:
```shell
pnpm install
```
Build the package:
```shell
cd packages/typescript-client
pnpm build
```
## Test
In one terminal, start the backend running:
```shell
cd ../sync-service
mix deps.get
mix stop_dev && mix compile && mix start_dev && ies -S mix
```
Then in this folder:
```shell
pnpm test
```

@@ -14,3 +14,6 @@ import {

FetchBackoffAbortError,
MissingHeadersError,
MissingShapeUrlError,
InvalidSignalError,
MissingShapeHandleError,
ReservedParamError,
} from './error'

@@ -67,2 +70,10 @@ import {

type RetryOpts = {
params?: ParamsRecord
headers?: Record<string, string>
}
type ShapeStreamErrorHandler = (
error: Error
) => void | RetryOpts | Promise<void | RetryOpts>
/**

@@ -111,2 +122,3 @@ * Options for constructing a ShapeStream.

replica?: Replica
/**

@@ -121,2 +133,3 @@ * The "offset" on the shape log. This is typically not set as the ShapeStream

offset?: Offset
/**

@@ -126,4 +139,3 @@ * Similar to `offset`, this isn't typically used unless you're maintaining

*/
shapeHandle?: string
backoffOptions?: BackoffOptions
handle?: string

@@ -149,5 +161,16 @@ /**

subscribe?: boolean
signal?: AbortSignal
fetchClient?: typeof fetch
backoffOptions?: BackoffOptions
parser?: Parser<T>
/**
* A function for handling shapestream errors.
* This is optional, when it is not provided any shapestream errors will be thrown.
* If the function is provided and returns an object containing parameters and/or headers
* the shapestream will apply those changes and try syncing again.
* If the function returns void the shapestream is stopped.
*/
onError?: ShapeStreamErrorHandler
}

@@ -160,8 +183,3 @@

): void
unsubscribeAllUpToDateSubscribers(): void
unsubscribeAll(): void
subscribeOnceToUpToDate(
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void
): () => void

@@ -176,2 +194,3 @@ isLoading(): boolean

shapeHandle?: string
error?: unknown
}

@@ -219,2 +238,3 @@

readonly options: ShapeStreamOptions<GetExtensions<T>>
#error: unknown = null

@@ -231,6 +251,2 @@ readonly #fetchClient: typeof fetch

>()
readonly #upToDateSubscribers = new Map<
number,
[() => void, (error: FetchError | Error) => void]
>()

@@ -245,14 +261,15 @@ #lastOffset: Offset

#schema?: Schema
#error?: unknown
#onError?: ShapeStreamErrorHandler
#replica?: Replica
constructor(options: ShapeStreamOptions<GetExtensions<T>>) {
validateOptions(options)
this.options = { subscribe: true, ...options }
validateOptions(this.options)
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeHandle = this.options.shapeHandle
this.#shapeHandle = this.options.handle
this.#databaseId = this.options.databaseId
this.#messageParser = new MessageParser<T>(options.parser)
this.#replica = this.options.replica
this.#onError = this.options.onError

@@ -275,3 +292,3 @@ const baseFetchClient =

this.start()
this.#start()
}

@@ -283,2 +300,6 @@

get error() {
return this.#error
}
get isUpToDate() {

@@ -292,16 +313,10 @@ return this.#isUpToDate

get error() {
return this.#error
}
async start() {
this.#isUpToDate = false
const { url, table, where, columns, signal } = this.options
async #start() {
try {
while (
(!signal?.aborted && !this.#isUpToDate) ||
(!this.options.signal?.aborted && !this.#isUpToDate) ||
this.options.subscribe
) {
const { url, table, where, columns, signal } = this.options
const fetchUrl = new URL(url)

@@ -369,3 +384,2 @@

if (e instanceof FetchBackoffAbortError) break // interrupted
if (e instanceof MissingHeadersError) throw e
if (!(e instanceof FetchError)) throw e // should never happen

@@ -381,3 +395,2 @@ if (e.status == 409) {

// Notify subscribers
this.#sendErrorToUpToDateSubscribers(e)
this.#sendErrorToSubscribers(e)

@@ -424,3 +437,2 @@

if (batch.length > 0) {
const prevUpToDate = this.#isUpToDate
const lastMessage = batch[batch.length - 1]

@@ -433,5 +445,2 @@ if (isUpToDateMessage(lastMessage)) {

await this.#publish(batch)
if (!prevUpToDate && this.#isUpToDate) {
this.#notifyUpToDateSubscribers()
}
}

@@ -441,2 +450,23 @@ }

this.#error = err
if (this.#onError) {
const retryOpts = await this.#onError(err as Error)
if (typeof retryOpts === `object`) {
this.#reset()
if (`params` in retryOpts) {
this.options.params = retryOpts.params
}
if (`headers` in retryOpts) {
this.options.headers = retryOpts.headers
}
// Restart
this.#start()
}
return
}
// If no handler is provided for errors just throw so the error still bubbles up.
throw err
} finally {

@@ -449,3 +479,3 @@ this.#connected = false

callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
onError: (error: Error) => void = () => {}
) {

@@ -465,19 +495,2 @@ const subscriptionId = Math.random()

subscribeOnceToUpToDate(
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void
) {
const subscriptionId = Math.random()
this.#upToDateSubscribers.set(subscriptionId, [callback, error])
return () => {
this.#upToDateSubscribers.delete(subscriptionId)
}
}
unsubscribeAllUpToDateSubscribers(): void {
this.#upToDateSubscribers.clear()
}
/** Unix time at which we last synced. Undefined when `isLoading` is true. */

@@ -501,3 +514,3 @@ lastSyncedAt(): number | undefined {

isLoading(): boolean {
return !this.isUpToDate
return !this.#isUpToDate
}

@@ -525,14 +538,2 @@

#notifyUpToDateSubscribers() {
this.#upToDateSubscribers.forEach(([callback]) => {
callback()
})
}
#sendErrorToUpToDateSubscribers(error: FetchError | Error) {
this.#upToDateSubscribers.forEach(([_, errorCallback]) =>
errorCallback(error)
)
}
/**

@@ -542,6 +543,6 @@ * Resets the state of the stream, optionally with a provided

*/
#reset(shapeHandle?: string) {
#reset(handle?: string) {
this.#lastOffset = `-1`
this.#liveCacheBuster = ``
this.#shapeHandle = shapeHandle
this.#shapeHandle = handle
this.#isUpToDate = false

@@ -555,8 +556,6 @@ this.#connected = false

if (!options.url) {
throw new Error(`Invalid shape options. It must provide the url`)
throw new MissingShapeUrlError()
}
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error(
`Invalid signal option. It must be an instance of AbortSignal.`
)
throw new InvalidSignalError()
}

@@ -567,7 +566,5 @@

options.offset !== `-1` &&
!options.shapeHandle
!options.handle
) {
throw new Error(
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)`
)
throw new MissingShapeHandleError()
}

@@ -581,5 +578,3 @@

if (reservedParams.length > 0) {
throw new Error(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
)
throw new ReservedParamError(reservedParams)
}

@@ -586,0 +581,0 @@ }

@@ -49,5 +49,59 @@ export class FetchError extends Error {

super(`Fetch with backoff aborted`)
this.name = `FetchBackoffAbortError`
}
}
export class InvalidShapeOptionsError extends Error {
constructor(message: string) {
super(message)
this.name = `InvalidShapeOptionsError`
}
}
export class MissingShapeUrlError extends Error {
constructor() {
super(`Invalid shape options: missing required url parameter`)
this.name = `MissingShapeUrlError`
}
}
export class InvalidSignalError extends Error {
constructor() {
super(`Invalid signal option. It must be an instance of AbortSignal.`)
this.name = `InvalidSignalError`
}
}
export class MissingShapeHandleError extends Error {
constructor() {
super(
`shapeHandle is required if this isn't an initial fetch (i.e. offset > -1)`
)
this.name = `MissingShapeHandleError`
}
}
export class ReservedParamError extends Error {
constructor(reservedParams: string[]) {
super(
`Cannot use reserved Electric parameter names in custom params: ${reservedParams.join(`, `)}`
)
this.name = `ReservedParamError`
}
}
export class ParserNullValueError extends Error {
constructor(columnName: string) {
super(`Column "${columnName ?? `unknown`}" does not allow NULL values`)
this.name = `ParserNullValueError`
}
}
export class ShapeStreamAlreadyRunningError extends Error {
constructor() {
super(`ShapeStream is already running`)
this.name = `ShapeStreamAlreadyRunningError`
}
}
export class MissingHeadersError extends Error {

@@ -60,4 +114,5 @@ constructor(url: string, missingHeaders: Array<string>) {

msg += `\nThis is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`
msg += `\nFor more information visit the troubleshooting guide: /docs/guides/troubleshooting/missing-headers`
super(msg)
}
}

@@ -174,3 +174,3 @@ import {

const addMissingHeaders = (requiredHeaders: Array<string>) =>
requiredHeaders.filter((h) => !headers.has(h))
missingHeaders.push(...requiredHeaders.filter((h) => !headers.has(h)))
addMissingHeaders(requiredElectricResponseHeaders)

@@ -177,0 +177,0 @@

import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types'
import { ParserNullValueError } from './error'

@@ -165,3 +166,3 @@ type NullToken = null | `NULL`

if (!isNullable) {
throw new Error(`Column ${columnName ?? `unknown`} is not nullable`)
throw new ParserNullValueError(columnName ?? `unknown`)
}

@@ -168,0 +169,0 @@ return null

@@ -58,11 +58,2 @@ import { Message, Offset, Row } from './types'

)
const unsubscribe = this.#stream.subscribeOnceToUpToDate(
() => {
unsubscribe()
},
(e) => {
this.#handleError(e)
throw e
}
)
}

@@ -78,2 +69,6 @@

get handle(): string | undefined {
return this.#stream.shapeHandle
}
get rows(): Promise<T[]> {

@@ -80,0 +75,0 @@ return this.value.then((v) => Array.from(v.values()))

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc