Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS dependence_chain_schedule (
dependence_chain_id bytea PRIMARY KEY,
last_scheduled_at TIMESTAMPTZ NOT NULL
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AUTOMATED] Bug: Type mismatch between TIMESTAMPTZ column and PrimitiveDateTime return type

The column last_scheduled_at is defined as TIMESTAMPTZ, but the Rust function clamp_schedule_order_db() returns PrimitiveDateTime. In sqlx with the time crate:

  • TIMESTAMP (without timezone) maps to PrimitiveDateTime
  • TIMESTAMPTZ (with timezone) maps to OffsetDateTime

The RETURNING last_scheduled_at clause will return a TIMESTAMPTZ value, and sqlx will fail at runtime when trying to decode it into PrimitiveDateTime.

Evidence: The existing schedule_order columns in the codebase use TIMESTAMP (not TIMESTAMPTZ):

-- 20250703000000_add_schedule_order_column.sql
ADD COLUMN IF NOT EXISTS schedule_order TIMESTAMP NOT NULL DEFAULT NOW();

Suggested fix: Change TIMESTAMPTZ to TIMESTAMP for consistency with existing schedule_order columns:

CREATE TABLE IF NOT EXISTS dependence_chain_schedule (
    dependence_chain_id  bytea PRIMARY KEY,
    last_scheduled_at     TIMESTAMP NOT NULL
);

Confidence: 92/100

);
11 changes: 11 additions & 0 deletions coprocessor/fhevm-engine/host-listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ By default the listener propagate TFHE operation events to the database.
You can change the database url using --database-url, it defaults to a local test database url.
If you want to disable TFHE operation events propagation, you can provide an empty database-url.

### Dependent ops throttling (optional)

Two flags enable a per-replica limiter for dependent ops:

- `--dependent-ops-rate-per-min` (0 disables, mainnet can leave at 0)
- `--dependent-ops-burst` (defaults to rate when set to 0)

When enabled, the listener defers dependent ops by writing a future `schedule_order`.
To avoid order inversion across multiple host-listener types (main/catchup/poller),
we clamp `schedule_order` via the `dependence_chain_schedule` table (one row per chain).

## Events in FHEVM

### Blockchain Events
Expand Down
16 changes: 16 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/bin/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ struct Args {
help = "Dependence chain are across blocks"
)]
pub dependence_cross_block: bool,

#[arg(
long,
default_value_t = 0,
help = "Global dependent ops rate limit per minute (0 disables)"
)]
pub dependent_ops_rate_per_min: u32,

#[arg(
long,
default_value_t = 0,
help = "Burst size for dependent ops limiter (0 = same as rate)"
)]
pub dependent_ops_burst: u32,
}

#[tokio::main]
Expand Down Expand Up @@ -159,6 +173,8 @@ async fn main() -> anyhow::Result<()> {
dependence_cache_size: args.dependence_cache_size,
dependence_by_connexity: args.dependence_by_connexity,
dependence_cross_block: args.dependence_cross_block,
dependent_ops_rate_per_min: args.dependent_ops_rate_per_min,
dependent_ops_burst: args.dependent_ops_burst,
};

run_poller(config).await
Expand Down
18 changes: 18 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ pub struct Args {
)]
pub dependence_cross_block: bool,

#[arg(
long,
default_value_t = 0,
help = "Global dependent ops rate limit per minute (0 disables)"
)]
pub dependent_ops_rate_per_min: u32,

#[arg(
long,
default_value_t = 0,
help = "Burst size for dependent ops limiter (0 = same as rate)"
)]
pub dependent_ops_burst: u32,

#[arg(
long,
default_value = "50",
Expand Down Expand Up @@ -1047,6 +1061,10 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
args.dependence_cache_size,
)
.await?;
db.set_dependent_ops_limiter(
args.dependent_ops_rate_per_min,
args.dependent_ops_burst,
);

