Skip to content

Commit 571fa95

Browse files
committed
fix(coprocessor): wip
1 parent 9f94772 commit 571fa95

File tree

14 files changed

+150
-74
lines changed

14 files changed

+150
-74
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ axum = "0.7"
3232
tower-http = { version = "0.5", features = ["trace"] }
3333
anyhow = "1.0.98"
3434
aws-config = "1.8.5"
35+
aws-credential-types = "1.2.6"
3536
aws-sdk-kms = { version = "1.68.0", default-features = false }
3637
aws-sdk-s3 = { version = "1.103.0", features = ["test-util"] }
3738
bigdecimal = "0.4.8"

coprocessor/fhevm-engine/gw-listener/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ alloy = { workspace = true }
1111
anyhow = { workspace = true }
1212
axum = { workspace = true }
1313
aws-config = { workspace = true }
14+
aws-credential-types = { workspace = true }
1415
aws-sdk-s3 = { workspace = true }
1516
clap = { workspace = true }
1617
futures-util = { workspace = true }
@@ -26,6 +27,7 @@ tokio-util = { workspace = true }
2627
tracing = { workspace = true }
2728
tracing-subscriber = { workspace = true }
2829
tower-http = { workspace = true }
30+
url = "2.5.7"
2931

3032
fhevm-engine-common = { path = "../fhevm-engine-common" }
3133

coprocessor/fhevm-engine/gw-listener/src/aws_s3.rs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::time::Duration;
22

33
use aws_config::{retry::RetryConfig, timeout::TimeoutConfig, BehaviorVersion};
4-
use aws_sdk_s3::{config::Builder, Client};
4+
use aws_sdk_s3::config::{Builder, ProvideCredentials};
5+
use aws_sdk_s3::Client;
56
use tokio_util::bytes;
6-
use tracing::{error, info};
7+
use tracing::{error, info, warn};
78

89
#[derive(Clone, Debug, Default)]
910
pub struct S3Policy {
@@ -26,8 +27,22 @@ impl S3Policy {
2627
};
2728
}
2829

29-
pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
30+
pub async fn create_s3_client(
31+
retry_policy: &S3Policy,
32+
url: &str,
33+
) -> anyhow::Result<aws_sdk_s3::Client> {
3034
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
35+
36+
let credentials = sdk_config
37+
.credentials_provider()
38+
.ok_or(anyhow::Error::msg("s3 client: no credential provider"))?
39+
.provide_credentials()
40+
.await?;
41+
info!(access_key = %credentials.access_key_id(), "Loaded AWS credentials");
42+
43+
let region = sdk_config.region();
44+
info!(region = ?region, "Using AWS region");
45+
3146
let timeout_config = TimeoutConfig::builder()
3247
.connect_timeout(retry_policy.connect_timeout)
3348
.operation_attempt_timeout(retry_policy.max_retries_timeout)
@@ -40,43 +55,65 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
4055
let config = Builder::from(&sdk_config)
4156
.timeout_config(timeout_config)
4257
.retry_config(retry_config)
58+
.endpoint_url(url)
4359
.build();
4460

45-
Client::from_conf(config)
61+
Ok(Client::from_conf(config))
4662
}
4763

48-
pub async fn default_aws_s3_client() -> AwsS3Client {
49-
let s3_client = create_s3_client(&S3Policy::DEFAULT).await;
50-
AwsS3Client { s3_client }
51-
}
52-
53-
// Let's wrap Aws Client to have an interface for it so we can mock it.
64+
// Let's wrap Aws access to have an interface for it so we can mock it.
5465
#[derive(Clone)]
55-
pub struct AwsS3Client {
56-
pub s3_client: Client,
57-
}
66+
pub struct AwsS3Client {}
5867

