Skip to content

Commit 6405852

Browse files
committed
fix(coprocessor): wip
1 parent 9f94772 commit 6405852

File tree

6 files changed

+74
-33
lines changed

6 files changed

+74
-33
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/gw-listener/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ tokio-util = { workspace = true }
2626
tracing = { workspace = true }
2727
tracing-subscriber = { workspace = true }
2828
tower-http = { workspace = true }
29+
url = "2.5.7"
2930

3031
fhevm-engine-common = { path = "../fhevm-engine-common" }
3132

coprocessor/fhevm-engine/gw-listener/src/aws_s3.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use aws_config::{retry::RetryConfig, timeout::TimeoutConfig, BehaviorVersion};
44
use aws_sdk_s3::{config::Builder, Client};
55
use tokio_util::bytes;
6-
use tracing::{error, info};
6+
use tracing::{error, info, warn};
77

88
#[derive(Clone, Debug, Default)]
99
pub struct S3Policy {
@@ -26,7 +26,7 @@ impl S3Policy {
2626
};
2727
}
2828

29-
pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
29+
pub async fn create_s3_client(retry_policy: &S3Policy, url: &str) -> aws_sdk_s3::Client {
3030
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
3131
let timeout_config = TimeoutConfig::builder()
3232
.connect_timeout(retry_policy.connect_timeout)
@@ -40,43 +40,59 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
4040
let config = Builder::from(&sdk_config)
4141
.timeout_config(timeout_config)
4242
.retry_config(retry_config)
43+
.endpoint_url(url)
4344
.build();
4445

4546
Client::from_conf(config)
4647
}
4748

48-
pub async fn default_aws_s3_client() -> AwsS3Client {
49-
let s3_client = create_s3_client(&S3Policy::DEFAULT).await;
50-
AwsS3Client { s3_client }
51-
}
52-
53-
// Let's wrap Aws Client to have an interface for it so we can mock it.
49+
// Let's wrap Aws access to have an interface for it so we can mock it.
5450
#[derive(Clone)]
55-
pub struct AwsS3Client {
56-
pub s3_client: Client,
57-
}
51+
pub struct AwsS3Client {}
5852

