Skip to content

Commit 86b8688

Browse files
committed
feat(host-listener): throttle dependent ops
1 parent d59a2d4 commit 86b8688

File tree

7 files changed

+240
-7
lines changed

7 files changed

+240
-7
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/host-listener/src/bin/poller.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,20 @@ struct Args {
118118
help = "Dependence chain are across blocks"
119119
)]
120120
pub dependence_cross_block: bool,
121+
122+
#[arg(
123+
long,
124+
default_value_t = 0,
125+
help = "Global dependent ops rate limit per minute (0 disables)"
126+
)]
127+
pub dependent_ops_rate_per_min: u32,
128+
129+
#[arg(
130+
long,
131+
default_value_t = 0,
132+
help = "Burst size for dependent ops limiter (0 = same as rate)"
133+
)]
134+
pub dependent_ops_burst: u32,
121135
}
122136

123137
#[tokio::main]
@@ -159,6 +173,8 @@ async fn main() -> anyhow::Result<()> {
159173
dependence_cache_size: args.dependence_cache_size,
160174
dependence_by_connexity: args.dependence_by_connexity,
161175
dependence_cross_block: args.dependence_cross_block,
176+
dependent_ops_rate_per_min: args.dependent_ops_rate_per_min,
177+
dependent_ops_burst: args.dependent_ops_burst,
162178
};
163179

164180
run_poller(config).await

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ pub struct Args {
125125
)]
126126
pub dependence_cross_block: bool,
127127

128+
#[arg(
129+
long,
130+
default_value_t = 0,
131+
help = "Global dependent ops rate limit per minute (0 disables)"
132+
)]
133+
pub dependent_ops_rate_per_min: u32,
134+
135+
#[arg(
136+
long,
137+
default_value_t = 0,
138+
help = "Burst size for dependent ops limiter (0 = same as rate)"
139+
)]
140+
pub dependent_ops_burst: u32,
141+
128142
#[arg(
129143
long,
130144
default_value = "50",
@@ -1047,6 +1061,10 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
10471061
args.dependence_cache_size,
10481062
)
10491063
.await?;
1064+
db.set_dependent_ops_limiter(
1065+
args.dependent_ops_rate_per_min,
1066+
args.dependent_ops_burst,
1067+
);
10501068

