Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@automerge/automerge-repo

Package Overview
Dependencies
Maintainers
4
Versions
69
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@automerge/automerge-repo - npm Package Compare versions

Comparing version 1.1.0-alpha.7 to 1.1.0-alpha.13

2

dist/AutomergeUrl.js

@@ -7,3 +7,3 @@ import * as Uuid from "uuid";

const regex = new RegExp(`^${urlPrefix}(\\w+)$`);
const [_, docMatch] = url.match(regex) || [];
const [, docMatch] = url.match(regex) || [];
const documentId = docMatch;

@@ -10,0 +10,0 @@ const binaryDocumentId = documentIdToBinary(documentId);

@@ -0,1 +1,2 @@

/* c8 ignore start */
export const pause = (t = 0) => new Promise(resolve => setTimeout(() => resolve(), t));

@@ -10,1 +11,2 @@ export function rejectOnTimeout(promise, millis) {

}
/* c8 ignore end */

@@ -0,1 +1,2 @@

/* c8 ignore start */
/**

@@ -23,1 +24,2 @@ * If `promise` is resolved before `t` ms elapse, the timeout is cleared and the result of the

}
/* c8 ignore end */

@@ -32,3 +32,3 @@ /**

export { NetworkAdapter } from "./network/NetworkAdapter.js";
export { isValidRepoMessage } from "./network/messages.js";
export { isRepoMessage } from "./network/messages.js";
export { StorageAdapter } from "./storage/StorageAdapter.js";

@@ -35,0 +35,0 @@ /** @hidden **/

@@ -32,3 +32,3 @@ /**

export { NetworkAdapter } from "./network/NetworkAdapter.js";
export { isValidRepoMessage } from "./network/messages.js";
export { isRepoMessage } from "./network/messages.js";
export { StorageAdapter } from "./storage/StorageAdapter.js";

@@ -35,0 +35,0 @@ /** @hidden **/

