@automerge/automerge-repo
Advanced tools
Comparing version 1.1.0-alpha.3 to 1.1.0-alpha.4
@@ -114,2 +114,7 @@ import { SyncState } from "@automerge/automerge"; | ||
} | ||
/** Notify the repo that a peer started syncing with a doc */ | ||
export interface OpenDocMessage { | ||
peerId: PeerId; | ||
documentId: DocumentId; | ||
} | ||
export declare const isValidRepoMessage: (message: Message) => message is RepoMessage; | ||
@@ -116,0 +121,0 @@ export declare const isDocumentUnavailableMessage: (msg: Message) => msg is DocumentUnavailableMessage; |
@@ -39,4 +39,5 @@ import { next as A } from "@automerge/automerge"; | ||
removePeer(peerId: PeerId): void; | ||
subscribePeerToDoc(peerId: PeerId, documentId: DocumentId): void; | ||
} | ||
export {}; | ||
//# sourceMappingURL=RemoteHeadsSubscriptions.d.ts.map |
@@ -12,2 +12,4 @@ import { EventEmitter } from "eventemitter3"; | ||
#generousPeers = new Set(); | ||
// Documents each peer has open, we need this information so we only send remote heads of documents that the peer knows | ||
#subscribedDocsByPeer = new Map(); | ||
#log = debug("automerge-repo:remote-heads-subscriptions"); | ||
@@ -51,2 +53,3 @@ subscribeToRemotes(remotes) { | ||
const remotesToRemove = []; | ||
const addedRemotesWeKnow = []; | ||
this.#log("handleControlMessage", control); | ||
@@ -56,2 +59,5 @@ if (control.add) { | ||
let theirSubs = this.#theirSubscriptions.get(remote); | ||
if (this.#ourSubscriptions.has(remote) || theirSubs) { | ||
addedRemotesWeKnow.push(remote); | ||
} | ||
if (!theirSubs) { | ||
@@ -86,2 +92,24 @@ theirSubs = new Set(); | ||
} | ||
// send all our stored heads of documents the peer knows for the remotes they've added | ||
for (const remote of addedRemotesWeKnow) { | ||
const subscribedDocs = this.#subscribedDocsByPeer.get(control.senderId); | ||
if (subscribedDocs) { | ||
for (const documentId of subscribedDocs) { | ||
const knownHeads = this.#knownHeads.get(documentId); | ||
if (!knownHeads) { | ||
continue; | ||
} | ||
const lastHeads = knownHeads.get(remote); | ||
if (lastHeads) { | ||
this.emit("notify-remote-heads", { | ||
targetId: control.senderId, | ||
documentId, | ||
heads: lastHeads.heads, | ||
timestamp: lastHeads.timestamp, | ||
storageId: remote, | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
@@ -119,9 +147,11 @@ /** A peer we are not directly connected to has changed their heads */ | ||
for (const peerId of theirSubs) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: event.documentId, | ||
heads: event.remoteHeads, | ||
timestamp: event.timestamp, | ||
storageId: event.storageId, | ||
}); | ||
if (this.#isPeerSubscribedToDoc(peerId, event.documentId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: event.documentId, | ||
heads: event.remoteHeads, | ||
timestamp: event.timestamp, | ||
storageId: event.storageId, | ||
}); | ||
} | ||
} | ||
@@ -148,9 +178,11 @@ } | ||
for (const peerId of theirSubs) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: documentId, | ||
heads: heads, | ||
timestamp: timestamp, | ||
storageId: storageId, | ||
}); | ||
if (this.#isPeerSubscribedToDoc(peerId, documentId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: documentId, | ||
heads: heads, | ||
timestamp: timestamp, | ||
storageId: storageId, | ||
}); | ||
} | ||
} | ||
@@ -184,2 +216,3 @@ } | ||
this.#generousPeers.delete(peerId); | ||
this.#subscribedDocsByPeer.delete(peerId); | ||
for (const [storageId, peerIds] of this.#theirSubscriptions) { | ||
@@ -201,2 +234,29 @@ if (peerIds.has(peerId)) { | ||
} | ||
subscribePeerToDoc(peerId, documentId) { | ||
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId); | ||
if (!subscribedDocs) { | ||
subscribedDocs = new Set(); | ||
this.#subscribedDocsByPeer.set(peerId, subscribedDocs); | ||
} | ||
subscribedDocs.add(documentId); | ||
const remoteHeads = this.#knownHeads.get(documentId); | ||
if (remoteHeads) { | ||
for (const [storageId, lastHeads] of remoteHeads) { | ||
const subscribedPeers = this.#theirSubscriptions.get(storageId); | ||
if (subscribedPeers && subscribedPeers.has(peerId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId, | ||
heads: lastHeads.heads, | ||
timestamp: lastHeads.timestamp, | ||
storageId, | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
#isPeerSubscribedToDoc(peerId, documentId) { | ||
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId); | ||
return subscribedDocs && subscribedDocs.has(documentId); | ||
} | ||
/** Returns the (document, storageId) pairs which have changed after processing msg */ | ||
@@ -203,0 +263,0 @@ #changedHeads(msg) { |
@@ -37,2 +37,3 @@ import { EventEmitter } from "eventemitter3"; | ||
get peers(): PeerId[]; | ||
getStorageIdOfPeer(peerId: PeerId): StorageId | undefined; | ||
/** | ||
@@ -39,0 +40,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is |
@@ -104,2 +104,5 @@ import { next as Automerge } from "@automerge/automerge"; | ||
}); | ||
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => { | ||
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId); | ||
}); | ||
// STORAGE | ||
@@ -248,2 +251,5 @@ // The storage subsystem has access to some form of persistence, and deals with save and loading documents. | ||
} | ||
getStorageIdOfPeer(peerId) { | ||
return this.peerMetadataByPeerId[peerId]?.storageId; | ||
} | ||
/** | ||
@@ -250,0 +256,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is |
@@ -43,2 +43,3 @@ import debug from "debug"; | ||
docSynchronizer.on("message", event => this.emit("message", event)); | ||
docSynchronizer.on("open-doc", event => this.emit("open-doc", event)); | ||
docSynchronizer.on("sync-state", event => this.emit("sync-state", event)); | ||
@@ -45,0 +46,0 @@ return docSynchronizer; |
@@ -69,5 +69,3 @@ import * as A from "@automerge/automerge/next"; | ||
#withSyncState(peerId, callback) { | ||
if (!this.#peers.includes(peerId)) { | ||
this.#peers.push(peerId); | ||
} | ||
this.#addPeer(peerId); | ||
if (!(peerId in this.#peerDocumentStatuses)) { | ||
@@ -90,2 +88,8 @@ this.#peerDocumentStatuses[peerId] = "unknown"; | ||
} | ||
#addPeer(peerId) { | ||
if (!this.#peers.includes(peerId)) { | ||
this.#peers.push(peerId); | ||
this.emit("open-doc", { documentId: this.documentId, peerId }); | ||
} | ||
} | ||
#initSyncState(peerId, syncState) { | ||
@@ -92,0 +96,0 @@ const pendingCallbacks = this.#pendingSyncStateCallbacks[peerId]; |
import { EventEmitter } from "eventemitter3"; | ||
import { MessageContents, RepoMessage, SyncStateMessage } from "../network/messages.js"; | ||
import { MessageContents, OpenDocMessage, RepoMessage, SyncStateMessage } from "../network/messages.js"; | ||
export declare abstract class Synchronizer extends EventEmitter<SynchronizerEvents> { | ||
@@ -9,3 +9,4 @@ abstract receiveMessage(message: RepoMessage): void; | ||
"sync-state": (arg: SyncStateMessage) => void; | ||
"open-doc": (arg: OpenDocMessage) => void; | ||
} | ||
//# sourceMappingURL=Synchronizer.d.ts.map |
{ | ||
"name": "@automerge/automerge-repo", | ||
"version": "1.1.0-alpha.3", | ||
"version": "1.1.0-alpha.4", | ||
"description": "A repository object to manage a collection of automerge documents", | ||
@@ -57,3 +57,3 @@ "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo", | ||
}, | ||
"gitHead": "0d76620579403005a01d4205ee15fd08a85d445c" | ||
"gitHead": "4361fd01d482d5d227bd679bfd79f9e32a8fe9cc" | ||
} |
@@ -107,15 +107,15 @@ import { SyncState } from "@automerge/automerge" | ||
export type RemoteSubscriptionControlMessage = { | ||
type: "remote-subscription-change", | ||
senderId: PeerId, | ||
targetId: PeerId, | ||
add?: StorageId[], | ||
remove?: StorageId[], | ||
type: "remote-subscription-change" | ||
senderId: PeerId | ||
targetId: PeerId | ||
add?: StorageId[] | ||
remove?: StorageId[] | ||
} | ||
export type RemoteHeadsChanged = { | ||
type: "remote-heads-changed", | ||
senderId: PeerId, | ||
targetId: PeerId, | ||
documentId: DocumentId, | ||
newHeads: {[key: StorageId]: {heads: string[], timestamp: number}}, | ||
type: "remote-heads-changed" | ||
senderId: PeerId | ||
targetId: PeerId | ||
documentId: DocumentId | ||
newHeads: { [key: StorageId]: { heads: string[]; timestamp: number } } | ||
} | ||
@@ -132,3 +132,7 @@ | ||
export type DocMessage = SyncMessage | EphemeralMessage | RequestMessage | DocumentUnavailableMessage | ||
export type DocMessage = | ||
| SyncMessage | ||
| EphemeralMessage | ||
| RequestMessage | ||
| DocumentUnavailableMessage | ||
@@ -153,2 +157,8 @@ /** These are all the message types that a {@link NetworkAdapter} might see. */ | ||
/** Notify the repo that a peer started syncing with a doc */ | ||
export interface OpenDocMessage { | ||
peerId: PeerId | ||
documentId: DocumentId | ||
} | ||
// TYPE GUARDS | ||
@@ -180,3 +190,5 @@ | ||
export const isRemoteSubscriptionControlMessage = (msg: Message): msg is RemoteSubscriptionControlMessage => | ||
export const isRemoteSubscriptionControlMessage = ( | ||
msg: Message | ||
): msg is RemoteSubscriptionControlMessage => | ||
msg.type === "remote-subscription-change" | ||
@@ -183,0 +195,0 @@ |
@@ -47,2 +47,5 @@ import { next as A } from "@automerge/automerge" | ||
#generousPeers: Set<PeerId> = new Set() | ||
// Documents each peer has open, we need this information so we only send remote heads of documents that the peer knows | ||
#subscribedDocsByPeer: Map<PeerId, Set<DocumentId>> = new Map() | ||
#log = debug("automerge-repo:remote-heads-subscriptions") | ||
@@ -93,2 +96,3 @@ | ||
const remotesToRemove: StorageId[] = [] | ||
const addedRemotesWeKnow: StorageId[] = [] | ||
@@ -99,2 +103,7 @@ this.#log("handleControlMessage", control) | ||
let theirSubs = this.#theirSubscriptions.get(remote) | ||
if (this.#ourSubscriptions.has(remote) || theirSubs) { | ||
addedRemotesWeKnow.push(remote) | ||
} | ||
if (!theirSubs) { | ||
@@ -134,2 +143,26 @@ theirSubs = new Set() | ||
} | ||
// send all our stored heads of documents the peer knows for the remotes they've added | ||
for (const remote of addedRemotesWeKnow) { | ||
const subscribedDocs = this.#subscribedDocsByPeer.get(control.senderId) | ||
if (subscribedDocs) { | ||
for (const documentId of subscribedDocs) { | ||
const knownHeads = this.#knownHeads.get(documentId) | ||
if (!knownHeads) { | ||
continue | ||
} | ||
const lastHeads = knownHeads.get(remote) | ||
if (lastHeads) { | ||
this.emit("notify-remote-heads", { | ||
targetId: control.senderId, | ||
documentId, | ||
heads: lastHeads.heads, | ||
timestamp: lastHeads.timestamp, | ||
storageId: remote, | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
@@ -172,9 +205,11 @@ | ||
for (const peerId of theirSubs) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: event.documentId, | ||
heads: event.remoteHeads, | ||
timestamp: event.timestamp, | ||
storageId: event.storageId, | ||
}) | ||
if (this.#isPeerSubscribedToDoc(peerId, event.documentId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: event.documentId, | ||
heads: event.remoteHeads, | ||
timestamp: event.timestamp, | ||
storageId: event.storageId, | ||
}) | ||
} | ||
} | ||
@@ -208,9 +243,11 @@ } | ||
for (const peerId of theirSubs) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: documentId, | ||
heads: heads, | ||
timestamp: timestamp, | ||
storageId: storageId, | ||
}) | ||
if (this.#isPeerSubscribedToDoc(peerId, documentId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId: documentId, | ||
heads: heads, | ||
timestamp: timestamp, | ||
storageId: storageId, | ||
}) | ||
} | ||
} | ||
@@ -250,2 +287,3 @@ } | ||
this.#generousPeers.delete(peerId) | ||
this.#subscribedDocsByPeer.delete(peerId) | ||
@@ -271,2 +309,33 @@ for (const [storageId, peerIds] of this.#theirSubscriptions) { | ||
subscribePeerToDoc(peerId: PeerId, documentId: DocumentId) { | ||
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId) | ||
if (!subscribedDocs) { | ||
subscribedDocs = new Set() | ||
this.#subscribedDocsByPeer.set(peerId, subscribedDocs) | ||
} | ||
subscribedDocs.add(documentId) | ||
const remoteHeads = this.#knownHeads.get(documentId) | ||
if (remoteHeads) { | ||
for (const [storageId, lastHeads] of remoteHeads) { | ||
const subscribedPeers = this.#theirSubscriptions.get(storageId) | ||
if (subscribedPeers && subscribedPeers.has(peerId)) { | ||
this.emit("notify-remote-heads", { | ||
targetId: peerId, | ||
documentId, | ||
heads: lastHeads.heads, | ||
timestamp: lastHeads.timestamp, | ||
storageId, | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
#isPeerSubscribedToDoc(peerId: PeerId, documentId: DocumentId) { | ||
let subscribedDocs = this.#subscribedDocsByPeer.get(peerId) | ||
return subscribedDocs && subscribedDocs.has(documentId) | ||
} | ||
/** Returns the (document, storageId) pairs which have changed after processing msg */ | ||
@@ -273,0 +342,0 @@ #changedHeads(msg: RemoteHeadsChanged): { |
@@ -143,2 +143,6 @@ import { next as Automerge } from "@automerge/automerge" | ||
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => { | ||
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId) | ||
}) | ||
// STORAGE | ||
@@ -336,2 +340,6 @@ // The storage subsystem has access to some form of persistence, and deals with save and loading documents. | ||
getStorageIdOfPeer(peerId: PeerId): StorageId | undefined { | ||
return this.peerMetadataByPeerId[peerId]?.storageId | ||
} | ||
/** | ||
@@ -338,0 +346,0 @@ * Creates a new document and returns a handle to it. The initial value of the document is |
@@ -58,2 +58,3 @@ import debug from "debug" | ||
docSynchronizer.on("message", event => this.emit("message", event)) | ||
docSynchronizer.on("open-doc", event => this.emit("open-doc", event)) | ||
docSynchronizer.on("sync-state", event => this.emit("sync-state", event)) | ||
@@ -60,0 +61,0 @@ return docSynchronizer |
@@ -23,3 +23,2 @@ import * as A from "@automerge/automerge/next" | ||
import { throttle } from "../helpers/throttle.js" | ||
import { headsAreSame } from "../helpers/headsAreSame.js" | ||
@@ -128,5 +127,3 @@ type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants" | ||
#withSyncState(peerId: PeerId, callback: (syncState: A.SyncState) => void) { | ||
if (!this.#peers.includes(peerId)) { | ||
this.#peers.push(peerId) | ||
} | ||
this.#addPeer(peerId) | ||
@@ -154,2 +151,9 @@ if (!(peerId in this.#peerDocumentStatuses)) { | ||
#addPeer(peerId: PeerId) { | ||
if (!this.#peers.includes(peerId)) { | ||
this.#peers.push(peerId) | ||
this.emit("open-doc", { documentId: this.documentId, peerId }) | ||
} | ||
} | ||
#initSyncState(peerId: PeerId, syncState: A.SyncState) { | ||
@@ -156,0 +160,0 @@ const pendingCallbacks = this.#pendingSyncStateCallbacks[peerId] |
import { EventEmitter } from "eventemitter3" | ||
import { | ||
MessageContents, | ||
OpenDocMessage, | ||
RepoMessage, | ||
@@ -15,2 +16,3 @@ SyncStateMessage, | ||
"sync-state": (arg: SyncStateMessage) => void | ||
"open-doc": (arg: OpenDocMessage) => void | ||
} |
@@ -0,8 +1,8 @@ | ||
import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel" | ||
import * as A from "@automerge/automerge/next" | ||
import assert from "assert" | ||
import { decode } from "cbor-x" | ||
import { setTimeout } from "timers/promises" | ||
import { describe, it } from "vitest" | ||
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" | ||
import { eventPromise } from "../src/helpers/eventPromise.js" | ||
import { pause } from "../src/helpers/pause.js" | ||
import { | ||
@@ -14,6 +14,5 @@ DocHandle, | ||
} from "../src/index.js" | ||
import { DummyStorageAdapter } from "./helpers/DummyStorageAdapter.js" | ||
import { waitForMessages } from "./helpers/waitForMessages.js" | ||
import { TestDoc } from "./types.js" | ||
import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel" | ||
import { setTimeout } from "timers/promises" | ||
import { DummyStorageAdapter } from "./helpers/DummyStorageAdapter.js" | ||
@@ -44,82 +43,203 @@ describe("DocHandle.remoteHeads", () => { | ||
it("should report remoteHeads for peers who are several hops away", async () => { | ||
// replicates a tab -> service worker -> sync server <- service worker <- tab scenario | ||
const leftTab = new Repo({ | ||
peerId: "left-tab" as PeerId, | ||
network: [], | ||
sharePolicy: async () => true, | ||
describe("multi hop sync", () => { | ||
async function setup() { | ||
// setup topology: tab -> service worker -> sync server <- service worker <- tab | ||
const leftTab1 = new Repo({ | ||
peerId: "left-tab-1" as PeerId, | ||
network: [], | ||
sharePolicy: async () => true, | ||
}) | ||
const leftTab2 = new Repo({ | ||
peerId: "left-tab-2" as PeerId, | ||
network: [], | ||
sharePolicy: async () => true, | ||
}) | ||
const leftServiceWorker = new Repo({ | ||
peerId: "left-service-worker" as PeerId, | ||
network: [], | ||
sharePolicy: async peer => peer === "sync-server", | ||
storage: new DummyStorageAdapter(), | ||
isEphemeral: false, | ||
}) | ||
const syncServer = new Repo({ | ||
peerId: "sync-server" as PeerId, | ||
network: [], | ||
isEphemeral: false, | ||
sharePolicy: async () => false, | ||
storage: new DummyStorageAdapter(), | ||
}) | ||
const rightServiceWorker = new Repo({ | ||
peerId: "right-service-worker" as PeerId, | ||
network: [], | ||
sharePolicy: async peer => peer === "sync-server", | ||
isEphemeral: false, | ||
storage: new DummyStorageAdapter(), | ||
}) | ||
const rightTab = new Repo({ | ||
peerId: "right-tab" as PeerId, | ||
network: [], | ||
sharePolicy: async () => true, | ||
}) | ||
// connect them all up | ||
connectRepos(leftTab1, leftServiceWorker) | ||
connectRepos(leftTab2, leftServiceWorker) | ||
connectRepos(leftServiceWorker, syncServer) | ||
connectRepos(syncServer, rightServiceWorker) | ||
connectRepos(rightServiceWorker, rightTab) | ||
await setTimeout(100) | ||
return { | ||
leftTab1, | ||
leftTab2, | ||
leftServiceWorker, | ||
syncServer, | ||
rightServiceWorker, | ||
rightTab, | ||
} | ||
} | ||
it("should report remoteHeads for peers", async () => { | ||
const { rightTab, rightServiceWorker, leftServiceWorker, leftTab1 } = | ||
await setup() | ||
// subscribe to the left service worker storage ID on the right tab | ||
rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!]) | ||
await setTimeout(100) | ||
// create a doc in the left tab | ||
const leftTabDoc = leftTab1.create<TestDoc>() | ||
leftTabDoc.change(d => (d.foo = "bar")) | ||
// wait for the document to arrive on the right tab | ||
const rightTabDoc = rightTab.find<TestDoc>(leftTabDoc.url) | ||
await rightTabDoc.whenReady() | ||
// wait for the document to arrive in the left service worker | ||
const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId) | ||
await leftServiceWorkerDoc.whenReady() | ||
const leftServiceWorkerStorageId = await leftServiceWorker.storageId() | ||
let leftSeenByRightPromise = new Promise<DocHandleRemoteHeadsPayload>( | ||
resolve => { | ||
rightTabDoc.on("remote-heads", message => { | ||
if (message.storageId === leftServiceWorkerStorageId) { | ||
resolve(message) | ||
} | ||
}) | ||
} | ||
) | ||
// make a change on the right | ||
rightTabDoc.change(d => (d.foo = "baz")) | ||
// wait for the change to be acknolwedged by the left | ||
const leftSeenByRight = await leftSeenByRightPromise | ||
assert.deepStrictEqual( | ||
leftSeenByRight.heads, | ||
A.getHeads(leftServiceWorkerDoc.docSync()) | ||
) | ||
}) | ||
const leftServiceWorker = new Repo({ | ||
peerId: "left-service-worker" as PeerId, | ||
network: [], | ||
sharePolicy: async peer => peer === "sync-server", | ||
storage: new DummyStorageAdapter(), | ||
isEphemeral: false, | ||
it("should report remoteHeads only for documents the subscriber has open", async () => { | ||
const { leftTab1, rightTab, rightServiceWorker } = await setup() | ||
// subscribe leftTab to storageId of rightServiceWorker | ||
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) | ||
await setTimeout(100) | ||
// create 2 docs in right tab | ||
const rightTabDocA = rightTab.create<TestDoc>() | ||
rightTabDocA.change(d => (d.foo = "A")) | ||
const rightTabDocB = rightTab.create<TestDoc>() | ||
rightTabDocB.change(d => (d.foo = "B")) | ||
// open doc b in left tab 1 | ||
const leftTabDocA = leftTab1.find<TestDoc>(rightTabDocA.url) | ||
const remoteHeadsChangedMessages = ( | ||
await waitForMessages(leftTab1.networkSubsystem, "message") | ||
).filter(({ type }) => type === "remote-heads-changed") | ||
// we should only be notified of the head changes of doc A | ||
assert.strictEqual(remoteHeadsChangedMessages.length, 1) | ||
assert.strictEqual( | ||
remoteHeadsChangedMessages[0].documentId, | ||
leftTabDocA.documentId | ||
) | ||
}) | ||
const syncServer = new Repo({ | ||
peerId: "sync-server" as PeerId, | ||
network: [], | ||
isEphemeral: false, | ||
sharePolicy: async () => false, | ||
storage: new DummyStorageAdapter(), | ||
}) | ||
const rightServiceWorker = new Repo({ | ||
peerId: "right-service-worker" as PeerId, | ||
network: [], | ||
sharePolicy: async peer => peer === "sync-server", | ||
isEphemeral: false, | ||
storage: new DummyStorageAdapter(), | ||
}) | ||
const rightTab = new Repo({ | ||
peerId: "right-tab" as PeerId, | ||
network: [], | ||
sharePolicy: async () => true, | ||
}) | ||
// connect them all up | ||
connectRepos(leftTab, leftServiceWorker) | ||
connectRepos(leftServiceWorker, syncServer) | ||
connectRepos(syncServer, rightServiceWorker) | ||
connectRepos(rightServiceWorker, rightTab) | ||
it("should report remote heads for doc on subscribe if peer already knows them", async () => { | ||
const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup() | ||
await setTimeout(100) | ||
// create 2 docs in right tab | ||
const rightTabDocA = rightTab.create<TestDoc>() | ||
rightTabDocA.change(d => (d.foo = "A")) | ||
// subscribe to the left service worker storage ID on the right tab | ||
rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!]) | ||
const rightTabDocB = rightTab.create<TestDoc>() | ||
rightTabDocB.change(d => (d.foo = "B")) | ||
await setTimeout(100) | ||
// open docs in left tab 1 | ||
const leftTab1DocA = leftTab1.find<TestDoc>(rightTabDocA.url) | ||
const leftTab1DocB = leftTab1.find<TestDoc>(rightTabDocB.url) | ||
// create a doc in the left tab | ||
const leftTabDoc = leftTab.create<TestDoc>() | ||
leftTabDoc.change(d => (d.foo = "bar")) | ||
// subscribe leftTab 1 to storageId of rightServiceWorker | ||
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) | ||
// wait for the document to arrive on the right tab | ||
const rightTabDoc = rightTab.find<TestDoc>(leftTabDoc.url) | ||
await rightTabDoc.whenReady() | ||
await setTimeout(200) | ||
// wait for the document to arrive in the left service worker | ||
const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId) | ||
await leftServiceWorkerDoc.whenReady() | ||
// now the left service worker has the remote heads of the right service worker for both doc A and doc B | ||
// if we subscribe from left tab 1 the left service workers should send it's stored remote heads immediately | ||
const leftServiceWorkerStorageId = await leftServiceWorker.storageId() | ||
let leftSeenByRightPromise = new Promise<DocHandleRemoteHeadsPayload>( | ||
resolve => { | ||
rightTabDoc.on("remote-heads", message => { | ||
if (message.storageId === leftServiceWorkerStorageId) { | ||
resolve(message) | ||
} | ||
}) | ||
} | ||
) | ||
// open doc and subscribe leftTab 2 to storageId of rightServiceWorker | ||
const leftTab2DocA = leftTab2.find<TestDoc>(rightTabDocA.url) | ||
leftTab2.subscribeToRemotes([await rightServiceWorker.storageId()!]) | ||
// make a change on the right | ||
rightTabDoc.change(d => (d.foo = "baz")) | ||
const remoteHeadsChangedMessages = ( | ||
await waitForMessages(leftTab2.networkSubsystem, "message") | ||
).filter(({ type }) => type === "remote-heads-changed") | ||
// wait for the change to be acknolwedged by the left | ||
const leftSeenByRight = await leftSeenByRightPromise | ||
// we should only be notified of the head changes of doc A | ||
assert.strictEqual(remoteHeadsChangedMessages.length, 1) | ||
assert.strictEqual( | ||
remoteHeadsChangedMessages[0].documentId, | ||
leftTab1DocA.documentId | ||
) | ||
}) | ||
assert.deepStrictEqual( | ||
leftSeenByRight.heads, | ||
A.getHeads(leftServiceWorkerDoc.docSync()) | ||
) | ||
it("should report remote heads for subscribed storage id once we open a new doc", async () => { | ||
const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup() | ||
// create 2 docs in right tab | ||
const rightTabDocA = rightTab.create<TestDoc>() | ||
rightTabDocA.change(d => (d.foo = "A")) | ||
const rightTabDocB = rightTab.create<TestDoc>() | ||
rightTabDocB.change(d => (d.foo = "B")) | ||
await setTimeout(200) | ||
// subscribe leftTab 1 to storageId of rightServiceWorker | ||
leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) | ||
// in leftTab 1 open doc A | ||
const leftTab1DocA = leftTab1.find<TestDoc>(rightTabDocA.url) | ||
const remoteHeadsChangedMessages = ( | ||
await waitForMessages(leftTab1.networkSubsystem, "message") | ||
).filter(({ type }) => type === "remote-heads-changed") | ||
console.log(JSON.stringify(remoteHeadsChangedMessages, null, 2)) | ||
assert.strictEqual(remoteHeadsChangedMessages.length, 1) | ||
assert.strictEqual( | ||
remoteHeadsChangedMessages[0].documentId, | ||
leftTab1DocA.documentId | ||
) | ||
}) | ||
}) | ||
@@ -126,0 +246,0 @@ }) |
import * as A from "@automerge/automerge" | ||
import assert from "assert" | ||
import { describe, it } from "vitest" | ||
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" | ||
import { RemoteHeadsSubscriptions } from "../src/RemoteHeadsSubscriptions.js" | ||
import { PeerId, StorageId } from "../src/index.js" | ||
import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" | ||
import { pause } from "../src/helpers/pause.js" | ||
import { EventEmitter } from "eventemitter3" | ||
import { | ||
@@ -13,2 +11,3 @@ RemoteHeadsChanged, | ||
} from "../src/network/messages.js" | ||
import { waitForMessages } from "./helpers/waitForMessages.js" | ||
@@ -228,2 +227,4 @@ describe("RepoHeadsSubscriptions", () => { | ||
) | ||
remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docA) | ||
remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docC) | ||
@@ -265,2 +266,28 @@ // change message for docA in storageB | ||
it("should not send remote heads for docs that the peer is not subscribed to", async () => { | ||
const remoteHeadsSubscriptions = new RemoteHeadsSubscriptions() | ||
remoteHeadsSubscriptions.subscribeToRemotes([storageB]) | ||
// subscribe peer c to storage b | ||
remoteHeadsSubscriptions.handleControlMessage(subscribePeerCToStorageB) | ||
const messagesAfterSubscribePromise = waitForMessages( | ||
remoteHeadsSubscriptions, | ||
"notify-remote-heads" | ||
) | ||
// change message for docA in storageB | ||
remoteHeadsSubscriptions.handleRemoteHeads(docAHeadsChangedForStorageB) | ||
// change heads directly | ||
remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged( | ||
docC, | ||
storageB, | ||
[] | ||
) | ||
// expect peer c to be notified both changes | ||
let messages = await messagesAfterSubscribePromise | ||
assert.strictEqual(messages.length, 0) | ||
}) | ||
it("should ignore sync states with an older timestamp", async () => { | ||
@@ -327,21 +354,1 @@ const remoteHeadsSubscription = new RemoteHeadsSubscriptions() | ||
}) | ||
async function waitForMessages( | ||
emitter: EventEmitter, | ||
event: string, | ||
timeout: number = 100 | ||
): Promise<any[]> { | ||
const messages = [] | ||
const onEvent = message => { | ||
messages.push(message) | ||
} | ||
emitter.on(event, onEvent) | ||
await pause(timeout) | ||
emitter.off(event) | ||
return messages | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
372697
133
8624