Skip to content

Commit 26d26a4

Browse files
committed
feat(host-listener): throttle dependent ops
1 parent d59a2d4 commit 26d26a4

File tree

6 files changed

+197
-4
lines changed

6 files changed

+197
-4
lines changed

coprocessor/fhevm-engine/host-listener/src/bin/poller.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,20 @@ struct Args {
118118
help = "Dependence chain are across blocks"
119119
)]
120120
pub dependence_cross_block: bool,
121+
122+
#[arg(
123+
long,
124+
default_value_t = 0,
125+
help = "Global dependent ops rate limit per minute (0 disables)"
126+
)]
127+
pub dependent_ops_rate_per_min: u32,
128+
129+
#[arg(
130+
long,
131+
default_value_t = 0,
132+
help = "Burst size for dependent ops limiter (0 = same as rate)"
133+
)]
134+
pub dependent_ops_burst: u32,
121135
}
122136

123137
#[tokio::main]
@@ -159,6 +173,8 @@ async fn main() -> anyhow::Result<()> {
159173
dependence_cache_size: args.dependence_cache_size,
160174
dependence_by_connexity: args.dependence_by_connexity,
161175
dependence_cross_block: args.dependence_cross_block,
176+
dependent_ops_rate_per_min: args.dependent_ops_rate_per_min,
177+
dependent_ops_burst: args.dependent_ops_burst,
162178
};
163179

164180
run_poller(config).await

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ pub struct Args {
125125
)]
126126
pub dependence_cross_block: bool,
127127

128+
#[arg(
129+
long,
130+
default_value_t = 0,
131+
help = "Global dependent ops rate limit per minute (0 disables)"
132+
)]
133+
pub dependent_ops_rate_per_min: u32,
134+
135+
#[arg(
136+
long,
137+
default_value_t = 0,
138+
help = "Burst size for dependent ops limiter (0 = same as rate)"
139+
)]
140+
pub dependent_ops_burst: u32,
141+
128142
#[arg(
129143
long,
130144
default_value = "50",
@@ -1047,6 +1061,10 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
10471061
args.dependence_cache_size,
10481062
)
10491063
.await?;
1064+
db.set_dependent_ops_limiter(
1065+
args.dependent_ops_rate_per_min,
1066+
args.dependent_ops_burst,
1067+
);
10501068