import { SyncState } from "@automerge/automerge";
import { StorageId } from "../storage/types.js";
import { DocumentId, PeerId, SessionId } from "../types.js";
import { StorageId } from "../storage/types.js";
export type Message = {
type: string;
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;
data?: Uint8Array;
documentId?: DocumentId;
};
/**

@@ -9,5 +18,3 @@ * A sync message for a particular document

type: "sync";
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;

@@ -19,33 +26,33 @@ /** The automerge sync message */

};
/** An ephemeral message
/**
* An ephemeral message.
*
* @remarks
* Ephemeral messages are not persisted anywhere and have no particular
* structure. `automerge-repo` will gossip them around, in order to avoid
* eternal loops of ephemeral messages every message has a session ID, which
* is a random number generated by the sender at startup time, and a sequence
* number. The combination of these two things allows us to discard messages
* we have already seen.
* Ephemeral messages are not persisted anywhere. The data property can be used by the application
* as needed. The repo gossips these around.
*
* In order to avoid infinite loops of ephemeral messages, every message has (a) a session ID, which
* is a random number generated by the sender at startup time; and (b) a sequence number. The
* combination of these two things allows us to discard messages we have already seen.
* */
export type EphemeralMessage = {
type: "ephemeral";
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;
/** A sequence number which must be incremented for each message sent by this peer */
/** A sequence number which must be incremented for each message sent by this peer. */
count: number;
/** The ID of the session this message is part of. The sequence number for a given session always increases */
/** The ID of the session this message is part of. The sequence number for a given session always increases. */
sessionId: SessionId;
/** The document ID this message pertains to */
/** The document ID this message pertains to. */
documentId: DocumentId;
/** The actual data of the message */
/** The actual data of the message. */
data: Uint8Array;
};
/** Sent by a {@link Repo} to indicate that it does not have the document and none of it's connected peers do either */
/**
* Sent by a {@link Repo} to indicate that it does not have the document and none of its connected
* peers do either.
*/
export type DocumentUnavailableMessage = {
type: "doc-unavailable";
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;

@@ -55,3 +62,4 @@ /** The document which the peer claims it doesn't have */

};
/** Sent by a {@link Repo} to request a document from a peer
/**
* Sent by a {@link Repo} to request a document from a peer.
*

@@ -64,21 +72,12 @@ * @remarks

type: "request";
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;
/** The initial automerge sync message */
/** The automerge sync message */
data: Uint8Array;
/** The document ID this message requests */
/** The document ID of the document this message is for */
documentId: DocumentId;
};
/** (anticipating work in progress) */
export type AuthMessage<TPayload = any> = {
type: "auth";
/** The peer ID of the sender of this message */
senderId: PeerId;
/** The peer ID of the recipient of this message */
targetId: PeerId;
/** The payload of the auth message (up to the specific auth provider) */
payload: TPayload;
};
/**
* Sent by a {@link Repo} to add or remove storage IDs from a remote peer's subscription.
*/
export type RemoteSubscriptionControlMessage = {

@@ -88,5 +87,10 @@ type: "remote-subscription-change";

targetId: PeerId;
/** The storage IDs to add to the subscription */
add?: StorageId[];
/** The storage IDs to remove from the subscription */
remove?: StorageId[];
};
/**
* Sent by a {@link Repo} to indicate that the heads of a document have changed on a remote peer.
*/
export type RemoteHeadsChanged = {

@@ -96,3 +100,5 @@ type: "remote-heads-changed";

targetId: PeerId;
/** The document ID of the document that has changed */
documentId: DocumentId;
/** The document's new heads */
newHeads: {

@@ -107,9 +113,8 @@ [key: StorageId]: {

export type RepoMessage = SyncMessage | EphemeralMessage | RequestMessage | DocumentUnavailableMessage | RemoteSubscriptionControlMessage | RemoteHeadsChanged;
/** These are message types that are handled by the {@link CollectionSynchronizer}.*/
export type DocMessage = SyncMessage | EphemeralMessage | RequestMessage | DocumentUnavailableMessage;
/** These are all the message types that a {@link NetworkAdapter} might see. */
export type Message = RepoMessage | AuthMessage;
/**
* The contents of a message, without the sender ID or other properties added by the {@link NetworkSubsystem})
*/
export type MessageContents<T extends Message = Message> = T extends EphemeralMessage ? Omit<T, "senderId" | "count" | "sessionId"> : Omit<T, "senderId">;
export type MessageContents<T extends Message = RepoMessage> = T extends EphemeralMessage ? Omit<T, "senderId" | "count" | "sessionId"> : Omit<T, "senderId">;
/** Notify the repo that the sync state has changed */

@@ -126,3 +131,3 @@ export interface SyncStateMessage {

}
export declare const isValidRepoMessage: (message: Message) => message is RepoMessage;
export declare const isRepoMessage: (message: Message) => message is RepoMessage;
export declare const isDocumentUnavailableMessage: (msg: Message) => msg is DocumentUnavailableMessage;

@@ -129,0 +134,0 @@ export declare const isRequestMessage: (msg: Message) => msg is RequestMessage;

// TYPE GUARDS
export const isValidRepoMessage = (message) => typeof message === "object" &&
typeof message.type === "string" &&
typeof message.senderId === "string" &&
(isSyncMessage(message) ||
isEphemeralMessage(message) ||
isRequestMessage(message) ||
isDocumentUnavailableMessage(message) ||
isRemoteSubscriptionControlMessage(message) ||
isRemoteHeadsChanged(message));
export const isRepoMessage = (message) => isSyncMessage(message) ||
isEphemeralMessage(message) ||
isRequestMessage(message) ||
isDocumentUnavailableMessage(message) ||
isRemoteSubscriptionControlMessage(message) ||
isRemoteHeadsChanged(message);
// prettier-ignore

@@ -16,3 +13,4 @@ export const isDocumentUnavailableMessage = (msg) => msg.type === "doc-unavailable";

export const isEphemeralMessage = (msg) => msg.type === "ephemeral";
// prettier-ignore
export const isRemoteSubscriptionControlMessage = (msg) => msg.type === "remote-subscription-change";
export const isRemoteHeadsChanged = (msg) => msg.type === "remote-heads-changed";

@@ -0,1 +1,2 @@

/* c8 ignore start */
import { EventEmitter } from "eventemitter3";

@@ -2,0 +3,0 @@ /** An interface representing some way to connect to other peers

import debug from "debug";
import { EventEmitter } from "eventemitter3";
import { isEphemeralMessage, isValidRepoMessage, } from "./messages.js";
import { isEphemeralMessage, isRepoMessage, } from "./messages.js";
const getEphemeralMessageSource = (message) => `${message.senderId}:${message.sessionId}`;

@@ -46,3 +46,3 @@ export class NetworkSubsystem extends EventEmitter {

networkAdapter.on("message", msg => {
if (!isValidRepoMessage(msg)) {
if (!isRepoMessage(msg)) {
this.#log(`invalid message: ${JSON.stringify(msg)}`);

@@ -111,3 +111,3 @@ return;

const outbound = prepareMessage(message);
this.#log("sending message", outbound);
this.#log("sending message %o", outbound);
peer.send(outbound);

@@ -114,0 +114,0 @@ }

@@ -7,4 +7,4 @@ import { EventEmitter } from "eventemitter3";

import { StorageSubsystem } from "./storage/StorageSubsystem.js";
import { StorageId } from "./storage/types.js";
import type { AnyDocumentId, DocumentId, PeerId } from "./types.js";
import { StorageId } from "./storage/types.js";
/** A Repo is a collection of documents with networking, syncing, and storage capabilities. */

@@ -33,3 +33,3 @@ /** The `Repo` is the main entry point of this library

peerMetadataByPeerId: Record<PeerId, PeerMetadata>;
constructor({ storage, network, peerId, sharePolicy, isEphemeral, }: RepoConfig);
constructor({ storage, network, peerId, sharePolicy, isEphemeral, enableRemoteHeadsGossiping, }: RepoConfig);
/** Returns all the handles we have cached. */

@@ -73,2 +73,10 @@ get handles(): Record<DocumentId, DocHandle<any>>;

/**
* Exports a document to a binary format.
* @param id - The url or documentId of the handle to export
*
* @returns Promise<Uint8Array | undefined> - A Promise containing the binary document,
* or undefined if the document is unavailable.
*/
export(id: AnyDocumentId): Promise<Uint8Array | undefined>;
/**
* Imports document binary into the repo.

@@ -96,2 +104,6 @@ * @param binary - The binary to import

sharePolicy?: SharePolicy;
/**
* Whether to enable the experimental remote heads gossiping feature
*/
enableRemoteHeadsGossiping?: boolean;
}

@@ -98,0 +110,0 @@ /** A function that determines whether we should share a document with a peer

@@ -6,2 +6,4 @@ import { next as Automerge } from "@automerge/automerge";

import { DocHandle } from "./DocHandle.js";
import { RemoteHeadsSubscriptions } from "./RemoteHeadsSubscriptions.js";
import { headsAreSame } from "./helpers/headsAreSame.js";
import { throttle } from "./helpers/throttle.js";

@@ -11,4 +13,2 @@ import { NetworkSubsystem } from "./network/NetworkSubsystem.js";

import { CollectionSynchronizer } from "./synchronizer/CollectionSynchronizer.js";
import { RemoteHeadsSubscriptions } from "./RemoteHeadsSubscriptions.js";
import { headsAreSame } from "./helpers/headsAreSame.js";
/** A Repo is a collection of documents with networking, syncing, and storage capabilities. */

@@ -40,4 +40,6 @@ /** The `Repo` is the main entry point of this library

#remoteHeadsSubscriptions = new RemoteHeadsSubscriptions();
constructor({ storage, network, peerId, sharePolicy, isEphemeral = storage === undefined, }) {
#remoteHeadsGossipingEnabled = false;
constructor({ storage, network, peerId, sharePolicy, isEphemeral = storage === undefined, enableRemoteHeadsGossiping = false, }) {
super();
this.#remoteHeadsGossipingEnabled = enableRemoteHeadsGossiping;
this.#log = debug(`automerge-repo:repo`);

@@ -107,5 +109,7 @@ this.sharePolicy = sharePolicy ?? this.sharePolicy;

});
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId);
});
if (this.#remoteHeadsGossipingEnabled) {
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId);
});
}
// STORAGE

@@ -133,3 +137,3 @@ // The storage subsystem has access to some form of persistence, and deals with save and loading documents.

.then(shouldShare => {
if (shouldShare) {
if (shouldShare && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.addGenerousPeer(peerId);

@@ -164,3 +168,3 @@ }

handle.setRemoteHeads(storageId, message.syncState.theirHeads);
if (storageId) {
if (storageId && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged(message.documentId, storageId, message.syncState.theirHeads);

@@ -170,30 +174,32 @@ }

});
this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
},
},
},
});
});
});
this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message);
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
});
}
});
this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId];
handle.setRemoteHeads(message.storageId, message.remoteHeads);
});
this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message);
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
});
}
});
this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId];
handle.setRemoteHeads(message.storageId, message.remoteHeads);
});
}
}

@@ -203,6 +209,10 @@ #receiveMessage(message) {

case "remote-subscription-change":
this.#remoteHeadsSubscriptions.handleControlMessage(message);
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleControlMessage(message);
}
break;
case "remote-heads-changed":
this.#remoteHeadsSubscriptions.handleRemoteHeads(message);
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleRemoteHeads(message);
}
break;

@@ -220,7 +230,7 @@ case "sync":

/** saves sync state throttled per storage id, if a peer doesn't have a storage id it's sync state is not persisted */
#saveSyncState(message) {
#saveSyncState(payload) {
if (!this.storageSubsystem) {
return;
}
const { storageId, isEphemeral } = this.peerMetadataByPeerId[message.peerId] || {};
const { storageId, isEphemeral } = this.peerMetadataByPeerId[payload.peerId] || {};
if (!storageId || isEphemeral) {

@@ -232,9 +242,6 @@ return;

handler = this.#throttledSaveSyncStateHandlers[storageId] = throttle(({ documentId, syncState }) => {
this.storageSubsystem.saveSyncState(documentId, storageId, syncState)
.catch(err => {
this.#log("error saving sync state", { err });
});
void this.storageSubsystem.saveSyncState(documentId, storageId, syncState);
}, this.saveDebounceRate);
}
handler(message);
handler(payload);
}

