diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index ae37337cdd..2b4cc0bfc1 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -8,7 +8,6 @@ import ./test_protobuf_validation, ./test_sqlite_migrations, ./test_parse_size, - ./test_tokenbucket, ./test_requestratelimiter, ./test_ratelimit_setting, ./test_timed_map diff --git a/tests/common/test_requestratelimiter.nim b/tests/common/test_requestratelimiter.nim index be910b38e1..e13275128c 100644 --- a/tests/common/test_requestratelimiter.nim +++ b/tests/common/test_requestratelimiter.nim @@ -44,8 +44,9 @@ suite "RequestRateLimiter": # conn1 reached the 75% of the main bucket over 2 periods of time check limiter.checkUsage(proto, conn1, now + 3.minutes) == false - # conn2 has not used its tokens while we have 1 more tokens left in the main bucket - check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + # conn2 has not used its tokens but while we have 0 more tokens left in the main bucket + # it will tolerate some burst within period. + check limiter.checkUsage(proto, conn2, now + 3.minutes + 50.seconds) == true test "RequestRateLimiter Restrict overusing peer": # keep limits low for easier calculation of ratios @@ -55,6 +56,8 @@ suite "RequestRateLimiter": # as ratio is 2 in this case but max tokens are main tokens*ratio . 0.75 # notice meanwhile we have 20 tokens over 2 period (4 mins) in sum # See: waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio + # This tests shows balanced token utilization as being replenished in between periods but never overusing + # intended rates. let now = Moment.now() # with first use we register the peer also and start its timer @@ -64,23 +67,22 @@ suite "RequestRateLimiter": # run out of main tokens but still used one more token from the peer's bucket check limiter.checkUsage(proto, conn1, now) == false - for i in 0 ..< 4: + for i in 0 ..< 10: check limiter.checkUsage(proto, conn1, now + 3.minutes) == true # conn1 reached the 75% of the main bucket over 2 periods of time check limiter.checkUsage(proto, conn1, now + 3.minutes) == false - check limiter.checkUsage(proto, conn2, now + 3.minutes) == true - check limiter.checkUsage(proto, conn2, now + 3.minutes) == true - check limiter.checkUsage(proto, conn3, now + 3.minutes) == true - check limiter.checkUsage(proto, conn2, now + 3.minutes) == true - check limiter.checkUsage(proto, conn3, now + 3.minutes) == true + check limiter.checkUsage(proto, conn2, now + 3.minutes + 50.seconds) == true + check limiter.checkUsage(proto, conn2, now + 3.minutes + 50.seconds) == true + check limiter.checkUsage(proto, conn3, now + 3.minutes + 50.seconds) == true + check limiter.checkUsage(proto, conn2, now + 3.minutes + 50.seconds) == true # conn1 gets replenished as the ratio was 2 giving twice as long replenish period than the main bucket # see waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio and calcPeerTokenSetting check limiter.checkUsage(proto, conn1, now + 4.minutes) == true # requests of other peers can also go - check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true + check limiter.checkUsage(proto, conn2, now + 5.minutes) == true check limiter.checkUsage(proto, conn3, now + 5.minutes) == true test "RequestRateLimiter lowest possible volume": diff --git a/tests/common/test_tokenbucket.nim b/tests/common/test_tokenbucket.nim deleted file mode 100644 index 5bc1a05830..0000000000 --- a/tests/common/test_tokenbucket.nim +++ /dev/null @@ -1,69 +0,0 @@ -# Chronos Test Suite -# (c) Copyright 2022-Present -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -{.used.} - -import testutils/unittests -import chronos -import ../../waku/common/rate_limit/token_bucket - -suite "Token Bucket": - test "TokenBucket Sync test - strict": - var bucket = TokenBucket.newStrict(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Out of budget - bucket.tryConsume(100, fullTime) == false - - test "TokenBucket Sync test - compensating": - var bucket = TokenBucket.new(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Due not using the bucket for a full period the compensation will satisfy this request - bucket.tryConsume(100, fullTime) == true - - test "TokenBucket Max compensation": - var bucket = TokenBucket.new(1000, 1.minutes) - var reqTime = Moment.now() - - check bucket.tryConsume(1000, reqTime) - check bucket.tryConsume(1, reqTime) == false - reqTime += 1.minutes - check bucket.tryConsume(500, reqTime) == true - reqTime += 1.minutes - check bucket.tryConsume(1000, reqTime) == true - reqTime += 10.seconds - # max compensation is 25% so try to consume 250 more - check bucket.tryConsume(250, reqTime) == true - reqTime += 49.seconds - # out of budget within the same period - check bucket.tryConsume(1, reqTime) == false - - test "TokenBucket Short replenish": - var bucket = TokenBucket.new(15000, 1.milliseconds) - let start = Moment.now() - check bucket.tryConsume(15000, start) - check bucket.tryConsume(1, start) == false - - check bucket.tryConsume(15000, start + 1.milliseconds) == true diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 0646c444fc..1c85269576 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655 +Subproject commit 1c85269576c47389f6e74a671963937a2679f2ea diff --git a/waku/common/rate_limit/per_peer_limiter.nim b/waku/common/rate_limit/per_peer_limiter.nim index 5cb96a2d13..916bd366f4 100644 --- a/waku/common/rate_limit/per_peer_limiter.nim +++ b/waku/common/rate_limit/per_peer_limiter.nim @@ -6,11 +6,10 @@ {.push raises: [].} -import std/[options, tables], libp2p/stream/connection - +import std/[options, tables], libp2p/stream/connection, chronos/ratelimit import ./[single_token_limiter, service_metrics], ../../utils/tableutils -export token_bucket, setting, service_metrics +export setting, service_metrics type PerPeerRateLimiter* = ref object of RootObj setting*: Option[RateLimitSetting] @@ -20,7 +19,7 @@ proc mgetOrPut( perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId ): var Option[TokenBucket] = return perPeerRateLimiter.peerBucket.mgetOrPut( - peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating) + peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Balanced) ) template checkUsageLimit*( diff --git a/waku/common/rate_limit/request_limiter.nim b/waku/common/rate_limit/request_limiter.nim index 0ede20be4a..b134d3cff4 100644 --- a/waku/common/rate_limit/request_limiter.nim +++ b/waku/common/rate_limit/request_limiter.nim @@ -19,6 +19,7 @@ import std/[options, math], chronicles, chronos/timer, + chronos/ratelimit, libp2p/stream/connection, libp2p/utility @@ -26,7 +27,7 @@ import std/times except TimeInterval, Duration, seconds, minutes import ./[single_token_limiter, service_metrics, timed_map] -export token_bucket, setting, service_metrics +export setting, service_metrics logScope: topics = "waku ratelimit" @@ -59,6 +60,9 @@ proc checkUsage*( return true let peerBucket = t.mgetOrPut(conn.peerId) + let avail = peerBucket.getAvailableCapacity(now) + let globAvail = t.tokenBucket.get().getAvailableCapacity(now) + ## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time... if not peerBucket.tryConsume(1, now): trace "peer usage limit reached", peer = conn.peerId diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index 50fb2d64cb..d3dc7158d0 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -2,16 +2,16 @@ {.push raises: [].} -import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility +import std/[options], chronos/timer, chronos/ratelimit, libp2p/stream/connection, libp2p/utility import std/times except TimeInterval, Duration -import ./[token_bucket, setting, service_metrics] -export token_bucket, setting, service_metrics +import ./[setting, service_metrics] +export setting, service_metrics proc newTokenBucket*( setting: Option[RateLimitSetting], - replenishMode: ReplenishMode = ReplenishMode.Compensating, + replenishMode: ReplenishMode = ReplenishMode.Balanced, ): Option[TokenBucket] = if setting.isNone(): return none[TokenBucket]() @@ -19,7 +19,7 @@ proc newTokenBucket*( if setting.get().isUnlimited(): return none[TokenBucket]() - return some(TokenBucket.new(setting.get().volume, setting.get().period)) + return some(TokenBucket.new(setting.get().volume, setting.get().period, replenishMode)) proc checkUsage( t: var TokenBucket, proto: string, now = Moment.now() diff --git a/waku/common/rate_limit/token_bucket.nim b/waku/common/rate_limit/token_bucket.nim deleted file mode 100644 index 799817ebd8..0000000000 --- a/waku/common/rate_limit/token_bucket.nim +++ /dev/null @@ -1,182 +0,0 @@ -{.push raises: [].} - -import chronos, std/math, std/options - -const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25 - -## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation. -## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. -## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. -## ref: https://github.com/status-im/nim-chronos/issues/500 -## -## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways: -## - It has a new mode called `Compensating` which is the default mode. -## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average. -## or up until maximum the allowed compansation treshold (Currently it is const 25%). -## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to -## overcompensation. -## - Strict mode is also available which will only replenish when time period is over but also will fill -## the bucket to the max capacity. - -type - ReplenishMode* = enum - Strict - Compensating - - TokenBucket* = ref object - budget: int ## Current number of tokens in the bucket - budgetCap: int ## Bucket capacity - lastTimeFull: Moment - ## This timer measures the proper periodizaiton of the bucket refilling - fillDuration: Duration ## Refill period - case replenishMode*: ReplenishMode - of Strict: - ## In strict mode, the bucket is refilled only till the budgetCap - discard - of Compensating: - ## This is the default mode. - maxCompensation: float - -func periodDistance(bucket: TokenBucket, currentTime: Moment): float = - ## notice fillDuration cannot be zero by design - ## period distance is a float number representing the calculated period time - ## since the last time bucket was refilled. - return - nanoseconds(currentTime - bucket.lastTimeFull).float / - nanoseconds(bucket.fillDuration).float - -func getUsageAverageSince(bucket: TokenBucket, distance: float): float = - if distance == 0.float: - ## in case there is zero time difference than the usage percentage is 100% - return 1.0 - - ## budgetCap can never be zero - ## usage average is calculated as a percentage of total capacity available over - ## the measured period - return bucket.budget.float / bucket.budgetCap.float / distance - -proc calcCompensation(bucket: TokenBucket, averageUsage: float): int = - # if we already fully used or even overused the tokens, there is no place for compensation - if averageUsage >= 1.0: - return 0 - - ## compensation is the not used bucket capacity in the last measured period(s) in average. - ## or maximum the allowed compansation treshold - let compensationPercent = - min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation) - return trunc(compensationPercent).int - -func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool = - return currentTime - bucket.lastTimeFull >= bucket.fillDuration - -## Update will take place if bucket is empty and trying to consume tokens. -## It checks if the bucket can be replenished as refill duration is passed or not. -## - strict mode: -proc updateStrict(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - if not periodElapsed(bucket, currentTime): - return - - bucket.budget = bucket.budgetCap - bucket.lastTimeFull = currentTime - -## - compensating - ballancing load: -## - between updates we calculate average load (current bucket capacity / number of periods till last update) -## - gives the percentage load used recently -## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold) -proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - # do not replenish within the same period - if not periodElapsed(bucket, currentTime): - return - - let distance = bucket.periodDistance(currentTime) - let recentAvgUsage = bucket.getUsageAverageSince(distance) - let compensation = bucket.calcCompensation(recentAvgUsage) - - bucket.budget = bucket.budgetCap + compensation - bucket.lastTimeFull = currentTime - -proc update(bucket: TokenBucket, currentTime: Moment) = - if bucket.replenishMode == ReplenishMode.Compensating: - updateWithCompensation(bucket, currentTime) - else: - updateStrict(bucket, currentTime) - -proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = - ## If `tokens` are available, consume them, - ## Otherwhise, return false. - - if bucket.budget >= bucket.budgetCap: - bucket.lastTimeFull = now - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - - bucket.update(now) - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - else: - return false - -proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = - ## Add `tokens` to the budget (capped to the bucket capacity) - bucket.budget += tokens - bucket.update(now) - -proc new*( - T: type[TokenBucket], - budgetCap: int, - fillDuration: Duration = 1.seconds, - mode: ReplenishMode = ReplenishMode.Compensating, -): T = - assert not isZero(fillDuration) - assert budgetCap != 0 - - ## Create different mode TokenBucket - case mode - of ReplenishMode.Strict: - return T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - ) - of ReplenishMode.Compensating: - T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, - ) - -proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket = - T.new(capacity, period, ReplenishMode.Strict) - -proc newCompensating*( - T: type[TokenBucket], capacity: int, period: Duration -): TokenBucket = - T.new(capacity, period, ReplenishMode.Compensating) - -func `$`*(b: TokenBucket): string {.inline.} = - if isNil(b): - return "nil" - return $b.budgetCap & "/" & $b.fillDuration - -func `$`*(ob: Option[TokenBucket]): string {.inline.} = - if ob.isNone(): - return "no-limit" - - return $ob.get()