Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@automerge/automerge-repo

Package Overview
Dependencies
Maintainers
4
Versions
69
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@automerge/automerge-repo - npm Package Compare versions

Comparing version 1.1.0-alpha.3 to 1.1.0-alpha.4

test/helpers/waitForMessages.ts

5

dist/network/messages.d.ts

@@ -114,2 +114,7 @@ import { SyncState } from "@automerge/automerge";

}
/** Notify the repo that a peer started syncing with a doc */
export interface OpenDocMessage {
peerId: PeerId;
documentId: DocumentId;
}
export declare const isValidRepoMessage: (message: Message) => message is RepoMessage;

@@ -116,0 +121,0 @@ export declare const isDocumentUnavailableMessage: (msg: Message) => msg is DocumentUnavailableMessage;

1

dist/RemoteHeadsSubscriptions.d.ts

@@ -39,4 +39,5 @@ import { next as A } from "@automerge/automerge";

removePeer(peerId: PeerId): void;
subscribePeerToDoc(peerId: PeerId, documentId: DocumentId): void;
}
export {};
//# sourceMappingURL=RemoteHeadsSubscriptions.d.ts.map

@@ -12,2 +12,4 @@ import { EventEmitter } from "eventemitter3";

#generousPeers = new Set();
// Documents each peer has open, we need this information so we only send remote heads of documents that the peer knows
#subscribedDocsByPeer = new Map();
#log = debug("automerge-repo:remote-heads-subscriptions");

@@ -51,2 +53,3 @@ subscribeToRemotes(remotes) {

const remotesToRemove = [];
const addedRemotesWeKnow = [];
this.#log("handleControlMessage", control);

@@ -56,2 +59,5 @@ if (control.add) {

let theirSubs = this.#theirSubscriptions.get(remote);
if (this.#ourSubscriptions.has(remote) || theirSubs) {
addedRemotesWeKnow.push(remote);
}
if (!theirSubs) {

@@ -86,2 +92,24 @@ theirSubs = new Set();

}
// send all our stored heads of documents the peer knows for the remotes they've added
for (const remote of addedRemotesWeKnow) {
const subscribedDocs = this.#subscribedDocsByPeer.get(control.senderId);
if (subscribedDocs) {
for (const documentId of subscribedDocs) {
const knownHeads = this.#knownHeads.get(documentId);
if (!knownHeads) {
continue;
}
const lastHeads = knownHeads.get(remote);
if (lastHeads) {
this.emit("notify-remote-heads", {
targetId: control.senderId,
documentId,
heads: lastHeads.heads,
timestamp: lastHeads.timestamp,
storageId: remote,
});
}
}
}
}
}

@@ -119,9 +147,11 @@ /** A peer we are not directly connected to has changed their heads */

for (const peerId of theirSubs) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: event.documentId,
heads: event.remoteHeads,
timestamp: event.timestamp,
storageId: event.storageId,
});
if (this.#isPeerSubscribedToDoc(peerId, event.documentId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: event.documentId,
heads: event.remoteHeads,
timestamp: event.timestamp,
storageId: event.storageId,
});
}
}

@@ -148,9 +178,11 @@ }

for (const peerId of theirSubs) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: documentId,
heads: heads,
timestamp: timestamp,
storageId: storageId,
});
if (this.#isPeerSubscribedToDoc(peerId, documentId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: documentId,
heads: heads,
timestamp: timestamp,
storageId: storageId,
});
}
}

@@ -184,2 +216,3 @@ }

this.#generousPeers.delete(peerId);
this.#subscribedDocsByPeer.delete(peerId);
for (const [storageId, peerIds] of this.#theirSubscriptions) {

@@ -201,2 +234,29 @@ if (peerIds.has(peerId)) {

}
subscribePeerToDoc(peerId, documentId) {
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId);
if (!subscribedDocs) {
subscribedDocs = new Set();
this.#subscribedDocsByPeer.set(peerId, subscribedDocs);
}
subscribedDocs.add(documentId);
const remoteHeads = this.#knownHeads.get(documentId);
if (remoteHeads) {
for (const [storageId, lastHeads] of remoteHeads) {
const subscribedPeers = this.#theirSubscriptions.get(storageId);
if (subscribedPeers && subscribedPeers.has(peerId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId,
heads: lastHeads.heads,
timestamp: lastHeads.timestamp,
storageId,
});
}
}
}
}
#isPeerSubscribedToDoc(peerId, documentId) {
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId);
return subscribedDocs && subscribedDocs.has(documentId);
}
/** Returns the (document, storageId) pairs which have changed after processing msg */