5953
impl AwsS3Interface for AwsS3Client {
60-
async fn get_bucket_key(&self, bucket: &str, key: &str) -> anyhow::Result<bytes::Bytes> {
61-
let result = self
62-
.s3_client
54+
async fn get_bucket_key(&self, url: &str, bucket: &str, key: &str) -> anyhow::Result<bytes::Bytes> {
55+
// let parsed_url_and_bucket = url::Url::parse(_url_and_bucket)?;
56+
// let url = parsed_url_and_bucket.
57+
Ok(create_s3_client(&S3Policy::DEFAULT, url).await
6358
.get_object()
6459
.bucket(bucket)
6560
.key(key)
6661
.send()
67-
.await?;
68-
Ok(result.body.collect().await?.into_bytes())
62+
.await?
63+
.body.collect().await?
64+
.into_bytes()
65+
)
6966
}
7067
}
7168

7269
pub trait AwsS3Interface {
7370
fn get_bucket_key(
7471
&self,
72+
url: &str,
7573
bucket: &str,
7674
key: &str,
7775
) -> impl std::future::Future<Output = anyhow::Result<bytes::Bytes>>;
7876
}
7977

78+
fn split_url(s3_bucket_url: &String) -> anyhow::Result<(String, String)> {
79+
let parsed_url_and_bucket = url::Url::parse(s3_bucket_url)?;
80+
let bucket = parsed_url_and_bucket.path();
81+
let scheme = parsed_url_and_bucket.scheme();
82+
let host = parsed_url_and_bucket.host_str().ok_or_else(|| anyhow::anyhow!("No host in S3 url"))?;
83+
let host = if host.contains("minio:9000") {
84+
warn!(s3_bucket_url, "Using localhost for minio access");
85+
host.replace("minio:9000", "172.17.0.1:9000")
86+
} else {
87+
host.to_owned()
88+
};
89+
let url = format!(
90+
"{scheme}://{host}",
91+
);
92+
info!(s3_bucket_url, url, bucket, "Parsed S3 url");
93+
Ok((url.to_owned(), bucket.to_owned()))
94+
}
95+
8096
pub async fn download_key_from_s3<A: AwsS3Interface>(
8197
s3_client: &A,
8298
s3_bucket_urls: &[String],
@@ -92,7 +108,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92108
key_path,
93109
s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94110
);
95-
let result = s3_client.get_bucket_key(s3_bucket_url, &key_path).await;
111+
let Ok((url, bucket)) = split_url(s3_bucket_url) else {
112+
error!(s3_bucket_url, "Failed to parse S3 url");
113+
continue;
114+
};
115+
let result = s3_client.get_bucket_key(&url, &bucket, &key_path).await;
96116
let Ok(result) = result else {
97117
error!(s3_bucket_url, key_path, result = ?result, "Downloading failed");
98118
continue;

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::time::Duration;
33
use alloy::providers::{ProviderBuilder, WsConnect};
44
use alloy::{primitives::Address, transports::http::reqwest::Url};
55
use clap::Parser;
6-
use gw_listener::aws_s3::default_aws_s3_client;
7-
use gw_listener::gw_listener::GatewayListener;
6+
use gw_listener::aws_s3::AwsS3Client;
7+
use gw_listener::gw_listener::{GatewayListener};
88
use gw_listener::http_server::HttpServer;
99
use gw_listener::ConfigSettings;
1010
use humantime::parse_duration;
@@ -116,7 +116,7 @@ async fn main() -> anyhow::Result<()> {
116116
}
117117
};
118118

119-
let aws_s3_client = default_aws_s3_client().await;
119+
let aws_s3_client = AwsS3Client {};
120120

121121
let cancel_token = CancellationToken::new();
122122

coprocessor/fhevm-engine/gw-listener/src/gw_listener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use alloy::{
44
eips::BlockNumberOrTag, network::Ethereum, primitives::Address, providers::Provider,
55
rpc::types::Log, sol,
66
};
7+
use aws_sdk_s3::Client;
78
use futures_util::{future::join_all, StreamExt};
89
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
910
use tokio_util::sync::CancellationToken;

coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@ use alloy::{
99
sol,
1010
};
1111

12-
use aws_sdk_s3::operation::get_object::GetObjectError;
13-
use gw_listener::gw_listener::GatewayListener;
14-
use gw_listener::ConfigSettings;
12+
use aws_sdk_s3::{operation::get_object::GetObjectError, Client};
1513
use gw_listener::{
16-
aws_s3::{default_aws_s3_client, AwsS3Client},
17-
gw_listener::{key_id_to_key_bucket, to_bucket_key_prefix},
14+
ConfigSettings,
15+
aws_s3::{AwsS3Client, AwsS3Interface},
16+
gw_listener::{key_id_to_key_bucket, to_bucket_key_prefix, GatewayListener},
1817
};
1918
use serial_test::serial;
2019
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
2120
use test_harness::instance::ImportMode;
2221
use tokio::time::sleep;
22+
use tokio_util::bytes;
2323
use tokio_util::sync::CancellationToken;
2424
use tracing::Level;
2525

@@ -105,7 +105,7 @@ async fn verify_proof_request_inserted_into_db() -> anyhow::Result<()> {
105105
.wallet(env.wallet)
106106
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
107107
.await?;
108-
let aws_s3_client = default_aws_s3_client().await;
108+
let aws_s3_client = AwsS3Client {};
109109
let input_verification = InputVerification::deploy(&provider).await?;
110110
let kms_management = KMSManagement::deploy(&provider).await?;
111111
let gw_listener = GatewayListener::new(
@@ -213,6 +213,24 @@ async fn has_crs(db_pool: &Pool<Postgres>) -> anyhow::Result<bool> {
213213
Ok(false)
214214
}
215215

216+
#[derive(Clone)]
217+
pub struct AwsS3ClientMocked(Client);
218+
219+
impl AwsS3Interface for AwsS3ClientMocked {
220+
async fn get_bucket_key(&self, url: &str, bucket: &str, key: &str) -> anyhow::Result<bytes::Bytes> {
221+
Ok(self.0
222+
.get_object()
223+
.bucket(bucket)
224+
.key(key)
225+
.send()
226+
.await?
227+
.body.collect().await?
228+
.into_bytes()
229+
)
230+
}
231+
}
232+
233+
216234
// test bad bucket
217235
// test bad key
218236
#[tokio::test]
@@ -233,10 +251,10 @@ async fn keygen_ok() -> anyhow::Result<()> {
233251

234252
// see ../contracts/KMSManagement.sol
235253
let buckets = [
236-
"https://s3.amazonaws.com/test-bucket1/PUB-P1",
237-
"https://s3.amazonaws.com/test-bucket2/PUB-P2",
238-
"https://s3.amazonaws.com/test-bucket3/PUB-P3",
239-
"https://s3.amazonaws.com/test-bucket4/PUB-P4",
254+
"/test-bucket1/PUB-P1",
255+
"/test-bucket2/PUB-P2",
256+
"/test-bucket3/PUB-P3",
257+
"/test-bucket4/PUB-P4",
240258
];
241259

242260
let keys_digests = [KeyType::PublicKey, KeyType::ServerKey];
@@ -283,7 +301,7 @@ async fn keygen_ok() -> anyhow::Result<()> {
283301
.wallet(env.wallet)
284302
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
285303
.await?;
286-
let aws_s3_client = AwsS3Client { s3_client: s3 };
304+
let aws_s3_client = AwsS3ClientMocked(s3);
287305
let input_verification = InputVerification::deploy(&provider).await?;
288306
let kms_management = KMSManagement::deploy(&provider).await?;
289307
let gw_listener = GatewayListener::new(
@@ -383,7 +401,7 @@ async fn keygen_compromised_key() -> anyhow::Result<()> {
383401
.wallet(env.wallet)
384402
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
385403
.await?;
386-
let aws_s3_client = AwsS3Client { s3_client: s3 };
404+
let aws_s3_client = AwsS3ClientMocked(s3);
387405
let input_verification = InputVerification::deploy(&provider).await?;
388406
let kms_management = KMSManagement::deploy(&provider).await?;
389407
let gw_listener = GatewayListener::new(
@@ -489,7 +507,7 @@ async fn keygen_bad_key_or_bucket() -> anyhow::Result<()> {
489507
.wallet(env.wallet)
490508
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
491509
.await?;
492-
let aws_s3_client = AwsS3Client { s3_client: s3 };
510+
let aws_s3_client = AwsS3ClientMocked(s3);
493511
let input_verification = InputVerification::deploy(&provider).await?;
494512
let kms_management = KMSManagement::deploy(&provider).await?;
495513
let gw_listener = GatewayListener::new(

0 commit comments

Comments
 (0)