Skip to content

Commit 3dca744

Browse files
committed
feat(coprocessor): gw-listener, catchup for KMSGeneration contract
add --catchup-kms-generation-from-block
1 parent ec766d3 commit 3dca744

File tree

5 files changed

+173
-2
lines changed

5 files changed

+173
-2
lines changed

coprocessor/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ Options:
160160
[default: 1]
161161
--error-sleep-max-secs <ERROR_SLEEP_MAX_SECS>
162162
[default: 10]
163+
--catchup-kms-generation-from-block <BLOCK_NUMBER OR BLOCKS IN PAST>
164+
[default: None]
163165
-h, --help
164166
Print help
165167
-V, --version

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ struct Conf {
7272
/// gw-listener service name in OTLP traces
7373
#[arg(long, default_value = "gw-listener")]
7474
pub service_name: String,
75+
76+
#[arg(long, default_value = None, help = "Can be negative from last processed block", allow_hyphen_values = true)]
77+
pub catchup_kms_generation_from_block: Option<i64>,
7578
}
7679

7780
fn install_signal_handlers(cancel_token: CancellationToken) -> anyhow::Result<()> {
@@ -142,7 +145,7 @@ async fn main() -> anyhow::Result<()> {
142145
let cancel_token = CancellationToken::new();
143146

144147
let Some(host_chain_id) = conf.host_chain_id.or_else(chain_id_from_env) else {
145-
anyhow::bail!("--chain-id or CHAIN_ID env var is missing.")
148+
anyhow::bail!("--host-chain-id or CHAIN_ID env var is missing.")
146149
};
147150
let config = ConfigSettings {
148151
host_chain_id,
@@ -156,6 +159,7 @@ async fn main() -> anyhow::Result<()> {
156159
health_check_timeout: conf.health_check_timeout,
157160
get_logs_poll_interval: conf.get_logs_poll_interval,
158161
get_logs_block_batch_size: conf.get_logs_block_batch_size,
162+
catchup_kms_generation_from_block: conf.catchup_kms_generation_from_block,
159163
};
160164

161165
let gw_listener = GatewayListener::new(

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,13 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
111111
let s = self.clone();
112112
let d = db_pool.clone();
113113
tokio::spawn(async move {
114+
let mut catchup_kms_generation_from = s.conf.catchup_kms_generation_from_block;
114115
let mut sleep_duration = s.conf.error_sleep_initial_secs as u64;
115116
loop {
116-
match s.run_get_logs(&d, &mut sleep_duration).await {
117+
match s
118+
.run_get_logs(&d, &mut sleep_duration, &mut catchup_kms_generation_from)
119+
.await
120+
{
117121
Ok(_) => {
118122
info!("run_get_logs() stopped");
119123
break;
@@ -182,9 +186,26 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
182186
&self,
183187
db_pool: &Pool<Postgres>,
184188
sleep_duration: &mut u64,
189+
catchup_kms_generation_from: &mut Option<i64>,
185190
) -> anyhow::Result<()> {
186191
let mut ticker = tokio::time::interval(self.conf.get_logs_poll_interval);
187192
let mut last_processed_block_num = self.get_last_processed_block_num(db_pool).await?;
193+
if let Some(from_block) = *catchup_kms_generation_from {
194+
info!(from_block, "Catchup on KMSGeneration, start");
195+
let from_block = if from_block >= 0 {
196+
// start from specified block
197+
from_block
198+
} else if let Some(last) = last_processed_block_num {
199+
// go N block in past from processed
200+
last as i64 + from_block
201+
} else {
202+
// go N block in past
203+
let current_block = self.provider.get_block_number().await?;
204+
current_block as i64 + from_block
205+
};
206+
// note, we cannot catchup block 0
207+
last_processed_block_num = Some(from_block.try_into().unwrap_or(0));
208+
}
188209

189210
loop {
190211
tokio::select! {
@@ -217,6 +238,9 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
217238
.to_block(to_block);
218239

219240
let logs = self.provider.get_logs(&filter).await?;
241+
if catchup_kms_generation_from.is_some() && from_block < current_block {
242+
info!(from_block, to_block, nb_events=logs.len(), "Catchup on KMSGeneration, get_logs");
243+
}
220244
for log in logs {
221245
if let Ok(event) = KMSGeneration::KMSGenerationEvents::decode_log(&log.inner) {
222246
match event.data {
@@ -246,6 +270,16 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface + Clone + 'stati
246270
}
247271
}
248272
last_processed_block_num = Some(to_block);
273+
if catchup_kms_generation_from.is_some() {
274+
if to_block == current_block {
275+
info!("Catchup on KMSGeneration, finished");
276+
*catchup_kms_generation_from = None;
277+
} else {
278+
// if an error happens catchup will restart here
279+
*catchup_kms_generation_from = Some(to_block as i64 + 1);
280+
info!(catchup_kms_generation_from, "Catchup on KMSGeneration continue");
281+
}
282+
}
249283
self.update_last_block_num(db_pool, last_processed_block_num).await?;
250284
if to_block < current_block {
251285
debug!(to_block = to_block,

coprocessor/fhevm-engine/gw-listener/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub struct ConfigSettings {
4848

4949
pub get_logs_poll_interval: Duration,
5050
pub get_logs_block_batch_size: u64,
51+
pub catchup_kms_generation_from_block: Option<i64>,
5152
}
5253

5354
pub fn chain_id_from_env() -> Option<ChainId> {
@@ -74,6 +75,7 @@ impl Default for ConfigSettings {
7475
health_check_timeout: Duration::from_secs(4),
7576
get_logs_poll_interval: Duration::from_secs(1),
7677
get_logs_block_batch_size: 100,
78+
catchup_kms_generation_from_block: None,
7779
}
7880
}
7981
}

coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,135 @@ async fn keygen_ok() -> anyhow::Result<()> {
459459
Ok(())
460460
}
461461

462+
#[tokio::test]
463+
#[serial(db)]
464+
async fn keygen_ok_catchup_positive() -> anyhow::Result<()> {
465+
keygen_ok_catchup_gen(true).await
466+
}
467+
468+
#[tokio::test]
469+
#[serial(db)]
470+
async fn keygen_ok_catchup_negative() -> anyhow::Result<()> {
471+
keygen_ok_catchup_gen(false).await
472+
}
473+
474+
async fn keygen_ok_catchup_gen(positive: bool) -> anyhow::Result<()> {
475+
use aws_sdk_s3::operation::get_object::GetObjectOutput;
476+
use aws_sdk_s3::primitives::ByteStream;
477+
use aws_sdk_s3::Client;
478+
use aws_smithy_mocks::RuleMode;
479+
use aws_smithy_mocks::{mock, mock_client};
480+
use gw_listener::KeyType;
481+
482+
// see ../contracts/KMSGeneration.sol
483+
let buckets = [
484+
"test-bucket1/PUB-P1",
485+
"test-bucket2/PUB-P2",
486+
"test-bucket3/PUB-P3",
487+
"test-bucket4/PUB-P4",
488+
];
489+
490+
let keys_digests = [KeyType::PublicKey, KeyType::ServerKey];
491+
492+
let key_id = U256::from(16);
493+
494+
let mut rules = vec![];
495+
for &bucket in &buckets {
496+
for key_type in &keys_digests {
497+
let key_type_str: &str = to_bucket_key_prefix(*key_type);
498+
let key_id_no_0x = key_id_to_key_bucket(key_id);
499+
let key = format!("{}/{}", key_type_str, key_id_no_0x);
500+
eprintln!("Adding {}/{}", bucket, key);
501+
let get_object_rule = mock!(Client::get_object)
502+
.match_requests(move |req| req.bucket() == Some(bucket) && req.key() == Some(&key))
503+
.then_output(|| {
504+
GetObjectOutput::builder()
505+
.body(ByteStream::from_static(b"key_bytes"))
506+
.build()
507+
});
508+
rules.push(get_object_rule);
509+
}
510+
}
511+
for &bucket in &buckets {
512+
let key_id_no_0x = &format!("{key_id:064X}");
513+
let key = format!("PUB/CRS/{key_id_no_0x}");
514+
eprintln!("Adding {}/{}", bucket, key);
515+
let get_object_rule = mock!(Client::get_object)
516+
.match_requests(move |req| req.bucket() == Some(bucket) && req.key() == Some(&key))
517+
.then_output(|| {
518+
GetObjectOutput::builder()
519+
.body(ByteStream::from_static(b"key_bytes"))
520+
.build()
521+
});
522+
rules.push(get_object_rule);
523+
}
524+
let rules_ref: Vec<_> = rules.iter().collect();
525+
526+
// Create a mocked client with the rule
527+
let s3 = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &rules_ref);
528+
529+
let env = TestEnvironment::new().await?;
530+
let provider = ProviderBuilder::new()
531+
.wallet(env.wallet)
532+
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
533+
.await?;
534+
let aws_s3_client = AwsS3ClientMocked(s3);
535+
let input_verification = InputVerification::deploy(&provider).await?;
536+
let kms_generation = KMSGeneration::deploy(&provider).await?;
537+
538+
assert!(provider.get_block_number().await? > 0);
539+
540+
let txn_req = kms_generation
541+
.keygen_public_key()
542+
.into_transaction_request();
543+
let pending_txn = provider.send_transaction(txn_req).await?;
544+
let receipt = pending_txn.get_receipt().await?;
545+
assert!(receipt.status());
546+
547+
let txn_req = kms_generation
548+
.keygen_server_key()
549+
.into_transaction_request();
550+
let pending_txn = provider.send_transaction(txn_req).await?;
551+
let receipt = pending_txn.get_receipt().await?;
552+
assert!(receipt.status());
553+
554+
let txn_req = kms_generation.crsgen().into_transaction_request();
555+
let pending_txn = provider.send_transaction(txn_req).await?;
556+
let receipt = pending_txn.get_receipt().await?;
557+
assert!(receipt.status());
558+
559+
assert!(has_not_public_key(&env.db_pool.clone()).await?);
560+
assert!(has_not_server_key(&env.db_pool.clone()).await?);
561+
assert!(has_not_crs(&env.db_pool.clone()).await?);
562+
563+
let catchup_kms_generation_from_block = if positive {
564+
Some(0)
565+
} else {
566+
Some(-(provider.get_block_number().await? as i64))
567+
};
568+
let conf = ConfigSettings {
569+
catchup_kms_generation_from_block,
570+
..env.conf.clone()
571+
};
572+
let gw_listener = GatewayListener::new(
573+
*input_verification.address(),
574+
*kms_generation.address(),
575+
conf,
576+
env.cancel_token.clone(),
577+
provider.clone(),
578+
aws_s3_client.clone(),
579+
);
580+
let listener = tokio::spawn(async move { gw_listener.run().await });
581+
582+
assert!(has_public_key(&env.db_pool.clone()).await?);
583+
assert!(has_server_key(&env.db_pool.clone()).await?);
584+
assert!(has_crs(&env.db_pool.clone()).await?);
585+
586+
env.cancel_token.cancel();
587+
listener.abort();
588+
Ok(())
589+
}
590+
462591
#[tokio::test]
463592
#[serial(db)]
464593
async fn keygen_compromised_key() -> anyhow::Result<()> {

0 commit comments

Comments
 (0)