if chain_id != db.chain_id {
error!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ use fhevm_engine_common::types::AllowEvents;
use fhevm_engine_common::types::SupportedFheOperations;
use fhevm_engine_common::utils::DatabaseURL;
use fhevm_engine_common::utils::{to_hex, HeartBeat};
use prometheus::{register_int_counter_vec, IntCounterVec};
use sqlx::postgres::PgConnectOptions;
use sqlx::postgres::PgPoolOptions;
use sqlx::types::Uuid;
use sqlx::Error as SqlxError;
use sqlx::{PgPool, Postgres};
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
use std::time::Instant;
use time::{Duration as TimeDuration, PrimitiveDateTime};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tracing::error;
use tracing::info;
Expand All @@ -39,6 +43,24 @@ pub type ScalarByte = FixedBytes<1>;
pub type ClearConst = Uint<256, 4>;
pub type ChainHash = TransactionHash;

static DEPENDENT_OPS_ALLOWED: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"host_listener_dependent_ops_allowed",
"Number of dependent ops allowed by the limiter",
&["chain_id"]
)
.unwrap()
});

static DEPENDENT_OPS_THROTTLED: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"host_listener_dependent_ops_throttled",
"Number of dependent ops deferred by the limiter",
&["chain_id"]
)
.unwrap()
});

#[derive(Clone, Debug)]
pub struct Chain {
pub hash: ChainHash,
Expand Down Expand Up @@ -92,8 +114,10 @@ pub struct Database {
pub pool: Arc<RwLock<sqlx::Pool<Postgres>>>,
pub tenant_id: TenantId,
pub chain_id: ChainId,
chain_id_label: String,
pub dependence_chain: ChainCache,
pub tick: HeartBeat,
dependent_ops_limiter: Option<Arc<Mutex<DependentOpsLimiter>>>,
}

#[derive(Debug)]
Expand All @@ -107,6 +131,67 @@ pub struct LogTfhe {
pub dependence_chain: TransactionHash,
}

#[derive(Debug)]
struct DependentOpsLimiter {
rate_per_min: u32,
burst: u32,
tokens: f64,
last_refill: Instant,
}

impl DependentOpsLimiter {
fn new(rate_per_min: u32, burst: u32) -> Option<Self> {
if rate_per_min == 0 {
return None;
}
let burst = if burst == 0 {
rate_per_min.max(1)
} else {
burst
};
Some(Self {
rate_per_min,
burst,
tokens: burst as f64,
last_refill: Instant::now(),
})
}

fn defer_duration(&mut self) -> Duration {
let now = Instant::now();
self.refill(now);
let rate_per_sec = self.rate_per_min as f64 / 60.0;
if rate_per_sec <= 0.0 {
return Duration::ZERO;
}
self.tokens -= 1.0;
if self.tokens >= 0.0 {
Duration::ZERO
} else {
let deficit = -self.tokens;
let wait_secs = deficit / rate_per_sec;
if wait_secs <= 0.0 {
Duration::ZERO
} else {
Duration::from_secs_f64(wait_secs)
}
}
}

fn refill(&mut self, now: Instant) {
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
if elapsed <= 0.0 {
return;
}
let rate_per_sec = self.rate_per_min as f64 / 60.0;
let added = elapsed * rate_per_sec;
if added > 0.0 {
self.tokens = (self.tokens + added).min(self.burst as f64);
self.last_refill = now;
}
}
}

pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>;

impl Database {
Expand All @@ -129,12 +214,20 @@ impl Database {
url: url.clone(),
tenant_id,
chain_id,
chain_id_label: chain_id.to_string(),
pool: Arc::new(RwLock::new(pool)),
dependence_chain: bucket_cache,
tick: HeartBeat::default(),
dependent_ops_limiter: None,
})
}

pub fn set_dependent_ops_limiter(&mut self, rate_per_min: u32, burst: u32) {
self.dependent_ops_limiter =
DependentOpsLimiter::new(rate_per_min, burst)
.map(|limiter| Arc::new(Mutex::new(limiter)));
}

