Skip to content

Commit c6aec24

Browse files
committed
fix(coprocessor): support multichain
1 parent 2458fa9 commit c6aec24

File tree

7 files changed

+120
-24
lines changed

7 files changed

+120
-24
lines changed

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub const DEFAULT_DEPENDENCE_BY_CONNEXITY: bool = false;
4040
pub const DEFAULT_DEPENDENCE_CROSS_BLOCK: bool = true;
4141

4242
const TIMEOUT_REQUEST_ON_WEBSOCKET: u64 = 15;
43+
pub const MINIMUM_DELAY_FOR_FINALIZATION_IN_BLOCKS: u64 = 5;
4344

4445
#[derive(Parser, Debug, Clone)]
4546
#[command(version, about, long_about = None)]
@@ -459,6 +460,7 @@ impl InfiniteLogIter {
459460
logs: std::mem::take(&mut current_logs),
460461
summary,
461462
catchup: true,
463+
finalized: self.catchup_finalization_in_blocks > MINIMUM_DELAY_FOR_FINALIZATION_IN_BLOCKS,
462464
};
463465
blocks_logs.push(block_logs);
464466
}
@@ -731,6 +733,7 @@ impl InfiniteLogIter {
731733
logs,
732734
summary: missing_block,
733735
catchup: true,
736+
finalized: false, // let catchups with finality conditions do the finalize later
734737
});
735738
self.block_history.add_block(missing_block);
736739
}
@@ -825,6 +828,7 @@ impl InfiniteLogIter {
825828
logs: self.get_logs_at_hash(block_header.hash).await?,
826829
summary: block_header.into(),
827830
catchup: false,
831+
finalized: false,
828832
})
829833
}
830834

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct BlockLogs<T> {
1919
pub logs: Vec<T>,
2020
pub summary: BlockSummary,
2121
pub catchup: bool,
22+
pub finalized: bool,
2223
}
2324

2425
#[derive(Copy, Clone, Debug)]
@@ -309,8 +310,7 @@ pub async fn ingest_block_logs(
309310
info!(block_number, catchup_insertion, "Catchup inserted events");
310311
}
311312
}
312-
313-
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
313+
db.mark_block_as_valid(&mut tx, &block_logs.summary, block_logs.finalized).await?;
314314
if at_least_one_insertion {
315315
db.update_dependence_chain(
316316
&mut tx,

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,19 +495,45 @@ impl Database {
495495
&self,
496496
tx: &mut Transaction<'_>,
497497
block_summary: &BlockSummary,
498+
finalized: bool,
498499
) -> Result<(), SqlxError> {
500+
let status = if finalized { "finalized" } else { "pending" };
501+
// 1. Insert if not exists (never overwrites existing row)
499502
sqlx::query!(
500503
r#"
501-
INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number)
502-
VALUES ($1, $2, $3)
503-
ON CONFLICT (chain_id, block_hash) DO NOTHING;
504+
INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number, block_status, ancestor_hash)
505+
VALUES ($1, $2, $3, $4, $5)
506+
ON CONFLICT (chain_id, block_hash) DO NOTHING
507+
;
504508
"#,
505509
self.chain_id.as_i64(),
506510
block_summary.hash.to_vec(),
507511
block_summary.number as i64,
512+
status,
513+
block_summary.parent_hash.to_vec(),
508514
)
509515
.execute(tx.deref_mut())
510516
.await?;
517+
518+
// 2. Update to finalized or orphan if needed
519+
if finalized {
520+
sqlx::query!(
521+
r#"
522+
UPDATE host_chain_blocks_valid
523+
SET block_status = CASE
524+
WHEN block_hash = $2
525+
THEN 'finalized'
526+
ELSE 'orphan'
527+
END
528+
WHERE block_status = 'pending' AND block_number = $3 AND chain_id = $1
529+
"#,
530+
self.chain_id as i64,
531+
block_summary.hash.to_vec(),
532+
block_summary.number as i64,
533+
)
534+
.execute(tx.deref_mut())
535+
.await?;
536+
}
511537
Ok(())
512538
}
513539

coprocessor/fhevm-engine/host-listener/src/poller/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> {
272272
logs,
273273
summary,
274274
catchup: true,
275+
finalized: config.finality_lag > 5,
275276
};
276277

