diff --git a/coprocessor/fhevm-engine/.sqlx/query-081a15f82a405de28992b48a0bc989e47c62f841f3c642735ce468e8ac144a2d.json b/coprocessor/fhevm-engine/.sqlx/query-081a15f82a405de28992b48a0bc989e47c62f841f3c642735ce468e8ac144a2d.json new file mode 100644 index 0000000000..373909cf24 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-081a15f82a405de28992b48a0bc989e47c62f841f3c642735ce468e8ac144a2d.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "NOTIFY new_host_block", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "081a15f82a405de28992b48a0bc989e47c62f841f3c642735ce468e8ac144a2d" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-1c23ffbe8e1840eb63722e1d1811ba32941e6d8bc4d5c369edef3a66d622a46e.json b/coprocessor/fhevm-engine/.sqlx/query-1c23ffbe8e1840eb63722e1d1811ba32941e6d8bc4d5c369edef3a66d622a46e.json deleted file mode 100644 index 8edd510b82..0000000000 --- a/coprocessor/fhevm-engine/.sqlx/query-1c23ffbe8e1840eb63722e1d1811ba32941e6d8bc4d5c369edef3a66d622a46e.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE delegate_user_decrypt\n SET reorg_out = true\n WHERE block_hash IN (SELECT unnest($1::bytea[]))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [] - }, - "hash": "1c23ffbe8e1840eb63722e1d1811ba32941e6d8bc4d5c369edef3a66d622a46e" -} diff --git a/coprocessor/fhevm-engine/.sqlx/query-41f1e1ec2e2ca8cc6fe2395105767fa28e0020847366a86cdeb18cd8db1354d7.json b/coprocessor/fhevm-engine/.sqlx/query-41f1e1ec2e2ca8cc6fe2395105767fa28e0020847366a86cdeb18cd8db1354d7.json new file mode 100644 index 0000000000..fe4ab16500 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-41f1e1ec2e2ca8cc6fe2395105767fa28e0020847366a86cdeb18cd8db1354d7.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE host_chain_blocks_valid SET block_status = 'orphaned' WHERE block_number = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "41f1e1ec2e2ca8cc6fe2395105767fa28e0020847366a86cdeb18cd8db1354d7" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-51b0ba894dbdd2b26c9ad13e1a5b3d4657af9aa912bbe652eabeae2959588589.json b/coprocessor/fhevm-engine/.sqlx/query-51b0ba894dbdd2b26c9ad13e1a5b3d4657af9aa912bbe652eabeae2959588589.json new file mode 100644 index 0000000000..a51b305119 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-51b0ba894dbdd2b26c9ad13e1a5b3d4657af9aa912bbe652eabeae2959588589.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT block_number, block_hash, block_status FROM host_chain_blocks_valid", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "block_number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "block_hash", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "block_status", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "51b0ba894dbdd2b26c9ad13e1a5b3d4657af9aa912bbe652eabeae2959588589" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-596dea818737c64f6d34646c47febc27968cb38e73f65b1ee98f57107b97b501.json b/coprocessor/fhevm-engine/.sqlx/query-596dea818737c64f6d34646c47febc27968cb38e73f65b1ee98f57107b97b501.json new file mode 100644 index 0000000000..d2b862f4d3 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-596dea818737c64f6d34646c47febc27968cb38e73f65b1ee98f57107b97b501.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT block_number FROM host_chain_blocks_valid\n WHERE block_status = 'pending' AND block_number <= $1 AND chain_id = $2\n ORDER BY block_number DESC\n LIMIT 10\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "block_number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "596dea818737c64f6d34646c47febc27968cb38e73f65b1ee98f57107b97b501" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-641036eba016313ea7cf191d71f2b69c1def70ea46139dd02fb510581b6322c2.json b/coprocessor/fhevm-engine/.sqlx/query-641036eba016313ea7cf191d71f2b69c1def70ea46139dd02fb510581b6322c2.json deleted file mode 100644 index 3ea2edb291..0000000000 --- a/coprocessor/fhevm-engine/.sqlx/query-641036eba016313ea7cf191d71f2b69c1def70ea46139dd02fb510581b6322c2.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number)\n VALUES ($1, $2, $3)\n ON CONFLICT (chain_id, block_hash) DO NOTHING;\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Bytea", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "641036eba016313ea7cf191d71f2b69c1def70ea46139dd02fb510581b6322c2" -} diff --git a/coprocessor/fhevm-engine/.sqlx/query-8907197e8a8331301874d45165cfd616f1c2bccd985dd6bb24269f53dff9e66b.json b/coprocessor/fhevm-engine/.sqlx/query-8a2918ace6c8fe642dc6b8badc952c7a3df9b2e0ac113b93d20b2a78bcab75b7.json similarity index 84% rename from coprocessor/fhevm-engine/.sqlx/query-8907197e8a8331301874d45165cfd616f1c2bccd985dd6bb24269f53dff9e66b.json rename to coprocessor/fhevm-engine/.sqlx/query-8a2918ace6c8fe642dc6b8badc952c7a3df9b2e0ac113b93d20b2a78bcab75b7.json index c16d7cbc64..d736753177 100644 --- a/coprocessor/fhevm-engine/.sqlx/query-8907197e8a8331301874d45165cfd616f1c2bccd985dd6bb24269f53dff9e66b.json +++ b/coprocessor/fhevm-engine/.sqlx/query-8a2918ace6c8fe642dc6b8badc952c7a3df9b2e0ac113b93d20b2a78bcab75b7.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT key, delegator, delegate, contract_address, delegation_counter, old_expiration_date, new_expiration_date, host_chain_id, block_number, block_hash, transaction_id, gateway_nb_attempts\n FROM delegate_user_decrypt\n WHERE block_number <= $1\n AND on_gateway = false\n AND reorg_out = false\n AND gateway_nb_attempts <= $2\n ORDER BY block_number ASC, delegation_counter ASC, transaction_id ASC\n FOR UPDATE\n ", + "query": "\n SELECT key, delegator, delegate, contract_address, delegation_counter, old_expiration_date, new_expiration_date, host_chain_id, block_number, block_hash, transaction_id, gateway_nb_attempts\n FROM delegate_user_decrypt\n WHERE on_gateway = false\n AND reorg_out = false\n AND gateway_nb_attempts <= $1\n ORDER BY block_number ASC, delegation_counter ASC, transaction_id ASC\n FOR UPDATE\n ", "describe": { "columns": [ { @@ -66,7 +66,6 @@ ], "parameters": { "Left": [ - "Int8", "Int8" ] }, @@ -85,5 +84,5 @@ false ] }, - "hash": "8907197e8a8331301874d45165cfd616f1c2bccd985dd6bb24269f53dff9e66b" + "hash": "8a2918ace6c8fe642dc6b8badc952c7a3df9b2e0ac113b93d20b2a78bcab75b7" } diff --git a/coprocessor/fhevm-engine/.sqlx/query-8b46c95180daf944b99d16dca194420f46cf495d5738d25b453a745cb83797a0.json b/coprocessor/fhevm-engine/.sqlx/query-8b46c95180daf944b99d16dca194420f46cf495d5738d25b453a745cb83797a0.json new file mode 100644 index 0000000000..679be2fb1c --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-8b46c95180daf944b99d16dca194420f46cf495d5738d25b453a745cb83797a0.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE host_chain_blocks_valid\n SET block_status = CASE\n WHEN block_hash = $2\n THEN 'finalized'\n ELSE 'orphaned'\n END\n WHERE block_number = $3 AND chain_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Bytea", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "8b46c95180daf944b99d16dca194420f46cf495d5738d25b453a745cb83797a0" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-8e2e1efee7317633a7c75aa4e750db5583341a7a5fda81949d49029db7468829.json b/coprocessor/fhevm-engine/.sqlx/query-8e2e1efee7317633a7c75aa4e750db5583341a7a5fda81949d49029db7468829.json new file mode 100644 index 0000000000..6494f75b89 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-8e2e1efee7317633a7c75aa4e750db5583341a7a5fda81949d49029db7468829.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE delegate_user_decrypt\n SET reorg_out = true\n WHERE key = ANY($1)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "8e2e1efee7317633a7c75aa4e750db5583341a7a5fda81949d49029db7468829" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-a5631e0977691e152b0e25d1f2c53d39302e5f3436c7790a6f0824330bd1fb3a.json b/coprocessor/fhevm-engine/.sqlx/query-a5631e0977691e152b0e25d1f2c53d39302e5f3436c7790a6f0824330bd1fb3a.json deleted file mode 100644 index 85548e906f..0000000000 --- a/coprocessor/fhevm-engine/.sqlx/query-a5631e0977691e152b0e25d1f2c53d39302e5f3436c7790a6f0824330bd1fb3a.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM delegate_user_decrypt\n WHERE block_number < $1\n AND gateway_nb_attempts = 0\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "a5631e0977691e152b0e25d1f2c53d39302e5f3436c7790a6f0824330bd1fb3a" -} diff --git a/coprocessor/fhevm-engine/.sqlx/query-d28852ae21252e3cfed6f82f912d44301291ccd97d88c3ea6f124316dce09ffd.json b/coprocessor/fhevm-engine/.sqlx/query-d28852ae21252e3cfed6f82f912d44301291ccd97d88c3ea6f124316dce09ffd.json new file mode 100644 index 0000000000..b323236249 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-d28852ae21252e3cfed6f82f912d44301291ccd97d88c3ea6f124316dce09ffd.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number, block_status) VALUES ($1, $2, $3, 'pending') ON CONFLICT DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Bytea", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "d28852ae21252e3cfed6f82f912d44301291ccd97d88c3ea6f124316dce09ffd" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-d689a7a2fc154b39cd8662c515c9e80c3cdad919dd41b595790079843445e664.json b/coprocessor/fhevm-engine/.sqlx/query-d689a7a2fc154b39cd8662c515c9e80c3cdad919dd41b595790079843445e664.json new file mode 100644 index 0000000000..cef481531e --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-d689a7a2fc154b39cd8662c515c9e80c3cdad919dd41b595790079843445e664.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number, block_status)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (chain_id, block_hash) DO NOTHING;\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Bytea", + "Int8", + "Text" + ] + }, + "nullable": [] + }, + "hash": "d689a7a2fc154b39cd8662c515c9e80c3cdad919dd41b595790079843445e664" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-e007c4af2864544c0eaa5d27f456f611b3d9f9909a845f78f85cdd69787c7106.json b/coprocessor/fhevm-engine/.sqlx/query-e007c4af2864544c0eaa5d27f456f611b3d9f9909a845f78f85cdd69787c7106.json new file mode 100644 index 0000000000..6440a52e48 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-e007c4af2864544c0eaa5d27f456f611b3d9f9909a845f78f85cdd69787c7106.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT block_status\n FROM host_chain_blocks_valid\n WHERE block_hash = $2 AND chain_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "block_status", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8", + "Bytea" + ] + }, + "nullable": [ + false + ] + }, + "hash": "e007c4af2864544c0eaa5d27f456f611b3d9f9909a845f78f85cdd69787c7106" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-e8e1a20c2a71d8658815aed49df37fe3e7ad9a10416da01bfc4a885f78199532.json b/coprocessor/fhevm-engine/.sqlx/query-e8e1a20c2a71d8658815aed49df37fe3e7ad9a10416da01bfc4a885f78199532.json new file mode 100644 index 0000000000..135e79122e --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-e8e1a20c2a71d8658815aed49df37fe3e7ad9a10416da01bfc4a885f78199532.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE host_chain_blocks_valid SET block_status = 'finalized' WHERE block_number = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "e8e1a20c2a71d8658815aed49df37fe3e7ad9a10416da01bfc4a885f78199532" +} diff --git a/coprocessor/fhevm-engine/Cargo.lock b/coprocessor/fhevm-engine/Cargo.lock index 52f6f08c0a..5f53716dee 100644 --- a/coprocessor/fhevm-engine/Cargo.lock +++ b/coprocessor/fhevm-engine/Cargo.lock @@ -1922,7 +1922,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.10.5", "log", "prettyplease", "proc-macro2", @@ -1942,7 +1942,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.10.5", "log", "prettyplease", "proc-macro2", @@ -4420,7 +4420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.3", + "windows-targets 0.48.5", ] [[package]] @@ -5389,9 +5389,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.3", @@ -6622,7 +6622,7 @@ dependencies = [ "solar-config", "solar-data-structures", "solar-macros", - "thiserror 2.0.16", + "thiserror 1.0.69", "tracing", "unicode-width", ] @@ -7089,7 +7089,7 @@ dependencies = [ "serde_json", "sha2", "tempfile", - "thiserror 2.0.16", + "thiserror 1.0.69", "url", "zip", ] diff --git a/coprocessor/fhevm-engine/db-migration/migrations/20260218155637_add_block_status.sql b/coprocessor/fhevm-engine/db-migration/migrations/20260218155637_add_block_status.sql new file mode 100644 index 0000000000..c3d8d4016c --- /dev/null +++ b/coprocessor/fhevm-engine/db-migration/migrations/20260218155637_add_block_status.sql @@ -0,0 +1,5 @@ +ALTER TABLE IF EXISTS host_chain_blocks_valid +ADD COLUMN IF NOT EXISTS block_status TEXT NOT NULL DEFAULT 'unknown' CHECK (block_status IN ('pending', 'unknown', 'finalized', 'orphaned')); + +ALTER TABLE IF EXISTS host_chain_blocks_valid +ALTER COLUMN block_status DROP DEFAULT; diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 218d2f381b..d033523bf4 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -21,7 +21,9 @@ use fhevm_engine_common::healthz_server::HttpServer as HealthHttpServer; use fhevm_engine_common::types::BlockchainProvider; use fhevm_engine_common::utils::{DatabaseURL, HeartBeat}; -use crate::database::ingest::{ingest_block_logs, BlockLogs, IngestOptions}; +use crate::database::ingest::{ + ingest_block_logs, update_finalized_blocks, BlockLogs, IngestOptions, +}; use crate::database::tfhe_event_propagate::Database; use crate::health_check::HealthCheck; use fhevm_engine_common::chain_id::ChainId; @@ -171,7 +173,7 @@ pub struct Args { } // TODO: to merge with Levent works -struct InfiniteLogIter { +pub struct InfiniteLogIter { url: String, block_time: u64, /* A default value that is refined with real-time * events data */ @@ -459,6 +461,7 @@ impl InfiniteLogIter { logs: std::mem::take(&mut current_logs), summary, catchup: true, + finalized: true, }; blocks_logs.push(block_logs); } @@ -538,7 +541,7 @@ impl InfiniteLogIter { self.catchup_blocks = Some((paging_to_block + 1, to_block)); // end is detected at function start } - async fn get_block_by_number(&self, number: u64) -> Result { + pub async fn get_block_by_number(&self, number: u64) -> Result { self.get_block_by_id(BlockId::number(number)).await } @@ -731,6 +734,7 @@ impl InfiniteLogIter { logs, summary: missing_block, catchup: true, + finalized: false, // let catchups with finality conditions do the finalize later }); self.block_history.add_block(missing_block); } @@ -825,6 +829,7 @@ impl InfiniteLogIter { logs: self.get_logs_at_hash(block_header.hash).await?, summary: block_header.into(), catchup: false, + finalized: false, }) } @@ -958,15 +963,6 @@ async fn db_insert_block( ) .await; let Err(err) = res else { - // Notify the database of the new block - // Delayed delegation rely on this signal to reconsider ready delegation - if !block_logs.catchup { - if let Err(err) = - db.block_notification(block_logs.summary.number).await - { - error!(error = %err, "Error notifying listener for new block"); - }; - } return Ok(()); }; if retries == 0 { @@ -1112,6 +1108,15 @@ pub async fn main(args: Args) -> anyhow::Result<()> { .number .max(log_iter.last_valid_block.unwrap_or(0)), ); + if !block_logs.catchup { + update_finalized_blocks( + &mut db, + &mut log_iter, + block_logs.summary.number, + args.catchup_finalization_in_blocks, + ) + .await; + } } if !args.only_catchup_loop { diff --git a/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs b/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs index f820a15c22..3a2627944c 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs @@ -9,6 +9,7 @@ use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime}; use tracing::{error, info}; use crate::cmd::block_history::BlockSummary; +use crate::cmd::InfiniteLogIter; use crate::contracts::{AclContract, TfheContract}; use crate::database::dependence_chains::dependence_chains; use crate::database::tfhe_event_propagate::{ @@ -19,6 +20,7 @@ pub struct BlockLogs { pub logs: Vec, pub summary: BlockSummary, pub catchup: bool, + pub finalized: bool, } #[derive(Copy, Clone, Debug)] @@ -309,8 +311,8 @@ pub async fn ingest_block_logs( info!(block_number, catchup_insertion, "Catchup inserted events"); } } - - db.mark_block_as_valid(&mut tx, &block_logs.summary).await?; + db.mark_block_as_valid(&mut tx, &block_logs.summary, block_logs.finalized) + .await?; if at_least_one_insertion { db.update_dependence_chain( &mut tx, @@ -324,6 +326,76 @@ pub async fn ingest_block_logs( tx.commit().await } +pub async fn update_finalized_blocks( + db: &mut Database, + log_iter: &mut InfiniteLogIter, + last_block_number: u64, + finality_lag: u64, +) { + info!(last_block_number, finality_lag, "Updating finalized blocks"); + let mut tx = match db.new_transaction().await { + Ok(tx) => tx, + Err(err) => { + error!( + ?err, + "Failed to create transaction for finalized blocks update" + ); + return; + } + }; + let last_finalized_block = last_block_number - finality_lag; + let blocks_number = match Database::get_finalized_blocks_number( + &mut tx, + last_finalized_block as i64, + db.chain_id, + ) + .await + { + Ok(numbers) => numbers, + Err(err) => { + error!( + ?err, + last_finalized_block, "Failed to fetch finalized blocks number" + ); + return; + } + }; + info!(?blocks_number, "Finalizing blocks"); + for block_number in blocks_number { + let block = + match log_iter.get_block_by_number(block_number as u64).await { + Ok(block) => block, + Err(err) => { + error!( + block_number, + ?err, + "Failed to fetch block for finalization" + ); + continue; + } + }; + if let Err(err) = db + .update_block_as_finalized( + &mut tx, + block_number, + &block.header.hash, + ) + .await + { + error!(block_number, ?err, "Failed to update block as finalized"); + } + } + if let Err(err) = tx.commit().await { + error!(?err, "Failed to commit finalized blocks update"); + return; + } + // Notify the database of the new block + // Delayed delegation rely on this signal to reconsider ready delegation + if let Err(err) = db.block_notification().await { + error!(error = %err, "Error notifying listener for new block"); + } +} + #[cfg(test)] mod tests { use alloy::primitives::FixedBytes; diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 913a8d43ba..0110f9eb83 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -27,6 +27,7 @@ use tracing::error; use tracing::info; use tracing::warn; +use crate::cmd::block_history::BlockHash; use crate::cmd::block_history::BlockSummary; use crate::contracts::AclContract::AclContractEvents; use crate::contracts::TfheContract; @@ -491,26 +492,89 @@ impl Database { } } + pub async fn update_block_as_finalized( + &self, + tx: &mut Transaction<'_>, + block_number: i64, + block_hash: &BlockHash, + ) -> Result<(), SqlxError> { + sqlx::query!( + r#" + UPDATE host_chain_blocks_valid + SET block_status = CASE + WHEN block_hash = $2 + THEN 'finalized' + ELSE 'orphaned' + END + WHERE block_number = $3 AND chain_id = $1 + "#, + self.chain_id.as_i64(), + block_hash.to_vec(), + block_number, + ) + .execute(tx.deref_mut()) + .await?; + Ok(()) + } + pub async fn mark_block_as_valid( &self, tx: &mut Transaction<'_>, block_summary: &BlockSummary, + finalized: bool, ) -> Result<(), SqlxError> { + let status = if finalized { "finalized" } else { "pending" }; + // 1. Insert if not exists (never overwrites existing row) sqlx::query!( r#" - INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number) - VALUES ($1, $2, $3) + INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number, block_status) + VALUES ($1, $2, $3, $4) ON CONFLICT (chain_id, block_hash) DO NOTHING; "#, self.chain_id.as_i64(), block_summary.hash.to_vec(), block_summary.number as i64, + status, ) .execute(tx.deref_mut()) .await?; + + // 2. Update to finalized or orphan if needed + if finalized { + self.update_block_as_finalized( + tx, + block_summary.number as i64, + &block_summary.hash, + ) + .await?; + } Ok(()) } + pub async fn get_finalized_blocks_number( + tx: &mut Transaction<'_>, + last_block_max: i64, + chain_id: ChainId, + ) -> Result, SqlxError> { + // most of the time there is only 1 block pending + let blocks_number = sqlx::query!( + r#" + SELECT block_number FROM host_chain_blocks_valid + WHERE block_status = 'pending' AND block_number <= $1 AND chain_id = $2 + ORDER BY block_number DESC + LIMIT 10 + "#, + last_block_max, + chain_id.as_i64(), + ) + .fetch_all(tx.deref_mut()) + .await?; + Ok(blocks_number + .into_iter() + .map(|record| record.block_number) + .collect()) + } + pub async fn poller_get_last_caught_up_block( &self, chain_id: ChainId, @@ -835,15 +899,8 @@ impl Database { Ok(inserted) } - pub async fn block_notification( - &mut self, - last_block_number: u64, - ) -> Result<(), SqlxError> { - let query = sqlx::query!( - "SELECT pg_notify($1, $2)", - "new_host_block", - last_block_number.to_string() - ); + pub async fn block_notification(&mut self) -> Result<(), SqlxError> { + let query = sqlx::query!("NOTIFY new_host_block",); query.execute(&self.pool().await).await?; Ok(()) } diff --git a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs index 8eb5559f94..b8b88e093e 100644 --- a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs @@ -272,6 +272,7 @@ pub async fn run_poller(config: PollerConfig) -> Result<()> { logs, summary, catchup: true, + finalized: true, }; let ingest_options = IngestOptions { diff --git a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs index 8c9ba4c635..d0e73663d5 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs @@ -232,7 +232,7 @@ async fn setup_with_block_time( dependence_cache_size: 128, reorg_maximum_duration_in_blocks: 100, // to go beyond chain start service_name: "host-listener-test".to_string(), - catchup_finalization_in_blocks: 2, + catchup_finalization_in_blocks: 3, dependence_by_connexity: false, dependence_cross_block: true, dependent_ops_max_per_chain: 0, @@ -331,6 +331,7 @@ async fn ingest_blocks_for_receipts( logs, summary: block.header.into(), catchup: false, + finalized: false, }; ingest_block_logs( db.chain_id, @@ -916,7 +917,7 @@ async fn test_only_catchup_loop_requires_negative_start_at_block( dependence_cache_size: 128, reorg_maximum_duration_in_blocks: 50, service_name: String::new(), - catchup_finalization_in_blocks: 20, + catchup_finalization_in_blocks: 3, only_catchup_loop: true, catchup_loop_sleep_secs: 60, dependence_by_connexity: false, @@ -944,6 +945,81 @@ async fn test_listener_restart_and_chain_reorg() -> Result<(), anyhow::Error> { test_listener_no_event_loss(true, true).await } +async fn check_finalization_status(setup: &Setup) { + let provider = ProviderBuilder::new() + .wallet(setup.wallets[0].clone()) + .connect_ws(WsConnect::new(setup.args.url.to_string())) + .await + .unwrap(); + // Verify block finalization status: for each block number, one should be finalized and others orphaned + let blocks = sqlx::query!( + "SELECT block_number, block_hash, block_status FROM host_chain_blocks_valid", + ) + .fetch_all(&setup.db_pool) + .await; + + let blocks = blocks.expect("Failed to fetch blocks from database"); + let block_max = blocks + .iter() + .map(|b| b.block_number) + .max() + .expect("At least one block should be ingested"); + + let mut blocks_by_number: std::collections::HashMap< + i64, + Vec<(Vec, String)>, + > = std::collections::HashMap::new(); + for block in blocks { + if block.block_number > block_max - 5 { + continue; // pending blocks within finalization window can be ignored for this assert + } + blocks_by_number + .entry(block.block_number) + .or_default() + .push((block.block_hash, block.block_status)); + } + + for (block_number, block_variants) in blocks_by_number.iter() { + let finalized_count = block_variants + .iter() + .filter(|(_, status)| status == "finalized") + .count(); + let orphan_count = block_variants + .iter() + .filter(|(_, status)| status == "orphaned") + .count(); + assert_eq!( + finalized_count, 1, + "Block {} should have exactly one finalized variant, found {}", + block_number, finalized_count + ); + let finalized_hash = block_variants + .iter() + .find(|(_, status)| status == "finalized") + .map(|(hash, _)| hash) + .unwrap(); + assert_eq!( + orphan_count, + block_variants.len() - 1, + "Block {} should have remaining variants as orphan", + block_number + ); + let expected_hash = provider + .get_block_by_number((*block_number as u64).into()) + .await + .unwrap() + .unwrap() + .header + .hash; + assert_eq!( + &expected_hash.0, + finalized_hash.as_slice(), + "Finalized block hash for block {} does not match expected", + block_number + ); + } +} + async fn test_listener_no_event_loss( kill: bool, reorg: bool, @@ -1007,6 +1083,7 @@ async fn test_listener_no_event_loss( // 10 mins max to avoid stalled CI let listener_handle = tokio::spawn(main(args.clone())); tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + check_finalization_status(&setup).await; let tfhe_new_count = sqlx::query!("SELECT COUNT(*) FROM computations") .fetch_one(&setup.db_pool) .await? @@ -1046,6 +1123,7 @@ async fn test_listener_no_event_loss( } assert_eq!(tfhe_events_count, expected_tfhe_events); assert_eq!(acl_events_count, expected_acl_events); + Ok(()) } diff --git a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs index fd8d43b419..3f24b4d9e9 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs @@ -50,9 +50,6 @@ struct Conf { #[arg(short, long)] gateway_url: Url, - #[arg(long)] - host_chain_url: Url, - #[arg(short, long, value_enum, default_value = "private-key")] signer_type: SignerType, @@ -164,20 +161,6 @@ struct Conf { #[arg(long, default_value_t = 10, value_parser = clap::value_parser!(u64).range(1..))] pub gauge_update_interval_secs: u64, - #[arg( - long, - default_value = "10", - help = "Delay for transmitting delegation to gateway in block number (to avoid most reorg)" - )] - pub delegation_block_delay: u64, - - #[arg( - long, - default_value = "648000", // 3 months assuming 12s block time on host chain - help = "Clear delegation entries after N blocks (default to 3 months)" - )] - pub delegation_clear_after_n_blocks: u64, - #[arg( long, default_value = "30", @@ -348,18 +331,6 @@ async fn main() -> anyhow::Result<()> { }; let gateway_provider = NonceManagedProvider::new(gateway_provider, Some(wallet.default_signer().address())); - let Ok(host_chain_provider) = get_provider( - &conf, - &conf.host_chain_url, - "HostChain", - wallet.clone(), - &cancel_token, - ) - .await - else { - info!("Cancellation requested before host chain provider was created on startup, exiting"); - return Ok(()); - }; let config = ConfigSettings { verify_proof_resp_db_channel: conf.verify_proof_resp_database_channel, @@ -381,8 +352,6 @@ async fn main() -> anyhow::Result<()> { health_check_timeout: conf.health_check_timeout, gas_limit_overprovision_percent: conf.gas_limit_overprovision_percent, graceful_shutdown_timeout: conf.graceful_shutdown_timeout, - delegation_block_delay: conf.delegation_block_delay, - delegation_clear_after_n_blocks: conf.delegation_clear_after_n_blocks, delegation_fallback_polling: conf.delegation_fallback_polling, delegation_max_retry: conf.delegation_max_retry, }; @@ -400,7 +369,6 @@ async fn main() -> anyhow::Result<()> { conf.multichain_acl_address, abstract_signer, gateway_provider, - host_chain_provider, cancel_token.clone(), config.clone(), None, diff --git a/coprocessor/fhevm-engine/transaction-sender/src/config.rs b/coprocessor/fhevm-engine/transaction-sender/src/config.rs index c4a27de345..929440f432 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/config.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/config.rs @@ -39,8 +39,6 @@ pub struct ConfigSettings { pub graceful_shutdown_timeout: Duration, - pub delegation_block_delay: u64, - pub delegation_clear_after_n_blocks: u64, pub delegation_fallback_polling: u64, pub delegation_max_retry: u64, } @@ -67,8 +65,6 @@ impl Default for ConfigSettings { health_check_timeout: Duration::from_secs(4), gas_limit_overprovision_percent: DEFAULT_GAS_LIMIT_OVERPROVISION_PERCENT, graceful_shutdown_timeout: Duration::from_secs(8), - delegation_block_delay: 10, - delegation_clear_after_n_blocks: 7776000 / 12, // 3 months assuming 12s block time delegation_fallback_polling: 30, delegation_max_retry: 100_000, } diff --git a/coprocessor/fhevm-engine/transaction-sender/src/ops/delegate_user_decrypt.rs b/coprocessor/fhevm-engine/transaction-sender/src/ops/delegate_user_decrypt.rs index 1afb881bec..6cf8e8038a 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/ops/delegate_user_decrypt.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/ops/delegate_user_decrypt.rs @@ -9,14 +9,11 @@ use crate::metrics::{ use crate::nonce_managed_provider::NonceManagedProvider; use crate::ops::common::{try_extract_non_retryable_config_error, CoprocessorConfigError}; -use alloy::primitives::{Address, FixedBytes}; +use alloy::network::{Ethereum, TransactionBuilder}; +use alloy::primitives::Address; use alloy::providers::Provider; use alloy::rpc::types::TransactionRequest; use alloy::transports::{RpcError, TransportErrorKind}; -use alloy::{ - eips::BlockNumberOrTag, - network::{Ethereum, TransactionBuilder}, -}; use anyhow::Result; use async_trait::async_trait; @@ -28,7 +25,6 @@ use tracing::{error, info, warn}; use super::TransactionOperation; -pub type BlockHash = FixedBytes<32>; pub type DbTransaction<'l> = sqlx::Transaction<'l, Postgres>; use fhevm_gateway_bindings::multichain_acl::MultichainACL; @@ -62,7 +58,6 @@ enum BlockStatus { pub struct DelegateUserDecryptOperation + Clone + 'static> { multichain_acl_address: Address, gateway_provider: NonceManagedProvider