@@ -203,0 +263,0 @@ #changedHeads(msg) {

@@ -37,2 +37,3 @@ import { EventEmitter } from "eventemitter3";

get peers(): PeerId[];
getStorageIdOfPeer(peerId: PeerId): StorageId | undefined;
/**

@@ -39,0 +40,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is

@@ -104,2 +104,5 @@ import { next as Automerge } from "@automerge/automerge";

});
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId);
});
// STORAGE

@@ -248,2 +251,5 @@ // The storage subsystem has access to some form of persistence, and deals with save and loading documents.

}
getStorageIdOfPeer(peerId) {
return this.peerMetadataByPeerId[peerId]?.storageId;
}
/**

@@ -250,0 +256,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is

@@ -43,2 +43,3 @@ import debug from "debug";

docSynchronizer.on("message", event => this.emit("message", event));
docSynchronizer.on("open-doc", event => this.emit("open-doc", event));
docSynchronizer.on("sync-state", event => this.emit("sync-state", event));

@@ -45,0 +46,0 @@ return docSynchronizer;

10

dist/synchronizer/DocSynchronizer.js

@@ -69,5 +69,3 @@ import * as A from "@automerge/automerge/next";

#withSyncState(peerId, callback) {
if (!this.#peers.includes(peerId)) {
this.#peers.push(peerId);
}
this.#addPeer(peerId);
if (!(peerId in this.#peerDocumentStatuses)) {

@@ -90,2 +88,8 @@ this.#peerDocumentStatuses[peerId] = "unknown";

}
#addPeer(peerId) {
if (!this.#peers.includes(peerId)) {
this.#peers.push(peerId);
this.emit("open-doc", { documentId: this.documentId, peerId });
}
}
#initSyncState(peerId, syncState) {

@@ -92,0 +96,0 @@ const pendingCallbacks = this.#pendingSyncStateCallbacks[peerId];

import { EventEmitter } from "eventemitter3";
import { MessageContents, RepoMessage, SyncStateMessage } from "../network/messages.js";
import { MessageContents, OpenDocMessage, RepoMessage, SyncStateMessage } from "../network/messages.js";
export declare abstract class Synchronizer extends EventEmitter<SynchronizerEvents> {

@@ -9,3 +9,4 @@ abstract receiveMessage(message: RepoMessage): void;

"sync-state": (arg: SyncStateMessage) => void;
"open-doc": (arg: OpenDocMessage) => void;
}
//# sourceMappingURL=Synchronizer.d.ts.map
{
"name": "@automerge/automerge-repo",
"version": "1.1.0-alpha.3",
"version": "1.1.0-alpha.4",
"description": "A repository object to manage a collection of automerge documents",

@@ -57,3 +57,3 @@ "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo",

},
"gitHead": "0d76620579403005a01d4205ee15fd08a85d445c"
"gitHead": "4361fd01d482d5d227bd679bfd79f9e32a8fe9cc"
}

@@ -107,15 +107,15 @@ import { SyncState } from "@automerge/automerge"

export type RemoteSubscriptionControlMessage = {
type: "remote-subscription-change",
senderId: PeerId,
targetId: PeerId,
add?: StorageId[],
remove?: StorageId[],
type: "remote-subscription-change"
senderId: PeerId
targetId: PeerId
add?: StorageId[]
remove?: StorageId[]
}
export type RemoteHeadsChanged = {
type: "remote-heads-changed",
senderId: PeerId,
targetId: PeerId,
documentId: DocumentId,
newHeads: {[key: StorageId]: {heads: string[], timestamp: number}},
type: "remote-heads-changed"
senderId: PeerId
targetId: PeerId
documentId: DocumentId
newHeads: { [key: StorageId]: { heads: string[]; timestamp: number } }
}

@@ -132,3 +132,7 @@

export type DocMessage = SyncMessage | EphemeralMessage | RequestMessage | DocumentUnavailableMessage
export type DocMessage =
| SyncMessage
| EphemeralMessage
| RequestMessage
| DocumentUnavailableMessage

@@ -153,2 +157,8 @@ /** These are all the message types that a {@link NetworkAdapter} might see. */