10511069
if chain_id != db.chain_id {
10521070
error!(

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 151 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,19 @@ use fhevm_engine_common::types::AllowEvents;
99
use fhevm_engine_common::types::SupportedFheOperations;
1010
use fhevm_engine_common::utils::DatabaseURL;
1111
use fhevm_engine_common::utils::{to_hex, HeartBeat};
12+
use prometheus::{register_int_counter_vec, IntCounterVec};
1213
use sqlx::postgres::PgConnectOptions;
1314
use sqlx::postgres::PgPoolOptions;
1415
use sqlx::types::Uuid;
1516
use sqlx::Error as SqlxError;
1617
use sqlx::{PgPool, Postgres};
1718
use std::ops::DerefMut;
1819
use std::sync::Arc;
20+
use std::sync::LazyLock;
1921
use std::time::Duration;
22+
use std::time::Instant;
2023
use time::{Duration as TimeDuration, PrimitiveDateTime};
24+
use tokio::sync::Mutex;
2125
use tokio::sync::RwLock;
2226
use tracing::error;
2327
use tracing::info;
@@ -39,6 +43,24 @@ pub type ScalarByte = FixedBytes<1>;
3943
pub type ClearConst = Uint<256, 4>;
4044
pub type ChainHash = TransactionHash;
4145

46+
static DEPENDENT_OPS_ALLOWED: LazyLock<IntCounterVec> = LazyLock::new(|| {
47+
register_int_counter_vec!(
48+
"host_listener_dependent_ops_allowed",
49+
"Number of dependent ops allowed by the limiter",
50+
&["chain_id"]
51+
)
52+
.unwrap()
53+
});
54+
55+
static DEPENDENT_OPS_THROTTLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
56+
register_int_counter_vec!(
57+
"host_listener_dependent_ops_throttled",
58+
"Number of dependent ops deferred by the limiter",
59+
&["chain_id"]
60+
)
61+
.unwrap()
62+
});
63+
4264
#[derive(Clone, Debug)]
4365
pub struct Chain {
4466
pub hash: ChainHash,
@@ -92,8 +114,10 @@ pub struct Database {
92114
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
93115
pub tenant_id: TenantId,
94116
pub chain_id: ChainId,
117+
chain_id_label: String,
95118
pub dependence_chain: ChainCache,
96119
pub tick: HeartBeat,
120+
dependent_ops_limiter: Option<Arc<Mutex<DependentOpsLimiter>>>,
97121
}
98122

99123
#[derive(Debug)]
@@ -107,6 +131,67 @@ pub struct LogTfhe {
107131
pub dependence_chain: TransactionHash,
108132
}
109133

134+
#[derive(Debug)]
135+
struct DependentOpsLimiter {
136+
rate_per_min: u32,
137+
burst: u32,
138+
tokens: f64,
139+
last_refill: Instant,
140+
}
141+
142+
impl DependentOpsLimiter {
143+
fn new(rate_per_min: u32, burst: u32) -> Option<Self> {
144+
if rate_per_min == 0 {
145+
return None;
146+
}
147+
let burst = if burst == 0 {
148+
rate_per_min.max(1)
149+
} else {
150+
burst
151+
};
152+
Some(Self {
153+
rate_per_min,
154+
burst,
155+
tokens: burst as f64,
156+
last_refill: Instant::now(),
157+
})
158+
}
159+
160+
fn defer_duration(&mut self) -> Duration {
161+
let now = Instant::now();
162+
self.refill(now);
163+
let rate_per_sec = self.rate_per_min as f64 / 60.0;
164+
if rate_per_sec <= 0.0 {
165+
return Duration::ZERO;
166+
}
167+
self.tokens -= 1.0;
168+
if self.tokens >= 0.0 {
169+
Duration::ZERO
170+
} else {
171+
let deficit = -self.tokens;
172+
let wait_secs = deficit / rate_per_sec;
173+
if wait_secs <= 0.0 {
174+
Duration::ZERO
175+
} else {
176+
Duration::from_secs_f64(wait_secs)
177+
}
178+
}
179+
}
180+
181+
fn refill(&mut self, now: Instant) {
182+
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
183+
if elapsed <= 0.0 {
184+
return;
185+
}
186+
let rate_per_sec = self.rate_per_min as f64 / 60.0;
187+
let added = elapsed * rate_per_sec;
188+
if added > 0.0 {
189+
self.tokens = (self.tokens + added).min(self.burst as f64);
190+
self.last_refill = now;
191+
}
192+
}
193+
}
194+
110195
pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
111196

112197
impl Database {
@@ -129,12 +214,20 @@ impl Database {
129214
url: url.clone(),
130215
tenant_id,
131216
chain_id,
217+
chain_id_label: chain_id.to_string(),
132218
pool: Arc::new(RwLock::new(pool)),
133219
dependence_chain: bucket_cache,
134220
tick: HeartBeat::default(),
221+
dependent_ops_limiter: None,
135222
})
136223
}
137224

