Skip to content

Commit 057da24

Browse files
committed
AWS crate replacement enhancements
1 parent abe265b commit 057da24

File tree

6 files changed

+27
-34
lines changed

6 files changed

+27
-34
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
AWS_ACCESS_KEY_ID=test
22
AWS_SECRET_ACCESS_KEY=test
3+
AWS_REGION=us-west-2
34
S3_ENDPOINT=http://localhost:4566
45
KAFKA_BROKERS=127.0.0.1:9092
56
DATABASE_URL=postgres://star:password@localhost:5642

Cargo.lock

Lines changed: 10 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ calendar-duration = "1.0"
3030
time = { version = "0.3", features = ["parsing", "serde"] }
3131
tokio-util = "0.7"
3232
reqwest = { version = "0.11", features = ["json"] }
33-
aws-config = "1.6"
34-
aws-sdk-s3 = { version = "1.83", features = ["behavior-version-latest"] }
33+
aws-config = { version = "1.6", features = ["behavior-version-latest"] }
34+
aws-sdk-s3 = { version = "1.85", features = ["behavior-version-latest"] }
3535
prometheus-client = "0.22"
3636
sentry = "0.36"
3737
jemallocator = "0.5"

docker-compose.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ networks:
55

66
services:
77
localstack:
8-
image: localstack/localstack@sha256:41fe0bd366e68ad9c29d75b1ed2bc4a0888b0295fee5e4dff366ff475e57c4d8
8+
image: localstack/localstack@sha256:b1b9f264d6c63fcacc7009a0924fd444f20d2149e9874500683d6c37e980dd60
99
ports:
1010
- 4510-4559:4510-4559
1111
- 4566:4566
@@ -14,9 +14,8 @@ services:
1414
- DOCKER_HOST=unix:///var/run/docker.sock
1515
- LOCALSTACK_SERVICES=s3
1616
volumes:
17-
- "/tmp/localstack:/tmp/localstack"
1817
- "/var/run/docker.sock:/var/run/docker.sock"
19-
- "./misc/create_localstack_bucket.sh:/docker-entrypoint-initaws.d/create_bucket.sh"
18+
- "./misc/create_localstack_bucket.sh:/etc/localstack/init/ready.d/create_bucket.sh"
2019
networks:
2120
- star-agg
2221
db:

src/lake.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use aws_sdk_s3::{
2-
config::{http::HttpResponse, Region},
2+
config::http::HttpResponse,
33
error::SdkError,
44
operation::put_object::PutObjectError,
55
primitives::ByteStream,
@@ -27,19 +27,15 @@ pub struct DataLake {
2727

2828
impl DataLake {
2929
pub async fn new() -> Self {
30-
let region = Region::new("us-west-2");
31-
let provider = aws_config::default_provider::credentials::DefaultCredentialsChain::builder()
32-
.region(region.clone())
33-
.build()
34-
.await;
35-
let endpoint = env::var(S3_ENDPOINT_ENV_VAR).unwrap_or_default();
36-
let config = aws_sdk_s3::config::Builder::new()
37-
.region(region)
38-
.credentials_provider(provider)
39-
.endpoint_url(endpoint)
40-
.force_path_style(true)
41-
.build();
42-
let s3 = Client::from_conf(config);
30+
let endpoint = env::var(S3_ENDPOINT_ENV_VAR).ok();
31+
let aws_config = aws_config::from_env().load().await;
32+
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
33+
if let Some(endpoint) = endpoint {
34+
s3_config_builder = s3_config_builder.endpoint_url(endpoint).force_path_style(true);
35+
}
36+
let s3_config = s3_config_builder.build();
37+
38+
let s3 = Client::from_conf(s3_config);
4339

4440
Self {
4541
s3,
@@ -62,7 +58,7 @@ impl DataLake {
6258
.put_object()
6359
.acl(aws_sdk_s3::types::ObjectCannedAcl::BucketOwnerFullControl)
6460
.body(ByteStream::from(contents))
65-
.bucket(self.bucket_name.clone())
61+
.bucket(&self.bucket_name)
6662
.key(full_key)
6763
.send()
6864
.await?;

src/lakesink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
1313

1414
const BATCH_SIZE_ENV_KEY: &str = "LAKE_SINK_BATCH_SIZE";
1515
const BATCH_SIZE_DEFAULT: &str = "1000";
16-
const BATCH_TIMEOUT_SECS: u64 = 45;
16+
const BATCH_TIMEOUT_SECS: u64 = 10;
1717

1818
#[derive(Error, From, Display, Debug)]
1919
#[display(fmt = "Lake sink error: {}")]

0 commit comments

Comments
 (0)