Skip to content

Commit 2181c22

Browse files
committed
fix(host-listener): bound slow-priority reset load in off mode
1 parent d531e99 commit 2181c22

File tree

6 files changed

+41
-28
lines changed

6 files changed

+41
-28
lines changed

coprocessor/fhevm-engine/host-listener/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ slow-lane work when fast lane is empty, which isolates heavy dependent traffic
4646
without reordering within a chain.
4747

4848
When set to `0`, host-listener disables slow-lane decisions, skips dependent-op
49-
throttling accounting, and resets existing `schedule_priority` values to `0` at
50-
startup so scheduling is effectively FIFO by `last_updated_at`.
49+
throttling accounting, and progressively resets pending slow chains back to
50+
`schedule_priority = 0` in bounded batches during ingest.
5151

5252
## Events in FHEVM
5353

coprocessor/fhevm-engine/host-listener/src/bin/poller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ struct Args {
122122
#[arg(
123123
long,
124124
default_value_t = 0,
125-
help = "Max weighted dependent ops per chain before slow-lane (0 disables and keeps all chains fast)"
125+
help = "Max weighted dependent ops per chain before slow-lane (0 disables; pending slow chains are reset in bounded batches)"
126126
)]
127127
pub dependent_ops_max_per_chain: u32,
128128
}

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub struct Args {
128128
#[arg(
129129
long,
130130
default_value_t = 0,
131-
help = "Max weighted dependent ops per chain before slow-lane (0 disables and keeps all chains fast)"
131+
help = "Max weighted dependent ops per chain before slow-lane (0 disables; pending slow chains are reset in bounded batches)"
132132
)]
133133
pub dependent_ops_max_per_chain: u32,
134134

@@ -1075,16 +1075,6 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
10751075
));
10761076
}
10771077

1078-
if args.dependent_ops_max_per_chain == 0 {
1079-
let reset = db.reset_schedule_priorities().await?;
1080-
if reset > 0 {
1081-
info!(
1082-
count = reset,
1083-
"Slow-lane disabled: reset priorities to fast"
1084-
);
1085-
}
1086-
}
1087-
10881078
let health_check = HealthCheck {
10891079
blockchain_timeout_tick: log_iter.tick_timeout.clone(),
10901080
blockchain_tick: log_iter.tick_block.clone(),

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::database::tfhe_event_propagate::{
1515
tfhe_result_handle, ChainHash, Database, LogTfhe,
1616
};
1717

18+
const SLOW_PRIORITY_RESET_BATCH_SIZE: i64 = 5_000;
19+
1820
pub struct BlockLogs<T> {
1921
pub logs: Vec<T>,
2022
pub summary: BlockSummary,
@@ -157,6 +159,20 @@ pub async fn ingest_block_logs(
157159
.await;
158160

159161
let slow_lane_enabled = options.dependent_ops_max_per_chain > 0;
162+
if !slow_lane_enabled {
163+
let reset = db
164+
.reset_schedule_priorities_batch(
165+
&mut tx,
166+
SLOW_PRIORITY_RESET_BATCH_SIZE,
167+
)
168+
.await?;
169+
if reset > 0 {
170+
info!(
171+
count = reset,
172+
"Slow-lane disabled: reset a batch of priorities to fast"
173+
);
174+
}
175+
}
160176
let mut dependent_ops_by_chain: HashMap<ChainHash, DependentOpsStats> =
161177
HashMap::new();
162178
for tfhe_log in tfhe_event_log {

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,32 @@ impl Database {
175175
}
176176
}
177177

178-
pub async fn reset_schedule_priorities(&self) -> Result<u64, SqlxError> {
178+
pub async fn reset_schedule_priorities_batch(
179+
&self,
180+
tx: &mut Transaction<'_>,
181+
batch_size: i64,
182+
) -> Result<u64, SqlxError> {
179183
let rows = sqlx::query(
180184
r#"
181-
UPDATE dependence_chain
185+
WITH batch AS (
186+
SELECT dependence_chain_id
187+
FROM dependence_chain
188+
WHERE schedule_priority = 1
189+
AND status = 'updated'
190+
AND worker_id IS NULL
191+
AND dependency_count = 0
192+
ORDER BY last_updated_at ASC
193+
LIMIT $1
194+
FOR UPDATE SKIP LOCKED
195+
)
196+
UPDATE dependence_chain dc
182197
SET schedule_priority = 0
183-
WHERE schedule_priority <> 0
198+
FROM batch
199+
WHERE dc.dependence_chain_id = batch.dependence_chain_id
184200
"#,
185201
)
186-
.execute(&self.pool().await)
202+
.bind(batch_size)
203+
.execute(tx.deref_mut())
187204
.await?;
188205
Ok(rows.rows_affected())
189206
}

coprocessor/fhevm-engine/host-listener/src/poller/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,16 +158,6 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
158158
));
159159
}
160160

161-
if config.dependent_ops_max_per_chain == 0 {
162-
let reset = db.reset_schedule_priorities().await?;
163-
if reset > 0 {
164-
info!(
165-
count = reset,
166-
"Slow-lane disabled: reset priorities to fast"
167-
);
168-
}
169-
}
170-
171161
let initial_anchor =
172162
db.poller_get_last_caught_up_block(chain_id as i64).await?;
173163
db.tick.update();

0 commit comments

Comments
 (0)