async fn new_pool(url: &DatabaseURL) -> PgPool {
let options: PgConnectOptions = url.parse().expect("bad url");
let options = options.options([
Expand Down Expand Up @@ -280,6 +373,43 @@ impl Database {
) -> Result<bool, SqlxError> {
let is_scalar = !scalar_byte.is_zero();
let output_handle = result.to_vec();
let dependence_chain_id = log.dependence_chain.to_vec();
let mut schedule_order =
log.block_timestamp
.saturating_add(TimeDuration::microseconds(
log.tx_depth_size as i64,
));
let has_dependencies = log.is_allowed && !dependencies.is_empty();
if has_dependencies {
if let Some(limiter) = &self.dependent_ops_limiter {
let defer_for = limiter.lock().await.defer_duration();
if defer_for.is_zero() {
DEPENDENT_OPS_ALLOWED
.with_label_values(&[self.chain_id_label.as_str()])
.inc();
} else {
// Defer by writing a future schedule_order; worker still
// pulls earliest schedule_order, so this smooths load without blocking ingest.
let defer_micros =
defer_for.as_micros().min(i64::MAX as u128) as i64;
schedule_order = schedule_order.saturating_add(
TimeDuration::microseconds(defer_micros),
);
DEPENDENT_OPS_THROTTLED
.with_label_values(&[self.chain_id_label.as_str()])
.inc();
}
}
}
if self.dependent_ops_limiter.is_some() && has_dependencies {
schedule_order = self
.clamp_schedule_order_db(
tx,
&dependence_chain_id,
schedule_order,
)
.await?;
}
let query = sqlx::query!(
r#"
INSERT INTO computations (
Expand All @@ -303,13 +433,10 @@ impl Database {
&dependencies,
fhe_operation as i16,
is_scalar,
log.dependence_chain.to_vec(),
dependence_chain_id,
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
log.block_timestamp
.saturating_add(time::Duration::microseconds(
log.tx_depth_size as i64
)),
schedule_order,
!log.is_allowed,
);
query
Expand All @@ -318,6 +445,36 @@ impl Database {
.map(|result| result.rows_affected() > 0)
}

async fn clamp_schedule_order_db(
&self,
tx: &mut Transaction<'_>,
dependence_chain_id: &[u8],
schedule_order: PrimitiveDateTime,
) -> Result<PrimitiveDateTime, SqlxError> {
// Monotonic clamp shared across all host-listener types.
// Uses a single row per dependence_chain_id to avoid order inversions
// across restarts or concurrent HL instances.
let clamped = sqlx::query_scalar(
r#"
INSERT INTO dependence_chain_schedule(
dependence_chain_id,
last_scheduled_at
) VALUES ($1, $2::timestamp)
ON CONFLICT (dependence_chain_id) DO UPDATE
SET last_scheduled_at = GREATEST(
dependence_chain_schedule.last_scheduled_at + INTERVAL '1 microsecond',
EXCLUDED.last_scheduled_at
)
RETURNING last_scheduled_at
"#,
)
.bind(dependence_chain_id)
.bind(schedule_order)
.fetch_one(tx.deref_mut())
.await?;
Ok(clamped)
}

#[rustfmt::skip]
pub async fn insert_tfhe_event(
&self,
Expand Down Expand Up @@ -1094,3 +1251,34 @@ pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec<Handle> {
E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![],
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn dependent_ops_limiter_defers_after_burst() {
let mut limiter = DependentOpsLimiter::new(60, 2).unwrap();
assert!(limiter.defer_duration().is_zero());
assert!(limiter.defer_duration().is_zero());
let deferred = limiter.defer_duration();
assert!(deferred > Duration::ZERO);
}

#[test]
fn dependent_ops_limiter_refills_over_time() {
let mut limiter = DependentOpsLimiter::new(60, 1).unwrap();
assert!(limiter.defer_duration().is_zero());
let deferred = limiter.defer_duration();
assert!(deferred > Duration::ZERO);
limiter.last_refill =
Instant::now().checked_sub(Duration::from_secs(2)).unwrap();
let allowed_after_refill = limiter.defer_duration();
assert!(allowed_after_refill.is_zero());
}

#[test]
fn dependent_ops_limiter_disabled_when_rate_zero() {
assert!(DependentOpsLimiter::new(0, 10).is_none());
}
}
6 changes: 6 additions & 0 deletions coprocessor/fhevm-engine/host-listener/src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct PollerConfig {
pub dependence_cache_size: u16,
pub dependence_by_connexity: bool,
pub dependence_cross_block: bool,
pub dependent_ops_rate_per_min: u32,
pub dependent_ops_burst: u32,
}

pub async fn run_poller(config: PollerConfig) -> Result<()> {
Expand Down Expand Up @@ -139,6 +141,10 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
config.dependence_cache_size,
)
.await?;
db.set_dependent_ops_limiter(
config.dependent_ops_rate_per_min,
config.dependent_ops_burst,
);

if chain_id != db.chain_id {
error!(
Expand Down
Loading
Loading