Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
@[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
)
builder.withNatStrategy("any")
builder.withMaxConnections(50)
builder.withRelayServiceRatio("60:40")
builder.withMaxConnections(150)
builder.withRelayServiceRatio("50:50")
builder.withMaxMessageSize("1024 KiB")
builder.withClusterId(DefaultClusterId)
builder.withSubscribeShards(@[DefaultShardId])
Expand Down
18 changes: 5 additions & 13 deletions tools/confutils/cli_args.nim
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,17 @@ type WakuNodeConf* = object
.}: bool

maxConnections* {.
desc: "Maximum allowed number of libp2p connections.",
defaultValue: 50,
desc:
"Maximum allowed number of libp2p connections. (Default: 150) that's recommended value for better connectivity",
defaultValue: 150,
name: "max-connections"
.}: int

maxRelayPeers* {.
desc:
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
name: "max-relay-peers"
.}: Option[int]

relayServiceRatio* {.
desc:
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
name: "relay-service-ratio",
defaultValue: "60:40" # 60:40 ratio of relay to service peers
defaultValue: "50:50",
name: "relay-service-ratio"
.}: string

colocationLimit* {.
Expand Down Expand Up @@ -957,9 +952,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.withExtMultiAddrsOnly(n.extMultiAddrsOnly)
b.withMaxConnections(n.maxConnections)

if n.maxRelayPeers.isSome():
b.withMaxRelayPeers(n.maxRelayPeers.get())

if n.relayServiceRatio != "":
b.withRelayServiceRatio(n.relayServiceRatio)
b.withColocationLimit(n.colocationLimit)
Expand Down
20 changes: 10 additions & 10 deletions waku/factory/conf_builder/waku_conf_builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ proc withAgentString*(b: var WakuConfBuilder, agentString: string) =
proc withColocationLimit*(b: var WakuConfBuilder, colocationLimit: int) =
b.colocationLimit = some(colocationLimit)

proc withMaxRelayPeers*(b: var WakuConfBuilder, maxRelayPeers: int) =
b.maxRelayPeers = some(maxRelayPeers)

proc withRelayServiceRatio*(b: var WakuConfBuilder, relayServiceRatio: string) =
b.relayServiceRatio = some(relayServiceRatio)

Expand Down Expand Up @@ -588,12 +585,15 @@ proc build*(
warn "Peer persistence not specified, defaulting to false"
false

let maxConnections =
if builder.maxConnections.isSome():
builder.maxConnections.get()
else:
warn "Max Connections was not specified, defaulting to 300"
300
var maxConnections: int
if builder.maxConnections.isSome():
maxConnections = builder.maxConnections.get()
if maxConnections < 150:
warn "max-connections less than 150; we suggest using 150 or more for better connectivity",
provided = maxConnections
else:
warn "Max Connections was not specified, defaulting to 150"
maxConnections = 150

# TODO: Do the git version thing here
let agentString = builder.agentString.get("nwaku")
Expand Down Expand Up @@ -663,7 +663,7 @@ proc build*(
agentString: agentString,
colocationLimit: colocationLimit,
maxRelayPeers: builder.maxRelayPeers,
relayServiceRatio: builder.relayServiceRatio.get("60:40"),
relayServiceRatio: builder.relayServiceRatio.get("50:50"),
rateLimit: rateLimit,
circuitRelayClient: builder.circuitRelayClient.get(false),
staticNodes: builder.staticNodes,
Expand Down
2 changes: 1 addition & 1 deletion waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ proc new*(
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int),
maxServicePeers: Option[int] = none(int),
relayServiceRatio: string = "60:40",
relayServiceRatio: string = "50:50",
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
Expand Down
10 changes: 10 additions & 0 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,19 @@ proc validate*(msg: WakuMessage): Result[void, string] =
upperBound = now + MaxMessageTimestampVariance

if msg.timestamp < lowerBound:
warn "rejecting message with old timestamp",
msgTimestamp = msg.timestamp,
lowerBound = lowerBound,
now = now,
drift = (now - msg.timestamp) div 1_000_000_000
return err(invalidMessageOld)

if upperBound < msg.timestamp:
warn "rejecting message with future timestamp",
msgTimestamp = msg.timestamp,
upperBound = upperBound,
now = now,
drift = (msg.timestamp - now) div 1_000_000_000
return err(invalidMessageFuture)

return ok()
Expand Down
39 changes: 31 additions & 8 deletions waku/waku_store/client.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
{.push raises: [].}

import std/[options, tables], results, chronicles, chronos, metrics, bearssl/rand
import
std/[options, tables, sequtils, algorithm],
results,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec

Expand All @@ -10,6 +16,8 @@ logScope:
const DefaultPageSize*: uint = 20
# A recommended default number of waku messages per page

const MaxQueryRetries = 5 # Maximum number of store peers to try before giving up

type WakuStoreClient* = ref object
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
Expand Down Expand Up @@ -79,18 +87,33 @@ proc query*(
proc queryToAny*(
self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId)
): Future[StoreQueryResult] {.async.} =
## This proc is similar to the query one but in this case
## we don't specify a particular peer and instead we get it from peer manager
## we don't specify a particular peer and instead we get it from peer manager.
## It will retry with different store peers if the dial fails.

if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))

let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
# Get all available store peers
var peers = self.peerManager.switch.peerStore.getPeersByProtocol(WakuStoreCodec)
if peers.len == 0:
return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected"))

let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
waku_store_errors.inc(labelValues = [DialFailure])
# Shuffle to distribute load and limit retries
let peersToTry = peers[0 ..< min(peers.len, MaxQueryRetries)]

return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))
var lastError: StoreError
for peer in peersToTry:
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
waku_store_errors.inc(labelValues = [DialFailure])
warn "failed to dial store peer, trying next"
lastError = StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)
continue

return await self.sendStoreRequest(request, connection)
let response = (await self.sendStoreRequest(request, connection)).valueOr:
warn "store query failed, trying next peer", peerId = peer.peerId, error = $error
lastError = error
continue

return ok(response)

return err(lastError)