@@ -359,2 +366,17 @@ /** Returns an existing handle if we have it; creates one otherwise. */

/**
* Exports a document to a binary format.
* @param id - The url or documentId of the handle to export
*
* @returns Promise<Uint8Array | undefined> - A Promise containing the binary document,
* or undefined if the document is unavailable.
*/
async export(id) {
const documentId = interpretAsDocumentId(id);
const handle = this.#getHandle(documentId, false);
const doc = await handle.doc();
if (!doc)
return undefined;
return Automerge.save(doc);
}
/**
* Imports document binary into the repo.

@@ -372,4 +394,9 @@ * @param binary - The binary to import

subscribeToRemotes = (remotes) => {
this.#log("subscribeToRemotes", { remotes });
this.#remoteHeadsSubscriptions.subscribeToRemotes(remotes);
if (this.#remoteHeadsGossipingEnabled) {
this.#log("subscribeToRemotes", { remotes });
this.#remoteHeadsSubscriptions.subscribeToRemotes(remotes);
}
else {
this.#log("WARN: subscribeToRemotes called but remote heads gossiping is not enabled");
}
};

@@ -376,0 +403,0 @@ storageId = async () => {

@@ -90,2 +90,3 @@ import debug from "debug";

// TODO: implement this
// eslint-disable-next-line @typescript-eslint/no-unused-vars
removeDocument(documentId) {

@@ -92,0 +93,0 @@ throw new Error("not implemented");

@@ -80,5 +80,7 @@ import * as A from "@automerge/automerge/next";

if (!pendingCallbacks) {
this.#onLoadSyncState(peerId).then(syncState => {
this.#onLoadSyncState(peerId)
.then(syncState => {
this.#initSyncState(peerId, syncState ?? A.initSyncState());
}).catch(err => {
})
.catch(err => {
this.#log(`Error loading sync state for ${peerId}: ${err}`);

@@ -180,7 +182,9 @@ });

this.#setSyncState(peerId, reparsedSyncState);
docPromise.then(doc => {
docPromise
.then(doc => {
if (doc) {
this.#sendSyncMessage(peerId, doc);
}
}).catch(err => {
})
.catch(err => {
this.#log(`Error loading doc for ${peerId}: ${err}`);

@@ -240,5 +244,5 @@ });

this.#processAllPendingSyncMessages();
this.#processSyncMessage(message, new Date());
this.#processSyncMessage(message);
}
#processSyncMessage(message, received) {
#processSyncMessage(message) {
if (isRequestMessage(message)) {

@@ -284,3 +288,3 @@ this.#peerDocumentStatuses[message.senderId] = "wants";

for (const message of this.#pendingSyncMessages) {
this.#processSyncMessage(message.message, message.received);
this.#processSyncMessage(message.message);
}

@@ -287,0 +291,0 @@ this.#pendingSyncMessages = [];

import { EventEmitter } from "eventemitter3";
import { MessageContents, OpenDocMessage, RepoMessage, SyncStateMessage } from "../network/messages.js";
import { MessageContents, OpenDocMessage, RepoMessage } from "../network/messages.js";
import { SyncState } from "@automerge/automerge";
import { PeerId, DocumentId } from "../types.js";
export declare abstract class Synchronizer extends EventEmitter<SynchronizerEvents> {

@@ -7,6 +9,12 @@ abstract receiveMessage(message: RepoMessage): void;

export interface SynchronizerEvents {
message: (arg: MessageContents) => void;
"sync-state": (arg: SyncStateMessage) => void;
message: (payload: MessageContents) => void;
"sync-state": (payload: SyncStatePayload) => void;
"open-doc": (arg: OpenDocMessage) => void;
}
/** Notify the repo that the sync state has changed */
export interface SyncStatePayload {
peerId: PeerId;
documentId: DocumentId;
syncState: SyncState;
}
//# sourceMappingURL=Synchronizer.d.ts.map
{
"name": "@automerge/automerge-repo",
"version": "1.1.0-alpha.7",
"version": "1.1.0-alpha.13",
"description": "A repository object to manage a collection of automerge documents",

@@ -12,5 +12,4 @@ "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo",

"build": "tsc",
"lint": "eslint --ext .ts src",
"watch": "npm-watch build",
"test:coverage": "c8 --reporter=lcov --reporter=html --reporter=text yarn test",
"test:coverage": "c8 --reporter=lcov --reporter=html --reporter=text pnpm test",
"test": "vitest",

@@ -60,3 +59,3 @@ "test:watch": "npm-watch test",

},
"gitHead": "9a4711e39c93273d992c5686257246ddfaaafddd"
"gitHead": "f4ce1376d900ad98f00a638626be9611077460b5"
}

