diff --git a/coprocessor/fhevm-engine/Cargo.lock b/coprocessor/fhevm-engine/Cargo.lock index c7a2d44112..d293687cf2 100644 --- a/coprocessor/fhevm-engine/Cargo.lock +++ b/coprocessor/fhevm-engine/Cargo.lock @@ -2378,9 +2378,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] diff --git a/coprocessor/fhevm-engine/db-migration/migrations/20260204120000_dependence_chain_schedule.sql b/coprocessor/fhevm-engine/db-migration/migrations/20260204120000_dependence_chain_schedule.sql new file mode 100644 index 0000000000..a6a1a67532 --- /dev/null +++ b/coprocessor/fhevm-engine/db-migration/migrations/20260204120000_dependence_chain_schedule.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS dependence_chain_schedule ( + dependence_chain_id bytea PRIMARY KEY, + last_scheduled_at TIMESTAMP NOT NULL +); diff --git a/coprocessor/fhevm-engine/host-listener/README.md b/coprocessor/fhevm-engine/host-listener/README.md index 57a92dbb67..6bdd67a0d8 100644 --- a/coprocessor/fhevm-engine/host-listener/README.md +++ b/coprocessor/fhevm-engine/host-listener/README.md @@ -33,6 +33,17 @@ By default the listener propagate TFHE operation events to the database. You can change the database url using --database-url, it defaults to a local test database url. If you want to disable TFHE operation events propagation, you can provide an empty database-url. +### Dependent ops throttling (optional) + +Two flags enable a per-replica limiter for dependent ops: + +- `--dependent-ops-rate-per-min` (0 disables, mainnet can leave at 0) +- `--dependent-ops-burst` (defaults to rate when set to 0) + +When enabled, the listener defers dependent ops by writing a future `schedule_order`. +To avoid order inversion across multiple host-listener types (main/catchup/poller), +we clamp `schedule_order` via the `dependence_chain_schedule` table (one row per chain). + ## Events in FHEVM ### Blockchain Events diff --git a/coprocessor/fhevm-engine/host-listener/src/bin/poller.rs b/coprocessor/fhevm-engine/host-listener/src/bin/poller.rs index 6c24d7c82d..864fe65ece 100644 --- a/coprocessor/fhevm-engine/host-listener/src/bin/poller.rs +++ b/coprocessor/fhevm-engine/host-listener/src/bin/poller.rs @@ -118,6 +118,20 @@ struct Args { help = "Dependence chain are across blocks" )] pub dependence_cross_block: bool, + + #[arg( + long, + default_value_t = 0, + help = "Global dependent ops rate limit per minute (0 disables)" + )] + pub dependent_ops_rate_per_min: u32, + + #[arg( + long, + default_value_t = 0, + help = "Burst size for dependent ops limiter (0 = same as rate)" + )] + pub dependent_ops_burst: u32, } #[tokio::main] @@ -159,6 +173,8 @@ async fn main() -> anyhow::Result<()> { dependence_cache_size: args.dependence_cache_size, dependence_by_connexity: args.dependence_by_connexity, dependence_cross_block: args.dependence_cross_block, + dependent_ops_rate_per_min: args.dependent_ops_rate_per_min, + dependent_ops_burst: args.dependent_ops_burst, }; run_poller(config).await diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 2898f3aeca..5843d46dc5 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -125,6 +125,20 @@ pub struct Args { )] pub dependence_cross_block: bool, + #[arg( + long, + default_value_t = 0, + help = "Global dependent ops rate limit per minute (0 disables)" + )] + pub dependent_ops_rate_per_min: u32, + + #[arg( + long, + default_value_t = 0, + help = "Burst size for dependent ops limiter (0 = same as rate)" + )] + pub dependent_ops_burst: u32, + #[arg( long, default_value = "50", @@ -1047,6 +1061,10 @@ pub async fn main(args: Args) -> anyhow::Result<()> { args.dependence_cache_size, ) .await?; + db.set_dependent_ops_limiter( + args.dependent_ops_rate_per_min, + args.dependent_ops_burst, + ); if chain_id != db.chain_id { error!( diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 56ba741541..3f003cae61 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -9,6 +9,7 @@ use fhevm_engine_common::types::AllowEvents; use fhevm_engine_common::types::SupportedFheOperations; use fhevm_engine_common::utils::DatabaseURL; use fhevm_engine_common::utils::{to_hex, HeartBeat}; +use prometheus::{register_int_counter_vec, IntCounterVec}; use sqlx::postgres::PgConnectOptions; use sqlx::postgres::PgPoolOptions; use sqlx::types::Uuid; @@ -16,8 +17,11 @@ use sqlx::Error as SqlxError; use sqlx::{PgPool, Postgres}; use std::ops::DerefMut; use std::sync::Arc; +use std::sync::LazyLock; use std::time::Duration; +use std::time::Instant; use time::{Duration as TimeDuration, PrimitiveDateTime}; +use tokio::sync::Mutex; use tokio::sync::RwLock; use tracing::error; use tracing::info; @@ -39,6 +43,24 @@ pub type ScalarByte = FixedBytes<1>; pub type ClearConst = Uint<256, 4>; pub type ChainHash = TransactionHash; +static DEPENDENT_OPS_ALLOWED: LazyLock = LazyLock::new(|| { + register_int_counter_vec!( + "host_listener_dependent_ops_allowed", + "Number of dependent ops allowed by the limiter", + &["chain_id"] + ) + .unwrap() +}); + +static DEPENDENT_OPS_THROTTLED: LazyLock = LazyLock::new(|| { + register_int_counter_vec!( + "host_listener_dependent_ops_throttled", + "Number of dependent ops deferred by the limiter", + &["chain_id"] + ) + .unwrap() +}); + #[derive(Clone, Debug)] pub struct Chain { pub hash: ChainHash, @@ -92,8 +114,10 @@ pub struct Database { pub pool: Arc>>, pub tenant_id: TenantId, pub chain_id: ChainId, + chain_id_label: String, pub dependence_chain: ChainCache, pub tick: HeartBeat, + dependent_ops_limiter: Option>>, } #[derive(Debug)] @@ -107,6 +131,67 @@ pub struct LogTfhe { pub dependence_chain: TransactionHash, } +#[derive(Debug)] +struct DependentOpsLimiter { + rate_per_min: u32, + burst: u32, + tokens: f64, + last_refill: Instant, +} + +impl DependentOpsLimiter { + fn new(rate_per_min: u32, burst: u32) -> Option { + if rate_per_min == 0 { + return None; + } + let burst = if burst == 0 { + rate_per_min.max(1) + } else { + burst + }; + Some(Self { + rate_per_min, + burst, + tokens: burst as f64, + last_refill: Instant::now(), + }) + } + + fn defer_duration(&mut self) -> Duration { + let now = Instant::now(); + self.refill(now); + let rate_per_sec = self.rate_per_min as f64 / 60.0; + if rate_per_sec <= 0.0 { + return Duration::ZERO; + } + self.tokens -= 1.0; + if self.tokens >= 0.0 { + Duration::ZERO + } else { + let deficit = -self.tokens; + let wait_secs = deficit / rate_per_sec; + if wait_secs <= 0.0 { + Duration::ZERO + } else { + Duration::from_secs_f64(wait_secs) + } + } + } + + fn refill(&mut self, now: Instant) { + let elapsed = now.duration_since(self.last_refill).as_secs_f64(); + if elapsed <= 0.0 { + return; + } + let rate_per_sec = self.rate_per_min as f64 / 60.0; + let added = elapsed * rate_per_sec; + if added > 0.0 { + self.tokens = (self.tokens + added).min(self.burst as f64); + self.last_refill = now; + } + } +} + pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>; impl Database { @@ -129,12 +214,20 @@ impl Database { url: url.clone(), tenant_id, chain_id, + chain_id_label: chain_id.to_string(), pool: Arc::new(RwLock::new(pool)), dependence_chain: bucket_cache, tick: HeartBeat::default(), + dependent_ops_limiter: None, }) } + pub fn set_dependent_ops_limiter(&mut self, rate_per_min: u32, burst: u32) { + self.dependent_ops_limiter = + DependentOpsLimiter::new(rate_per_min, burst) + .map(|limiter| Arc::new(Mutex::new(limiter))); + } + async fn new_pool(url: &DatabaseURL) -> PgPool { let options: PgConnectOptions = url.parse().expect("bad url"); let options = options.options([ @@ -280,6 +373,43 @@ impl Database { ) -> Result { let is_scalar = !scalar_byte.is_zero(); let output_handle = result.to_vec(); + let dependence_chain_id = log.dependence_chain.to_vec(); + let mut schedule_order = + log.block_timestamp + .saturating_add(TimeDuration::microseconds( + log.tx_depth_size as i64, + )); + let has_dependencies = log.is_allowed && !dependencies.is_empty(); + if has_dependencies { + if let Some(limiter) = &self.dependent_ops_limiter { + let defer_for = limiter.lock().await.defer_duration(); + if defer_for.is_zero() { + DEPENDENT_OPS_ALLOWED + .with_label_values(&[self.chain_id_label.as_str()]) + .inc(); + } else { + // Defer by writing a future schedule_order; worker still + // pulls earliest schedule_order, so this smooths load without blocking ingest. + let defer_micros = + defer_for.as_micros().min(i64::MAX as u128) as i64; + schedule_order = schedule_order.saturating_add( + TimeDuration::microseconds(defer_micros), + ); + DEPENDENT_OPS_THROTTLED + .with_label_values(&[self.chain_id_label.as_str()]) + .inc(); + } + } + } + if self.dependent_ops_limiter.is_some() && has_dependencies { + schedule_order = self + .clamp_schedule_order_db( + tx, + &dependence_chain_id, + schedule_order, + ) + .await?; + } let query = sqlx::query!( r#" INSERT INTO computations ( @@ -303,13 +433,10 @@ impl Database { &dependencies, fhe_operation as i16, is_scalar, - log.dependence_chain.to_vec(), + dependence_chain_id, log.transaction_hash.map(|txh| txh.to_vec()), log.is_allowed, - log.block_timestamp - .saturating_add(time::Duration::microseconds( - log.tx_depth_size as i64 - )), + schedule_order, !log.is_allowed, ); query @@ -318,6 +445,36 @@ impl Database { .map(|result| result.rows_affected() > 0) } + async fn clamp_schedule_order_db( + &self, + tx: &mut Transaction<'_>, + dependence_chain_id: &[u8], + schedule_order: PrimitiveDateTime, + ) -> Result { + // Monotonic clamp shared across all host-listener types. + // Uses a single row per dependence_chain_id to avoid order inversions + // across restarts or concurrent HL instances. + let clamped = sqlx::query_scalar( + r#" + INSERT INTO dependence_chain_schedule( + dependence_chain_id, + last_scheduled_at + ) VALUES ($1, $2::timestamp) + ON CONFLICT (dependence_chain_id) DO UPDATE + SET last_scheduled_at = GREATEST( + dependence_chain_schedule.last_scheduled_at + INTERVAL '1 microsecond', + EXCLUDED.last_scheduled_at + ) + RETURNING last_scheduled_at + "#, + ) + .bind(dependence_chain_id) + .bind(schedule_order) + .fetch_one(tx.deref_mut()) + .await?; + Ok(clamped) + } + #[rustfmt::skip] pub async fn insert_tfhe_event( &self, @@ -1094,3 +1251,34 @@ pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec { E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![], } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dependent_ops_limiter_defers_after_burst() { + let mut limiter = DependentOpsLimiter::new(60, 2).unwrap(); + assert!(limiter.defer_duration().is_zero()); + assert!(limiter.defer_duration().is_zero()); + let deferred = limiter.defer_duration(); + assert!(deferred > Duration::ZERO); + } + + #[test] + fn dependent_ops_limiter_refills_over_time() { + let mut limiter = DependentOpsLimiter::new(60, 1).unwrap(); + assert!(limiter.defer_duration().is_zero()); + let deferred = limiter.defer_duration(); + assert!(deferred > Duration::ZERO); + limiter.last_refill = + Instant::now().checked_sub(Duration::from_secs(2)).unwrap(); + let allowed_after_refill = limiter.defer_duration(); + assert!(allowed_after_refill.is_zero()); + } + + #[test] + fn dependent_ops_limiter_disabled_when_rate_zero() { + assert!(DependentOpsLimiter::new(0, 10).is_none()); + } +} diff --git a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs index 8c6520aeaf..7cbb54e681 100644 --- a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs @@ -87,6 +87,8 @@ pub struct PollerConfig { pub dependence_cache_size: u16, pub dependence_by_connexity: bool, pub dependence_cross_block: bool, + pub dependent_ops_rate_per_min: u32, + pub dependent_ops_burst: u32, } pub async fn run_poller(config: PollerConfig) -> Result<()> { @@ -139,6 +141,10 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> { config.dependence_cache_size, ) .await?; + db.set_dependent_ops_limiter( + config.dependent_ops_rate_per_min, + config.dependent_ops_burst, + ); if chain_id != db.chain_id { error!( diff --git a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs index dc254de057..0c4d3ce0b0 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs @@ -234,6 +234,8 @@ async fn setup(node_chain_id: Option) -> Result { catchup_finalization_in_blocks: 2, dependence_by_connexity: false, dependence_cross_block: true, + dependent_ops_rate_per_min: 0, + dependent_ops_burst: 0, timeout_request_websocket: 30, }; let health_check_url = format!("http://127.0.0.1:{}", args.health_port); @@ -289,6 +291,8 @@ async fn test_only_catchup_loop_requires_negative_start_at_block( catchup_loop_sleep_secs: 60, dependence_by_connexity: false, dependence_cross_block: true, + dependent_ops_rate_per_min: 0, + dependent_ops_burst: 0, timeout_request_websocket: 30, }; diff --git a/coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs b/coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs index 1f87b2e67c..d26b89ef7f 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/poller_integration_tests.rs @@ -200,6 +200,8 @@ async fn poller_catches_up_to_safe_tip( dependence_cache_size: 10_000, dependence_by_connexity: false, dependence_cross_block: false, + dependent_ops_rate_per_min: 0, + dependent_ops_burst: 0, }; let poller_handle = tokio::spawn(run_poller(config));