From 16cc61043b226682256d3ba456b2a55ff4ac9223 Mon Sep 17 00:00:00 2001 From: yarolegovich Date: Thu, 1 Aug 2024 19:18:28 +0300 Subject: [PATCH 1/3] added failing tests --- .../src/helpers/DummyNetworkAdapter.ts | 23 +++++- packages/automerge-repo/test/Repo.test.ts | 76 +++++++++++++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts b/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts index d4d5d230c..fa31e85b9 100644 --- a/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts +++ b/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts @@ -5,11 +5,22 @@ export class DummyNetworkAdapter extends NetworkAdapter { #sendMessage?: SendMessageFn #ready = false + #isDroppingMessages = false; #readyResolver?: () => void #readyPromise: Promise = new Promise(resolve => { this.#readyResolver = resolve }) + #droppedMessages: Message[] = []; + + dropMessages(drop: boolean) { + this.#isDroppingMessages = drop; + } + + getDroppedMessages() { + return [...this.#droppedMessages]; + } + isReady() { return this.#ready } @@ -49,11 +60,19 @@ export class DummyNetworkAdapter extends NetworkAdapter { } override send(message: Message) { - this.#sendMessage?.(message) + if (this.#isDroppingMessages) { + this.#droppedMessages.push(message); + } else { + this.#sendMessage?.(message) + } } receive(message: Message) { - this.emit("message", message) + if (this.#isDroppingMessages) { + this.#droppedMessages.push(message); + } else { + this.emit("message", message) + } } static createConnectedPair({ latency = 10 }: { latency?: number } = {}) { diff --git a/packages/automerge-repo/test/Repo.test.ts b/packages/automerge-repo/test/Repo.test.ts index 2a40a38a8..f86faedaf 100644 --- a/packages/automerge-repo/test/Repo.test.ts +++ b/packages/automerge-repo/test/Repo.test.ts @@ -1275,6 +1275,82 @@ describe("Repo", () => { assert.equal(bobDoc.isReady(), true) }) + it("changes get replicated after the next change if a sync message was dropped", async () => { + const alice = "alice" as PeerId + const bob = "bob" as PeerId + const [aliceAdapter, bobAdapter] = DummyNetworkAdapter.createConnectedPair() + const aliceRepo = new Repo({ + network: [aliceAdapter], + peerId: alice, + }) + const bobRepo = new Repo({ + network: [bobAdapter], + peerId: bob, + }) + aliceAdapter.peerCandidate(bob); + bobAdapter.peerCandidate(alice); + + const bobHandle = bobRepo.create() + bobHandle.change(d => { + d.foo = "bar" + }) + const aliceHandle = await aliceRepo.find(bobHandle.documentId) + await eventPromise(aliceHandle, "heads-changed") + assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) + + bobAdapter.dropMessages(true); + bobHandle.change(d => { + d.bar = "foo" + }) + await pause(10); + assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync"); + + bobAdapter.dropMessages(false); + bobHandle.change(d => { + d.baz = "42" + }) + await eventPromise(aliceHandle, "heads-changed") + }) + + it("changes get replicated the peer's next change if a sync message was dropped", async () => { + const alice = "alice" as PeerId + const bob = "bob" as PeerId + const [aliceAdapter, bobAdapter] = DummyNetworkAdapter.createConnectedPair() + const aliceRepo = new Repo({ + network: [aliceAdapter], + peerId: alice, + }) + const bobRepo = new Repo({ + network: [bobAdapter], + peerId: bob, + }) + aliceAdapter.peerCandidate(bob); + bobAdapter.peerCandidate(alice); + + const bobHandle = bobRepo.create() + bobHandle.change(d => { + d.foo = "bar" + }) + const aliceHandle = await aliceRepo.find(bobHandle.documentId) + await eventPromise(aliceHandle, "heads-changed") + assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) + + bobAdapter.dropMessages(true); + bobHandle.change(d => { + d.bar = "foo" + }) + await pause(10); + assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync"); + + bobAdapter.dropMessages(false); + aliceHandle.change(d => { + d.baz = "42" + }) + await eventPromise(bobHandle, "heads-changed") + await pause(200); + assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) + }) + describe("with peers (mesh network)", () => { const setup = async () => { // Set up three repos; connect Alice to Bob, Bob to Charlie, and Alice to Charlie From 70031582f27b7af22bcda5e9159e1e8037cefcbf Mon Sep 17 00:00:00 2001 From: yarolegovich Date: Thu, 1 Aug 2024 20:39:06 +0300 Subject: [PATCH 2/3] repo level fix --- packages/automerge-repo/src/synchronizer/DocSynchronizer.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts index 5077af55e..976bc0dca 100644 --- a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts @@ -351,6 +351,11 @@ export class DocSynchronizer extends Synchronizer { this.#withSyncState(message.senderId, syncState => { this.#handle.update(doc => { + // Retry if message sending failed. + for (const need of A.decodeSyncMessage(message.data).need) { + delete syncState.sentHashes[need]; + } + const [newDoc, newSyncState] = A.receiveSyncMessage( doc, syncState, From d52dc78840e0527451377238dda4543ac19c0086 Mon Sep 17 00:00:00 2001 From: yarolegovich Date: Fri, 2 Aug 2024 11:17:27 +0300 Subject: [PATCH 3/3] formatting and build --- .../src/helpers/DummyNetworkAdapter.ts | 12 ++++----- .../src/synchronizer/DocSynchronizer.ts | 2 +- packages/automerge-repo/test/Repo.test.ts | 26 +++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts b/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts index fa31e85b9..0b111fda5 100644 --- a/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts +++ b/packages/automerge-repo/src/helpers/DummyNetworkAdapter.ts @@ -5,20 +5,20 @@ export class DummyNetworkAdapter extends NetworkAdapter { #sendMessage?: SendMessageFn #ready = false - #isDroppingMessages = false; + #isDroppingMessages = false #readyResolver?: () => void #readyPromise: Promise = new Promise(resolve => { this.#readyResolver = resolve }) - #droppedMessages: Message[] = []; + #droppedMessages: Message[] = [] dropMessages(drop: boolean) { - this.#isDroppingMessages = drop; + this.#isDroppingMessages = drop } getDroppedMessages() { - return [...this.#droppedMessages]; + return [...this.#droppedMessages] } isReady() { @@ -61,7 +61,7 @@ export class DummyNetworkAdapter extends NetworkAdapter { override send(message: Message) { if (this.#isDroppingMessages) { - this.#droppedMessages.push(message); + this.#droppedMessages.push(message) } else { this.#sendMessage?.(message) } @@ -69,7 +69,7 @@ export class DummyNetworkAdapter extends NetworkAdapter { receive(message: Message) { if (this.#isDroppingMessages) { - this.#droppedMessages.push(message); + this.#droppedMessages.push(message) } else { this.emit("message", message) } diff --git a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts index 976bc0dca..9527974b9 100644 --- a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts @@ -353,7 +353,7 @@ export class DocSynchronizer extends Synchronizer { this.#handle.update(doc => { // Retry if message sending failed. for (const need of A.decodeSyncMessage(message.data).need) { - delete syncState.sentHashes[need]; + delete (syncState.sentHashes as any)[need] } const [newDoc, newSyncState] = A.receiveSyncMessage( diff --git a/packages/automerge-repo/test/Repo.test.ts b/packages/automerge-repo/test/Repo.test.ts index f86faedaf..ff643f1a6 100644 --- a/packages/automerge-repo/test/Repo.test.ts +++ b/packages/automerge-repo/test/Repo.test.ts @@ -1287,8 +1287,8 @@ describe("Repo", () => { network: [bobAdapter], peerId: bob, }) - aliceAdapter.peerCandidate(bob); - bobAdapter.peerCandidate(alice); + aliceAdapter.peerCandidate(bob) + bobAdapter.peerCandidate(alice) const bobHandle = bobRepo.create() bobHandle.change(d => { @@ -1298,14 +1298,14 @@ describe("Repo", () => { await eventPromise(aliceHandle, "heads-changed") assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) - bobAdapter.dropMessages(true); + bobAdapter.dropMessages(true) bobHandle.change(d => { d.bar = "foo" }) - await pause(10); - assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync"); + await pause(10) + assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync") - bobAdapter.dropMessages(false); + bobAdapter.dropMessages(false) bobHandle.change(d => { d.baz = "42" }) @@ -1324,8 +1324,8 @@ describe("Repo", () => { network: [bobAdapter], peerId: bob, }) - aliceAdapter.peerCandidate(bob); - bobAdapter.peerCandidate(alice); + aliceAdapter.peerCandidate(bob) + bobAdapter.peerCandidate(alice) const bobHandle = bobRepo.create() bobHandle.change(d => { @@ -1335,19 +1335,19 @@ describe("Repo", () => { await eventPromise(aliceHandle, "heads-changed") assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) - bobAdapter.dropMessages(true); + bobAdapter.dropMessages(true) bobHandle.change(d => { d.bar = "foo" }) - await pause(10); - assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync"); + await pause(10) + assert.equal(bobAdapter.getDroppedMessages()[0].type, "sync") - bobAdapter.dropMessages(false); + bobAdapter.dropMessages(false) aliceHandle.change(d => { d.baz = "42" }) await eventPromise(bobHandle, "heads-changed") - await pause(200); + await pause(200) assert.deepEqual(aliceHandle.docSync(), bobHandle.docSync()) })