Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/discv5.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and `rlp`.

## How to use

### Standard usage

```Nim
let
rng = keys.newRng
Expand All @@ -36,6 +38,18 @@ d = newProtocol(privKey, ip, tcpPort, udpPort,
d.open() # Start listening and add bootstrap nodes to the routing table.
```

### Discovery v5 with shared transport

```Nim
d1 = newDiscoveryV5WithTransport(privKey1, enrIp = Opt.some(ip),
enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress)
d2 = newDiscoveryV5WithTransport(privKey2, enrIp = Opt.some(ip),
enrTcpPort = Opt.some(port), enrUdpPort = Opt.some(port), transp = transport, bindAddress)
d1.openWithTransport() # Start listening with shared transport
d2.openWithTransport() # Start listening with shared transport
```
This allows multiple Discovery v5 nodes to share the same UDP socket, useful for running multiple protocols on the same port.

Next there are two ways to run the protocol.

One can call `d.start()` and two loops will be started:
Expand Down
246 changes: 170 additions & 76 deletions eth/p2p/discoveryv5/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -467,83 +467,89 @@ proc banNode*(d: Protocol, n: Node, banPeriod: chronos.Duration) =
proc isBanned*(d: Protocol, nodeId: NodeId): bool =
d.banNodes and d.routingTable.isBanned(nodeId)

proc receive*(d: Protocol, a: Address, packet: openArray[byte]) =
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) : DiscResult[void] =
discv5_network_bytes.inc(packet.len.int64, labelValues = [$Direction.In])

let decoded = d.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
case packet.flag
of OrdinaryMessage:
if d.isBanned(packet.srcId):
trace "Ignoring received OrdinaryMessage from banned node", nodeId = packet.srcId
return

if packet.messageOpt.isSome():
let message = packet.messageOpt.get()
trace "Received message packet", srcId = packet.srcId, address = a,
kind = message.kind
d.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = packet.srcId, address = a
d.sendWhoareyou(packet.srcId, a, packet.requestNonce,
d.getNode(packet.srcId))

of Flag.Whoareyou:
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
address, pr.message, packet.whoareyou, toNode.pubkey)

# Finished setting up the session on our side, so store the ENR of the
# peer in the session cache.
d.codec.sessions.setEnr(toNode.id, address, toNode.record)

trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage:
if d.isBanned(packet.srcIdHs):
trace "Ignoring received HandshakeMessage from banned node", nodeId = packet.srcIdHs
return

trace "Received handshake message packet", srcId = packet.srcIdHs,
address = a, kind = packet.message.kind

# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
if packet.node.isSome():
let node = packet.node.get()
# Lets not add nodes without correct IP in the ENR to the routing table.
# The ENR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
if d.addNode(node):
trace "Added new node to routing table after handshake", node

# Received an ENR in the handshake, add it to the session that was just
# created in the session cache.
d.codec.sessions.setEnr(packet.srcIdHs, a, node.record)
else:
# Did not receive an ENR in the handshake, this means that the ENR used
# is up to date. Get it from the routing table which should normally
# be there unless the request was started manually (E.g. from a JSON-RPC call).
let node = d.getNode(packet.srcIdHs)
if node.isSome():
d.codec.sessions.setEnr(packet.srcIdHs, a, node.value().record)

# The handling of the message needs to be done after adding the ENR.
d.handleMessage(packet.srcIdHs, a, packet.message, packet.node)
else:
trace "Packet decoding error", error = decoded.error, address = a
let decodedPacket = d.codec.decodePacket(a, packet).valueOr:
trace "Packet decoding error", error = error, address = a
return err("Failed to decode packet")

case decodedPacket.flag
of OrdinaryMessage:
if d.isBanned(decodedPacket.srcId):
trace "Ignoring received OrdinaryMessage from banned node", nodeId = decodedPacket.srcId
return ok()

if decodedPacket.messageOpt.isSome():
let message = decodedPacket.messageOpt.get()
trace "Received message packet", srcId = decodedPacket.srcId, address = a,
kind = message.kind
d.handleMessage(decodedPacket.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = decodedPacket.srcId, address = a
d.sendWhoareyou(decodedPacket.srcId, a, decodedPacket.requestNonce,
d.getNode(decodedPacket.srcId))

return ok()

of Flag.Whoareyou:
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(decodedPacket.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
address, pr.message, decodedPacket.whoareyou, toNode.pubkey)

# Finished setting up the session on our side, so store the ENR of the
# peer in the session cache.
d.codec.sessions.setEnr(toNode.id, address, toNode.record)

trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested whoareyou packet", address = a

return ok()

of HandshakeMessage:
if d.isBanned(decodedPacket.srcIdHs):
trace "Ignoring received HandshakeMessage from banned node", nodeId = decodedPacket.srcIdHs
return ok()

trace "Received handshake message packet", srcId = decodedPacket.srcIdHs,
address = a, kind = decodedPacket.message.kind

# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
if decodedPacket.node.isSome():
let node = decodedPacket.node.get()
# Lets not add nodes without correct IP in the ENR to the routing table.
# The ENR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
if d.addNode(node):
trace "Added new node to routing table after handshake", node

# Received an ENR in the handshake, add it to the session that was just
# created in the session cache.
d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.record)
else:
# Did not receive an ENR in the handshake, this means that the ENR used
# is up to date. Get it from the routing table which should normally
# be there unless the request was started manually (E.g. from a JSON-RPC call).
let node = d.getNode(decodedPacket.srcIdHs)
if node.isSome():
d.codec.sessions.setEnr(decodedPacket.srcIdHs, a, node.value().record)