10511069
if chain_id != db.chain_id {
10521070
error!(

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 192 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@ use fhevm_engine_common::types::AllowEvents;
99
use fhevm_engine_common::types::SupportedFheOperations;
1010
use fhevm_engine_common::utils::DatabaseURL;
1111
use fhevm_engine_common::utils::{to_hex, HeartBeat};
12+
use prometheus::{register_int_counter_vec, IntCounterVec};
1213
use sqlx::postgres::PgConnectOptions;
1314
use sqlx::postgres::PgPoolOptions;
1415
use sqlx::types::Uuid;
1516
use sqlx::Error as SqlxError;
1617
use sqlx::{PgPool, Postgres};
18+
use std::collections::HashMap;
1719
use std::ops::DerefMut;
1820
use std::sync::Arc;
21+
use std::sync::LazyLock;
1922
use std::time::Duration;
23+
use std::time::Instant;
2024
use time::{Duration as TimeDuration, PrimitiveDateTime};
25+
use tokio::sync::Mutex;
2126
use tokio::sync::RwLock;
2227
use tracing::error;
2328
use tracing::info;
@@ -39,6 +44,24 @@ pub type ScalarByte = FixedBytes<1>;
3944
pub type ClearConst = Uint<256, 4>;
4045
pub type ChainHash = TransactionHash;
4146

47+
static DEPENDENT_OPS_ALLOWED: LazyLock<IntCounterVec> = LazyLock::new(|| {
48+
register_int_counter_vec!(
49+
"host_listener_dependent_ops_allowed",
50+
"Number of dependent ops allowed by the limiter",
51+
&["chain_id"]
52+
)
53+
.unwrap()
54+
});
55+
56+
static DEPENDENT_OPS_THROTTLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
57+
register_int_counter_vec!(
58+
"host_listener_dependent_ops_throttled",
59+
"Number of dependent ops deferred by the limiter",
60+
&["chain_id"]
61+
)
62+
.unwrap()
63+
});
64+
4265
#[derive(Clone, Debug)]
4366
pub struct Chain {
4467
pub hash: ChainHash,
@@ -92,8 +115,11 @@ pub struct Database {
92115
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
93116
pub tenant_id: TenantId,
94117
pub chain_id: ChainId,
118+
chain_id_label: String,
119+
last_scheduled_by_chain: Arc<Mutex<HashMap<Vec<u8>, PrimitiveDateTime>>>,
95120
pub dependence_chain: ChainCache,
96121
pub tick: HeartBeat,
122+
dependent_ops_limiter: Option<Arc<Mutex<DependentOpsLimiter>>>,
97123
}
98124

99125
#[derive(Debug)]
@@ -107,6 +133,82 @@ pub struct LogTfhe {
107133
pub dependence_chain: TransactionHash,
108134
}
109135

136+
#[derive(Debug)]
137+
struct DependentOpsLimiter {
138+
rate_per_min: u32,
139+
burst: u32,
140+
tokens: f64,
141+
last_refill: Instant,
142+
}
143+
144+
impl DependentOpsLimiter {
145+
fn new(rate_per_min: u32, burst: u32) -> Option<Self> {
146+
if rate_per_min == 0 {
147+
return None;
148+
}
149+
let burst = if burst == 0 {
150+
rate_per_min.max(1)
151+
} else {
152+
burst
153+
};
154+
Some(Self {
155+
rate_per_min,
156+
burst,
157+
tokens: burst as f64,
158+
last_refill: Instant::now(),
159+
})
160+
}
161+
162+
fn defer_duration(&mut self) -> Duration {
163+
let now = Instant::now();
164+
self.refill(now);
165+
let rate_per_sec = self.rate_per_min as f64 / 60.0;
166+
if rate_per_sec <= 0.0 {
167+
return Duration::ZERO;
168+
}
169+
self.tokens -= 1.0;
170+
if self.tokens >= 0.0 {
171+
Duration::ZERO
172+
} else {
173+
let deficit = -self.tokens;
174+
let wait_secs = deficit / rate_per_sec;
175+
if wait_secs <= 0.0 {
176+
Duration::ZERO
177+
} else {
178+
Duration::from_secs_f64(wait_secs)
179+
}
180+
}
181+
}
182+
183+
fn refill(&mut self, now: Instant) {
184+
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
185+
if elapsed <= 0.0 {
186+
return;
187+
}
188+
let rate_per_sec = self.rate_per_min as f64 / 60.0;
189+
let added = elapsed * rate_per_sec;
190+
if added > 0.0 {
191+
self.tokens = (self.tokens + added).min(self.burst as f64);
192+
self.last_refill = now;
193+
}
194+
}
195+
}
196+
197+
fn clamp_schedule_order(
198+
last_scheduled_by_chain: &mut HashMap<Vec<u8>, PrimitiveDateTime>,
199+
chain_id: Vec<u8>,
200+
schedule_order: PrimitiveDateTime,
201+
) -> PrimitiveDateTime {
202+
let next = match last_scheduled_by_chain.get(&chain_id) {
203+
Some(last) if *last >= schedule_order => {
204+
last.saturating_add(TimeDuration::microseconds(1))
205+
}
206+
_ => schedule_order,
207+
};
208+
last_scheduled_by_chain.insert(chain_id, next);
209+
next
210+
}
211+
110212
pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;
111213

112214
impl Database {
@@ -129,12 +231,21 @@ impl Database {
129231
url: url.clone(),
130232
tenant_id,
131233
chain_id,
234+
chain_id_label: chain_id.to_string(),
235+
last_scheduled_by_chain: Arc::new(Mutex::new(HashMap::new())),
132236
pool: Arc::new(RwLock::new(pool)),
133237
dependence_chain: bucket_cache,
134238
tick: HeartBeat::default(),
239+
dependent_ops_limiter: None,
135240
})
136241
}
137242