277278
let ingest_options = IngestOptions {

coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,56 @@ async fn test_listener_restart_and_chain_reorg() -> Result<(), anyhow::Error> {
944944
test_listener_no_event_loss(true, true).await
945945
}
946946

947+
async fn check_finalization_status(
948+
setup: Setup
949+
) {
950+
let provider = ProviderBuilder::new()
951+
.wallet(setup.wallets[0].clone())
952+
.connect_ws(WsConnect::new(setup.args.url.to_string()))
953+
.await
954+
.unwrap();
955+
// Verify block finalization status: for each block number, one should be finalized and others orphaned
956+
let blocks = sqlx::query!(
957+
"SELECT block_number, block_hash, block_status FROM host_chain_blocks_valid"
958+
)
959+
.fetch_all(&setup.db_pool)
960+
.await;
961+
962+
let blocks = blocks.expect("Failed to fetch blocks from database");
963+
964+
let mut blocks_by_number: std::collections::HashMap<i64, Vec<(Vec<u8>, String)>> = std::collections::HashMap::new();
965+
for block in blocks {
966+
blocks_by_number
967+
.entry(block.block_number)
968+
.or_insert_with(Vec::new)
969+
.push((block.block_hash, block.block_status));
970+
}
971+
972+
for (block_number, block_variants) in blocks_by_number.iter() {
973+
let finalized_count = block_variants.iter().filter(|(_, status)| status == "finalized").count();
974+
let orphan_count = block_variants.iter().filter(|(_, status)| status == "orphan").count();
975+
assert_eq!(
976+
finalized_count, 1,
977+
"Block {} should have exactly one finalized variant, found {}",
978+
block_number, finalized_count
979+
);
980+
let finalized_hash = block_variants.iter().find(|(_, status)| status == "finalized").map(|(hash, _)| hash).unwrap();
981+
assert_eq!(
982+
orphan_count,
983+
block_variants.len() - 1,
984+
"Block {} should have remaining variants as orphan",
985+
block_number
986+
);
987+
let expected_hash= provider.get_block_by_number((*block_number as u64).into()).await.unwrap().unwrap().header.hash;
988+
assert_eq!(
989+
&expected_hash.0,
990+
finalized_hash.as_slice(),
991+
"Finalized block hash for block {} does not match expected",
992+
block_number
993+
);
994+
}
995+
}
996+
947997
async fn test_listener_no_event_loss(
948998
kill: bool,
949999
reorg: bool,
@@ -1046,6 +1096,8 @@ async fn test_listener_no_event_loss(
10461096
}
10471097
assert_eq!(tfhe_events_count, expected_tfhe_events);
10481098
assert_eq!(acl_events_count, expected_acl_events);
1099+
1100+
check_finalization_status(setup).await;
10491101
Ok(())
10501102
}
10511103

coprocessor/fhevm-engine/transaction-sender/src/ops/delegate_user_decrypt.rs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,9 @@ impl<P: Provider<Ethereum> + Clone + 'static> DelegateUserDecryptOperation<P> {
257257
let block_status = if let Some(status) = blocks_status.get(&delegation.block_number) {
258258
*status
259259
} else {
260-
let status = match self.get_block_hash(delegation.block_number).await {
261-
Ok(block_hash) if delegation.block_hash == block_hash.to_vec() => {
262-
BlockStatus::Stable
263-
}
264-
Ok(block_hash) => {
260+
let status = self.get_block_status(&delegation).await;
261+
match status {
262+
BlockStatus::Dismissed => {
265263
warn!(
266264
delegation_block_hash = ?delegation.block_hash,
267265
block_hash = ?block_hash.to_vec(),
@@ -270,16 +268,15 @@ impl<P: Provider<Ethereum> + Clone + 'static> DelegateUserDecryptOperation<P> {
270268
);
271269
// ignoring delegation due to reorg, will be marked as reorg_out
272270
reorg_out_block.push(delegation.block_hash.clone());
273-
BlockStatus::Dismissed
274271
}
275-
Err(_) => {
272+
BlockStatus::Unknown => {
276273
error!(
277274
block_number = delegation.block_number,
278275
"Cannot get block hash for delegation, will retry next block"
279276
);
280277
unsure_block.push(delegation.block_number);
281-
BlockStatus::Unknown
282278
}
279+
BlockStatus::Stable => (),
283280
};
284281
blocks_status.insert(delegation.block_number, status);
285282
status
@@ -315,17 +312,32 @@ impl<P: Provider<Ethereum> + Clone + 'static> DelegateUserDecryptOperation<P> {
315312
Ok((stable_delegations, reorg_out_block))
316313
}
317314

318-
async fn get_block_hash(&self, block_number: u64) -> Result<BlockHash> {
319-
let search_block = BlockNumberOrTag::Number(block_number);
320-
let some_block = self
321-
.host_chain_provider
322-
.get_block_by_number(search_block)
323-
.await?;
324-
let Some(block) = some_block else {
325-
error!(block_number, "A past block cannot be found by number");
326-
anyhow::bail!("Cannot get past block by number, giving up");
327-
};
328-
Ok(block.header.hash)
315+
async fn get_block_status(&self, delegation: &DelegationRow) -> BlockStatus {
316+
let status = sqlx::query!(r#"
317+
SELECT block_status
318+
FROM host_chain_blocks_valid
319+
WHERE block_hash = $2 AND chain_id = $1
320+
"#,
321+
delegation.host_chain_id as i64, // TODO: chain id
322+
delegation.block_hash, //.as_bytes(),
323+
)
324+
.fetch_one(&self.db_pool)
325+
.await;
326+
match status {
327+
Ok(record) => match record.block_status.as_str() {
328+
"finalized" => BlockStatus::Stable,
329+
"orphaned" => BlockStatus::Dismissed,
330+
_ => BlockStatus::Unknown,
331+
},
332+
Err(e) => {
333+
error!(
334+
%e,
335+
?delegation,
336+
"Error querying block status from database"
337+
);
338+
BlockStatus::Unknown
339+
}
340+
}
329341
}
330342

331343
async fn wait_last_block_number(&self) -> Result<u64> {

test-suite/fhevm/docker-compose/coprocessor-docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ services:
203203
- --s3-recheck-duration=2s
204204
- --s3-regular-recheck-duration=120s
205205
- --enable-compression
206+
- --private-key=${TX_SENDER_PRIVATE_KEY}
206207
depends_on:
207208
coprocessor-db-migration:
208209
condition: service_completed_successfully

0 commit comments

Comments
 (0)