225+
pub fn set_dependent_ops_limiter(&mut self, rate_per_min: u32, burst: u32) {
226+
self.dependent_ops_limiter =
227+
DependentOpsLimiter::new(rate_per_min, burst)
228+
.map(|limiter| Arc::new(Mutex::new(limiter)));
229+
}
230+
138231
async fn new_pool(url: &DatabaseURL) -> PgPool {
139232
let options: PgConnectOptions = url.parse().expect("bad url");
140233
let options = options.options([
@@ -280,6 +373,32 @@ impl Database {
280373
) -> Result<bool, SqlxError> {
281374
let is_scalar = !scalar_byte.is_zero();
282375
let output_handle = result.to_vec();
376+
let mut schedule_order =
377+
log.block_timestamp
378+
.saturating_add(TimeDuration::microseconds(
379+
log.tx_depth_size as i64,
380+
));
381+
if log.is_allowed && !dependencies.is_empty() {
382+
if let Some(limiter) = &self.dependent_ops_limiter {
383+
let defer_for = limiter.lock().await.defer_duration();
384+
if defer_for.is_zero() {
385+
DEPENDENT_OPS_ALLOWED
386+
.with_label_values(&[self.chain_id_label.as_str()])
387+
.inc();
388+
} else {
389+
// Defer by writing a future schedule_order; worker still
390+
// pulls earliest schedule_order, so this smooths load without blocking ingest.
391+
let defer_micros =
392+
defer_for.as_micros().min(i64::MAX as u128) as i64;
393+
schedule_order = schedule_order.saturating_add(
394+
TimeDuration::microseconds(defer_micros),
395+
);
396+
DEPENDENT_OPS_THROTTLED
397+
.with_label_values(&[self.chain_id_label.as_str()])
398+
.inc();
399+
}
400+
}
401+
}
283402
let query = sqlx::query!(
284403
r#"
285404
INSERT INTO computations (
@@ -306,10 +425,7 @@ impl Database {
306425
log.dependence_chain.to_vec(),
307426
log.transaction_hash.map(|txh| txh.to_vec()),
308427
log.is_allowed,
309-
log.block_timestamp
310-
.saturating_add(time::Duration::microseconds(
311-
log.tx_depth_size as i64
312-
)),
428+
schedule_order,
313429
!log.is_allowed,
314430
);
315431
query
@@ -1094,3 +1210,34 @@ pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec<Handle> {
10941210
E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![],
10951211
}
10961212
}
1213+
1214+
#[cfg(test)]
1215+
mod tests {
1216+
use super::*;
1217+
1218+
#[test]
1219+
fn dependent_ops_limiter_defers_after_burst() {
1220+
let mut limiter = DependentOpsLimiter::new(60, 2).unwrap();
1221+
assert!(limiter.defer_duration().is_zero());
1222+
assert!(limiter.defer_duration().is_zero());
1223+
let deferred = limiter.defer_duration();
1224+
assert!(deferred > Duration::ZERO);
1225+
}
1226+
1227+
#[test]
1228+
fn dependent_ops_limiter_refills_over_time() {
1229+
let mut limiter = DependentOpsLimiter::new(60, 1).unwrap();
1230+
assert!(limiter.defer_duration().is_zero());
1231+
let deferred = limiter.defer_duration();
1232+
assert!(deferred > Duration::ZERO);
1233+
limiter.last_refill =
1234+
Instant::now().checked_sub(Duration::from_secs(2)).unwrap();
1235+
let allowed_after_refill = limiter.defer_duration();
1236+
assert!(allowed_after_refill.is_zero());
1237+
}
1238+
1239+
#[test]
1240+
fn dependent_ops_limiter_disabled_when_rate_zero() {
1241+
assert!(DependentOpsLimiter::new(0, 10).is_none());
1242+
}
1243+
}

coprocessor/fhevm-engine/host-listener/src/poller/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub struct PollerConfig {
8787
pub dependence_cache_size: u16,
8888
pub dependence_by_connexity: bool,
8989
pub dependence_cross_block: bool,
90+
pub dependent_ops_rate_per_min: u32,
91+
pub dependent_ops_burst: u32,
9092
}
9193

9294
pub async fn run_poller(config: PollerConfig) -> Result<()> {
@@ -139,6 +141,10 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
139141
config.dependence_cache_size,
140142
)
141143
.await?;
144+
db.set_dependent_ops_limiter(
145+
config.dependent_ops_rate_per_min,
146+
config.dependent_ops_burst,
147+
);
142148

143149
if chain_id != db.chain_id {
144150
error!(

coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
234234
catchup_finalization_in_blocks: 2,
235235
dependence_by_connexity: false,
236236
dependence_cross_block: true,
237+
dependent_ops_rate_per_min: 0,
238+
dependent_ops_burst: 0,
237239
timeout_request_websocket: 30,
238240
};
239241
let health_check_url = format!("http://127.0.0.1:{}", args.health_port);
@@ -289,6 +291,8 @@ async fn test_only_catchup_loop_requires_negative_start_at_block(
289291
catchup_loop_sleep_secs: 60,
290292
dependence_by_connexity: false,
291293
dependence_cross_block: true,
294+
dependent_ops_rate_per_min: 0,
295+
dependent_ops_burst: 0,
292296
timeout_request_websocket: 30,
293297
};
294298

coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ async fn poller_catches_up_to_safe_tip(
200200
dependence_cache_size: 10_000,
201201
dependence_by_connexity: false,
202202
dependence_cross_block: false,
203+
dependent_ops_rate_per_min: 0,
204+
dependent_ops_burst: 0,
203205
};
204206

205207
let poller_handle = tokio::spawn(run_poller(config));

0 commit comments

Comments
 (0)