From b3832cba5928fb78c0215095466eb1ee5488d480 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 11 Apr 2025 10:24:49 -0400 Subject: [PATCH 1/6] add remote peer id to logs --- waku/waku_store_sync/reconciliation.nim | 17 ++++++++++++----- waku/waku_store_sync/transfer.nim | 10 ++++++---- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 80c025140a..1ea68308d6 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -103,12 +103,14 @@ proc processRequest( await conn.readLp(int.high) let buffer: seq[byte] = readRes.valueOr: - return err("connection read error: " & error.msg) + await conn.close() + return err("remote " & $conn.peerId & " connection read error: " & error.msg) total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving]) let recvPayload = RangesData.deltaDecode(buffer).valueOr: - return err("payload decoding error: " & error) + await conn.close() + return err("remote " & $conn.peerId & " payload decoding error: " & error) roundTrips.inc() @@ -150,7 +152,9 @@ proc processRequest( await conn.writeLP(rawPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + await conn.close() + return + err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, @@ -196,7 +200,10 @@ proc initiate( await connection.writeLP(sendPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + await connection.close() + return err( + "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg + ) trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, @@ -217,7 +224,7 @@ proc storeSynchronization*( let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec) let conn: Connection = connOpt.valueOr: - return err("cannot establish sync connection") + return err("fail to dial remote " & $peer.peerId) debug "sync session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 0ac959de00..2413e14326 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -57,7 +57,9 @@ proc sendMessage( await conn.writeLP(rawPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + await conn.close() + return + err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) total_transfer_messages_exchanged.inc(labelValues = [Sending]) @@ -69,7 +71,7 @@ proc openConnection( let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec) let conn: Connection = connOpt.valueOr: - return err("Cannot establish transfer connection") + return err("fail to dial remote " & $peerId) debug "transfer session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId @@ -158,12 +160,10 @@ proc initProtocolHandler(self: SyncTransfer) = if value[].missingOrExcl(hash): error "unwanted hash received, disconnecting" self.inSessions.del(conn.peerId) - await conn.close() break do: error "unwanted hash received, disconnecting" self.inSessions.del(conn.peerId) - await conn.close() break #TODO verify msg RLN proof... @@ -176,6 +176,8 @@ proc initProtocolHandler(self: SyncTransfer) = continue + await conn.close() + debug "transfer session ended", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId From ff2bd6286918c837384ef63133a12cda1f27975f Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 11 Apr 2025 10:48:12 -0400 Subject: [PATCH 2/6] added more metrics --- waku/waku_store_sync/protocols_metrics.nim | 4 ++++ waku/waku_store_sync/reconciliation.nim | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim index 2d2776674e..bb22f11c7a 100644 --- a/waku/waku_store_sync/protocols_metrics.nim +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -10,6 +10,10 @@ declarePublicHistogram reconciliation_roundtrips, "the nubmer of roundtrips for each reconciliation", buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf] +declarePublicHistogram reconciliation_differences, + "the nubmer of differences for each reconciliation", + buckets = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 10000.0, Inf] + declarePublicSummary total_bytes_exchanged, "the number of bytes sent and received by the protocols", ["protocol", "direction"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 1ea68308d6..5f1dc9ded5 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -96,7 +96,9 @@ proc messageIngress*(self: SyncReconciliation, id: SyncID) = proc processRequest( self: SyncReconciliation, conn: Connection ): Future[Result[void, string]] {.async.} = - var roundTrips = 0 + var + roundTrips = 0 + diffs = 0 while true: let readRes = catch: @@ -138,9 +140,11 @@ proc processRequest( for hash in hashToSend: self.remoteNeedsTx.addLastNoWait((conn.peerId, hash)) + diffs.inc() for hash in hashToRecv: self.localWantsTx.addLastNoWait((conn.peerId, hash)) + diffs.inc() rawPayload = sendPayload.deltaEncode() @@ -167,6 +171,7 @@ proc processRequest( continue reconciliation_roundtrips.observe(roundTrips) + reconciliation_differences.observe(diffs) await conn.close() From 99d723bef2c50db679fca3e76878b4f4098b7e6d Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 14 Apr 2025 10:27:41 -0400 Subject: [PATCH 3/6] one more error --- waku/waku_store_sync/transfer.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 2413e14326..3a3a2597fe 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -169,6 +169,7 @@ proc initProtocolHandler(self: SyncTransfer) = #TODO verify msg RLN proof... (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: + error "failed to archive message", error = $error continue let id = SyncID(time: msg.timestamp, hash: hash) From edcbfc7d64424c1a4522190f4743a4dfb8fc340b Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 14 Apr 2025 11:36:03 -0400 Subject: [PATCH 4/6] temp. log at debug lvl --- waku/waku_store_sync/reconciliation.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 5f1dc9ded5..aa6e5b3cf6 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -116,7 +116,7 @@ proc processRequest( roundTrips.inc() - trace "sync payload received", + debug "sync payload received", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = recvPayload @@ -160,7 +160,7 @@ proc processRequest( return err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) - trace "sync payload sent", + debug "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = sendPayload @@ -210,7 +210,7 @@ proc initiate( "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg ) - trace "sync payload sent", + debug "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, payload = sendPayload From 5a1cf15263b8df358b4eccbb69886a2418eac0db Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 14 Apr 2025 16:21:06 -0400 Subject: [PATCH 5/6] log init payload --- waku/waku_store_sync/reconciliation.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index aa6e5b3cf6..102986d78f 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -213,7 +213,7 @@ proc initiate( debug "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, - payload = sendPayload + payload = initPayload ?await self.processRequest(connection) From 5faf5ddd1286c4955dea3c1c2ade408dfff6aee3 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 15 Apr 2025 15:55:35 -0400 Subject: [PATCH 6/6] trace log & fix --- waku/waku_store_sync/reconciliation.nim | 6 +++--- waku/waku_store_sync/transfer.nim | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 102986d78f..f7c13d42cc 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -116,7 +116,7 @@ proc processRequest( roundTrips.inc() - debug "sync payload received", + trace "sync payload received", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = recvPayload @@ -160,7 +160,7 @@ proc processRequest( return err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) - debug "sync payload sent", + trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = sendPayload @@ -210,7 +210,7 @@ proc initiate( "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg ) - debug "sync payload sent", + trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, payload = initPayload diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 3a3a2597fe..81bed5ece7 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -57,7 +57,6 @@ proc sendMessage( await conn.writeLP(rawPayload) if writeRes.isErr(): - await conn.close() return err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) @@ -128,6 +127,8 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0]) (await sendMessage(connection, msg)).isOkOr: + self.outSessions.del(peerId) + await connection.close() error "failed to send message", error = error continue