Skip to content

Commit 23eeb15

Browse files
committed
fix(coprocessor): wip
1 parent 9f94772 commit 23eeb15

File tree

6 files changed

+96
-37
lines changed

6 files changed

+96
-37
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/gw-listener/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ tokio-util = { workspace = true }
2626
tracing = { workspace = true }
2727
tracing-subscriber = { workspace = true }
2828
tower-http = { workspace = true }
29+
url = "2.5.7"
2930

3031
fhevm-engine-common = { path = "../fhevm-engine-common" }
3132

coprocessor/fhevm-engine/gw-listener/src/aws_s3.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use aws_config::{retry::RetryConfig, timeout::TimeoutConfig, BehaviorVersion};
44
use aws_sdk_s3::{config::Builder, Client};
55
use tokio_util::bytes;
6-
use tracing::{error, info};
6+
use tracing::{error, info, warn};
77

88
#[derive(Clone, Debug, Default)]
99
pub struct S3Policy {
@@ -26,7 +26,7 @@ impl S3Policy {
2626
};
2727
}
2828

29-
pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
29+
pub async fn create_s3_client(retry_policy: &S3Policy, url: &str) -> aws_sdk_s3::Client {
3030
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
3131
let timeout_config = TimeoutConfig::builder()
3232
.connect_timeout(retry_policy.connect_timeout)
@@ -40,43 +40,65 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
4040
let config = Builder::from(&sdk_config)
4141
.timeout_config(timeout_config)
4242
.retry_config(retry_config)
43+
.endpoint_url(url)
4344
.build();
4445

4546
Client::from_conf(config)
4647
}
4748

48-
pub async fn default_aws_s3_client() -> AwsS3Client {
49-
let s3_client = create_s3_client(&S3Policy::DEFAULT).await;
50-
AwsS3Client { s3_client }
51-
}
52-
53-
// Let's wrap Aws Client to have an interface for it so we can mock it.
49+
// Let's wrap Aws access to have an interface for it so we can mock it.
5450
#[derive(Clone)]
55-
pub struct AwsS3Client {
56-
pub s3_client: Client,
57-
}
51+
pub struct AwsS3Client {}
5852

