Skip to content

Commit e4d114b

Browse files
ajaysubraclaude
andcommitted
feat(analytics): add demand-adaptive token-bucket flush governor
Replace the rigid fixed-interval flush decision with a token-bucket gate so the SDK adapts to demand: bursts of activity after an idle period flush immediately (the bucket banks up to `flushTokenBucketCapacity` tokens), while sustained activity is capped at the long-term refill rate. Refill rate is derived from the existing network-aware `flushInterval` (wifi/cell), and the bucket is frozen while offline. Also add a queue-depth early-flush trigger (mirrors the Android SDK's batch flush depth) so large bursts flush early instead of waiting for the next interval tick; the token bucket still gates whether the early flush proceeds. Defaults preserve today's steady-state cadence — no behavior change unless the queue bursts or sustained load would otherwise exceed the refill rate. Token-bucket state is transient (not persisted), so the bucket starts full on each launch and the first post-launch flush proceeds immediately. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 640c182 commit e4d114b

10 files changed

Lines changed: 265 additions & 2 deletions
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//
2+
// KlaviyoState+FlushTokenBucket.swift
3+
//
4+
// Demand-adaptive flush governor: a token-bucket rate limiter plus a queue-depth
5+
// early-flush trigger. Together these let the SDK absorb bursts of activity after
6+
// idle periods while capping the long-term flush rate, instead of flushing on a
7+
// rigid fixed interval regardless of demand.
8+
//
9+
// Copyright (c) 2026 Klaviyo
10+
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
11+
//
12+
13+
import Foundation
14+
15+
extension KlaviyoState {
16+
/// `true` when the queue has grown large enough to warrant an early flush attempt
17+
/// rather than waiting for the next flush-interval tick. Suppressed when the
18+
/// `flushInterval` is non-finite (i.e. offline) so we don't kick off doomed requests.
19+
var shouldFlushForQueueDepth: Bool {
20+
flushInterval.isFinite && queue.count >= StateManagementConstants.flushDepth
21+
}
22+
23+
/// Refills the flush-token bucket based on the time elapsed since the last refill, then
24+
/// attempts to consume a single token.
25+
///
26+
/// Tokens accrue at a rate of `1 / flushInterval` per second (the same network-aware
27+
/// cadence used by the flush timer) and are capped at `flushTokenBucketCapacity`. Because
28+
/// the bucket can bank up to `capacity` tokens during quiet periods, a burst of flushes
29+
/// immediately after an idle stretch is allowed through, while sustained activity is
30+
/// throttled to the long-term refill rate. When offline (`flushInterval` is non-finite)
31+
/// the bucket is frozen — no tokens accrue until connectivity returns.
32+
///
33+
/// - Parameter currentTime: The current time, injected for testability.
34+
/// - Returns: `true` if a token was available and consumed (the caller may flush);
35+
/// `false` if the bucket is empty (the caller should defer until tokens refill).
36+
mutating func consumeFlushToken(currentTime: Date) -> Bool {
37+
if let lastRefill = lastFlushTokenRefill,
38+
flushInterval.isFinite, flushInterval > 0 {
39+
let elapsed = currentTime.timeIntervalSince(lastRefill)
40+
if elapsed > 0 {
41+
let refillRate = 1.0 / flushInterval // tokens per second
42+
availableFlushTokens = min(
43+
StateManagementConstants.flushTokenBucketCapacity,
44+
availableFlushTokens + elapsed * refillRate
45+
)
46+
}
47+
}
48+
lastFlushTokenRefill = currentTime
49+
50+
guard availableFlushTokens >= 1.0 else {
51+
return false
52+
}
53+
availableFlushTokens -= 1.0
54+
return true
55+
}
56+
}

Sources/KlaviyoSwift/StateManagement/KlaviyoState.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ struct KlaviyoState: Equatable, Codable {
6262
var pendingProfile: [Profile.ProfileKey: AnyEncodable]?
6363
var isProcessingDeepLink = false
6464

65+
// token bucket related stuff
66+
// These govern the flush cadence (see `consumeFlushToken(now:)`). They are intentionally
67+
// transient — left out of `CodingKeys` below — so the bucket starts full on every launch,
68+
// letting the first post-launch flush proceed immediately.
69+
var availableFlushTokens = StateManagementConstants.flushTokenBucketCapacity
70+
var lastFlushTokenRefill: Date?
71+
6572
enum CodingKeys: CodingKey {
6673
case apiKey
6774
case email

Sources/KlaviyoSwift/StateManagement/StateManagement.swift

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ enum StateManagementConstants {
2424
static let wifiFlushInterval = 10.0
2525
static let maxQueueSize = 200
2626
static let initialAttempt = 1
27+
28+
/// Maximum number of flushes the token bucket can hold. Each flush consumes one
29+
/// token; tokens refill over time at a rate derived from `flushInterval` (the
30+
/// network-aware cadence). A capacity greater than 1 lets the SDK absorb a burst
31+
/// of flushes after an idle period while still capping the long-term flush rate at
32+
/// the refill rate — see `KlaviyoState.consumeFlushToken(currentTime:)`.
33+
static let flushTokenBucketCapacity = 5.0
34+
35+
/// Queue depth that triggers an early flush attempt instead of waiting for the next
36+
/// flush-interval tick. Mirrors the Android SDK's batch flush depth so the burst
37+
/// behavior is consistent across platforms. The token bucket still gates whether the
38+
/// early flush actually proceeds.
39+
static let flushDepth = 25
2740
}
2841

2942
/// Describes how the state machine should handle retrying a request after a failure.
@@ -318,6 +331,14 @@ struct KlaviyoReducer: ReducerProtocol {
318331
return .none
319332
}
320333

334+
// Gate the flush on the token bucket: this enforces the long-term flush rate
335+
// while still allowing bursts immediately after idle periods. If no token is
336+
// available we defer — the next flush-interval tick (or queue-depth trigger)
337+
// will retry once tokens have refilled.
338+
guard state.consumeFlushToken(currentTime: environment.date()) else {
339+
return .none
340+
}
341+
321342
state.requestsInFlight.append(contentsOf: state.queue)
322343
state.queue.removeAll()
323344
state.flushing = true
@@ -513,7 +534,11 @@ struct KlaviyoReducer: ReducerProtocol {
513534
state.enqueueRequest(request: request)
514535
}
515536

516-
let baseEffect = shouldPrioritize ? EffectTask<KlaviyoAction>.task { .flushQueue } : .none
537+
// Prioritized events flush immediately; otherwise flush early only once the queue
538+
// reaches the depth threshold. The token bucket in `.flushQueue` decides whether the
539+
// attempt actually proceeds, so this stays within the long-term rate limit.
540+
let shouldFlushNow = shouldPrioritize || state.shouldFlushForQueueDepth
541+
let baseEffect = shouldFlushNow ? EffectTask<KlaviyoAction>.task { .flushQueue } : .none
517542
return .merge([
518543
baseEffect,
519544
.fireAndForget { KlaviyoInternal.publishEvent(event) }
@@ -531,7 +556,7 @@ struct KlaviyoReducer: ReducerProtocol {
531556

532557
state.enqueueRequest(request: request)
533558

534-
return .none
559+
return state.shouldFlushForQueueDepth ? EffectTask<KlaviyoAction>.task { .flushQueue } : .none
535560

536561
case let .enqueueProfile(profile):
537562
guard case .initialized = state.initalizationState
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//
2+
// FlushTokenBucketTests.swift
3+
//
4+
// Tests for the demand-adaptive (token-bucket) flush governor and the
5+
// queue-depth early-flush trigger.
6+
//
7+
// Copyright (c) 2026 Klaviyo
8+
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
9+
//
10+
11+
@testable import KlaviyoCore
12+
@testable import KlaviyoSwift
13+
import Foundation
14+
import XCTest
15+
16+
final class FlushTokenBucketTests: XCTestCase {
17+
@MainActor
18+
override func setUp() async throws {
19+
environment = KlaviyoEnvironment.test()
20+
klaviyoSwiftEnvironment = KlaviyoSwiftEnvironment.test()
21+
}
22+
23+
private func sampleRequest(name: String = "test") -> KlaviyoRequest {
24+
KlaviyoRequest(endpoint: .createEvent("foo", CreateEventPayload(data: .init(name: name))))
25+
}
26+
27+
// MARK: - consumeFlushToken(currentTime:)
28+
29+
func test_consumeFlushToken_fullBucketAllowsFlushAndDecrementsByOne() {
30+
var state = KlaviyoState(queue: [])
31+
state.flushInterval = StateManagementConstants.wifiFlushInterval
32+
33+
let flushTime = Date(timeIntervalSince1970: 1000)
34+
XCTAssertTrue(state.consumeFlushToken(currentTime: flushTime))
35+
XCTAssertEqual(
36+
state.availableFlushTokens,
37+
StateManagementConstants.flushTokenBucketCapacity - 1,
38+
accuracy: 0.0001
39+
)
40+
XCTAssertEqual(state.lastFlushTokenRefill, flushTime)
41+
}
42+
43+
func test_consumeFlushToken_emptyBucketIsDeniedUntilEnoughTimeElapses() {
44+
var state = KlaviyoState(queue: [])
45+
state.flushInterval = StateManagementConstants.wifiFlushInterval // 10s -> 0.1 token/sec
46+
state.availableFlushTokens = 0
47+
let start = Date(timeIntervalSince1970: 1000)
48+
state.lastFlushTokenRefill = start
49+
50+
// After 5s only half a token has accrued -> still denied.
51+
XCTAssertFalse(state.consumeFlushToken(currentTime: start.addingTimeInterval(5)))
52+
XCTAssertEqual(state.availableFlushTokens, 0.5, accuracy: 0.0001)
53+
54+
// 10 more seconds accrues a full token -> allowed, then decremented.
55+
XCTAssertTrue(state.consumeFlushToken(currentTime: start.addingTimeInterval(15)))
56+
XCTAssertEqual(state.availableFlushTokens, 0.5, accuracy: 0.0001)
57+
}
58+
59+
func test_consumeFlushToken_refillIsCappedAtCapacity() {
60+
var state = KlaviyoState(queue: [])
61+
state.flushInterval = StateManagementConstants.wifiFlushInterval
62+
state.availableFlushTokens = 4
63+
let start = Date(timeIntervalSince1970: 1000)
64+
state.lastFlushTokenRefill = start
65+
66+
// 100s would add 10 tokens, but the bucket is capped at capacity before consuming.
67+
XCTAssertTrue(state.consumeFlushToken(currentTime: start.addingTimeInterval(100)))
68+
XCTAssertEqual(
69+
state.availableFlushTokens,
70+
StateManagementConstants.flushTokenBucketCapacity - 1,
71+
accuracy: 0.0001
72+
)
73+
}
74+
75+
func test_consumeFlushToken_doesNotRefillWhenOffline() {
76+
var state = KlaviyoState(queue: [])
77+
state.flushInterval = .infinity // offline: timer paused, bucket frozen
78+
state.availableFlushTokens = 0
79+
let start = Date(timeIntervalSince1970: 1000)
80+
state.lastFlushTokenRefill = start
81+
82+
XCTAssertFalse(state.consumeFlushToken(currentTime: start.addingTimeInterval(10_000)))
83+
XCTAssertEqual(state.availableFlushTokens, 0, accuracy: 0.0001)
84+
XCTAssertEqual(state.lastFlushTokenRefill, start.addingTimeInterval(10_000))
85+
}
86+
87+
// MARK: - shouldFlushForQueueDepth
88+
89+
func test_shouldFlushForQueueDepth_belowThresholdIsFalse() {
90+
let queue = Array(repeating: sampleRequest(), count: StateManagementConstants.flushDepth - 1)
91+
var state = KlaviyoState(queue: queue)
92+
state.flushInterval = StateManagementConstants.wifiFlushInterval
93+
XCTAssertFalse(state.shouldFlushForQueueDepth)
94+
}
95+
96+
func test_shouldFlushForQueueDepth_atThresholdIsTrue() {
97+
let queue = Array(repeating: sampleRequest(), count: StateManagementConstants.flushDepth)
98+
var state = KlaviyoState(queue: queue)
99+
state.flushInterval = StateManagementConstants.wifiFlushInterval
100+
XCTAssertTrue(state.shouldFlushForQueueDepth)
101+
}
102+
103+
func test_shouldFlushForQueueDepth_isFalseWhenOffline() {
104+
let queue = Array(repeating: sampleRequest(), count: StateManagementConstants.flushDepth)
105+
var state = KlaviyoState(queue: queue)
106+
state.flushInterval = .infinity
107+
XCTAssertFalse(state.shouldFlushForQueueDepth)
108+
}
109+
110+
// MARK: - flushQueue token gate (reducer)
111+
112+
@MainActor
113+
func test_flushQueue_withDepletedBucketDoesNotFlush() async {
114+
var initialState = INITIALIZED_TEST_STATE()
115+
initialState.flushing = false
116+
initialState.availableFlushTokens = 0
117+
// Refilled "now", so no time has elapsed at flush time -> no token accrues.
118+
initialState.lastFlushTokenRefill = environment.date()
119+
initialState.queue = [initialState.buildProfileRequest(
120+
apiKey: initialState.apiKey!,
121+
anonymousId: initialState.anonymousId!
122+
)]
123+
let store = TestStore(initialState: initialState, reducer: KlaviyoReducer())
124+
125+
// Bucket is empty and nothing refills, so the flush is deferred:
126+
// no state change and, crucially, no `.sendRequest` is emitted.
127+
_ = await store.send(.flushQueue)
128+
}
129+
130+
// MARK: - queue-depth early-flush trigger (reducer)
131+
132+
@MainActor
133+
func test_enqueueAggregateEventReachingQueueDepthTriggersFlush() async {
134+
var initialState = INITIALIZED_TEST_STATE()
135+
initialState.flushing = false
136+
// Empty the bucket so the triggered flush is gated. This keeps the test focused on the
137+
// depth trigger itself rather than the downstream send cascade.
138+
initialState.availableFlushTokens = 0
139+
initialState.queue = Array(
140+
repeating: sampleRequest(name: "filler"),
141+
count: StateManagementConstants.flushDepth - 1
142+
)
143+
let store = TestStore(initialState: initialState, reducer: KlaviyoReducer())
144+
store.exhaustivity = .off
145+
146+
// Enqueuing one more request reaches `flushDepth`, which schedules an early flush.
147+
await store.send(.enqueueAggregateEvent(Data("agg".utf8)))
148+
await store.receive(.flushQueue)
149+
}
150+
}

Tests/KlaviyoSwiftTests/KlaviyoTestUtils.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ extension Store where State == KlaviyoState, Action == KlaviyoAction {
9292
static let test = Store(initialState: .test, reducer: KlaviyoTestReducer())
9393
}
9494

95+
extension KlaviyoState {
96+
/// Mirrors the token-bucket mutation performed by a single successful `.flushQueue`:
97+
/// one token is consumed and the refill timestamp is advanced to "now". Use inside
98+
/// `TestStore` expectation closures to keep flush assertions DRY.
99+
mutating func expectFlushTokenConsumed() {
100+
availableFlushTokens -= 1
101+
lastFlushTokenRefill = environment.date()
102+
}
103+
}
104+
95105
extension FileClient {
96106
static let test = FileClient(
97107
write: { _, _ in },

Tests/KlaviyoSwiftTests/StateManagementTests.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class StateManagementTests: XCTestCase {
140140
$0.flushing = true
141141
$0.requestsInFlight = $0.queue
142142
$0.queue = []
143+
$0.expectFlushTokenConsumed()
143144
}
144145

145146
await store.receive(.sendRequest)
@@ -173,6 +174,7 @@ class StateManagementTests: XCTestCase {
173174
$0.flushing = true
174175
$0.requestsInFlight = $0.queue
175176
$0.queue = []
177+
$0.expectFlushTokenConsumed()
176178
}
177179

178180
await store.receive(.sendRequest)
@@ -206,6 +208,7 @@ class StateManagementTests: XCTestCase {
206208
$0.flushing = true
207209
$0.requestsInFlight = $0.queue
208210
$0.queue = []
211+
$0.expectFlushTokenConsumed()
209212
}
210213

211214
await store.receive(.sendRequest)
@@ -313,6 +316,7 @@ class StateManagementTests: XCTestCase {
313316
$0.flushing = true
314317
$0.requestsInFlight = $0.queue
315318
$0.queue = []
319+
$0.expectFlushTokenConsumed()
316320
}
317321
await store.receive(.sendRequest)
318322

@@ -360,6 +364,7 @@ class StateManagementTests: XCTestCase {
360364
$0.flushing = true
361365
$0.requestsInFlight = $0.queue
362366
$0.queue = []
367+
$0.expectFlushTokenConsumed()
363368
}
364369
await store.receive(.sendRequest)
365370

@@ -480,6 +485,7 @@ class StateManagementTests: XCTestCase {
480485
$0.requestsInFlight = $0.queue
481486
$0.queue = []
482487
$0.flushing = true
488+
$0.expectFlushTokenConsumed()
483489
$0.pendingProfile = nil
484490
request = $0.requestsInFlight[0]
485491
switch request?.endpoint {
@@ -761,6 +767,7 @@ class StateManagementTests: XCTestCase {
761767
$0.flushing = true
762768
$0.requestsInFlight = $0.queue
763769
$0.queue = []
770+
$0.expectFlushTokenConsumed()
764771
XCTAssertEqual($0.requestsInFlight.count, 3, "Should have 3 requests in flight")
765772
actualGeofenceRequest = $0.requestsInFlight[0]
766773
if case let .createEvent(_, payload) = actualGeofenceRequest!.endpoint {

Tests/KlaviyoSwiftTests/__Snapshots__/KlaviyoStateTests/testLoadNewKlaviyoState.1.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
- some: "00000000-0000-0000-0000-000000000001"
44
▿ apiKey: Optional<String>
55
- some: "foo"
6+
- availableFlushTokens: 5.0
67
- email: Optional<String>.none
78
- externalId: Optional<String>.none
89
- flushInterval: 10.0
910
- flushing: false
1011
- initalizationState: InitializationState.uninitialized
1112
- isProcessingDeepLink: false
13+
- lastFlushTokenRefill: Optional<Date>.none
1214
- pendingProfile: Optional<Dictionary<ProfileKey, AnyEncodable>>.none
1315
- pendingRequests: 0 elements
1416
- phoneNumber: Optional<String>.none

Tests/KlaviyoSwiftTests/__Snapshots__/KlaviyoStateTests/testStateFileExistsInvalidData.1.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
- some: "00000000-0000-0000-0000-000000000001"
44
▿ apiKey: Optional<String>
55
- some: "foo"
6+
- availableFlushTokens: 5.0
67
- email: Optional<String>.none
78
- externalId: Optional<String>.none
89
- flushInterval: 10.0
910
- flushing: false
1011
- initalizationState: InitializationState.uninitialized
1112
- isProcessingDeepLink: false
13+
- lastFlushTokenRefill: Optional<Date>.none
1214
- pendingProfile: Optional<Dictionary<ProfileKey, AnyEncodable>>.none
1315
- pendingRequests: 0 elements
1416
- phoneNumber: Optional<String>.none

Tests/KlaviyoSwiftTests/__Snapshots__/KlaviyoStateTests/testStateFileExistsInvalidJSON.1.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
- some: "00000000-0000-0000-0000-000000000001"
44
▿ apiKey: Optional<String>
55
- some: "foo"
6+
- availableFlushTokens: 5.0
67
- email: Optional<String>.none
78
- externalId: Optional<String>.none
89
- flushInterval: 10.0
910
- flushing: false
1011
- initalizationState: InitializationState.uninitialized
1112
- isProcessingDeepLink: false
13+
- lastFlushTokenRefill: Optional<Date>.none
1214
- pendingProfile: Optional<Dictionary<ProfileKey, AnyEncodable>>.none
1315
- pendingRequests: 0 elements
1416
- phoneNumber: Optional<String>.none

0 commit comments

Comments
 (0)