@@ -49,3 +49,5 @@ # Automerge Repo

- `import(binary: Uint8Array)`
Imports a document binary (from `Automerge.save(doc)`) into the repo, returning a new handle
Imports a document binary (from `export()` or `Automerge.save(doc)`) into the repo, returning a new handle
- `export(docId: DocumentId)`
Exports the document. Returns a Promise containing either the Uint8Array of the document or undefined if the document is currently unavailable. See the [Automerge binary format spec](https://automerge.org/automerge-binary-format-spec/) for more details on the shape of the Uint8Array.
- `.on("document", ({handle: DocHandle}) => void)`

@@ -52,0 +54,0 @@ Registers a callback to be fired each time a new document is loaded or created.

@@ -16,3 +16,3 @@ import type {

const regex = new RegExp(`^${urlPrefix}(\\w+)$`)
const [_, docMatch] = url.match(regex) || []
const [, docMatch] = url.match(regex) || []
const documentId = docMatch as DocumentId

@@ -19,0 +19,0 @@ const binaryDocumentId = documentIdToBinary(documentId)

@@ -0,1 +1,3 @@

/* c8 ignore start */
export const pause = (t = 0) =>

@@ -15,1 +17,3 @@ new Promise<void>(resolve => setTimeout(() => resolve(), t))

}
/* c8 ignore end */

@@ -0,1 +1,2 @@

/* c8 ignore start */
/**

@@ -29,1 +30,2 @@ * If `promise` is resolved before `t` ms elapse, the timeout is cleared and the result of the

}
/* c8 ignore end */

@@ -37,3 +37,3 @@ /**

export { NetworkAdapter } from "./network/NetworkAdapter.js"
export { isValidRepoMessage } from "./network/messages.js"
export { isRepoMessage } from "./network/messages.js"
export { StorageAdapter } from "./storage/StorageAdapter.js"

@@ -40,0 +40,0 @@

import { SyncState } from "@automerge/automerge"
import { StorageId } from "../storage/types.js"
import { DocumentId, PeerId, SessionId } from "../types.js"
import { StorageId } from "../storage/types.js"
export type Message = {
type: string
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId
data?: Uint8Array
documentId?: DocumentId
}
/**

@@ -10,7 +24,3 @@ * A sync message for a particular document

type: "sync"
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId

@@ -25,42 +35,38 @@

/** An ephemeral message
/**
* An ephemeral message.
*
* @remarks
* Ephemeral messages are not persisted anywhere and have no particular
* structure. `automerge-repo` will gossip them around, in order to avoid
* eternal loops of ephemeral messages every message has a session ID, which
* is a random number generated by the sender at startup time, and a sequence
* number. The combination of these two things allows us to discard messages
* we have already seen.
* Ephemeral messages are not persisted anywhere. The data property can be used by the application
* as needed. The repo gossips these around.
*
* In order to avoid infinite loops of ephemeral messages, every message has (a) a session ID, which
* is a random number generated by the sender at startup time; and (b) a sequence number. The
* combination of these two things allows us to discard messages we have already seen.
* */
export type EphemeralMessage = {
type: "ephemeral"
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId
/** A sequence number which must be incremented for each message sent by this peer */
/** A sequence number which must be incremented for each message sent by this peer. */
count: number
/** The ID of the session this message is part of. The sequence number for a given session always increases */
/** The ID of the session this message is part of. The sequence number for a given session always increases. */
sessionId: SessionId
/** The document ID this message pertains to */
/** The document ID this message pertains to. */
documentId: DocumentId
/** The actual data of the message */
/** The actual data of the message. */
data: Uint8Array
}
/** Sent by a {@link Repo} to indicate that it does not have the document and none of it's connected peers do either */
/**
* Sent by a {@link Repo} to indicate that it does not have the document and none of its connected
* peers do either.
*/
export type DocumentUnavailableMessage = {
type: "doc-unavailable"
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId

@@ -72,3 +78,4 @@

/** Sent by a {@link Repo} to request a document from a peer
/**
* Sent by a {@link Repo} to request a document from a peer.
*

@@ -81,30 +88,15 @@ * @remarks

type: "request"
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId
/** The initial automerge sync message */
/** The automerge sync message */
data: Uint8Array
/** The document ID this message requests */
/** The document ID of the document this message is for */
documentId: DocumentId
}
/** (anticipating work in progress) */
export type AuthMessage<TPayload = any> = {
type: "auth"
/** The peer ID of the sender of this message */
senderId: PeerId
/** The peer ID of the recipient of this message */
targetId: PeerId
/** The payload of the auth message (up to the specific auth provider) */
payload: TPayload
}
/**
* Sent by a {@link Repo} to add or remove storage IDs from a remote peer's subscription.
*/
export type RemoteSubscriptionControlMessage = {

@@ -114,6 +106,13 @@ type: "remote-subscription-change"

targetId: PeerId
/** The storage IDs to add to the subscription */
add?: StorageId[]
/** The storage IDs to remove from the subscription */
remove?: StorageId[]
}
/**
* Sent by a {@link Repo} to indicate that the heads of a document have changed on a remote peer.
*/
export type RemoteHeadsChanged = {

@@ -123,3 +122,7 @@ type: "remote-heads-changed"

targetId: PeerId
/** The document ID of the document that has changed */
documentId: DocumentId
/** The document's new heads */
newHeads: { [key: StorageId]: { heads: string[]; timestamp: number } }

@@ -137,2 +140,3 @@ }

/** These are message types that are handled by the {@link CollectionSynchronizer}.*/
export type DocMessage =

@@ -144,9 +148,6 @@ | SyncMessage

/** These are all the message types that a {@link NetworkAdapter} might see. */
export type Message = RepoMessage | AuthMessage
/**
* The contents of a message, without the sender ID or other properties added by the {@link NetworkSubsystem})
*/
export type MessageContents<T extends Message = Message> =
export type MessageContents<T extends Message = RepoMessage> =
T extends EphemeralMessage

@@ -171,12 +172,9 @@ ? Omit<T, "senderId" | "count" | "sessionId">

export const isValidRepoMessage = (message: Message): message is RepoMessage =>
typeof message === "object" &&
typeof message.type === "string" &&
typeof message.senderId === "string" &&
(isSyncMessage(message) ||
isEphemeralMessage(message) ||
isRequestMessage(message) ||
isDocumentUnavailableMessage(message) ||
isRemoteSubscriptionControlMessage(message) ||
isRemoteHeadsChanged(message))
export const isRepoMessage = (message: Message): message is RepoMessage =>
isSyncMessage(message) ||
isEphemeralMessage(message) ||
isRequestMessage(message) ||
isDocumentUnavailableMessage(message) ||
isRemoteSubscriptionControlMessage(message) ||
isRemoteHeadsChanged(message)

@@ -196,5 +194,4 @@ // prettier-ignore

export const isRemoteSubscriptionControlMessage = (
msg: Message
): msg is RemoteSubscriptionControlMessage =>
// prettier-ignore
export const isRemoteSubscriptionControlMessage = (msg: Message): msg is RemoteSubscriptionControlMessage =>
msg.type === "remote-subscription-change"

@@ -201,0 +198,0 @@

@@ -0,1 +1,3 @@

/* c8 ignore start */
import { EventEmitter } from "eventemitter3"

@@ -6,7 +8,7 @@ import { PeerId } from "../types.js"

/**
/**
* Describes a peer intent to the system
* storageId: the key for syncState to decide what the other peer already has
* isEphemeral: to decide if we bother recording this peer's sync state
*
*
*/

@@ -13,0 +15,0 @@ export interface PeerMetadata {

@@ -14,3 +14,3 @@ import debug from "debug"

isEphemeralMessage,
isValidRepoMessage,
isRepoMessage,
} from "./messages.js"

@@ -77,3 +77,3 @@

networkAdapter.on("message", msg => {
if (!isValidRepoMessage(msg)) {
if (!isRepoMessage(msg)) {
this.#log(`invalid message: ${JSON.stringify(msg)}`)

@@ -151,3 +151,3 @@ return

const outbound = prepareMessage(message)
this.#log("sending message", outbound)
this.#log("sending message %o", outbound)
peer.send(outbound as RepoMessage)

@@ -154,0 +154,0 @@ }

@@ -10,13 +10,14 @@ import { next as Automerge } from "@automerge/automerge"

import { DocHandle, DocHandleEncodedChangePayload } from "./DocHandle.js"
import { RemoteHeadsSubscriptions } from "./RemoteHeadsSubscriptions.js"
import { headsAreSame } from "./helpers/headsAreSame.js"
import { throttle } from "./helpers/throttle.js"
import { NetworkAdapter, type PeerMetadata } from "./network/NetworkAdapter.js"
import { NetworkSubsystem } from "./network/NetworkSubsystem.js"
import { RepoMessage } from "./network/messages.js"
import { StorageAdapter } from "./storage/StorageAdapter.js"
import { StorageSubsystem } from "./storage/StorageSubsystem.js"
import { StorageId } from "./storage/types.js"
import { CollectionSynchronizer } from "./synchronizer/CollectionSynchronizer.js"
import { SyncStatePayload } from "./synchronizer/Synchronizer.js"
import type { AnyDocumentId, DocumentId, PeerId } from "./types.js"
import { RepoMessage, SyncStateMessage } from "./network/messages.js"
import { StorageId } from "./storage/types.js"
import { RemoteHeadsSubscriptions } from "./RemoteHeadsSubscriptions.js"
import { headsAreSame } from "./helpers/headsAreSame.js"

@@ -56,2 +57,3 @@ /** A Repo is a collection of documents with networking, syncing, and storage capabilities. */

#remoteHeadsSubscriptions = new RemoteHeadsSubscriptions()
#remoteHeadsGossipingEnabled = false

@@ -64,4 +66,6 @@ constructor({

isEphemeral = storage === undefined,
enableRemoteHeadsGossiping = false,
}: RepoConfig) {
super()
this.#remoteHeadsGossipingEnabled = enableRemoteHeadsGossiping
this.#log = debug(`automerge-repo:repo`)

@@ -83,6 +87,3 @@ this.sharePolicy = sharePolicy ?? this.sharePolicy

}
handle.on(
"heads-changed",
throttle(saveFn, this.saveDebounceRate)
)
handle.on("heads-changed", throttle(saveFn, this.saveDebounceRate))

@@ -147,5 +148,7 @@ if (isNew) {

this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
})
if (this.#remoteHeadsGossipingEnabled) {
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
})
}

@@ -162,3 +165,3 @@ // STORAGE

// eslint-disable-next-line no-async-promise-executor -- TODO: fix
async (resolve) =>
async resolve =>
resolve({

@@ -187,3 +190,3 @@ storageId: await storageSubsystem?.id(),

.then(shouldShare => {
if (shouldShare) {
if (shouldShare && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.addGenerousPeer(peerId)

@@ -228,3 +231,3 @@ }

if (storageId) {
if (storageId && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged(

@@ -239,32 +242,34 @@ message.documentId,

this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
},
},
},
})
})
})
this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message)
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
})
}
})
this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message)
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
})
}
})
this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId]
handle.setRemoteHeads(message.storageId, message.remoteHeads)
})
this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId]
handle.setRemoteHeads(message.storageId, message.remoteHeads)
})
}
}