5968
impl AwsS3Interface for AwsS3Client {
60-
async fn get_bucket_key(&self, bucket: &str, key: &str) -> anyhow::Result<bytes::Bytes> {
61-
let result = self
62-
.s3_client
69+
async fn get_bucket_key(
70+
&self,
71+
url: &str,
72+
bucket: &str,
73+
key: &str,
74+
) -> anyhow::Result<bytes::Bytes> {
75+
Ok(create_s3_client(&S3Policy::DEFAULT, url)
76+
.await?
6377
.get_object()
6478
.bucket(bucket)
6579
.key(key)
6680
.send()
67-
.await?;
68-
Ok(result.body.collect().await?.into_bytes())
81+
.await?
82+
.body
83+
.collect()
84+
.await?
85+
.into_bytes())
6986
}
7087
}
7188

7289
pub trait AwsS3Interface {
7390
fn get_bucket_key(
7491
&self,
92+
url: &str,
7593
bucket: &str,
7694
key: &str,
7795
) -> impl std::future::Future<Output = anyhow::Result<bytes::Bytes>>;
7896
}
7997

98+
fn split_url(s3_bucket_url: &String) -> anyhow::Result<(String, String)> {
99+
let parsed_url_and_bucket = url::Url::parse(s3_bucket_url)?;
100+
let bucket = parsed_url_and_bucket.path();
101+
let host = s3_bucket_url
102+
.replace(bucket, "")
103+
.trim_end_matches('/')
104+
.to_owned();
105+
let host = if host.contains("minio:9000") {
106+
// TODO: replace by docker configuration
107+
warn!(s3_bucket_url, "Using localhost for minio access");
108+
host.replace("minio:9000", "172.17.0.1:9000")
109+
} else {
110+
host.to_owned()
111+
};
112+
let bucket = bucket.trim_start_matches('/');
113+
info!(s3_bucket_url, host, bucket, "Parsed S3 url");
114+
Ok((host.to_owned(), bucket.to_owned()))
115+
}
116+
80117
pub async fn download_key_from_s3<A: AwsS3Interface>(
81118
s3_client: &A,
82119
s3_bucket_urls: &[String],
@@ -92,7 +129,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92129
key_path,
93130
s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94131
);
95-
let result = s3_client.get_bucket_key(s3_bucket_url, &key_path).await;
132+
let Ok((url, bucket)) = split_url(s3_bucket_url) else {
133+
error!(s3_bucket_url, "Failed to parse S3 url");
134+
continue;
135+
};
136+
let result = s3_client.get_bucket_key(&url, &bucket, &key_path).await;
96137
let Ok(result) = result else {
97138
error!(s3_bucket_url, key_path, result = ?result, "Downloading failed");
98139
continue;

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use alloy::providers::{ProviderBuilder, WsConnect};
44
use alloy::{primitives::Address, transports::http::reqwest::Url};
55
use clap::Parser;
6-
use gw_listener::aws_s3::default_aws_s3_client;
6+
use gw_listener::aws_s3::AwsS3Client;
77
use gw_listener::gw_listener::GatewayListener;
88
use gw_listener::http_server::HttpServer;
99
use gw_listener::ConfigSettings;
@@ -116,7 +116,7 @@ async fn main() -> anyhow::Result<()> {
116116
}
117117
};
118118

119-
let aws_s3_client = default_aws_s3_client().await;
119+
let aws_s3_client = AwsS3Client {};
120120

121121
let cancel_token = CancellationToken::new();
122122

coprocessor/fhevm-engine/gw-listener/src/database.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ pub async fn update_tenant_key(
3838
key_bytes: &[u8],
3939
reduced_key_bytes: Option<Vec<u8>>,
4040
tenant_id: TenantId,
41-
chain_id: ChainId,
41+
host_chain_id: ChainId,
4242
) -> anyhow::Result<()> {
4343
let query = match key_type {
4444
KeyType::ServerKey => {
45-
info!(tenant_id, chain_id, key_id, "Updating server key");
45+
info!(tenant_id, host_chain_id, key_id, "Updating server key");
4646
let Some(reduced_key_bytes) = reduced_key_bytes else {
4747
anyhow::bail!("Reduced key bytes must be provided for server key");
4848
};
@@ -58,11 +58,11 @@ pub async fn update_tenant_key(
5858
reduced_key_bytes,
5959
key_id,
6060
tenant_id as i32,
61-
chain_id as i64,
61+
host_chain_id as i64,
6262
)
6363
}
6464
KeyType::PublicKey => {
65-
info!(tenant_id, chain_id, key_id, "Updating public key");
65+
info!(tenant_id, host_chain_id, key_id, "Updating public key");
6666
sqlx::query!(
6767
"UPDATE tenants
6868
SET
@@ -72,7 +72,7 @@ pub async fn update_tenant_key(
7272
key_bytes,
7373
key_id,
7474
tenant_id as i32,
75-
chain_id as i64,
75+
host_chain_id as i64,
7676
)
7777
}
7878
};
@@ -81,7 +81,7 @@ pub async fn update_tenant_key(
8181
anyhow::bail!(
8282
"No tenant found for tenant_id {} and chain_id {}",
8383
tenant_id,
84-
chain_id
84+
host_chain_id
8585
);
8686
}
8787
Ok(())

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::aws_s3::{download_key_from_s3, AwsS3Interface};
1313
use crate::database::{tenant_id, update_tenant_crs, update_tenant_key};
1414
use crate::digest::{digest_crs, digest_key};
1515
use crate::sks_key::extract_server_key_without_ns;
16-
use crate::{ChainId, ConfigSettings, HealthStatus, KeyId, KeyType, TenantId};
16+
use crate::{ChainId, ConfigSettings, HealthStatus, KeyId, KeyType};
1717

1818
sol!(
1919
#[sol(rpc)]
@@ -27,11 +27,6 @@ sol!(
2727
"./../../../gateway-contracts/artifacts/contracts/KMSManagement.sol/KMSManagement.json"
2828
);
2929

30-
struct TenantInfo {
31-
tenant_id: Option<TenantId>,
32-
chain_id: ChainId,
33-
}
34-
3530
pub struct GatewayListener<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> {
3631
input_verification_address: Address,
3732
kms_management_address: Address,
@@ -105,12 +100,8 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
105100
let kms_management = KMSManagement::new(self.kms_management_address, &self.provider);
106101

107102
let mut from_block = self.get_last_block_num(db_pool).await?;
108-
let chain_id = self.provider.get_chain_id().await?;
109-
let tenant_id = tenant_id(db_pool, chain_id).await?;
110-
let tenant_info = TenantInfo {
111-
tenant_id,
112-
chain_id,
113-
};
103+
// TODO: make CHAIN_ID a command line parameter
104+
let host_chain_id = std::env::var("CHAIN_ID")?.parse::<i64>()? as ChainId;
114105

115106
// We call `from_block` here, but we expect that most nodes will not honour it. That doesn't lead to issues as described below.
116107
//
@@ -163,7 +154,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
163154
return Err(anyhow::anyhow!("Block stream closed"));
164155
};
165156
let (request, _log) = item?;
166-
self.activate_key(db_pool, request, &self.aws_s3_client, &tenant_info).await?;
157+
self.activate_key(db_pool, request, &self.aws_s3_client, host_chain_id).await?;
167158
info!("ActivateKey event successful");
168159
}
169160
item = activate_crs.next() => {
@@ -172,7 +163,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
172163
return Err(anyhow::anyhow!("Block stream closed"));
173164
};
174165
let (request, _log) = item?;
175-
self.activate_crs(db_pool, request, &self.aws_s3_client, &tenant_info).await?;
166+
self.activate_crs(db_pool, request, &self.aws_s3_client, host_chain_id).await?;
176167
info!("ActivateCrs event successful");
177168
}
178169
}
@@ -218,7 +209,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
218209
db_pool: &Pool<Postgres>,
219210
request: KMSManagement::ActivateKey,
220211
s3_client: &A,
221-
tenant_info: &TenantInfo,
212+
host_chain_id: ChainId,
222213
) -> anyhow::Result<()> {
223214
let key_id: KeyId = request.keyId;
224215
let s3_bucket_urls = request.kmsNodeStorageUrls;
@@ -255,12 +246,12 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
255246
}
256247
keys_bytes.push(bytes);
257248
}
258-
let Some(tenant_id) = tenant_info.tenant_id else {
249+
let Some(tenant_id) = tenant_id(db_pool, host_chain_id).await? else {
259250
error!(
260-
chain_id = tenant_info.chain_id,
251+
host_chain_id,
261252
"No tenant found for chain id, stopping"
262253
);
263-
anyhow::bail!("No tenant found for chain id {}", tenant_info.chain_id);
254+
anyhow::bail!("No tenant found for chain id {}", host_chain_id);
264255
};
265256
let key_id = key_id_to_bytes(key_id);
266257
let mut tx = db_pool.begin().await?;
@@ -276,7 +267,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
276267
&key_bytes,
277268
reduced_key_bytes,
278269
tenant_id,
279-
tenant_info.chain_id,
270+
host_chain_id,
280271
)
281272
.await?;
282273
}
@@ -289,7 +280,7 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
289280
db_pool: &Pool<Postgres>,
290281
request: KMSManagement::ActivateCrs,
291282
s3_client: &A,
292-
tenant_info: &TenantInfo,
283+
host_chain_id: ChainId,
293284
) -> anyhow::Result<()> {
294285
let crs_id: KeyId = request.crsId;
295286
let s3_bucket_urls = request.kmsNodeStorageUrls;
@@ -312,15 +303,15 @@ impl<P: Provider<Ethereum> + Clone + 'static, A: AwsS3Interface> GatewayListener
312303
error!(download_digest = ?download_digest, expected_digest = ?expected_digest, "Key digest mismatch, stopping");
313304
anyhow::bail!("Invalid Key digest for key id:{crs_id}");
314305
}
315-
let Some(tenant_id) = tenant_info.tenant_id else {
306+
let Some(tenant_id) = tenant_id(db_pool, host_chain_id).await? else {
316307
error!(
317-
chain_id = tenant_info.chain_id,
308+
host_chain_id,
318309
"No tenant found for chain id, stopping"
319310
);
320-
anyhow::bail!("No tenant found for chain id {}", tenant_info.chain_id);
311+
anyhow::bail!("No tenant found for chain id {}", host_chain_id);
321312
};
322313
let mut tx = db_pool.begin().await?;
323-
update_tenant_crs(&mut tx, &bytes, tenant_id, tenant_info.chain_id).await?;
314+
update_tenant_crs(&mut tx, &bytes, tenant_id, host_chain_id).await?;
324315
tx.commit().await?;
325316
Ok(())
326317
}

0 commit comments

Comments
 (0)