Skip to content

feat: enhance Sync logs and metrics #3370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions waku/waku_store_sync/protocols_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ declarePublicHistogram reconciliation_roundtrips,
"the nubmer of roundtrips for each reconciliation",
buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf]

declarePublicHistogram reconciliation_differences,
"the nubmer of differences for each reconciliation",
buckets = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 10000.0, Inf]

declarePublicSummary total_bytes_exchanged,
"the number of bytes sent and received by the protocols", ["protocol", "direction"]

Expand Down
26 changes: 19 additions & 7 deletions waku/waku_store_sync/reconciliation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,23 @@ proc messageIngress*(self: SyncReconciliation, id: SyncID) =
proc processRequest(
self: SyncReconciliation, conn: Connection
): Future[Result[void, string]] {.async.} =
var roundTrips = 0
var
roundTrips = 0
diffs = 0

while true:
let readRes = catch:
await conn.readLp(int.high)

let buffer: seq[byte] = readRes.valueOr:
return err("connection read error: " & error.msg)
await conn.close()
return err("remote " & $conn.peerId & " connection read error: " & error.msg)

total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving])

let recvPayload = RangesData.deltaDecode(buffer).valueOr:
return err("payload decoding error: " & error)
await conn.close()
return err("remote " & $conn.peerId & " payload decoding error: " & error)

roundTrips.inc()

Expand Down Expand Up @@ -136,9 +140,11 @@ proc processRequest(

for hash in hashToSend:
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
diffs.inc()

for hash in hashToRecv:
self.localWantsTx.addLastNoWait((conn.peerId, hash))
diffs.inc()

rawPayload = sendPayload.deltaEncode()

Expand All @@ -150,7 +156,9 @@ proc processRequest(
await conn.writeLP(rawPayload)

if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
await conn.close()
return
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)

trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
Expand All @@ -163,6 +171,7 @@ proc processRequest(
continue

reconciliation_roundtrips.observe(roundTrips)
reconciliation_differences.observe(diffs)

await conn.close()

Expand Down Expand Up @@ -196,12 +205,15 @@ proc initiate(
await connection.writeLP(sendPayload)

if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
await connection.close()
return err(
"remote " & $connection.peerId & " connection write error: " & writeRes.error.msg
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to differentiate it a little from the other:

Suggested change
"remote " & $connection.peerId & " connection write error: " & writeRes.error.msg
"remote " & $connection.peerId & " connection write error initiate: " & writeRes.error.msg

)

trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = connection.peerId,
payload = sendPayload
payload = initPayload

?await self.processRequest(connection)

Expand All @@ -217,7 +229,7 @@ proc storeSynchronization*(
let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec)

let conn: Connection = connOpt.valueOr:
return err("cannot establish sync connection")
return err("fail to dial remote " & $peer.peerId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe?

Suggested change
return err("fail to dial remote " & $peer.peerId)
return err("fail to dial remote " & $peer.peerId & " store sync conn error: " & $error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an Option there's no error to log sadly.


debug "sync session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
Expand Down
12 changes: 8 additions & 4 deletions waku/waku_store_sync/transfer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ proc sendMessage(
await conn.writeLP(rawPayload)

if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
return
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)

total_transfer_messages_exchanged.inc(labelValues = [Sending])

Expand All @@ -69,7 +70,7 @@ proc openConnection(
let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec)

let conn: Connection = connOpt.valueOr:
return err("Cannot establish transfer connection")
return err("fail to dial remote " & $peerId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe :)?

Suggested change
return err("fail to dial remote " & $peerId)
return err("fail to dial remote " & $peerId & " open conn error: " & $error)


debug "transfer session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
Expand Down Expand Up @@ -126,6 +127,8 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])

(await sendMessage(connection, msg)).isOkOr:
self.outSessions.del(peerId)
await connection.close()
error "failed to send message", error = error
continue

Expand Down Expand Up @@ -158,24 +161,25 @@ proc initProtocolHandler(self: SyncTransfer) =
if value[].missingOrExcl(hash):
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
await conn.close()
break
do:
error "unwanted hash received, disconnecting"
self.inSessions.del(conn.peerId)
await conn.close()
break

#TODO verify msg RLN proof...

(await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr:
error "failed to archive message", error = $error
continue

let id = SyncID(time: msg.timestamp, hash: hash)
await self.idsTx.addLast(id)

continue

await conn.close()

debug "transfer session ended",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId

Expand Down
Loading