Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 139 additions & 74 deletions eth/p2p/discoveryv5/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -467,83 +467,91 @@ proc banNode*(d: Protocol, n: Node, banPeriod: chronos.Duration) =
proc isBanned*(d: Protocol, nodeId: NodeId): bool =
d.banNodes and d.routingTable.isBanned(nodeId)

proc receive*(d: Protocol, a: Address, packet: openArray[byte]) =
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) : DiscResult[void] =
discv5_network_bytes.inc(packet.len.int64, labelValues = [$Direction.In])

let decoded = d.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
case packet.flag
of OrdinaryMessage:
if d.isBanned(packet.srcId):
trace "Ignoring received OrdinaryMessage from banned node", nodeId = packet.srcId
return

if packet.messageOpt.isSome():
let message = packet.messageOpt.get()
trace "Received message packet", srcId = packet.srcId, address = a,
kind = message.kind
d.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = packet.srcId, address = a
d.sendWhoareyou(packet.srcId, a, packet.requestNonce,
d.getNode(packet.srcId))

of Flag.Whoareyou:
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
address, pr.message, packet.whoareyou, toNode.pubkey)

# Finished setting up the session on our side, so store the ENR of the
# peer in the session cache.
d.codec.sessions.setEnr(toNode.id, address, toNode.record)

trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage:
if d.isBanned(packet.srcIdHs):
trace "Ignoring received HandshakeMessage from banned node", nodeId = packet.srcIdHs
return

trace "Received handshake message packet", srcId = packet.srcIdHs,
address = a, kind = packet.message.kind

# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
if packet.node.isSome():
let node = packet.node.get()
# Lets not add nodes without correct IP in the ENR to the routing table.
# The ENR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
if d.addNode(node):
trace "Added new node to routing table after handshake", node

# Received an ENR in the handshake, add it to the session that was just
# created in the session cache.
d.codec.sessions.setEnr(packet.srcIdHs, a, node.record)
else:
# Did not receive an ENR in the handshake, this means that the ENR used
# is up to date. Get it from the routing table which should normally
# be there unless the request was started manually (E.g. from a JSON-RPC call).
let node = d.getNode(packet.srcIdHs)
if node.isSome():
d.codec.sessions.setEnr(packet.srcIdHs, a, node.value().record)

# The handling of the message needs to be done after adding the ENR.
d.handleMessage(packet.srcIdHs, a, packet.message, packet.node)
else:
if decoded.isErr:
Copy link
Contributor

@kdeme kdeme Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are to restructure this without the big if/else clause (which is indeed cleaner) then it would be even cleaner to use Result's valueOr.

trace "Packet decoding error", error = decoded.error, address = a
return err("Failed to decode packet")

let decodedPacket = decoded[]
case decodedPacket.flag
of OrdinaryMessage:
if d.isBanned(decodedPacket.srcId):
trace "Ignoring received OrdinaryMessage from banned node", nodeId = decodedPacket.srcId
return ok()

if decodedPacket.messageOpt.isSome():
let message = decodedPacket.messageOpt.get()
trace "Received message packet", srcId = decodedPacket.srcId, address = a,
kind = message.kind
d.handleMessage(decodedPacket.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = decodedPacket.srcId, address = a
d.sendWhoareyou(decodedPacket.srcId, a, decodedPacket.requestNonce,
d.getNode(decodedPacket.srcId))

return ok()

of Flag.Whoareyou:
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(decodedPacket.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
address, pr.message, decodedPacket.whoareyou, toNode.pubkey)

# Finished setting up the session on our side, so store the ENR of the
# peer in the session cache.
d.codec.sessions.setEnr(toNode.id, address, toNode.record)

trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested whoareyou packet", address = a

return ok()

of HandshakeMessage:
if d.isBanned(decodedPacket.srcIdHs):
trace "Ignoring received HandshakeMessage from banned node", nodeId = decodedPacket.srcIdHs
return ok()

trace "Received handshake message packet", srcId = decodedPacket.srcIdHs,
address = a, kind = decodedPacket.message.kind

# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
if decodedPacket.node.isSome():
let node = decodedPacket.node.get()
# Lets not add nodes without correct IP in the ENR to the routing table.
# The ENR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
if d.addNode(node):
trace "Added new node to routing table after handshake", node

# Received an ENR in the handshake, add it to the session that was just
# created in the session cache.
d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.record)
else:
# Did not receive an ENR in the handshake, this means that the ENR used
# is up to date. Get it from the routing table which should normally
# be there unless the request was started manually (E.g. from a JSON-RPC call).
let node = d.getNode(decodedPacket.srcIdHs)
if node.isSome():
d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.value().record)