/** Notify the repo that a peer started syncing with a doc */
export interface OpenDocMessage {
peerId: PeerId
documentId: DocumentId
}
// TYPE GUARDS

@@ -180,3 +190,5 @@

export const isRemoteSubscriptionControlMessage = (msg: Message): msg is RemoteSubscriptionControlMessage =>
export const isRemoteSubscriptionControlMessage = (
msg: Message
): msg is RemoteSubscriptionControlMessage =>
msg.type === "remote-subscription-change"

@@ -183,0 +195,0 @@

@@ -47,2 +47,5 @@ import { next as A } from "@automerge/automerge"

#generousPeers: Set<PeerId> = new Set()
// Documents each peer has open, we need this information so we only send remote heads of documents that the peer knows
#subscribedDocsByPeer: Map<PeerId, Set<DocumentId>> = new Map()
#log = debug("automerge-repo:remote-heads-subscriptions")

@@ -93,2 +96,3 @@

const remotesToRemove: StorageId[] = []
const addedRemotesWeKnow: StorageId[] = []

@@ -99,2 +103,7 @@ this.#log("handleControlMessage", control)

let theirSubs = this.#theirSubscriptions.get(remote)
if (this.#ourSubscriptions.has(remote) || theirSubs) {
addedRemotesWeKnow.push(remote)
}
if (!theirSubs) {

@@ -134,2 +143,26 @@ theirSubs = new Set()

}
// send all our stored heads of documents the peer knows for the remotes they've added
for (const remote of addedRemotesWeKnow) {
const subscribedDocs = this.#subscribedDocsByPeer.get(control.senderId)
if (subscribedDocs) {
for (const documentId of subscribedDocs) {
const knownHeads = this.#knownHeads.get(documentId)
if (!knownHeads) {
continue
}
const lastHeads = knownHeads.get(remote)
if (lastHeads) {
this.emit("notify-remote-heads", {
targetId: control.senderId,
documentId,
heads: lastHeads.heads,
timestamp: lastHeads.timestamp,
storageId: remote,
})
}
}
}
}
}

@@ -172,9 +205,11 @@

for (const peerId of theirSubs) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: event.documentId,
heads: event.remoteHeads,
timestamp: event.timestamp,
storageId: event.storageId,
})
if (this.#isPeerSubscribedToDoc(peerId, event.documentId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: event.documentId,
heads: event.remoteHeads,
timestamp: event.timestamp,
storageId: event.storageId,
})
}
}

@@ -208,9 +243,11 @@ }

for (const peerId of theirSubs) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: documentId,
heads: heads,
timestamp: timestamp,
storageId: storageId,
})
if (this.#isPeerSubscribedToDoc(peerId, documentId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId: documentId,
heads: heads,
timestamp: timestamp,
storageId: storageId,
})
}
}

@@ -250,2 +287,3 @@ }

this.#generousPeers.delete(peerId)
this.#subscribedDocsByPeer.delete(peerId)

@@ -271,2 +309,33 @@ for (const [storageId, peerIds] of this.#theirSubscriptions) {

subscribePeerToDoc(peerId: PeerId, documentId: DocumentId) {
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId)
if (!subscribedDocs) {
subscribedDocs = new Set()
this.#subscribedDocsByPeer.set(peerId, subscribedDocs)
}
subscribedDocs.add(documentId)
const remoteHeads = this.#knownHeads.get(documentId)
if (remoteHeads) {
for (const [storageId, lastHeads] of remoteHeads) {
const subscribedPeers = this.#theirSubscriptions.get(storageId)
if (subscribedPeers && subscribedPeers.has(peerId)) {
this.emit("notify-remote-heads", {
targetId: peerId,
documentId,
heads: lastHeads.heads,
timestamp: lastHeads.timestamp,
storageId,
})
}
}
}
}
#isPeerSubscribedToDoc(peerId: PeerId, documentId: DocumentId) {
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId)
return subscribedDocs && subscribedDocs.has(documentId)
}
/** Returns the (document, storageId) pairs which have changed after processing msg */

@@ -273,0 +342,0 @@ #changedHeads(msg: RemoteHeadsChanged): {

@@ -143,2 +143,6 @@ import { next as Automerge } from "@automerge/automerge"

this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
})
// STORAGE

@@ -336,2 +340,6 @@ // The storage subsystem has access to some form of persistence, and deals with save and loading documents.