@@ -275,6 +280,10 @@

case "remote-subscription-change":
this.#remoteHeadsSubscriptions.handleControlMessage(message)
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleControlMessage(message)
}
break
case "remote-heads-changed":
this.#remoteHeadsSubscriptions.handleRemoteHeads(message)
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleRemoteHeads(message)
}
break

@@ -293,7 +302,7 @@ case "sync":

StorageId,
(message: SyncStateMessage) => void
(payload: SyncStatePayload) => void
> = {}
/** saves sync state throttled per storage id, if a peer doesn't have a storage id it's sync state is not persisted */
#saveSyncState(message: SyncStateMessage) {
#saveSyncState(payload: SyncStatePayload) {
if (!this.storageSubsystem) {

@@ -304,3 +313,3 @@ return

const { storageId, isEphemeral } =
this.peerMetadataByPeerId[message.peerId] || {}
this.peerMetadataByPeerId[payload.peerId] || {}

@@ -314,7 +323,8 @@ if (!storageId || isEphemeral) {

handler = this.#throttledSaveSyncStateHandlers[storageId] = throttle(
({ documentId, syncState }: SyncStateMessage) => {
this.storageSubsystem!.saveSyncState(documentId, storageId, syncState)
.catch(err => {
this.#log("error saving sync state", { err })
})
({ documentId, syncState }: SyncStatePayload) => {
void this.storageSubsystem!.saveSyncState(
documentId,
storageId,
syncState
)
},

@@ -325,3 +335,3 @@ this.saveDebounceRate

handler(message)
handler(payload)
}

@@ -472,2 +482,18 @@

/**
* Exports a document to a binary format.
* @param id - The url or documentId of the handle to export
*
* @returns Promise<Uint8Array | undefined> - A Promise containing the binary document,
* or undefined if the document is unavailable.
*/
async export(id: AnyDocumentId): Promise<Uint8Array | undefined> {
const documentId = interpretAsDocumentId(id)
const handle = this.#getHandle(documentId, false)
const doc = await handle.doc()
if (!doc) return undefined
return Automerge.save(doc)
}
/**
* Imports document binary into the repo.

@@ -489,4 +515,10 @@ * @param binary - The binary to import

subscribeToRemotes = (remotes: StorageId[]) => {
this.#log("subscribeToRemotes", { remotes })
this.#remoteHeadsSubscriptions.subscribeToRemotes(remotes)
if (this.#remoteHeadsGossipingEnabled) {
this.#log("subscribeToRemotes", { remotes })
this.#remoteHeadsSubscriptions.subscribeToRemotes(remotes)
} else {
this.#log(
"WARN: subscribeToRemotes called but remote heads gossiping is not enabled"
)
}
}

@@ -522,2 +554,7 @@

sharePolicy?: SharePolicy
/**
* Whether to enable the experimental remote heads gossiping feature
*/
enableRemoteHeadsGossiping?: boolean
}

@@ -524,0 +561,0 @@

@@ -120,2 +120,3 @@ import debug from "debug"

// TODO: implement this
// eslint-disable-next-line @typescript-eslint/no-unused-vars
removeDocument(documentId: DocumentId) {

@@ -122,0 +123,0 @@ throw new Error("not implemented")

@@ -140,7 +140,9 @@ import * as A from "@automerge/automerge/next"

if (!pendingCallbacks) {
this.#onLoadSyncState(peerId).then(syncState => {
this.#initSyncState(peerId, syncState ?? A.initSyncState())
}).catch(err => {
this.#log(`Error loading sync state for ${peerId}: ${err}`)
})
this.#onLoadSyncState(peerId)
.then(syncState => {
this.#initSyncState(peerId, syncState ?? A.initSyncState())
})
.catch(err => {
this.#log(`Error loading sync state for ${peerId}: ${err}`)
})
pendingCallbacks = this.#pendingSyncStateCallbacks[peerId] = []

@@ -266,9 +268,11 @@ }