# The handling of the message needs to be done after adding the ENR.
d.handleMessage(decodedPacket.srcIdHs, a, decodedPacket.message, decodedPacket.node)

return ok()

proc processClient(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async: (raises: []).} =
Expand All @@ -556,7 +564,9 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress):
warn "Transport getMessage", exception = e.name, msg = e.msg
return

proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf)
let res = proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf)
if res.isErr:
debug "Failed to process received packet", error = res.error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is rather redundant to forward the error message to be logged here again, when it already was up (however trace and thus also altering here the log level).

It is also not fully clear what failed processing should include. I would perhaps also consider the unsolicited whoareyou message an processing error. While here it is rather meant for failed decoding of the packet (and the decoding error gets lost in the passed along Result).

Copy link
Author

@gerceboss gerceboss Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, we can just discard its result then ? As if the functions returns some error , it would have been logeed already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that you can just remove the DiscResult[void] as return value in the receive call in the first place.

Copy link
Author

@gerceboss gerceboss Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return type was needed at Execution client where we will have to rewrite this receive function again.
That's why I started to changed it here @kdeme as suggested by @jangko in the PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that's fine then.

FYI, you can update your nimbus-eth1 PR (in draft) to use this commit of nim-eth. That way it is easier to see why certain changes are made or how the API should look.


# TODO: This could be improved to do the clean-up immediately in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
Expand Down Expand Up @@ -1180,6 +1190,61 @@ proc newProtocol*(
responseTimeout: config.responseTimeout,
rng: rng)

proc newDiscoveryV5*(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are creating new API here to initialize the discovery protocol I would prefer to go for the func new(T: type Protocol, ... way of doing this (see also https://status-im.github.io/nim-style-guide/language.objconstr.html ).

We can perhaps alias Protocol to DiscoveryV5Protocol to make this more clear (the original Protocol type naming is rather unfortunate).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to have the same parameters to be passed along as exist for newProtocol

privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort: Opt[Port],
enrUdpPort: Opt[Port],
bootstrapRecords: openArray[enr.Record] = [],
bindPort: Port,
bindIp = IPv6_any(),
enrAutoUpdate = true,
rng = newRng(),
): Protocol =
## Create a new Discovery v5 protocol instance
let protocol = newProtocol(
privKey = privKey,
enrIp = enrIp,
enrTcpPort = enrTcpPort,
enrUdpPort = enrUdpPort,
bootstrapRecords = bootstrapRecords,
bindPort = bindPort,
bindIp = bindIp,
enrAutoUpdate = enrAutoUpdate,
rng = rng
)
protocol.seedTable()
return protocol

proc newDiscoveryV5WithTransport*(
privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort: Opt[Port],
enrUdpPort: Opt[Port],
bootstrapRecords: openArray[enr.Record] = [],
transp: DatagramTransport,
bindAddress: OptAddress,
enrAutoUpdate = true,
rng = newRng(),
): Protocol =
## Create a new Discovery v5 protocol instance with an existing transport
## This allows sharing the same UDP transport between multiple protocols
let protocol = newProtocol(
privKey = privKey,
enrIp = enrIp,
enrTcpPort = enrTcpPort,
enrUdpPort = enrUdpPort,
bootstrapRecords = bootstrapRecords,
bindPort = bindAddress.port,
bindIp = bindAddress.ip,
enrAutoUpdate = enrAutoUpdate,
rng = rng
)

protocol.transp = transp
protocol.seedTable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will become rather confusing for the user considering there are two different types of initialization, one that requires open and one that does not, and seeds the table already in the init. Easy "mistake" for a user would be to call open also after doing newDiscoveryV5WithTransport.

As a possible solution for this we could move everything out of open and deprecate it. However that would mean the TransportOsError moves to the init.
Another possibility is to move the seedTable to the start proc, and for the open call, make it a null operation in case the transport is already open.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kdeme as I add open close functions according to shared transport as well, then I'll remove the seedTable from the new functions

return protocol

proc `$`*(a: OptAddress): string =
if a.ip.isNone():
"*:" & $a.port
Expand Down
6 changes: 3 additions & 3 deletions tests/p2p/test_discoveryv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ suite "Discovery v5.1 Tests":

let (packet, _) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()

# Checking different nodeIds but same address
check receiveNode.codec.handshakes.len == 5
Expand Down Expand Up @@ -731,7 +731,7 @@ suite "Discovery v5.1 Tests":
let a = localAddress(20303 + i)
let (packet, _) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()

# Checking different nodeIds but same address
check receiveNode.codec.handshakes.len == 5
Expand Down Expand Up @@ -763,7 +763,7 @@ suite "Discovery v5.1 Tests":
for i in 0 ..< 5:
let (packet, requestNonce) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()
if i == 0:
firstRequestNonce = requestNonce

Expand Down