-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathstable.rs
More file actions
243 lines (214 loc) · 8.99 KB
/
stable.rs
File metadata and controls
243 lines (214 loc) · 8.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
//! Stable throttle
//!
//! This throttle refills capacity at a steady rate.
use std::num::NonZeroU32;
use super::{Clock, RealClock};
// An 'interval' is the period in which all counters reset. The throttle makes
// no claims on units, but consider if a user intends to produce 1Mb/s the
// 'interval' is one second and each tick corresponds to one microsecond. Each
// microsecond accumulates 1 byte.
const INTERVAL_TICKS: u64 = 1_000_000;
/// Errors produced by [`Stable`].
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum Error {
/// Requested capacity is greater than maximum allowed capacity.
#[error("Capacity")]
Capacity,
}
#[derive(Debug)]
/// A throttle type.
///
/// This throttle is stable in that it will steadily refill units at a known
/// rate and does not inspect the target in any way.
pub struct Stable<C = RealClock> {
valve: Valve,
/// The clock that `Stable` will use.
clock: C,
}
impl<C> Stable<C>
where
C: Clock + Send + Sync,
{
#[inline]
pub(crate) async fn wait(&mut self) -> Result<(), Error> {
self.wait_for(NonZeroU32::MIN).await
}
pub(crate) async fn wait_for(&mut self, request: NonZeroU32) -> Result<(), Error> {
loop {
let slop: u64 = self
.valve
.request(self.clock.ticks_elapsed(), request.get())?;
if slop == 0 {
break;
}
self.clock.wait(slop).await;
}
Ok(())
}
pub(crate) fn with_clock(maximum_capacity: NonZeroU32, clock: C) -> Self {
Self {
valve: Valve::new(maximum_capacity),
clock,
}
}
}
/// The non-async interior to Stable, about which we can make proof claims. The
/// mechanical analogue isn't quite right but think of this as a poppet valve
/// for the stable throttle.
#[derive(Debug)]
struct Valve {
/// The maximum capacity of `Valve` past which no more capacity will be
/// added.
maximum_capacity: u32,
/// The capacity of the `Valve`. This amount will be drawn on by every
/// request. It is refilled to maximum at every interval roll-over.
capacity: u32,
/// The current interval -- multiple of `INTERVAL_TICKS` -- of time.
interval: u64,
}
impl Valve {
/// Create a new `Valve` instance with a maximum capacity, given in
/// tick-units.
fn new(maximum_capacity: NonZeroU32) -> Self {
let maximum_capacity = maximum_capacity.get();
Self {
capacity: maximum_capacity,
maximum_capacity,
interval: 0,
}
}
/// For a given `capacity_request` and an amount of `ticks_elapsed` since
/// the last call return how long a caller would have to wait -- in ticks --
/// before the valve will have sufficient spare capacity to be open.
///
/// Note that `ticks_elapsed` must be an absolute value.
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
fn request(&mut self, ticks_elapsed: u64, capacity_request: u32) -> Result<u64, Error> {
// Okay, here's the idea. We have bucket that fills every INTERVAL_TICKS
// microseconds and requests draw down on that bucket. When it's empty,
// we return the number of ticks until the next interval roll-over.
// Callers are expected to wait although nothing forces them to.
// Capacity is only drawn on when it is immediately available.
//
// Caller is responsible for maintaining the clock. We do not advance
// the interval ticker when the caller requests more capacity than will
// ever be available from this throttle. We do advance the iterval
// ticker if the caller makes a zero request. It's strange but it's a
// valid thing to do.
if capacity_request > self.maximum_capacity {
return Err(Error::Capacity);
}
let current_interval = ticks_elapsed / INTERVAL_TICKS;
if current_interval > self.interval {
// We have rolled forward into a new interval. At this point the
// capacity is reset to maximum -- no matter how deep we are into
// the interval -- and we record the new interval index.
self.capacity = self.maximum_capacity;
self.interval = current_interval;
}
// If the request is zero we return. If the capacity is greater or equal
// to the request we deduct the request from capacity and return 0 slop,
// signaling to the user that their request is a success. Else, we
// calculate how long the caller should wait until the interval rolls
// over and capacity is refilled. The capacity will never increase in
// this interval so they will have to call again later.
if capacity_request == 0 {
Ok(0)
} else if capacity_request <= self.capacity {
self.capacity -= capacity_request;
Ok(0)
} else {
Ok(INTERVAL_TICKS.saturating_sub(ticks_elapsed % INTERVAL_TICKS))
}
}
}
#[cfg(kani)]
mod verification {
use crate::stable::{INTERVAL_TICKS, Valve};
use std::num::NonZeroU32;
/// Capacity requests that are too large always error.
#[kani::proof]
fn request_too_large_always_errors() {
let maximum_capacity: NonZeroU32 = kani::any();
let mut valve = Valve::new(maximum_capacity);
let maximum_capacity = maximum_capacity.get();
let request: u32 = kani::any_where(|r: &u32| *r > maximum_capacity);
let ticks_elapsed: u64 = kani::any();
let res = valve.request(ticks_elapsed, request);
kani::assert(
res.is_err(),
"Requests that are too large must always fail.",
);
}
/// Capacity requests that are zero always succeed.
#[kani::proof]
fn request_zero_always_succeed() {
let maximum_capacity: NonZeroU32 = kani::any();
let mut valve = Valve::new(maximum_capacity);
let ticks_elapsed: u64 = kani::any();
let slop = valve.request(ticks_elapsed, 0).expect("request failed.");
kani::assert(slop == 0, "Requests that are zero always succeed.");
}
/// If a request is made on the throttle such that request <= max_capacity
/// and ticks_elapsed <= INTERVAL_TICKS then the request should return with
/// zero slop and the internal capacity of the valve should be reduced
/// exactly the request size.
#[kani::proof]
fn request_in_cap_interval() {
let maximum_capacity: NonZeroU32 = kani::any();
let mut valve = Valve::new(maximum_capacity);
let maximum_capacity = maximum_capacity.get();
let request: u32 = kani::any_where(|r: &u32| *r <= maximum_capacity);
let ticks_elapsed: u64 = kani::any_where(|t: &u64| *t <= INTERVAL_TICKS);
let slop = valve
.request(ticks_elapsed, request)
.expect("request failed");
kani::assert(
slop == 0,
"Request in-capacity, interval should succeed without wait.",
);
kani::assert(
valve.capacity == maximum_capacity - request,
"Request in-capacity, interval should reduce capacity by request size.",
);
}
/// If a request is made on the throttle such that capacity < request <=
/// max_capacity and ticks_elapsed <= INTERVAL_TICKS then the request should
/// return with non-zero slop and the internal capacity of the valve should
/// not be reduced.
#[kani::proof]
fn request_out_in_cap_interval() {
let maximum_capacity: NonZeroU32 = kani::any();
let mut valve = Valve::new(maximum_capacity);
let maximum_capacity = maximum_capacity.get();
let original_capacity = valve.capacity;
let request: u32 =
kani::any_where(|r: &u32| original_capacity < *r && *r <= maximum_capacity);
let ticks_elapsed: u64 = kani::any_where(|t: &u64| *t <= INTERVAL_TICKS);
let slop = valve
.request(ticks_elapsed, request)
.expect("request failed");
kani::assert(slop > 0, "Should be forced to wait.");
kani::assert(
valve.capacity == original_capacity,
"Capacity should not be reduced.",
);
}
/// No matter the request size the valve's interval measure should always be
/// consistent with the time passed in ticks_elapsed.
#[kani::proof]
fn interval_time_preserved() {
let maximum_capacity: NonZeroU32 = kani::any();
let mut valve = Valve::new(maximum_capacity);
let maximum_capacity = maximum_capacity.get();
let request: u32 = kani::any_where(|r: &u32| *r <= maximum_capacity);
// 2**32 microseconds is 1 hour 1 minutes and change. While callers
// _may_ be waiting longer this this we deem it unlikely.
let ticks_elapsed = kani::any::<u32>() as u64;
let _ = valve.request(ticks_elapsed, request);
kani::assert(
valve.interval == ticks_elapsed / INTERVAL_TICKS,
"Interval should be consistent with ticks_elapsed.",
);
}
}