, - host_chain_provider: P, conf: crate::ConfigSettings, gas: Option, db_pool: Pool, @@ -92,7 +87,6 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ pub fn new( multichain_acl_address: Address, gateway_provider: NonceManagedProvider

, - host_chain_provider: P, conf: crate::ConfigSettings, gas: Option, db_pool: Pool, @@ -107,7 +101,6 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ Self { multichain_acl_address, gateway_provider, - host_chain_provider, conf, gas, db_pool, @@ -219,10 +212,8 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ pub async fn tx_check_ready_delegations( &self, tx: &mut DbTransaction<'_>, - last_ready_block: u64, - ) -> Result<(Vec, Vec>)> { - let delegations = - delayed_sorted_delegation(tx, last_ready_block, self.conf.delegation_max_retry).await?; + ) -> Result<(Vec, Vec)> { + let delegations = delayed_sorted_delegation(tx, self.conf.delegation_max_retry).await?; let nb_ready_delegations = delegations.len(); if delegations.is_empty() { return Ok((vec![], vec![])); @@ -235,16 +226,14 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ let retry_up_to_error_level = retry_error_up_to_error_level(max_error_level); info!( nb_ready_delegations, - last_ready_block, max_error_level, retry_up_to_error_level, "Ready delegations" + max_error_level, retry_up_to_error_level, "Ready delegations" ); - let mut blocks_status = HashMap::new(); // avoid multiple host chain call + let mut blocks_status = HashMap::new(); // cache db access let mut stable_delegations = vec![]; - let mut unsure_block = vec![]; - let mut dismissed = 0; let mut nb_unsure_delegations = 0; - let mut reorg_out_block = vec![]; + let mut reorg_out_delegations = vec![]; let mut past_error_backlog = 0; - for delegation in &delegations { + for delegation in delegations { if delegation.gateway_nb_attempts > 0 { past_error_backlog += 1; } @@ -254,53 +243,38 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ if delegation.gateway_nb_attempts > retry_up_to_error_level { continue; } - let block_status = if let Some(status) = blocks_status.get(&delegation.block_number) { + let block_status = if let Some(status) = blocks_status.get(&delegation.block_hash) { *status } else { - let status = match self.get_block_hash(delegation.block_number).await { - Ok(block_hash) if delegation.block_hash == block_hash.to_vec() => { - BlockStatus::Stable - } - Ok(block_hash) => { - warn!( - delegation_block_hash = ?delegation.block_hash, - block_hash = ?block_hash.to_vec(), - block_number = delegation.block_number, - "Block hash mismatch for delegation, block was reorged out" - ); - // ignoring delegation due to reorg, will be marked as reorg_out - reorg_out_block.push(delegation.block_hash.clone()); - BlockStatus::Dismissed - } - Err(_) => { - error!( - block_number = delegation.block_number, - "Cannot get block hash for delegation, will retry next block" - ); - unsure_block.push(delegation.block_number); - BlockStatus::Unknown - } - }; - blocks_status.insert(delegation.block_number, status); + let status = self.get_block_status(&delegation).await; + blocks_status.insert(delegation.block_hash.clone(), status); status }; match block_status { - BlockStatus::Stable => { - stable_delegations.push(delegation.clone()); + BlockStatus::Dismissed => { + warn!( + delegation_block_hash = ?delegation.block_hash, + block_number = delegation.block_number, + "Block hash mismatch for delegation, block was reorged out" + ); + // ignoring delegation due to reorg, will be marked as reorg_out + reorg_out_delegations.push(delegation.clone()); } BlockStatus::Unknown => { - // skip the full block, will retry on the delegation on next call + warn!( + block_number = delegation.block_number, + "Cannot get block hash for delegation, will retry next block" + ); nb_unsure_delegations += 1; - continue; } - BlockStatus::Dismissed => { - dismissed += 1; - continue; + BlockStatus::Stable => { + stable_delegations.push(delegation.clone()); } } } DELEGATE_USER_DECRYPT_ERROR_BACKLOG.set(past_error_backlog); let nb_stable_delegations = stable_delegations.len(); + let dismissed = reorg_out_delegations.len(); if dismissed > 0 { info!(dismissed, "Some delegations were dismissed due to reorg"); }; @@ -312,58 +286,76 @@ impl + Clone + 'static> DelegateUserDecryptOperation

{ }; info!(nb_stable_delegations, "Processing ready delegations"); - Ok((stable_delegations, reorg_out_block)) + Ok((stable_delegations, reorg_out_delegations)) } - async fn get_block_hash(&self, block_number: u64) -> Result { - let search_block = BlockNumberOrTag::Number(block_number); - let some_block = self - .host_chain_provider - .get_block_by_number(search_block) - .await?; - let Some(block) = some_block else { - error!(block_number, "A past block cannot be found by number"); - anyhow::bail!("Cannot get past block by number, giving up"); - }; - Ok(block.header.hash) + async fn get_block_status(&self, delegation: &DelegationRow) -> BlockStatus { + let status = sqlx::query!( + r#" + SELECT block_status + FROM host_chain_blocks_valid + WHERE block_hash = $2 AND chain_id = $1 + "#, + delegation.host_chain_id.as_i64(), + delegation.block_hash, + ) + .fetch_optional(&self.db_pool) + .await; + match status { + Ok(Some(record)) => match record.block_status.as_str() { + "finalized" => BlockStatus::Stable, + "orphaned" => BlockStatus::Dismissed, + "unknown" => { + warn!( + ?delegation, + "Block with unknown status for delegation, delegation was introduced during migration, please manually fix block status in host_chain_blocks_valid table to process the delegation" + ); + BlockStatus::Unknown + } + "pending" => BlockStatus::Unknown, + _ => { + error!( + ?delegation, + status = record.block_status, + "Invalid block status for delegation, manually fix it to process the delegation", + ); + BlockStatus::Unknown + } + }, + Ok(None) => { + error!(?delegation, "No block status found for delegation"); + BlockStatus::Unknown + } + Err(e) => { + error!( + %e, + ?delegation, + "Error querying block status from database" + ); + BlockStatus::Unknown + } + } } - async fn wait_last_block_number(&self) -> Result { + async fn wait_new_block(&self) -> Result<()> { let mut listener = PgListener::connect_with(&self.db_pool).await?; listener.listen(self.channel()).await?; - let notification = tokio::select! { + tokio::select! { _ = self.cancel_token.cancelled() => anyhow::bail!("Operation cancelled"), - notification = listener.recv() => Some(notification), - _ = tokio::time::sleep(Duration::from_secs(self.conf.delegation_fallback_polling)) => None, - }; - let Some(notification) = notification else { - // timeout - let block_number = self.host_chain_provider.get_block_number().await?; - warn!( - block_number, - "Delegation notification, based on timeout, use last block number" - ); - return Ok(block_number); - }; - let Ok(notification) = notification else { - // connection error, try to go further in case of a real db issue, db read will fail later - let block_number = self.host_chain_provider.get_block_number().await?; - warn!( - block_number, - "Delegation notification, db error, use last block number" - ); - return Ok(block_number); - }; - let payload = notification.payload(); - let Ok(block_number) = notification.payload().parse() else { - let block_number = self.host_chain_provider.get_block_number().await?; - error!( - payload, - block_number, "Delegation notification, invalid payload, use last block number" - ); - return Ok(block_number); - }; - Ok(block_number) + notification = listener.recv() => { + match notification { + Ok(notification) => { + info!(?notification, "Received new block notification"); + } + Err(e) => { + tokio::time::sleep(Duration::from_secs(1)).await; // avoid busy loop if db connection is lost + error!(%e, "Error receiving new block notification, will process delegations"); + } + } + Ok(()) + } + _ = tokio::time::sleep(Duration::from_secs(self.conf.delegation_fallback_polling)) => Ok(()), + } } } @@ -378,37 +370,24 @@ where } async fn execute(&self) -> Result { - let block_number = self.wait_last_block_number().await?; - let multichain_acl = MultichainACL::new( - self.multichain_acl_address, - self.host_chain_provider.clone(), - ); - let up_to_block_number: u64 = block_number.saturating_sub(self.conf.delegation_block_delay); - let clean_before_block = - block_number.saturating_sub(self.conf.delegation_clear_after_n_blocks); + self.wait_new_block().await?; + let multichain_acl = + MultichainACL::new(self.multichain_acl_address, self.gateway_provider.inner()); let mut tx = self.db_pool.begin().await?; - let delegations = self - .tx_check_ready_delegations(&mut tx, up_to_block_number) - .await; + let delegations = self.tx_check_ready_delegations(&mut tx).await; - let Ok((ready_delegations, reorg_out_block)) = delegations else { + let Ok((ready_delegations, reorg_out_delegations)) = delegations else { tx.rollback().await?; warn!("Error checking ready delegations, will retry later"); anyhow::bail!("Error checking ready delegations, will retry later"); }; - if ready_delegations.is_empty() && reorg_out_block.is_empty() { + if ready_delegations.is_empty() && reorg_out_delegations.is_empty() { tx.commit().await?; - info!( - block_number, - up_to_block_number, "No delegations to handle at block up to block number" - ); + info!("No delegations to handle"); return Ok(true); // will automatically rewait for new tasks via listen channel } - if update_useless_delegations(&mut tx, clean_before_block, &reorg_out_block) - .await - .is_err() - { - error!("Cannot update useless delegations"); + if let Err(err) = update_reorged_delegations(&mut tx, &reorg_out_delegations).await { + error!(?err, "Cannot update reorged delegations, will retry later, continuing on finalized delegations"); } let mut requests = Vec::with_capacity(ready_delegations.len()); let to_transaction = |delegation: &DelegationRow| { @@ -537,21 +516,18 @@ fn expiration_date_to_u64(value: BigDecimal) -> u64 { pub async fn delayed_sorted_delegation( tx: &mut DbTransaction<'_>, - up_to_block_number: u64, delegation_max_retry: u64, ) -> Result> { let query = sqlx::query!( r#" SELECT key, delegator, delegate, contract_address, delegation_counter, old_expiration_date, new_expiration_date, host_chain_id, block_number, block_hash, transaction_id, gateway_nb_attempts FROM delegate_user_decrypt - WHERE block_number <= $1 - AND on_gateway = false + WHERE on_gateway = false AND reorg_out = false - AND gateway_nb_attempts <= $2 + AND gateway_nb_attempts <= $1 ORDER BY block_number ASC, delegation_counter ASC, transaction_id ASC FOR UPDATE "#, - up_to_block_number as i64, delegation_max_retry as i64, // excludes delegations retired after a non-retryable config error (set to max_retry + 1) ); let delegations_rows = query.fetch_all(tx.deref_mut()).await?; @@ -574,7 +550,7 @@ pub async fn delayed_sorted_delegation( }; delegations.push(delegation); } - Ok(delegations) // delegations) + Ok(delegations) } pub async fn update_error_delegation( @@ -671,47 +647,33 @@ pub async fn update_transmitted_delegation( } } -pub async fn update_useless_delegations( +pub async fn update_reorged_delegations( tx: &mut DbTransaction<'_>, - clean_before_block: u64, - reorg_out_blocks: &[Vec], + reorged_delegations: &[DelegationRow], ) -> Result<()> { // update reorg out - let reorg_out = sqlx::query!( - r#" - UPDATE delegate_user_decrypt - SET reorg_out = true - WHERE block_hash IN (SELECT unnest($1::bytea[])) - "#, - reorg_out_blocks, - ); - if !reorg_out_blocks.is_empty() { - let reorg_out = reorg_out.execute(tx.deref_mut()).await?; + if !reorged_delegations.is_empty() { + let keys = reorged_delegations + .iter() + .map(|d| d.key) + .collect::>(); + let reorg_out = sqlx::query!( + r#" + UPDATE delegate_user_decrypt + SET reorg_out = true + WHERE key = ANY($1) + "#, + &keys + ) + .execute(tx.deref_mut()) + .await?; if reorg_out.rows_affected() == 0 { error!( - reorg_out_blocks = ?reorg_out_blocks, + nb_delegations = keys.len(), "No rows updated when updating reorg out delegation" ); } } - // clean table past blocks except for errors - let cleaned = sqlx::query!( - r#" - DELETE FROM delegate_user_decrypt - WHERE block_number < $1 - AND gateway_nb_attempts = 0 - "#, - clean_before_block as i64, - ) - .execute(tx.deref_mut()) - .await?; - let nb_cleaned = cleaned.rows_affected(); - if nb_cleaned > 0 { - info!( - nb_cleaned, - clean_before_block, "Cleaning old entries in delegate_user_decrypt" - ); - } Ok(()) } diff --git a/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs b/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs index 6ed7b39db4..f637a58b95 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/transaction_sender.rs @@ -38,7 +38,6 @@ where multichain_acl_address: Address, signer: AbstractSigner, gateway_provider: NonceManagedProvider

, - host_chain_provider: P, cancel_token: CancellationToken, conf: ConfigSettings, gas: Option, @@ -73,7 +72,6 @@ where ops::delegate_user_decrypt::DelegateUserDecryptOperation::new( multichain_acl_address, gateway_provider.clone(), - host_chain_provider, conf.clone(), gas, db_pool.clone(), diff --git a/coprocessor/fhevm-engine/transaction-sender/tests/add_ciphertext_tests.rs b/coprocessor/fhevm-engine/transaction-sender/tests/add_ciphertext_tests.rs index 260188c8fc..f6e03d9104 100644 --- a/coprocessor/fhevm-engine/transaction-sender/tests/add_ciphertext_tests.rs +++ b/coprocessor/fhevm-engine/transaction-sender/tests/add_ciphertext_tests.rs @@ -49,7 +49,6 @@ async fn add_ciphertext_digests(#[case] signer_type: SignerType) -> anyhow::Resu PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -147,7 +146,6 @@ async fn ciphertext_digest_already_added(#[case] signer_type: SignerType) -> any PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -243,7 +241,6 @@ async fn recover_from_transport_error(#[case] signer_type: SignerType) -> anyhow PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -351,7 +348,6 @@ async fn stop_on_backend_gone(#[case] signer_type: SignerType) -> anyhow::Result PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -446,7 +442,6 @@ async fn retry_mechanism(#[case] signer_type: SignerType) -> anyhow::Result<()> PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -552,7 +547,6 @@ async fn retry_on_aws_kms_error(#[case] signer_type: SignerType) -> anyhow::Resu PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -658,7 +652,6 @@ async fn stop_retrying_add_ciphertext_on_gw_config_error( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, diff --git a/coprocessor/fhevm-engine/transaction-sender/tests/allow_handle_tests.rs b/coprocessor/fhevm-engine/transaction-sender/tests/allow_handle_tests.rs index f30d4e0fa3..de8aeafaf0 100644 --- a/coprocessor/fhevm-engine/transaction-sender/tests/allow_handle_tests.rs +++ b/coprocessor/fhevm-engine/transaction-sender/tests/allow_handle_tests.rs @@ -114,7 +114,6 @@ async fn allow_call( *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), // shared blockchain env.cancel_token.clone(), env.conf.clone(), None, @@ -234,7 +233,6 @@ async fn stop_on_backend_gone(#[case] signer_type: SignerType) -> anyhow::Result *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), // shared blockchain env.cancel_token.clone(), env.conf.clone(), None, @@ -327,7 +325,6 @@ async fn retry_on_aws_kms_error(#[case] signer_type: SignerType) -> anyhow::Resu *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -429,7 +426,6 @@ async fn stop_retrying_allow_handle_on_gw_config_error( *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, diff --git a/coprocessor/fhevm-engine/transaction-sender/tests/delegate_user_decrypt.rs b/coprocessor/fhevm-engine/transaction-sender/tests/delegate_user_decrypt.rs index 057307d456..d568d70b89 100644 --- a/coprocessor/fhevm-engine/transaction-sender/tests/delegate_user_decrypt.rs +++ b/coprocessor/fhevm-engine/transaction-sender/tests/delegate_user_decrypt.rs @@ -5,6 +5,7 @@ use alloy::signers::local::PrivateKeySigner; use alloy::{primitives::Address, providers::WsConnect}; use common::{is_coprocessor_config_error, MultichainACL, SignerType, TestEnvironment}; +use fhevm_engine_common::chain_id::ChainId; use rstest::*; use serial_test::serial; use sqlx::PgPool; @@ -24,7 +25,7 @@ async fn insert_delegate_user_decrypt( delegation_counter: u64, old_expiration_date: u64, new_expiration_date: u64, - chain_id: u64, + chain_id: ChainId, block_hash: &[u8], block_number: u64, transaction_id: Option>, @@ -39,12 +40,20 @@ async fn insert_delegate_user_decrypt( delegation_counter as i64, old_expiration_date as i64, new_expiration_date as i64, - chain_id as i64, + chain_id.as_i64(), block_number as i64, block_hash, transaction_id, ); query.execute(pool).await?; + sqlx::query!( + "INSERT INTO host_chain_blocks_valid (chain_id, block_hash, block_number, block_status) VALUES ($1, $2, $3, 'pending') ON CONFLICT DO NOTHING", + chain_id.as_i64(), + block_hash, + block_number as i64, + ) + .execute(pool) + .await?; Ok(()) } @@ -77,8 +86,6 @@ async fn delegate_user_decrypt_life_cycle_aux( let multichain_acl = MultichainACL::deploy(&provider_deploy, already_allowed_revert).await?; let config = ConfigSettings { - delegation_block_delay: 2, - delegation_clear_after_n_blocks: 5, delegation_fallback_polling: 1000, // disable ..env.conf.clone() }; @@ -90,7 +97,6 @@ async fn delegate_user_decrypt_life_cycle_aux( *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), // shared blockchain env.cancel_token.clone(), config.clone(), None, @@ -99,7 +105,7 @@ async fn delegate_user_decrypt_life_cycle_aux( let run_handle = tokio::spawn(async move { txn_sender.run().await }); - let chain_id = provider.get_chain_id().await?; + let chain_id = ChainId::try_from(provider.get_chain_id().await?)?; let block = provider .inner() @@ -141,8 +147,24 @@ async fn delegate_user_decrypt_life_cycle_aux( } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - - for i in 0..(config.delegation_clear_after_n_blocks + 1) { + let finalization_delay = 3; + for i in 0..(finalization_delay + 3) { + if i == finalization_delay { + sqlx::query!( + "UPDATE host_chain_blocks_valid SET block_status = 'finalized' WHERE block_number = $1", + start_block as i64 + ) + .execute(&env.db_pool) + .await?; + } + if i == finalization_delay + 1 { + sqlx::query!( + "UPDATE host_chain_blocks_valid SET block_status = 'orphaned' WHERE block_number = $1", + (start_block + 1) as i64 + ) + .execute(&env.db_pool) + .await?; + } sqlx::query!( "SELECT pg_notify($1, $2)", "new_host_block", @@ -168,20 +190,14 @@ async fn delegate_user_decrypt_life_cycle_aux( .await? .count .unwrap_or(0); - if i < config.delegation_block_delay { + if i < finalization_delay { assert_eq!(present, 2); assert_eq!(reorg_out, 0); assert_eq!(on_gateway, 0); - } else if i == config.delegation_block_delay { + } else if i == finalization_delay { assert_eq!(present, 2); assert_eq!(reorg_out, 0); assert_eq!(on_gateway, 1); - } else if i == config.delegation_block_delay + 1 { - assert_eq!(present, 2); - assert_eq!(reorg_out, 1); - assert_eq!(on_gateway, 1); - } else if i > config.delegation_clear_after_n_blocks { - assert_eq!(present, 0); } else { assert_eq!(present, 2); assert_eq!(reorg_out, 1); @@ -236,8 +252,6 @@ async fn delegate_user_decrypt_idempotent_error_call( let multichain_acl = MultichainACL::deploy(&provider_deploy, already_allowed_revert).await?; let config = ConfigSettings { - delegation_block_delay: 2, - delegation_clear_after_n_blocks: 5, delegation_fallback_polling: 1000, // disable ..env.conf.clone() }; @@ -249,7 +263,6 @@ async fn delegate_user_decrypt_idempotent_error_call( *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), // shared blockchain env.cancel_token.clone(), config.clone(), None, @@ -258,7 +271,7 @@ async fn delegate_user_decrypt_idempotent_error_call( let run_handle = tokio::spawn(async move { txn_sender.run().await }); - let chain_id = provider.get_chain_id().await?; + let chain_id = ChainId::try_from(provider.get_chain_id().await?)?; let block = provider .inner() @@ -302,8 +315,16 @@ async fn delegate_user_decrypt_idempotent_error_call( } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - - for i in 0..(config.delegation_clear_after_n_blocks + 1) { + let finalization_delay = 3; + for i in 0..(finalization_delay + 3) { + if i == finalization_delay { + sqlx::query!( + "UPDATE host_chain_blocks_valid SET block_status = 'finalized' WHERE block_number = $1", + start_block as i64 + ) + .execute(&env.db_pool) + .await?; + } sqlx::query!( "SELECT pg_notify($1, $2)", "new_host_block", @@ -337,16 +358,10 @@ async fn delegate_user_decrypt_idempotent_error_call( .count .unwrap_or(0); error!("{i} {present} {on_gateway} {reorg_out} {error}"); - if i < config.delegation_block_delay { + if i < finalization_delay { assert_eq!(present, 2); assert_eq!(reorg_out, 0); assert_eq!(on_gateway, 0); - } else if i == config.delegation_block_delay { - assert_eq!(present, 2); - assert_eq!(reorg_out, 0); - assert_eq!(on_gateway, 2); - } else if i > config.delegation_clear_after_n_blocks { - assert_eq!(present, 0); } else { assert_eq!(present, 2); assert_eq!(reorg_out, 0); @@ -367,8 +382,6 @@ async fn stop_retrying_delegation_on_gw_config_error( ) -> anyhow::Result<()> { let config_error_mode: u8 = 1; let base_conf = ConfigSettings { - delegation_block_delay: 0, - delegation_clear_after_n_blocks: 5, delegation_fallback_polling: 1, delegation_max_retry: 3, ..Default::default() @@ -405,7 +418,6 @@ async fn stop_retrying_delegation_on_gw_config_error( *multichain_acl.address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -417,7 +429,7 @@ async fn stop_retrying_delegation_on_gw_config_error( .await?; let run_handle = tokio::spawn(async move { txn_sender.run().await }); - let chain_id = provider.get_chain_id().await?; + let chain_id = ChainId::try_from(provider.get_chain_id().await?)?; let block = provider .inner() .get_block_by_number(BlockNumberOrTag::Latest) @@ -450,6 +462,12 @@ async fn stop_retrying_delegation_on_gw_config_error( let mut attempts = 0; let row = loop { + sqlx::query!( + "UPDATE host_chain_blocks_valid SET block_status = 'finalized' WHERE block_number = $1", + start_block as i64 + ) + .execute(&env.db_pool) + .await?; sqlx::query!( "SELECT pg_notify($1, $2)", "new_host_block", @@ -457,7 +475,6 @@ async fn stop_retrying_delegation_on_gw_config_error( ) .execute(&env.db_pool) .await?; - let row = sqlx::query!( "SELECT on_gateway, gateway_nb_attempts, gateway_last_error FROM delegate_user_decrypt diff --git a/coprocessor/fhevm-engine/transaction-sender/tests/verify_proof_tests.rs b/coprocessor/fhevm-engine/transaction-sender/tests/verify_proof_tests.rs index 5359a1040c..6bdcd03b91 100644 --- a/coprocessor/fhevm-engine/transaction-sender/tests/verify_proof_tests.rs +++ b/coprocessor/fhevm-engine/transaction-sender/tests/verify_proof_tests.rs @@ -68,7 +68,6 @@ async fn verify_proof_response_success(#[case] signer_type: SignerType) -> anyho PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -186,7 +185,6 @@ async fn verify_proof_response_empty_handles_success( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -307,7 +305,6 @@ async fn verify_proof_response_concurrent_success( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -426,7 +423,6 @@ async fn reject_proof_response_success(#[case] signer_type: SignerType) -> anyho PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -538,7 +534,6 @@ async fn verify_proof_response_reversal_already_verified( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -642,7 +637,6 @@ async fn reject_proof_response_reversal_already_rejected( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -746,7 +740,6 @@ async fn verify_proof_response_other_reversal( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), Some(1_000_000_000_000_000), @@ -847,7 +840,6 @@ async fn reject_proof_response_other_reversal( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), Some(1_000_000_000_000_000), @@ -943,7 +935,6 @@ async fn verify_proof_response_other_reversal_gas_estimation( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -1043,7 +1034,6 @@ async fn reject_proof_response_other_reversal_gas_estimation( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -1145,7 +1135,6 @@ async fn verify_proof_max_retries_remove_entry( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -1237,7 +1226,6 @@ async fn verify_proof_max_retries_do_not_remove_entry( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, @@ -1356,7 +1344,6 @@ async fn stop_retrying_verify_proof_on_gw_config_error( PrivateKeySigner::random().address(), env.signer.clone(), provider.clone(), - provider.inner().clone(), env.cancel_token.clone(), env.conf.clone(), None, diff --git a/test-suite/fhevm/docker-compose/coprocessor-docker-compose.yml b/test-suite/fhevm/docker-compose/coprocessor-docker-compose.yml index 43939fe0ac..921111de82 100644 --- a/test-suite/fhevm/docker-compose/coprocessor-docker-compose.yml +++ b/test-suite/fhevm/docker-compose/coprocessor-docker-compose.yml @@ -57,6 +57,7 @@ services: - --tfhe-contract-address=${FHEVM_EXECUTOR_CONTRACT_ADDRESS} - --url=${RPC_WS_URL} - --initial-block-time=1 + - --catchup-finalization-in-blocks=5 depends_on: coprocessor-db-migration: condition: service_completed_successfully @@ -236,9 +237,6 @@ services: - --verify-proof-resp-max-retries=15 - --verify-proof-remove-after-max-retries - --signer-type=private-key - - --host-chain-url=${RPC_WS_URL} - - --delegation-block-delay=10 - - --delegation-clear-after-n-blocks=648000 - --delegation-fallback-polling=30 - --delegation-max-retry=100000 - --retry-immediately-on-nonce-error=2