docPromise.then(doc => {
if (doc) {
this.#sendSyncMessage(peerId, doc)
}
}).catch(err => {
this.#log(`Error loading doc for ${peerId}: ${err}`)
})
docPromise
.then(doc => {
if (doc) {
this.#sendSyncMessage(peerId, doc)
}
})
.catch(err => {
this.#log(`Error loading doc for ${peerId}: ${err}`)
})
})

@@ -335,6 +339,6 @@ })

this.#processAllPendingSyncMessages()
this.#processSyncMessage(message, new Date())
this.#processSyncMessage(message)
}
#processSyncMessage(message: SyncMessage | RequestMessage, received: Date) {
#processSyncMessage(message: SyncMessage | RequestMessage) {
if (isRequestMessage(message)) {

@@ -398,3 +402,3 @@ this.#peerDocumentStatuses[message.senderId] = "wants"

for (const message of this.#pendingSyncMessages) {
this.#processSyncMessage(message.message, message.received)
this.#processSyncMessage(message.message)
}

@@ -401,0 +405,0 @@

@@ -6,4 +6,5 @@ import { EventEmitter } from "eventemitter3"

RepoMessage,
SyncStateMessage,
} from "../network/messages.js"
import { SyncState } from "@automerge/automerge"
import { PeerId, DocumentId } from "../types.js"

