Skip to content

Commit 5595317

Browse files
feat: Added simple, configurable rate limit for lightpush and store-query (#2390)
* feat: Added simple, configurable rate limit for lightpush and store-query Adjust lightpush rest response to rate limit, added tests ann some fixes Add rest store query test for rate limit checks and proper error response Update apps/wakunode2/external_config.nim Move chronos/tokenbucket to nwaku codebasee with limited and fixed feature set Add meterics counter to lightpush rate limits Co-authored-by: gabrielmer <[email protected]>
1 parent 55d6b95 commit 5595317

File tree

22 files changed

+686
-29
lines changed

22 files changed

+686
-29
lines changed

tests/waku_lightpush/lightpush_utils.nim

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import
1010
../testlib/[common, wakucore]
1111

1212
proc newTestWakuLightpushNode*(
13-
switch: Switch, handler: PushMessageHandler
13+
switch: Switch,
14+
handler: PushMessageHandler,
15+
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
1416
): Future[WakuLightPush] {.async.} =
1517
let
1618
peerManager = PeerManager.new(switch)
17-
proto = WakuLightPush.new(peerManager, rng, handler)
19+
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)
1820

1921
await proto.start()
2022
switch.mount(proto)

tests/waku_lightpush/test_all.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
import ./test_client
1+
import ./test_client, ./test_ratelimit

tests/waku_lightpush/test_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ suite "Waku Lightpush Client":
203203
# 1KiB
204204
message2 = fakeWakuMessage(
205205
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
206-
) # 10KiB
206+
) # 10KiB
207207
message3 = fakeWakuMessage(
208208
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
209209
) # 100KiB
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
{.used.}
2+
3+
import
4+
std/[options, strscans],
5+
testutils/unittests,
6+
chronicles,
7+
chronos,
8+
libp2p/crypto/crypto
9+
10+
import
11+
../../waku/[
12+
node/peer_manager,
13+
common/ratelimit,
14+
waku_core,
15+
waku_lightpush,
16+
waku_lightpush/client,
17+
waku_lightpush/common,
18+
waku_lightpush/protocol_metrics,
19+
waku_lightpush/rpc,
20+
waku_lightpush/rpc_codec,
21+
],
22+
../testlib/[assertions, wakucore, testasync, futures, testutils],
23+
./lightpush_utils,
24+
../resources/[pubsub_topics, content_topics, payloads]
25+
26+
suite "Rate limited push service":
27+
asyncTest "push message with rate limit not violated":
28+
## Setup
29+
let
30+
serverSwitch = newTestSwitch()
31+
clientSwitch = newTestSwitch()
32+
33+
await allFutures(serverSwitch.start(), clientSwitch.start())
34+
35+
## Given
36+
var handlerFuture = newFuture[(string, WakuMessage)]()
37+
let handler: PushMessageHandler = proc(
38+
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
39+
): Future[WakuLightPushResult[void]] {.async.} =
40+
handlerFuture.complete((pubsubTopic, message))
41+
return ok()
42+
43+
let
44+
tokenPeriod = 500.millis
45+
server =
46+
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
47+
client = newTestWakuLightpushClient(clientSwitch)
48+
49+
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
50+
51+
let sendMsgProc = proc(): Future[void] {.async.} =
52+
let message = fakeWakuMessage()
53+
54+
handlerFuture = newFuture[(string, WakuMessage)]()
55+
let requestRes =
56+
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
57+
58+
check await handlerFuture.withTimeout(50.millis)
59+
60+
assert requestRes.isOk(), requestRes.error
61+
check handlerFuture.finished()
62+
63+
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
64+
65+
check:
66+
handledMessagePubsubTopic == DefaultPubsubTopic
67+
handledMessage == message
68+
69+
let waitInBetweenFor = 20.millis
70+
71+
# Test cannot be too explicit about the time when the TokenBucket resets
72+
# the internal timer, although in normal use there is no use case to care about it.
73+
var firstWaitExtend = 300.millis
74+
75+
for runCnt in 0 ..< 3:
76+
let startTime = Moment.now()
77+
for testCnt in 0 ..< 3:
78+
await sendMsgProc()
79+
await sleepAsync(20.millis)
80+
81+
var endTime = Moment.now()
82+
var elapsed: Duration = (endTime - startTime)
83+
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
84+
firstWaitEXtend = 100.millis
85+
86+
## Cleanup
87+
await allFutures(clientSwitch.stop(), serverSwitch.stop())
88+
89+
asyncTest "push message with rate limit reject":
90+
## Setup
91+
let
92+
serverSwitch = newTestSwitch()
93+
clientSwitch = newTestSwitch()
94+
95+
await allFutures(serverSwitch.start(), clientSwitch.start())
96+
97+
## Given
98+
var handlerFuture = newFuture[(string, WakuMessage)]()
99+
let handler = proc(
100+
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
101+
): Future[WakuLightPushResult[void]] {.async.} =
102+
handlerFuture.complete((pubsubTopic, message))
103+
return ok()
104+
105+
let
106+
server =
107+
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis)))
108+
client = newTestWakuLightpushClient(clientSwitch)
109+
110+
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
111+
let topic = DefaultPubsubTopic
112+
113+
let successProc = proc(): Future[void] {.async.} =
114+
let message = fakeWakuMessage()
115+
handlerFuture = newFuture[(string, WakuMessage)]()
116+
let requestRes =
117+
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
118+
discard await handlerFuture.withTimeout(10.millis)
119+
120+
check:
121+
requestRes.isOk()
122+
handlerFuture.finished()
123+
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
124+
check:
125+
handledMessagePubsubTopic == DefaultPubsubTopic
126+
handledMessage == message
127+
128+
let rejectProc = proc(): Future[void] {.async.} =
129+
let message = fakeWakuMessage()
130+
handlerFuture = newFuture[(string, WakuMessage)]()
131+
let requestRes =
132+
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
133+
discard await handlerFuture.withTimeout(10.millis)
134+
135+
check:
136+
requestRes.isErr()
137+
requestRes.error == "TOO_MANY_REQUESTS"
138+
139+
for testCnt in 0 .. 2:
140+
await successProc()
141+
await sleepAsync(20.millis)
142+
143+
await rejectProc()
144+
145+
await sleepAsync(500.millis)
146+
147+
## next one shall succeed due to the rate limit time window has passed
148+
await successProc()
149+
150+
## Cleanup
151+
await allFutures(clientSwitch.stop(), serverSwitch.stop())