5953
impl AwsS3Interface for AwsS3Client {
60-
async fn get_bucket_key(&self, bucket: &str, key: &str) -> anyhow::Result<bytes::Bytes> {
61-
let result = self
62-
.s3_client
54+
async fn get_bucket_key(
55+
&self,
56+
url: &str,
57+
bucket: &str,
58+
key: &str,
59+
) -> anyhow::Result<bytes::Bytes> {
60+
// let parsed_url_and_bucket = url::Url::parse(_url_and_bucket)?;
61+
// let url = parsed_url_and_bucket.
62+
Ok(create_s3_client(&S3Policy::DEFAULT, url)
63+
.await
6364
.get_object()
6465
.bucket(bucket)
6566
.key(key)
6667
.send()
67-
.await?;
68-
Ok(result.body.collect().await?.into_bytes())
68+
.await?
69+
.body
70+
.collect()
71+
.await?
72+
.into_bytes())
6973
}
7074
}
7175

7276
pub trait AwsS3Interface {
7377
fn get_bucket_key(
7478
&self,
79+
url: &str,
7580
bucket: &str,
7681
key: &str,
7782
) -> impl std::future::Future<Output = anyhow::Result<bytes::Bytes>>;
7883
}
7984

85+
fn split_url(s3_bucket_url: &String) -> anyhow::Result<(String, String)> {
86+
let parsed_url_and_bucket = url::Url::parse(s3_bucket_url)?;
87+
let bucket = parsed_url_and_bucket.path();
88+
let scheme = parsed_url_and_bucket.scheme();
89+
let host = s3_bucket_url.replace(bucket, "").trim_end_matches('/').to_owned();
90+
let host = if host.contains("minio:9000") {
91+
// TODO: replace by docker configuration
92+
warn!(s3_bucket_url, "Using localhost for minio access");
93+
host.replace("minio:9000", "172.17.0.1:9000")
94+
} else {
95+
host.to_owned()
96+
};
97+
let url = format!("{scheme}://{host}",);
98+
info!(s3_bucket_url, url, bucket, "Parsed S3 url");
99+
Ok((url.to_owned(), bucket.to_owned()))
100+
}
101+
80102
pub async fn download_key_from_s3<A: AwsS3Interface>(
81103
s3_client: &A,
82104
s3_bucket_urls: &[String],
@@ -92,7 +114,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92114
key_path,
93115
s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94116
);
95-
let result = s3_client.get_bucket_key(s3_bucket_url, &key_path).await;
117+
let Ok((url, bucket)) = split_url(s3_bucket_url) else {
118+
error!(s3_bucket_url, "Failed to parse S3 url");
119+
continue;
120+
};
121+
let result = s3_client.get_bucket_key(&url, &bucket, &key_path).await;
96122
let Ok(result) = result else {
97123
error!(s3_bucket_url, key_path, result = ?result, "Downloading failed");
98124
continue;

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33
use alloy::providers::{ProviderBuilder, WsConnect};
44
use alloy::{primitives::Address, transports::http::reqwest::Url};
55
use clap::Parser;
6-
use gw_listener::aws_s3::default_aws_s3_client;
6+
use gw_listener::aws_s3::AwsS3Client;
77
use gw_listener::gw_listener::GatewayListener;
88
use gw_listener::http_server::HttpServer;
99
use gw_listener::ConfigSettings;
@@ -116,7 +116,7 @@ async fn main() -> anyhow::Result<()> {
116116
}
117117
};
118118

119-
let aws_s3_client = default_aws_s3_client().await;
119+
let aws_s3_client = AwsS3Client {};
120120

121121
let cancel_token = CancellationToken::new();
122122

coprocessor/fhevm-engine/gw-listener/tests/gw_listener_tests.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@ use alloy::{
99
sol,
1010
};
1111

12-
use aws_sdk_s3::operation::get_object::GetObjectError;
13-
use gw_listener::gw_listener::GatewayListener;
14-
use gw_listener::ConfigSettings;
12+
use aws_sdk_s3::{operation::get_object::GetObjectError, Client};
1513
use gw_listener::{
16-
aws_s3::{default_aws_s3_client, AwsS3Client},
17-
gw_listener::{key_id_to_key_bucket, to_bucket_key_prefix},
14+
aws_s3::{AwsS3Client, AwsS3Interface},
15+
gw_listener::{key_id_to_key_bucket, to_bucket_key_prefix, GatewayListener},
16+
ConfigSettings,
1817
};
1918
use serial_test::serial;
2019
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
2120
use test_harness::instance::ImportMode;
2221
use tokio::time::sleep;
22+
use tokio_util::bytes;
2323
use tokio_util::sync::CancellationToken;
2424
use tracing::Level;
2525

@@ -105,7 +105,7 @@ async fn verify_proof_request_inserted_into_db() -> anyhow::Result<()> {
105105
.wallet(env.wallet)
106106
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
107107
.await?;
108-
let aws_s3_client = default_aws_s3_client().await;
108+
let aws_s3_client = AwsS3Client {};
109109
let input_verification = InputVerification::deploy(&provider).await?;
110110
let kms_management = KMSManagement::deploy(&provider).await?;
111111
let gw_listener = GatewayListener::new(
@@ -213,6 +213,30 @@ async fn has_crs(db_pool: &Pool<Postgres>) -> anyhow::Result<bool> {
213213
Ok(false)
214214
}
215215

216+
#[derive(Clone)]
217+
pub struct AwsS3ClientMocked(Client);
218+
219+
impl AwsS3Interface for AwsS3ClientMocked {
220+
async fn get_bucket_key(
221+
&self,
222+
url: &str,
223+
bucket: &str,
224+
key: &str,
225+
) -> anyhow::Result<bytes::Bytes> {
226+
Ok(self
227+
.0
228+
.get_object()
229+
.bucket(bucket)
230+
.key(key)
231+
.send()
232+
.await?
233+
.body
234+
.collect()
235+
.await?
236+
.into_bytes())
237+
}
238+
}
239+
216240
// test bad bucket
217241
// test bad key
218242
#[tokio::test]
@@ -233,10 +257,10 @@ async fn keygen_ok() -> anyhow::Result<()> {
233257

234258
// see ../contracts/KMSManagement.sol
235259
let buckets = [
236-
"https://s3.amazonaws.com/test-bucket1/PUB-P1",
237-
"https://s3.amazonaws.com/test-bucket2/PUB-P2",
238-
"https://s3.amazonaws.com/test-bucket3/PUB-P3",
239-
"https://s3.amazonaws.com/test-bucket4/PUB-P4",
260+
"/test-bucket1/PUB-P1",
261+
"/test-bucket2/PUB-P2",
262+
"/test-bucket3/PUB-P3",
263+
"/test-bucket4/PUB-P4",
240264
];
241265

242266
let keys_digests = [KeyType::PublicKey, KeyType::ServerKey];
@@ -283,7 +307,7 @@ async fn keygen_ok() -> anyhow::Result<()> {
283307
.wallet(env.wallet)
284308
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
285309
.await?;
286-
let aws_s3_client = AwsS3Client { s3_client: s3 };
310+
let aws_s3_client = AwsS3ClientMocked(s3);
287311
let input_verification = InputVerification::deploy(&provider).await?;
288312
let kms_management = KMSManagement::deploy(&provider).await?;
289313
let gw_listener = GatewayListener::new(
@@ -383,7 +407,7 @@ async fn keygen_compromised_key() -> anyhow::Result<()> {
383407
.wallet(env.wallet)
384408
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
385409
.await?;
386-
let aws_s3_client = AwsS3Client { s3_client: s3 };
410+
let aws_s3_client = AwsS3ClientMocked(s3);
387411
let input_verification = InputVerification::deploy(&provider).await?;
388412
let kms_management = KMSManagement::deploy(&provider).await?;
389413
let gw_listener = GatewayListener::new(
@@ -489,7 +513,7 @@ async fn keygen_bad_key_or_bucket() -> anyhow::Result<()> {
489513
.wallet(env.wallet)
490514
.connect_ws(WsConnect::new(env.anvil.ws_endpoint_url()))
491515
.await?;
492-
let aws_s3_client = AwsS3Client { s3_client: s3 };
516+
let aws_s3_client = AwsS3ClientMocked(s3);
493517
let input_verification = InputVerification::deploy(&provider).await?;
494518
let kms_management = KMSManagement::deploy(&provider).await?;
495519
let gw_listener = GatewayListener::new(

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,10 @@ impl SwitchNSquashService {
154154
}
155155
}
156156

157-
async fn get_keyset(pool: PgPool,
157+
async fn get_keyset(
158+
pool: PgPool,
158159
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
159-
tenant_api_key: &String
160+
tenant_api_key: &String,
160161
) -> Result<Option<KeySet>, ExecutionError> {
161162
let t = telemetry::tracer("worker_loop_init");
162163
let s = t.child_span("fetch_keyset");
@@ -191,7 +192,10 @@ pub(crate) async fn run_loop(
191192
Some(keys)
192193
}
193194
None => {
194-
warn!(tenant_api_key = tenant_api_key, "No keys found for the given tenant_api_key");
195+
warn!(
196+
tenant_api_key = tenant_api_key,
197+
"No keys found for the given tenant_api_key"
198+
);
195199
None
196200
}
197201
};
@@ -205,7 +209,10 @@ pub(crate) async fn run_loop(
205209
update_last_active(last_active_at.clone()).await;
206210

207211
let Some(keys) = keys.as_ref() else {
208-
warn!(tenant_api_key = tenant_api_key, "No keys available, retrying in 5 seconds");
212+
warn!(
213+
tenant_api_key = tenant_api_key,
214+
"No keys available, retrying in 5 seconds"
215+
);
209216
tokio::time::sleep(Duration::from_secs(5)).await;
210217
if token.is_cancelled() {
211218
return Ok(());
@@ -214,7 +221,7 @@ pub(crate) async fn run_loop(
214221
continue;
215222
};
216223

217-
let maybe_remaining = fetch_and_execute_sns_tasks(&pool, &tx, &keys, &conf, &token).await?;
224+
let maybe_remaining = fetch_and_execute_sns_tasks(&pool, &tx, keys, &conf, &token).await?;
218225
if maybe_remaining {
219226
if token.is_cancelled() {
220227
return Ok(());

0 commit comments

Comments
 (0)