243+
pub fn set_dependent_ops_limiter(&mut self, rate_per_min: u32, burst: u32) {
244+
self.dependent_ops_limiter =
245+
DependentOpsLimiter::new(rate_per_min, burst)
246+
.map(|limiter| Arc::new(Mutex::new(limiter)));
247+
}
248+
138249
async fn new_pool(url: &DatabaseURL) -> PgPool {
139250
let options: PgConnectOptions = url.parse().expect("bad url");
140251
let options = options.options([
@@ -280,6 +391,41 @@ impl Database {
280391
) -> Result<bool, SqlxError> {
281392
let is_scalar = !scalar_byte.is_zero();
282393
let output_handle = result.to_vec();
394+
let dependence_chain_id = log.dependence_chain.to_vec();
395+
let mut schedule_order =
396+
log.block_timestamp
397+
.saturating_add(TimeDuration::microseconds(
398+
log.tx_depth_size as i64,
399+
));
400+
if log.is_allowed && !dependencies.is_empty() {
401+
if let Some(limiter) = &self.dependent_ops_limiter {
402+
let defer_for = limiter.lock().await.defer_duration();
403+
if defer_for.is_zero() {
404+
DEPENDENT_OPS_ALLOWED
405+
.with_label_values(&[self.chain_id_label.as_str()])
406+
.inc();
407+
} else {
408+
// Defer by writing a future schedule_order; worker still
409+
// pulls earliest schedule_order, so this smooths load without blocking ingest.
410+
let defer_micros =
411+
defer_for.as_micros().min(i64::MAX as u128) as i64;
412+
schedule_order = schedule_order.saturating_add(
413+
TimeDuration::microseconds(defer_micros),
414+
);
415+
DEPENDENT_OPS_THROTTLED
416+
.with_label_values(&[self.chain_id_label.as_str()])
417+
.inc();
418+
}
419+
}
420+
}
421+
if self.dependent_ops_limiter.is_some() {
422+
let mut last_scheduled = self.last_scheduled_by_chain.lock().await;
423+
schedule_order = clamp_schedule_order(
424+
&mut last_scheduled,
425+
dependence_chain_id.clone(),
426+
schedule_order,
427+
);
428+
}
283429
let query = sqlx::query!(
284430
r#"
285431
INSERT INTO computations (
@@ -303,13 +449,10 @@ impl Database {
303449
&dependencies,
304450
fhe_operation as i16,
305451
is_scalar,
306-
log.dependence_chain.to_vec(),
452+
dependence_chain_id,
307453
log.transaction_hash.map(|txh| txh.to_vec()),
308454
log.is_allowed,
309-
log.block_timestamp
310-
.saturating_add(time::Duration::microseconds(
311-
log.tx_depth_size as i64
312-
)),
455+
schedule_order,
313456
!log.is_allowed,
314457
);
315458
query
@@ -1094,3 +1237,47 @@ pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec<Handle> {
10941237
E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![],
10951238
}
10961239
}
1240+
1241+
#[cfg(test)]
1242+
mod tests {
1243+
use super::*;
1244+
1245+
#[test]
1246+
fn dependent_ops_limiter_defers_after_burst() {
1247+
let mut limiter = DependentOpsLimiter::new(60, 2).unwrap();
1248+
assert!(limiter.defer_duration().is_zero());
1249+
assert!(limiter.defer_duration().is_zero());
1250+
let deferred = limiter.defer_duration();
1251+
assert!(deferred > Duration::ZERO);
1252+
}
1253+
1254+
#[test]
1255+
fn dependent_ops_limiter_refills_over_time() {
1256+
let mut limiter = DependentOpsLimiter::new(60, 1).unwrap();
1257+
assert!(limiter.defer_duration().is_zero());
1258+
let deferred = limiter.defer_duration();
1259+
assert!(deferred > Duration::ZERO);
1260+
limiter.last_refill =
1261+
Instant::now().checked_sub(Duration::from_secs(2)).unwrap();
1262+
let allowed_after_refill = limiter.defer_duration();
1263+
assert!(allowed_after_refill.is_zero());
1264+
}
1265+
1266+
#[test]
1267+
fn dependent_ops_limiter_disabled_when_rate_zero() {
1268+
assert!(DependentOpsLimiter::new(0, 10).is_none());
1269+
}
1270+
1271+
#[test]
1272+
fn clamp_schedule_order_is_monotonic_per_chain() {
1273+
let mut last_scheduled = HashMap::new();
1274+
let chain_id = vec![1, 2, 3];
1275+
let base = PrimitiveDateTime::MIN;
1276+
let first =
1277+
clamp_schedule_order(&mut last_scheduled, chain_id.clone(), base);
1278+
assert_eq!(first, base);
1279+
let second =
1280+
clamp_schedule_order(&mut last_scheduled, chain_id.clone(), base);
1281+
assert!(second > base);
1282+
}
1283+
}

coprocessor/fhevm-engine/host-listener/src/poller/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub struct PollerConfig {
8787
pub dependence_cache_size: u16,
8888
pub dependence_by_connexity: bool,
8989
pub dependence_cross_block: bool,
90+
pub dependent_ops_rate_per_min: u32,
91+
pub dependent_ops_burst: u32,
9092
}
9193

9294
pub async fn run_poller(config: PollerConfig) -> Result<()> {
@@ -139,6 +141,10 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
139141
config.dependence_cache_size,
140142
)
141143
.await?;
144+
db.set_dependent_ops_limiter(
145+
config.dependent_ops_rate_per_min,
146+
config.dependent_ops_burst,
147+
);
142148

143149
if chain_id != db.chain_id {
144150
error!(

coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ async fn setup(node_chain_id: Option<u64>) -> Result<Setup, anyhow::Error> {
234234
catchup_finalization_in_blocks: 2,
235235
dependence_by_connexity: false,
236236
dependence_cross_block: true,
237+
dependent_ops_rate_per_min: 0,
238+
dependent_ops_burst: 0,
237239
timeout_request_websocket: 30,
238240
};
239241
let health_check_url = format!("http://127.0.0.1:{}", args.health_port);
@@ -289,6 +291,8 @@ async fn test_only_catchup_loop_requires_negative_start_at_block(
289291
catchup_loop_sleep_secs: 60,
290292
dependence_by_connexity: false,
291293
dependence_cross_block: true,
294+
dependent_ops_rate_per_min: 0,
295+
dependent_ops_burst: 0,
292296
timeout_request_websocket: 30,
293297
};
294298

coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ async fn poller_catches_up_to_safe_tip(
200200
dependence_cache_size: 10_000,
201201
dependence_by_connexity: false,
202202
dependence_cross_block: false,
203+
dependent_ops_rate_per_min: 0,
204+
dependent_ops_burst: 0,
203205
};
204206

205207
let poller_handle = tokio::spawn(run_poller(config));

0 commit comments

Comments
 (0)