tests/waku_store/test_wakunode_store.nim

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,7 @@ procSuite "WakuNode - Store":
244244

245245
server.wakuFilterClient.registerPushHandler(filterHandler)
246246
let resp = waitFor server.filterSubscribe(
247-
some(DefaultPubsubTopic),
248-
DefaultContentTopic,
249-
peer = filterSourcePeer,
247+
some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer
250248
)
251249

252250
waitFor sleepAsync(100.millis)
@@ -319,3 +317,97 @@ procSuite "WakuNode - Store":
319317

320318
# Cleanup
321319
waitFor allFutures(client.stop(), server.stop())
320+
321+
test "Store protocol queries does not violate request rate limitation":
322+
## Setup
323+
let
324+
serverKey = generateSecp256k1Key()
325+
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
326+
clientKey = generateSecp256k1Key()
327+
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
328+
329+
waitFor allFutures(client.start(), server.start())
330+
331+
let mountArchiveRes = server.mountArchive(archiveA)
332+
assert mountArchiveRes.isOk(), mountArchiveRes.error
333+
334+
waitFor server.mountStore((4, 500.millis))
335+
336+
client.mountStoreClient()
337+
338+
## Given
339+
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
340+
let serverPeer = server.peerInfo.toRemotePeerInfo()
341+
342+
let requestProc = proc() {.async.} =
343+
let queryRes = waitFor client.query(req, peer = serverPeer)
344+
345+
assert queryRes.isOk(), queryRes.error
346+
347+
let response = queryRes.get()
348+
check:
349+
response.messages == msgListA
350+
351+
for count in 0 ..< 4:
352+
waitFor requestProc()
353+
waitFor sleepAsync(20.millis)
354+
355+
waitFor sleepAsync(500.millis)
356+
357+
for count in 0 ..< 4:
358+
waitFor requestProc()
359+
waitFor sleepAsync(20.millis)
360+
361+
# Cleanup
362+
waitFor allFutures(client.stop(), server.stop())
363+
364+
test "Store protocol queries overrun request rate limitation":
365+
## Setup
366+
let
367+
serverKey = generateSecp256k1Key()
368+
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
369+
clientKey = generateSecp256k1Key()
370+
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
371+
372+
waitFor allFutures(client.start(), server.start())
373+
374+
let mountArchiveRes = server.mountArchive(archiveA)
375+
assert mountArchiveRes.isOk(), mountArchiveRes.error
376+
377+
waitFor server.mountStore((3, 500.millis))
378+
379+
client.mountStoreClient()
380+
381+
## Given
382+
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
383+
let serverPeer = server.peerInfo.toRemotePeerInfo()
384+
385+
let successProc = proc() {.async.} =
386+
let queryRes = waitFor client.query(req, peer = serverPeer)
387+
388+
check queryRes.isOk()
389+
390+
let response = queryRes.get()
391+
check:
392+
response.messages == msgListA
393+
394+
let failsProc = proc() {.async.} =
395+
let queryRes = waitFor client.query(req, peer = serverPeer)
396+
397+
check queryRes.isErr()
398+
check queryRes.error == "TOO_MANY_REQUESTS"
399+
400+
for count in 0 ..< 3:
401+
waitFor successProc()
402+
waitFor sleepAsync(20.millis)
403+
404+
waitFor failsProc()
405+
406+
waitFor sleepAsync(500.millis)
407+
408+
for count in 0 ..< 3:
409+
waitFor successProc()
410+
waitFor sleepAsync(20.millis)
411+
412+
# Cleanup
413+
waitFor allFutures(client.stop(), server.stop())