@@ -15,5 +16,12 @@ export abstract class Synchronizer extends EventEmitter<SynchronizerEvents> {

export interface SynchronizerEvents {
message: (arg: MessageContents) => void
"sync-state": (arg: SyncStateMessage) => void
message: (payload: MessageContents) => void
"sync-state": (payload: SyncStatePayload) => void
"open-doc": (arg: OpenDocMessage) => void
}
/** Notify the repo that the sync state has changed */
export interface SyncStatePayload {
peerId: PeerId
documentId: DocumentId
syncState: SyncState
}

@@ -1,2 +0,2 @@

import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel"
import { MessageChannelNetworkAdapter } from "../../automerge-repo-network-messagechannel/dist/index.js"
import * as A from "@automerge/automerge/next"

@@ -27,2 +27,3 @@ import assert from "assert"

storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})

@@ -50,2 +51,3 @@ const bobStorageId = await bobRepo.storageId()

sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})

@@ -56,2 +58,3 @@ const leftTab2 = new Repo({

sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})

@@ -64,2 +67,3 @@ const leftServiceWorker = new Repo({

isEphemeral: false,
enableRemoteHeadsGossiping: true,
})

@@ -72,2 +76,3 @@ const syncServer = new Repo({

storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})

@@ -80,2 +85,3 @@ const rightServiceWorker = new Repo({

storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})