# The handling of the message needs to be done after adding the ENR.
d.handleMessage(decodedPacket.srcIdHs, a, decodedPacket.message, decodedPacket.node)

return ok()

proc processClient(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async: (raises: []).} =
Expand All @@ -556,7 +562,7 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress):
warn "Transport getMessage", exception = e.name, msg = e.msg
return

proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf)
discard proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf)

# TODO: This could be improved to do the clean-up immediately in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
Expand Down Expand Up @@ -1180,6 +1186,73 @@ proc newProtocol*(
responseTimeout: config.responseTimeout,
rng: rng)

# Alias for clarity
type
DiscoveryV5Protocol* = Protocol

proc new*(T: type DiscoveryV5Protocol,
privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort: Opt[Port],
enrUdpPort: Opt[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[Record] = [],
previousRecord = Opt.none(enr.Record),
bindPort: Port,
bindIp = IPv4_any(),
enrAutoUpdate = false,
banNodes = false,
config = defaultDiscoveryConfig,
rng = newRng()
): DiscoveryV5Protocol =

let protocol = newProtocol(
privKey = privKey,
enrIp = enrIp,
enrTcpPort = enrTcpPort,
enrUdpPort = enrUdpPort,
localEnrFields = localEnrFields,
bootstrapRecords = bootstrapRecords,
previousRecord = previousRecord,
bindPort = bindPort,
bindIp = bindIp,
enrAutoUpdate = enrAutoUpdate,
banNodes = banNodes,
config = config,
rng = rng
)

return protocol

proc newDiscoveryV5WithTransport*(
privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort: Opt[Port],
enrUdpPort: Opt[Port],
bootstrapRecords: openArray[enr.Record] = [],
transp: DatagramTransport,
bindAddress: OptAddress,
enrAutoUpdate = true,
rng = newRng(),
): Protocol =
## Create a new Discovery v5 protocol instance with an existing transport
## This allows sharing the same UDP transport between multiple protocols
let protocol = newProtocol(
privKey = privKey,
enrIp = enrIp,
enrTcpPort = enrTcpPort,
enrUdpPort = enrUdpPort,
bootstrapRecords = bootstrapRecords,
bindPort = bindAddress.port,
bindIp = bindAddress.ip,
enrAutoUpdate = enrAutoUpdate,
rng = rng
)

protocol.transp = transp

return protocol

proc `$`*(a: OptAddress): string =
if a.ip.isNone():
"*:" & $a.port
Expand Down Expand Up @@ -1228,3 +1301,24 @@ proc closeWait*(d: Protocol) {.async: (raises: []).} =

proc close*(d: Protocol) {.deprecated: "Please use closeWait() instead".} =
asyncSpawn d.closeWait()

proc openWithTransport*(d: Protocol) =
## Start the discovery protocol using an externally managed transport.
## Does not create or bind a new transport.
info "Starting discovery node (with external transport)", node = d.localNode,
bindAddress = d.bindAddress
d.seedTable()

proc closeWithTransport*(d: Protocol) {.async: (raises: []).} =
## Stop the discovery protocol, but do not close the external transport.
doAssert(not d.transp.closed)
debug "Closing discovery node (with external transport)", node = d.localNode
var futures: seq[Future[void]]
if not d.revalidateLoop.isNil:
futures.add(d.revalidateLoop.cancelAndWait())
if not d.refreshLoop.isNil:
futures.add(d.refreshLoop.cancelAndWait())
if not d.ipMajorityLoop.isNil:
futures.add(d.ipMajorityLoop.cancelAndWait())
await noCancel(allFutures(futures))
# Do not close d.transp here, as it is managed externally
6 changes: 3 additions & 3 deletions tests/p2p/test_discoveryv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ suite "Discovery v5.1 Tests":

let (packet, _) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()

# Checking different nodeIds but same address
check receiveNode.codec.handshakes.len == 5
Expand Down Expand Up @@ -731,7 +731,7 @@ suite "Discovery v5.1 Tests":
let a = localAddress(20303 + i)
let (packet, _) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()

# Checking different nodeIds but same address
check receiveNode.codec.handshakes.len == 5
Expand Down Expand Up @@ -763,7 +763,7 @@ suite "Discovery v5.1 Tests":
for i in 0 ..< 5:
let (packet, requestNonce) = encodeMessagePacket(rng[], codec,
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
receiveNode.receive(a, packet)
check receiveNode.receive(a, packet).isOk()
if i == 0:
firstRequestNonce = requestNonce

Expand Down