tests/wakunode_rest/test_rest_lightpush.nim

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import
2222
../../waku/waku_api/rest/lightpush/handlers as lightpush_api,
2323
../../waku/waku_api/rest/lightpush/client as lightpush_api_client,
2424
../../waku/waku_relay,
25+
../../waku/common/ratelimit,
2526
../testlib/wakucore,
26-
../testlib/wakunode
27+
../testlib/wakunode,
28+
../testlib/testutils
2729

2830
proc testWakuNode(): WakuNode =
2931
let
@@ -41,7 +43,9 @@ type RestLightPushTest = object
4143
restServer: WakuRestServerRef
4244
client: RestClientRef
4345

44-
proc init(T: type RestLightPushTest): Future[T] {.async.} =
46+
proc init(
47+
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
48+
): Future[T] {.async.} =
4549
var testSetup = RestLightPushTest()
4650
testSetup.serviceNode = testWakuNode()
4751
testSetup.pushNode = testWakuNode()
@@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} =
5559

5660
await testSetup.consumerNode.mountRelay()
5761
await testSetup.serviceNode.mountRelay()
58-
await testSetup.serviceNode.mountLightPush()
62+
await testSetup.serviceNode.mountLightPush(rateLimit)
5963
testSetup.pushNode.mountLightPushClient()
6064

6165
testSetup.serviceNode.peerManager.addServicePeer(
@@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush":
178182

179183
await restLightPushTest.shutdown()
180184

185+
# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
186+
xasyncTest "Request rate limit push message":
187+
# Given
188+
let budgetCap = 3
189+
let tokenPeriod = 500.millis
190+
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))
191+
192+
restLightPushTest.consumerNode.subscribe(
193+
(kind: PubsubSub, topic: DefaultPubsubTopic)
194+
)
195+
restLightPushTest.serviceNode.subscribe(
196+
(kind: PubsubSub, topic: DefaultPubsubTopic)
197+
)
198+
require:
199+
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
200+
201+
# When
202+
let pushProc = proc() {.async.} =
203+
let message: RelayWakuMessage = fakeWakuMessage(
204+
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
205+
)
206+
.toRelayWakuMessage()
207+
208+
let requestBody =
209+
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
210+
let response = await restLightPushTest.client.sendPushRequest(requestBody)
211+
212+
echo "response", $response
213+
214+
# Then
215+
check:
216+
response.status == 200
217+
$response.contentType == $MIMETYPE_TEXT
218+
219+
let pushRejectedProc = proc() {.async.} =
220+
let message: RelayWakuMessage = fakeWakuMessage(
221+
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
222+
)
223+
.toRelayWakuMessage()
224+
225+
let requestBody =
226+
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
227+
let response = await restLightPushTest.client.sendPushRequest(requestBody)
228+
229+
echo "response", $response
230+
231+
# Then
232+
check:
233+
response.status == 429
234+
235+
await pushProc()
236+
await pushProc()
237+
await pushProc()
238+
await pushRejectedProc()
239+
240+
await sleepAsync(tokenPeriod)
241+
242+
for runCnt in 0 ..< 3:
243+
let startTime = Moment.now()
244+
for sendCnt in 0 ..< budgetCap:
245+
await pushProc()
246+
247+
let endTime = Moment.now()
248+
let elapsed: Duration = (endTime - startTime)
249+
await sleepAsync(tokenPeriod - elapsed)
250+
251+
await restLightPushTest.shutdown()
252+
181253
## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
182254
## This test is similar when no available peer exists for publish. Currently it is returning success,
183255
## that makes this test not useful.

0 commit comments

Comments
 (0)