@@ -86,2 +92,3 @@ const rightTab = new Repo({

sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})

@@ -240,3 +247,3 @@

console.log(JSON.stringify(remoteHeadsChangedMessages, null, 2))
// console.log(JSON.stringify(remoteHeadsChangedMessages, null, 2))

@@ -243,0 +250,0 @@ assert.strictEqual(remoteHeadsChangedMessages.length, 1)

import { next as A } from "@automerge/automerge"
import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel"
import { MessageChannelNetworkAdapter } from "../../automerge-repo-network-messagechannel/src/index.js"
import assert from "assert"

@@ -16,2 +16,3 @@ import * as Uuid from "uuid"

import {
AnyDocumentId,
AutomergeUrl,

@@ -35,2 +36,11 @@ DocHandle,

describe("Repo", () => {
describe("constructor", () => {
it("can be instantiated without network adapters", () => {
const repo = new Repo({
network: [],
})
expect(repo).toBeInstanceOf(Repo)
})
})
describe("local only", () => {

@@ -326,2 +336,23 @@ const setup = ({ startReady = true } = {}) => {

it("exports a document", async () => {
const { repo } = setup()
const handle = repo.create<TestDoc>()
handle.change(d => {
d.foo = "bar"
})
assert.equal(handle.isReady(), true)
const exported = await repo.export(handle.documentId)
const loaded = A.load(exported)
const doc = await handle.doc()
assert.deepEqual(doc, loaded)
})
it("rejects when exporting a document that does not exist", async () => {
const { repo } = setup()
assert.rejects(async () => {
await repo.export("foo" as AnyDocumentId)
})
})
it("storage state doesn't change across reloads when the document hasn't changed", async () => {

@@ -328,0 +359,0 @@ const storage = new DummyStorageAdapter()

@@ -1,2 +0,2 @@

import { NodeFSStorageAdapter } from "@automerge/automerge-repo-storage-nodefs"
import { NodeFSStorageAdapter } from "../../automerge-repo-storage-nodefs/src/index.js"
import * as A from "@automerge/automerge/next"

@@ -3,0 +3,0 @@ import assert from "assert"

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc