Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS dependence_chain_schedule (
dependence_chain_id bytea PRIMARY KEY,
last_scheduled_at TIMESTAMP NOT NULL
);
11 changes: 11 additions & 0 deletions coprocessor/fhevm-engine/host-listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ By default the listener propagate TFHE operation events to the database.
You can change the database url using --database-url, it defaults to a local test database url.
If you want to disable TFHE operation events propagation, you can provide an empty database-url.

### Dependent ops throttling (optional)

Two flags enable a per-replica limiter for dependent ops:

- `--dependent-ops-rate-per-min` (0 disables, mainnet can leave at 0)
- `--dependent-ops-burst` (defaults to rate when set to 0)

When enabled, the listener defers dependent ops by writing a future `schedule_order`.
To avoid order inversion across multiple host-listener types (main/catchup/poller),
we clamp `schedule_order` via the `dependence_chain_schedule` table (one row per chain).

## Events in FHEVM

### Blockchain Events
Expand Down
16 changes: 16 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/bin/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ struct Args {
help = "Dependence chain are across blocks"
)]
pub dependence_cross_block: bool,

#[arg(
long,
default_value_t = 0,
help = "Global dependent ops rate limit per minute (0 disables)"
)]
pub dependent_ops_rate_per_min: u32,

#[arg(
long,
default_value_t = 0,
help = "Burst size for dependent ops limiter (0 = same as rate)"
)]
pub dependent_ops_burst: u32,
}

#[tokio::main]
Expand Down Expand Up @@ -159,6 +173,8 @@ async fn main() -> anyhow::Result<()> {
dependence_cache_size: args.dependence_cache_size,
dependence_by_connexity: args.dependence_by_connexity,
dependence_cross_block: args.dependence_cross_block,
dependent_ops_rate_per_min: args.dependent_ops_rate_per_min,
dependent_ops_burst: args.dependent_ops_burst,
};

run_poller(config).await
Expand Down
18 changes: 18 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ pub struct Args {
)]
pub dependence_cross_block: bool,

#[arg(
long,
default_value_t = 0,
help = "Global dependent ops rate limit per minute (0 disables)"
)]
pub dependent_ops_rate_per_min: u32,

#[arg(
long,
default_value_t = 0,
help = "Burst size for dependent ops limiter (0 = same as rate)"
)]
pub dependent_ops_burst: u32,

#[arg(
long,
default_value = "50",
Expand Down Expand Up @@ -1047,6 +1061,10 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
args.dependence_cache_size,
)
.await?;
db.set_dependent_ops_limiter(
args.dependent_ops_rate_per_min,
args.dependent_ops_burst,
);

if chain_id != db.chain_id {
error!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ use fhevm_engine_common::types::AllowEvents;
use fhevm_engine_common::types::SupportedFheOperations;
use fhevm_engine_common::utils::DatabaseURL;
use fhevm_engine_common::utils::{to_hex, HeartBeat};
use prometheus::{register_int_counter_vec, IntCounterVec};
use sqlx::postgres::PgConnectOptions;
use sqlx::postgres::PgPoolOptions;
use sqlx::types::Uuid;
use sqlx::Error as SqlxError;
use sqlx::{PgPool, Postgres};
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
use std::time::Instant;
use time::{Duration as TimeDuration, PrimitiveDateTime};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tracing::error;
use tracing::info;
Expand All @@ -39,6 +43,24 @@ pub type ScalarByte = FixedBytes<1>;
pub type ClearConst = Uint<256, 4>;
pub type ChainHash = TransactionHash;

static DEPENDENT_OPS_ALLOWED: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"host_listener_dependent_ops_allowed",
"Number of dependent ops allowed by the limiter",
&["chain_id"]
)
.unwrap()
});

static DEPENDENT_OPS_THROTTLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"host_listener_dependent_ops_throttled",
"Number of dependent ops deferred by the limiter",
&["chain_id"]
)
.unwrap()
});

#[derive(Clone, Debug)]
pub struct Chain {
pub hash: ChainHash,
Expand Down Expand Up @@ -92,8 +114,10 @@ pub struct Database {
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
pub tenant_id: TenantId,
pub chain_id: ChainId,
chain_id_label: String,
pub dependence_chain: ChainCache,
pub tick: HeartBeat,
dependent_ops_limiter: Option<Arc<Mutex<DependentOpsLimiter>>>,
}

#[derive(Debug)]
Expand All @@ -107,6 +131,67 @@ pub struct LogTfhe {
pub dependence_chain: TransactionHash,
}

#[derive(Debug)]
struct DependentOpsLimiter {
rate_per_min: u32,
burst: u32,
tokens: f64,
last_refill: Instant,
}

impl DependentOpsLimiter {
fn new(rate_per_min: u32, burst: u32) -> Option<Self> {
if rate_per_min == 0 {
return None;
}
let burst = if burst == 0 {
rate_per_min.max(1)
} else {
burst
};
Some(Self {
rate_per_min,
burst,
tokens: burst as f64,
last_refill: Instant::now(),
})
}

fn defer_duration(&mut self) -> Duration {
let now = Instant::now();
self.refill(now);
let rate_per_sec = self.rate_per_min as f64 / 60.0;
if rate_per_sec <= 0.0 {
return Duration::ZERO;
}
self.tokens -= 1.0;
if self.tokens >= 0.0 {
Duration::ZERO
} else {
let deficit = -self.tokens;
let wait_secs = deficit / rate_per_sec;
if wait_secs <= 0.0 {
Duration::ZERO
} else {
Duration::from_secs_f64(wait_secs)
}
}
}

fn refill(&mut self, now: Instant) {
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
if elapsed <= 0.0 {
return;
}
let rate_per_sec = self.rate_per_min as f64 / 60.0;
let added = elapsed * rate_per_sec;
if added > 0.0 {
self.tokens = (self.tokens + added).min(self.burst as f64);
self.last_refill = now;
}
}
}

pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;

impl Database {
Expand All @@ -129,12 +214,20 @@ impl Database {
url: url.clone(),
tenant_id,
chain_id,
chain_id_label: chain_id.to_string(),
pool: Arc::new(RwLock::new(pool)),
dependence_chain: bucket_cache,
tick: HeartBeat::default(),
dependent_ops_limiter: None,
})
}

pub fn set_dependent_ops_limiter(&mut self, rate_per_min: u32, burst: u32) {
self.dependent_ops_limiter =
DependentOpsLimiter::new(rate_per_min, burst)
.map(|limiter| Arc::new(Mutex::new(limiter)));
}

async fn new_pool(url: &DatabaseURL) -> PgPool {
let options: PgConnectOptions = url.parse().expect("bad url");
let options = options.options([
Expand Down Expand Up @@ -280,6 +373,43 @@ impl Database {
) -> Result<bool, SqlxError> {
let is_scalar = !scalar_byte.is_zero();
let output_handle = result.to_vec();
let dependence_chain_id = log.dependence_chain.to_vec();
let mut schedule_order =
log.block_timestamp
.saturating_add(TimeDuration::microseconds(
log.tx_depth_size as i64,
));
let has_dependencies = log.is_allowed && !dependencies.is_empty();
if has_dependencies {
if let Some(limiter) = &self.dependent_ops_limiter {
let defer_for = limiter.lock().await.defer_duration();
if defer_for.is_zero() {
DEPENDENT_OPS_ALLOWED
.with_label_values(&[self.chain_id_label.as_str()])
.inc();
} else {
// Defer by writing a future schedule_order; worker still
// pulls earliest schedule_order, so this smooths load without blocking ingest.
let defer_micros =
defer_for.as_micros().min(i64::MAX as u128) as i64;
schedule_order = schedule_order.saturating_add(
TimeDuration::microseconds(defer_micros),
);
DEPENDENT_OPS_THROTTLED
.with_label_values(&[self.chain_id_label.as_str()])
.inc();
}
}
}
if self.dependent_ops_limiter.is_some() && has_dependencies {
schedule_order = self
.clamp_schedule_order_db(
tx,
&dependence_chain_id,
schedule_order,
)
.await?;
}
let query = sqlx::query!(
r#"
INSERT INTO computations (
Expand All @@ -303,13 +433,10 @@ impl Database {
&dependencies,
fhe_operation as i16,
is_scalar,
log.dependence_chain.to_vec(),
dependence_chain_id,
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
log.block_timestamp
.saturating_add(time::Duration::microseconds(
log.tx_depth_size as i64
)),
schedule_order,
!log.is_allowed,
);
query
Expand All @@ -318,6 +445,36 @@ impl Database {
.map(|result| result.rows_affected() > 0)
}

async fn clamp_schedule_order_db(
&self,
tx: &mut Transaction<'_>,
dependence_chain_id: &[u8],
schedule_order: PrimitiveDateTime,
) -> Result<PrimitiveDateTime, SqlxError> {
// Monotonic clamp shared across all host-listener types.
// Uses a single row per dependence_chain_id to avoid order inversions
// across restarts or concurrent HL instances.
let clamped = sqlx::query_scalar(
r#"
INSERT INTO dependence_chain_schedule(
dependence_chain_id,
last_scheduled_at
) VALUES ($1, $2::timestamp)
ON CONFLICT (dependence_chain_id) DO UPDATE
SET last_scheduled_at = GREATEST(
dependence_chain_schedule.last_scheduled_at + INTERVAL '1 microsecond',
EXCLUDED.last_scheduled_at
)
RETURNING last_scheduled_at
"#,
)
.bind(dependence_chain_id)
.bind(schedule_order)
.fetch_one(tx.deref_mut())
.await?;
Ok(clamped)
}

#[rustfmt::skip]
pub async fn insert_tfhe_event(
&self,
Expand Down Expand Up @@ -1094,3 +1251,34 @@ pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec<Handle> {
E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![],
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn dependent_ops_limiter_defers_after_burst() {
let mut limiter = DependentOpsLimiter::new(60, 2).unwrap();
assert!(limiter.defer_duration().is_zero());
assert!(limiter.defer_duration().is_zero());
let deferred = limiter.defer_duration();
assert!(deferred > Duration::ZERO);
}

#[test]
fn dependent_ops_limiter_refills_over_time() {
let mut limiter = DependentOpsLimiter::new(60, 1).unwrap();
assert!(limiter.defer_duration().is_zero());
let deferred = limiter.defer_duration();
assert!(deferred > Duration::ZERO);
limiter.last_refill =
Instant::now().checked_sub(Duration::from_secs(2)).unwrap();
let allowed_after_refill = limiter.defer_duration();
assert!(allowed_after_refill.is_zero());
}

#[test]
fn dependent_ops_limiter_disabled_when_rate_zero() {
assert!(DependentOpsLimiter::new(0, 10).is_none());
}
}
6 changes: 6 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct PollerConfig {
pub dependence_cache_size: u16,
pub dependence_by_connexity: bool,
pub dependence_cross_block: bool,
pub dependent_ops_rate_per_min: u32,
pub dependent_ops_burst: u32,
}

pub async fn run_poller(config: PollerConfig) -> Result<()> {
Expand Down Expand Up @@ -139,6 +141,10 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
config.dependence_cache_size,
)
.await?;
db.set_dependent_ops_limiter(
config.dependent_ops_rate_per_min,
config.dependent_ops_burst,
);

if chain_id != db.chain_id {
error!(
Expand Down
Loading
Loading