Skip to content

Commit c8f7e16

Browse files
chore(kms-connector): gateway events renamed protocol events (#2209)
* chore(kms-connector): gateway events renamed protocol events * chore(kms-connector): parametrized integration tests
1 parent 4aaebe5 commit c8f7e16

File tree

19 files changed

+223
-340
lines changed

19 files changed

+223
-340
lines changed

kms-connector/crates/gw-listener/src/core/gateway.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use alloy::{
1111
use anyhow::anyhow;
1212
use connector_utils::{
1313
monitoring::otlp::PropagationContext,
14-
types::{GatewayEvent, GatewayEventKind, db::EventType},
14+
types::{ProtocolEvent, ProtocolEventKind, db::EventType},
1515
};
1616
use fhevm_gateway_bindings::{
1717
decryption::Decryption::DecryptionEvents, kms_generation::KMSGeneration::KMSGenerationEvents,
@@ -210,8 +210,8 @@ where
210210
Ok((to_block.saturating_add(1), to_block < current_block))
211211
}
212212

213-
/// Decodes a log into a `GatewayEventKind`.
214-
fn decode_log(contract: MonitoredContract, log: &Log) -> anyhow::Result<GatewayEventKind> {
213+
/// Decodes a log into a `ProtocolEventKind`.
214+
fn decode_log(contract: MonitoredContract, log: &Log) -> anyhow::Result<ProtocolEventKind> {
215215
match contract {
216216
MonitoredContract::Decryption => {
217217
let event = DecryptionEvents::decode_log(&log.inner)
@@ -237,11 +237,11 @@ where
237237
}
238238
}
239239

240-
/// Decodes logs and prepares `GatewayEvent` structs with OTLP context and metrics.
240+
/// Decodes logs and prepares `ProtocolEvent` structs with OTLP context and metrics.
241241
fn prepare_events(
242242
contract: MonitoredContract,
243243
logs: Vec<Log>,
244-
) -> anyhow::Result<Vec<GatewayEvent>> {
244+
) -> anyhow::Result<Vec<ProtocolEvent>> {
245245
let mut events = Vec::with_capacity(logs.len());
246246
for log in logs {
247247
let event_kind = Self::decode_log(contract, &log)?;
@@ -251,7 +251,7 @@ where
251251

252252
let span = info_span!("handle_gateway_event", event = %event_kind);
253253
let otlp_ctx = PropagationContext::inject(&span.context());
254-
events.push(GatewayEvent::new(
254+
events.push(ProtocolEvent::new(
255255
event_kind,
256256
log.transaction_hash,
257257
otlp_ctx,

kms-connector/crates/gw-listener/src/core/publish.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use anyhow::anyhow;
33
use connector_utils::{
44
monitoring::otlp::PropagationContext,
55
types::{
6-
GatewayEvent, GatewayEventKind,
6+
ProtocolEvent, ProtocolEventKind,
77
db::{EventType, ParamsTypeDb, SnsCiphertextMaterialDbItem},
88
},
99
};
@@ -25,7 +25,7 @@ use tracing::{debug, info, warn};
2525
#[tracing::instrument(skip_all)]
2626
pub async fn publish_batch(
2727
db_pool: &Pool<Postgres>,
28-
events: Vec<GatewayEvent>,
28+
events: Vec<ProtocolEvent>,
2929
event_types: &[EventType],
3030
block_number: u64,
3131
) -> anyhow::Result<()> {
@@ -40,36 +40,36 @@ pub async fn publish_batch(
4040

4141
async fn publish_event_inner<'e>(
4242
executor: impl PgExecutor<'e>,
43-
event: GatewayEvent,
43+
event: ProtocolEvent,
4444
) -> anyhow::Result<()> {
4545
info!("Storing {:?} in DB...", event.kind);
4646

4747
let otlp_ctx = event.otlp_context;
4848
let tx_hash = event.tx_hash;
4949
let created_at = event.created_at;
5050
let query_result = match event.kind {
51-
GatewayEventKind::PublicDecryption(e) => {
51+
ProtocolEventKind::PublicDecryption(e) => {
5252
publish_public_decryption(executor, e, tx_hash, created_at, otlp_ctx).await
5353
}
54-
GatewayEventKind::UserDecryption(e) => {
54+
ProtocolEventKind::UserDecryption(e) => {
5555
publish_user_decryption(executor, e, tx_hash, created_at, otlp_ctx).await
5656
}
57-
GatewayEventKind::PrepKeygen(e) => {
57+
ProtocolEventKind::PrepKeygen(e) => {
5858
let params_type: ParamsTypeDb = e.paramsType.try_into()?;
5959
publish_prep_keygen_request(executor, e, params_type, tx_hash, created_at, otlp_ctx)
6060
.await
6161
}
62-
GatewayEventKind::Keygen(e) => {
62+
ProtocolEventKind::Keygen(e) => {
6363
publish_keygen_request(executor, e, tx_hash, created_at, otlp_ctx).await
6464
}
65-
GatewayEventKind::Crsgen(e) => {
65+
ProtocolEventKind::Crsgen(e) => {
6666
let params_type: ParamsTypeDb = e.paramsType.try_into()?;
6767
publish_crsgen_request(executor, e, params_type, tx_hash, created_at, otlp_ctx).await
6868
}
69-
GatewayEventKind::PrssInit(id) => {
69+
ProtocolEventKind::PrssInit(id) => {
7070
publish_prss_init(executor, id, tx_hash, created_at, otlp_ctx).await
7171
}
72-
GatewayEventKind::KeyReshareSameSet(e) => {
72+
ProtocolEventKind::KeyReshareSameSet(e) => {
7373
let params_type: ParamsTypeDb = e.paramsType.try_into()?;
7474
publish_key_reshare_same_set(executor, e, params_type, tx_hash, created_at, otlp_ctx)
7575
.await

kms-connector/crates/gw-listener/tests/block_tracking.rs

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,59 +8,18 @@ use tokio_util::sync::CancellationToken;
88
use tracing::info;
99

1010
#[rstest]
11+
#[case::public_decryption(EventType::PublicDecryptionRequest)]
12+
#[case::user_decryption(EventType::UserDecryptionRequest)]
13+
#[case::prep_keygen(EventType::PrepKeygenRequest)]
14+
#[case::keygen(EventType::KeygenRequest)]
15+
#[case::crsgen(EventType::CrsgenRequest)]
16+
// As there is currently only one PRSS init ID allowed, the test won't pass as there will be only
17+
// one row in the DB instead of two.
18+
// #[case::prss_init(EventType::PrssInit)]
19+
#[case::key_reshare_same_set(EventType::KeyReshareSameSet)]
1120
#[timeout(Duration::from_secs(90))]
1221
#[tokio::test]
13-
async fn test_block_tracking_public_decryption() -> anyhow::Result<()> {
14-
test_block_tracking(EventType::PublicDecryptionRequest).await
15-
}
16-
17-
#[rstest]
18-
#[timeout(Duration::from_secs(90))]
19-
#[tokio::test]
20-
async fn test_block_tracking_user_decryption() -> anyhow::Result<()> {
21-
test_block_tracking(EventType::UserDecryptionRequest).await
22-
}
23-
24-
#[rstest]
25-
#[timeout(Duration::from_secs(90))]
26-
#[tokio::test]
27-
async fn test_block_tracking_prep_keygen() -> anyhow::Result<()> {
28-
test_block_tracking(EventType::PrepKeygenRequest).await
29-
}
30-
31-
#[rstest]
32-
#[timeout(Duration::from_secs(90))]
33-
#[tokio::test]
34-
async fn test_block_tracking_keygen() -> anyhow::Result<()> {
35-
test_block_tracking(EventType::KeygenRequest).await
36-
}
37-
38-
#[rstest]
39-
#[timeout(Duration::from_secs(90))]
40-
#[tokio::test]
41-
async fn test_block_tracking_crsgen() -> anyhow::Result<()> {
42-
test_block_tracking(EventType::CrsgenRequest).await
43-
}
44-
45-
#[rstest]
46-
#[timeout(Duration::from_secs(90))]
47-
#[tokio::test]
48-
#[ignore = "
49-
As there is currently only one PRSS init ID allowed,
50-
the test won't pass as there will be only one row in the DB instead of two
51-
"]
52-
async fn test_block_tracking_prss_init() -> anyhow::Result<()> {
53-
test_block_tracking(EventType::PrssInit).await
54-
}
55-
56-
#[rstest]
57-
#[timeout(Duration::from_secs(90))]
58-
#[tokio::test]
59-
async fn test_block_tracking_key_reshare_same_set() -> anyhow::Result<()> {
60-
test_block_tracking(EventType::KeyReshareSameSet).await
61-
}
62-
63-
async fn test_block_tracking(event_type: EventType) -> anyhow::Result<()> {
22+
async fn test_block_tracking(#[case] event_type: EventType) -> anyhow::Result<()> {
6423
let mut test_instance = TestInstanceBuilder::db_gw_setup().await?;
6524
let cancel_token = CancellationToken::new();
6625
let gw_listener_task =

kms-connector/crates/gw-listener/tests/catchup.rs

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,55 +8,16 @@ use tokio_util::sync::CancellationToken;
88
use tracing::info;
99

1010
#[rstest]
11+
#[case::public_decryption(EventType::PublicDecryptionRequest)]
12+
#[case::user_decryption(EventType::UserDecryptionRequest)]
13+
#[case::prep_keygen(EventType::PrepKeygenRequest)]
14+
#[case::keygen(EventType::KeygenRequest)]
15+
#[case::crsgen(EventType::CrsgenRequest)]
16+
#[case::prss_init(EventType::PrssInit)]
17+
#[case::key_reshare_same_set(EventType::KeyReshareSameSet)]
1118
#[timeout(Duration::from_secs(60))]
1219
#[tokio::test]
13-
async fn test_catchup_public_decryption_from_block() -> anyhow::Result<()> {
14-
test_catchup_from_block(EventType::PublicDecryptionRequest).await
15-
}
16-
17-
#[rstest]
18-
#[timeout(Duration::from_secs(60))]
19-
#[tokio::test]
20-
async fn test_catchup_user_decryption_from_block() -> anyhow::Result<()> {
21-
test_catchup_from_block(EventType::UserDecryptionRequest).await
22-
}
23-
24-
#[rstest]
25-
#[timeout(Duration::from_secs(60))]
26-
#[tokio::test]
27-
async fn test_catchup_prep_keygen_from_block() -> anyhow::Result<()> {
28-
test_catchup_from_block(EventType::PrepKeygenRequest).await
29-
}
30-
31-
#[rstest]
32-
#[timeout(Duration::from_secs(60))]
33-
#[tokio::test]
34-
async fn test_catchup_keygen_from_block() -> anyhow::Result<()> {
35-
test_catchup_from_block(EventType::KeygenRequest).await
36-
}
37-
38-
#[rstest]
39-
#[timeout(Duration::from_secs(60))]
40-
#[tokio::test]
41-
async fn test_catchup_crsgen_from_block() -> anyhow::Result<()> {
42-
test_catchup_from_block(EventType::CrsgenRequest).await
43-
}
44-
45-
#[rstest]
46-
#[timeout(Duration::from_secs(60))]
47-
#[tokio::test]
48-
async fn test_catchup_prss_init_from_block() -> anyhow::Result<()> {
49-
test_catchup_from_block(EventType::PrssInit).await
50-
}
51-
52-
#[rstest]
53-
#[timeout(Duration::from_secs(60))]
54-
#[tokio::test]
55-
async fn test_catchup_key_reshare_same_set_from_block() -> anyhow::Result<()> {
56-
test_catchup_from_block(EventType::KeyReshareSameSet).await
57-
}
58-
59-
async fn test_catchup_from_block(event_type: EventType) -> anyhow::Result<()> {
20+
async fn test_catchup_from_block(#[case] event_type: EventType) -> anyhow::Result<()> {
6021
let mut test_instance = TestInstanceBuilder::db_gw_setup().await?;
6122
let cancel_token = CancellationToken::new();
6223

kms-connector/crates/gw-listener/tests/common/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use connector_utils::{
1111
setup::TestInstance,
1212
},
1313
types::{
14-
GatewayEventKind,
14+
ProtocolEventKind,
1515
db::{EventType, ParamsTypeDb},
16-
gw_event::PRSS_INIT_ID,
16+
event::PRSS_INIT_ID,
1717
},
1818
};
1919
use fhevm_gateway_bindings::{
@@ -67,7 +67,7 @@ pub async fn start_test_listener(
6767
pub async fn mock_event_on_gw(
6868
test_instance: &TestInstance,
6969
event_type: EventType,
70-
) -> anyhow::Result<(GatewayEventKind, Option<u64>)> {
70+
) -> anyhow::Result<(ProtocolEventKind, Option<u64>)> {
7171
info!("Mocking {event_type} on Anvil...");
7272
let (pending_tx, event) = match event_type {
7373
EventType::PublicDecryptionRequest => {
@@ -194,7 +194,7 @@ pub async fn fetch_from_db(db: &Pool<Postgres>, event_type: EventType) -> sqlx::
194194
pub async fn poll_db_for_event(
195195
db: &Pool<Postgres>,
196196
event_type: EventType,
197-
expected_event: &GatewayEventKind,
197+
expected_event: &ProtocolEventKind,
198198
) -> anyhow::Result<()> {
199199
let timeout = Duration::from_secs(30);
200200
let poll_interval = Duration::from_millis(200);
@@ -211,16 +211,16 @@ pub async fn poll_db_for_event(
211211
}
212212
}
213213

214-
pub fn check_event_in_db(rows: &[PgRow], event: GatewayEventKind) -> anyhow::Result<()> {
214+
pub fn check_event_in_db(rows: &[PgRow], event: ProtocolEventKind) -> anyhow::Result<()> {
215215
match event {
216-
GatewayEventKind::PublicDecryption(e) => {
216+
ProtocolEventKind::PublicDecryption(e) => {
217217
for r in rows {
218218
if e.extraData.to_vec() == r.try_get::<Vec<u8>, _>("extra_data")? {
219219
return Ok(());
220220
}
221221
}
222222
}
223-
GatewayEventKind::UserDecryption(e) => {
223+
ProtocolEventKind::UserDecryption(e) => {
224224
for r in rows {
225225
if e.publicKey.to_vec() == r.try_get::<Vec<u8>, _>("public_key")?
226226
&& e.userAddress == Address::from(r.try_get::<[u8; 20], _>("user_address")?)
@@ -229,14 +229,14 @@ pub fn check_event_in_db(rows: &[PgRow], event: GatewayEventKind) -> anyhow::Res
229229
}
230230
}
231231
}
232-
GatewayEventKind::PrepKeygen(_) => {
232+
ProtocolEventKind::PrepKeygen(_) => {
233233
for r in rows {
234234
if r.try_get::<ParamsTypeDb, _>("params_type")? == ParamsTypeDb::Test {
235235
return Ok(());
236236
}
237237
}
238238
}
239-
GatewayEventKind::Keygen(e) => {
239+
ProtocolEventKind::Keygen(e) => {
240240
for r in rows {
241241
if e.prepKeygenId
242242
== U256::from_le_bytes(r.try_get::<[u8; 32], _>("prep_keygen_id")?)
@@ -245,7 +245,7 @@ pub fn check_event_in_db(rows: &[PgRow], event: GatewayEventKind) -> anyhow::Res
245245
}
246246
}
247247
}
248-
GatewayEventKind::Crsgen(e) => {
248+
ProtocolEventKind::Crsgen(e) => {
249249
for r in rows {
250250
if e.maxBitLength
251251
== U256::from_le_bytes(r.try_get::<[u8; 32], _>("max_bit_length")?)
@@ -254,14 +254,14 @@ pub fn check_event_in_db(rows: &[PgRow], event: GatewayEventKind) -> anyhow::Res
254254
}
255255
}
256256
}
257-
GatewayEventKind::PrssInit(_) => {
257+
ProtocolEventKind::PrssInit(_) => {
258258
for r in rows {
259259
if U256::from_le_bytes(r.try_get::<[u8; 32], _>("id")?) == PRSS_INIT_ID {
260260
return Ok(());
261261
}
262262
}
263263
}
264-
GatewayEventKind::KeyReshareSameSet(e) => {
264+
ProtocolEventKind::KeyReshareSameSet(e) => {
265265
for r in rows {
266266
if e.keyId == U256::from_le_bytes(r.try_get::<[u8; 32], _>("key_id")?) {
267267
return Ok(());

kms-connector/crates/gw-listener/tests/integration_test.rs

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,55 +9,16 @@ use tokio_util::sync::CancellationToken;
99
use tracing::info;
1010

1111
#[rstest]
12+
#[case::public_decryption(EventType::PublicDecryptionRequest)]
13+
#[case::user_decryption(EventType::UserDecryptionRequest)]
14+
#[case::prep_keygen(EventType::PrepKeygenRequest)]
15+
#[case::keygen(EventType::KeygenRequest)]
16+
#[case::crsgen(EventType::CrsgenRequest)]
17+
#[case::prss_init(EventType::PrssInit)]
18+
#[case::key_reshare_same_set(EventType::KeyReshareSameSet)]
1219
#[timeout(Duration::from_secs(60))]
1320
#[tokio::test]
14-
async fn test_publish_public_decryption() -> anyhow::Result<()> {
15-
test_publish_event(EventType::PublicDecryptionRequest).await
16-
}
17-
18-
#[rstest]
19-
#[timeout(Duration::from_secs(60))]
20-
#[tokio::test]
21-
async fn test_publish_user_decryption() -> anyhow::Result<()> {
22-
test_publish_event(EventType::UserDecryptionRequest).await
23-
}
24-
25-
#[rstest]
26-
#[timeout(Duration::from_secs(60))]
27-
#[tokio::test]
28-
async fn test_publish_prep_keygen() -> anyhow::Result<()> {
29-
test_publish_event(EventType::PrepKeygenRequest).await
30-
}
31-
32-
#[rstest]
33-
#[timeout(Duration::from_secs(60))]
34-
#[tokio::test]
35-
async fn test_publish_keygen() -> anyhow::Result<()> {
36-
test_publish_event(EventType::KeygenRequest).await
37-
}
38-
39-
#[rstest]
40-
#[timeout(Duration::from_secs(60))]
41-
#[tokio::test]
42-
async fn test_publish_crsgen() -> anyhow::Result<()> {
43-
test_publish_event(EventType::CrsgenRequest).await
44-
}
45-
46-
#[rstest]
47-
#[timeout(Duration::from_secs(60))]
48-
#[tokio::test]
49-
async fn test_publish_prss_init() -> anyhow::Result<()> {
50-
test_publish_event(EventType::PrssInit).await
51-
}
52-
53-
#[rstest]
54-
#[timeout(Duration::from_secs(60))]
55-
#[tokio::test]
56-
async fn test_publish_key_reshare_same_set() -> anyhow::Result<()> {
57-
test_publish_event(EventType::KeyReshareSameSet).await
58-
}
59-
60-
async fn test_publish_event(event_type: EventType) -> anyhow::Result<()> {
21+
async fn test_publish_event(#[case] event_type: EventType) -> anyhow::Result<()> {
6122
let mut test_instance = TestInstanceBuilder::db_gw_setup().await?;
6223
let cancel_token = CancellationToken::new();
6324
let gw_listener_task =

0 commit comments

Comments
 (0)