getStorageIdOfPeer(peerId: PeerId): StorageId | undefined {
return this.peerMetadataByPeerId[peerId]?.storageId
}
/**

@@ -338,0 +346,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is

@@ -58,2 +58,3 @@ import debug from "debug"

docSynchronizer.on("message", event => this.emit("message", event))
docSynchronizer.on("open-doc", event => this.emit("open-doc", event))
docSynchronizer.on("sync-state", event => this.emit("sync-state", event))

@@ -60,0 +61,0 @@ return docSynchronizer

@@ -23,3 +23,2 @@ import * as A from "@automerge/automerge/next"

import { throttle } from "../helpers/throttle.js"
import { headsAreSame } from "../helpers/headsAreSame.js"

@@ -128,5 +127,3 @@ type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants"

#withSyncState(peerId: PeerId, callback: (syncState: A.SyncState) => void) {
if (!this.#peers.includes(peerId)) {
this.#peers.push(peerId)
}
this.#addPeer(peerId)

@@ -154,2 +151,9 @@ if (!(peerId in this.#peerDocumentStatuses)) {

#addPeer(peerId: PeerId) {
if (!this.#peers.includes(peerId)) {
this.#peers.push(peerId)
this.emit("open-doc", { documentId: this.documentId, peerId })
}
}
#initSyncState(peerId: PeerId, syncState: A.SyncState) {

@@ -156,0 +160,0 @@ const pendingCallbacks = this.#pendingSyncStateCallbacks[peerId]

import { EventEmitter } from "eventemitter3"
import {
MessageContents,
OpenDocMessage,
RepoMessage,

@@ -15,2 +16,3 @@ SyncStateMessage,

"sync-state": (arg: SyncStateMessage) => void
"open-doc": (arg: OpenDocMessage) => void
}

@@ -0,8 +1,8 @@

import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel"
import * as A from "@automerge/automerge/next"
import assert from "assert"
import { decode } from "cbor-x"
import { setTimeout } from "timers/promises"
import { describe, it } from "vitest"
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js"
import { eventPromise } from "../src/helpers/eventPromise.js"
import { pause } from "../src/helpers/pause.js"
import {

@@ -14,6 +14,5 @@ DocHandle,

} from "../src/index.js"
import { DummyStorageAdapter } from "./helpers/DummyStorageAdapter.js"
import { waitForMessages } from "./helpers/waitForMessages.js"
import { TestDoc } from "./types.js"
import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel"
import { setTimeout } from "timers/promises"
import { DummyStorageAdapter } from "./helpers/DummyStorageAdapter.js"

@@ -44,82 +43,203 @@ describe("DocHandle.remoteHeads", () => {

it("should report remoteHeads for peers who are several hops away", async () => {
// replicates a tab -> service worker -> sync server <- service worker <- tab scenario
const leftTab = new Repo({
peerId: "left-tab" as PeerId,
network: [],
sharePolicy: async () => true,
describe("multi hop sync", () => {
async function setup() {
// setup topology: tab -> service worker -> sync server <- service worker <- tab
const leftTab1 = new Repo({
peerId: "left-tab-1" as PeerId,
network: [],
sharePolicy: async () => true,
})
const leftTab2 = new Repo({
peerId: "left-tab-2" as PeerId,
network: [],
sharePolicy: async () => true,
})
const leftServiceWorker = new Repo({
peerId: "left-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
storage: new DummyStorageAdapter(),
isEphemeral: false,
})
const syncServer = new Repo({
peerId: "sync-server" as PeerId,
network: [],
isEphemeral: false,
sharePolicy: async () => false,
storage: new DummyStorageAdapter(),
})
const rightServiceWorker = new Repo({
peerId: "right-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
isEphemeral: false,
storage: new DummyStorageAdapter(),
})
const rightTab = new Repo({
peerId: "right-tab" as PeerId,
network: [],
sharePolicy: async () => true,
})
// connect them all up
connectRepos(leftTab1, leftServiceWorker)
connectRepos(leftTab2, leftServiceWorker)
connectRepos(leftServiceWorker, syncServer)
connectRepos(syncServer, rightServiceWorker)
connectRepos(rightServiceWorker, rightTab)
await setTimeout(100)
return {
leftTab1,
leftTab2,
leftServiceWorker,
syncServer,
rightServiceWorker,
rightTab,
}
}
it("should report remoteHeads for peers", async () => {
const { rightTab, rightServiceWorker, leftServiceWorker, leftTab1 } =
await setup()
// subscribe to the left service worker storage ID on the right tab
rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!])
await setTimeout(100)
// create a doc in the left tab
const leftTabDoc = leftTab1.create<TestDoc>()
leftTabDoc.change(d => (d.foo = "bar"))
// wait for the document to arrive on the right tab
const rightTabDoc = rightTab.find<TestDoc>(leftTabDoc.url)
await rightTabDoc.whenReady()
// wait for the document to arrive in the left service worker
const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId)
await leftServiceWorkerDoc.whenReady()
const leftServiceWorkerStorageId = await leftServiceWorker.storageId()
let leftSeenByRightPromise = new Promise<DocHandleRemoteHeadsPayload>(
resolve => {
rightTabDoc.on("remote-heads", message => {
if (message.storageId === leftServiceWorkerStorageId) {
resolve(message)
}
})
}
)
// make a change on the right
rightTabDoc.change(d => (d.foo = "baz"))
// wait for the change to be acknolwedged by the left
const leftSeenByRight = await leftSeenByRightPromise
assert.deepStrictEqual(
leftSeenByRight.heads,
A.getHeads(leftServiceWorkerDoc.docSync())
)
})
const leftServiceWorker = new Repo({
peerId: "left-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
storage: new DummyStorageAdapter(),
isEphemeral: false,
it("should report remoteHeads only for documents the subscriber has open", async () => {
const { leftTab1, rightTab, rightServiceWorker } = await setup()
// subscribe leftTab to storageId of rightServiceWorker
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!])
await setTimeout(100)
// create 2 docs in right tab
const rightTabDocA = rightTab.create<TestDoc>()
rightTabDocA.change(d => (d.foo = "A"))
const rightTabDocB = rightTab.create<TestDoc>()
rightTabDocB.change(d => (d.foo = "B"))
// open doc b in left tab 1
const leftTabDocA = leftTab1.find<TestDoc>(rightTabDocA.url)
const remoteHeadsChangedMessages = (
await waitForMessages(leftTab1.networkSubsystem, "message")
).filter(({ type }) => type === "remote-heads-changed")
// we should only be notified of the head changes of doc A
assert.strictEqual(remoteHeadsChangedMessages.length, 1)
assert.strictEqual(
remoteHeadsChangedMessages[0].documentId,
leftTabDocA.documentId
)
})
const syncServer = new Repo({
peerId: "sync-server" as PeerId,
network: [],
isEphemeral: false,
sharePolicy: async () => false,
storage: new DummyStorageAdapter(),
})
const rightServiceWorker = new Repo({
peerId: "right-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
isEphemeral: false,
storage: new DummyStorageAdapter(),
})
const rightTab = new Repo({
peerId: "right-tab" as PeerId,
network: [],
sharePolicy: async () => true,
})
// connect them all up
connectRepos(leftTab, leftServiceWorker)
connectRepos(leftServiceWorker, syncServer)
connectRepos(syncServer, rightServiceWorker)
connectRepos(rightServiceWorker, rightTab)
it("should report remote heads for doc on subscribe if peer already knows them", async () => {
const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup()
await setTimeout(100)
// create 2 docs in right tab
const rightTabDocA = rightTab.create<TestDoc>()
rightTabDocA.change(d => (d.foo = "A"))
// subscribe to the left service worker storage ID on the right tab
rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!])
const rightTabDocB = rightTab.create<TestDoc>()
rightTabDocB.change(d => (d.foo = "B"))
await setTimeout(100)
// open docs in left tab 1
const leftTab1DocA = leftTab1.find<TestDoc>(rightTabDocA.url)
const leftTab1DocB = leftTab1.find<TestDoc>(rightTabDocB.url)
// create a doc in the left tab
const leftTabDoc = leftTab.create<TestDoc>()
leftTabDoc.change(d => (d.foo = "bar"))
// subscribe leftTab 1 to storageId of rightServiceWorker
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!])
// wait for the document to arrive on the right tab
const rightTabDoc = rightTab.find<TestDoc>(leftTabDoc.url)
await rightTabDoc.whenReady()
await setTimeout(200)
// wait for the document to arrive in the left service worker
const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId)
await leftServiceWorkerDoc.whenReady()
// now the left service worker has the remote heads of the right service worker for both doc A and doc B
// if we subscribe from left tab 1 the left service workers should send it's stored remote heads immediately
const leftServiceWorkerStorageId = await leftServiceWorker.storageId()
let leftSeenByRightPromise = new Promise<DocHandleRemoteHeadsPayload>(
resolve => {
rightTabDoc.on("remote-heads", message => {
if (message.storageId === leftServiceWorkerStorageId) {
resolve(message)
}
})
}
)
// open doc and subscribe leftTab 2 to storageId of rightServiceWorker
const leftTab2DocA = leftTab2.find<TestDoc>(rightTabDocA.url)
leftTab2.subscribeToRemotes([await rightServiceWorker.storageId()!])
// make a change on the right
rightTabDoc.change(d => (d.foo = "baz"))
const remoteHeadsChangedMessages = (
await waitForMessages(leftTab2.networkSubsystem, "message")
).filter(({ type }) => type === "remote-heads-changed")
// wait for the change to be acknolwedged by the left
const leftSeenByRight = await leftSeenByRightPromise
// we should only be notified of the head changes of doc A
assert.strictEqual(remoteHeadsChangedMessages.length, 1)
assert.strictEqual(
remoteHeadsChangedMessages[0].documentId,
leftTab1DocA.documentId
)
})
assert.deepStrictEqual(
leftSeenByRight.heads,
A.getHeads(leftServiceWorkerDoc.docSync())
)
it("should report remote heads for subscribed storage id once we open a new doc", async () => {
const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup()
// create 2 docs in right tab
const rightTabDocA = rightTab.create<TestDoc>()
rightTabDocA.change(d => (d.foo = "A"))
const rightTabDocB = rightTab.create<TestDoc>()
rightTabDocB.change(d => (d.foo = "B"))
await setTimeout(200)
// subscribe leftTab 1 to storageId of rightServiceWorker
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!])
// in leftTab 1 open doc A
const leftTab1DocA = leftTab1.find<TestDoc>(rightTabDocA.url)
const remoteHeadsChangedMessages = (
await waitForMessages(leftTab1.networkSubsystem, "message")
).filter(({ type }) => type === "remote-heads-changed")
console.log(JSON.stringify(remoteHeadsChangedMessages, null, 2))
assert.strictEqual(remoteHeadsChangedMessages.length, 1)
assert.strictEqual(
remoteHeadsChangedMessages[0].documentId,
leftTab1DocA.documentId
)
})
})

@@ -126,0 +246,0 @@ })

import * as A from "@automerge/automerge"
import assert from "assert"
import { describe, it } from "vitest"
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js"
import { RemoteHeadsSubscriptions } from "../src/RemoteHeadsSubscriptions.js"
import { PeerId, StorageId } from "../src/index.js"
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js"
import { pause } from "../src/helpers/pause.js"
import { EventEmitter } from "eventemitter3"
import {

@@ -13,2 +11,3 @@ RemoteHeadsChanged,

} from "../src/network/messages.js"
import { waitForMessages } from "./helpers/waitForMessages.js"

@@ -228,2 +227,4 @@ describe("RepoHeadsSubscriptions", () => {

)
remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docA)
remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docC)

@@ -265,2 +266,28 @@ // change message for docA in storageB

it("should not send remote heads for docs that the peer is not subscribed to", async () => {
const remoteHeadsSubscriptions = new RemoteHeadsSubscriptions()
remoteHeadsSubscriptions.subscribeToRemotes([storageB])
// subscribe peer c to storage b
remoteHeadsSubscriptions.handleControlMessage(subscribePeerCToStorageB)
const messagesAfterSubscribePromise = waitForMessages(
remoteHeadsSubscriptions,
"notify-remote-heads"
)
// change message for docA in storageB
remoteHeadsSubscriptions.handleRemoteHeads(docAHeadsChangedForStorageB)
// change heads directly
remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged(
docC,
storageB,
[]
)
// expect peer c to be notified both changes
let messages = await messagesAfterSubscribePromise
assert.strictEqual(messages.length, 0)
})
it("should ignore sync states with an older timestamp", async () => {

@@ -327,21 +354,1 @@ const remoteHeadsSubscription = new RemoteHeadsSubscriptions()

})
async function waitForMessages(
emitter: EventEmitter,
event: string,
timeout: number = 100
): Promise<any[]> {
const messages = []
const onEvent = message => {
messages.push(message)
}
emitter.on(event, onEvent)
await pause(timeout)
emitter.off(event)
return messages
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc