Skip to content

Commit dafe79e

Browse files
authored
refactor: Move duplicated S3 source and target structs into a single struct (#56)
1 parent 0b20750 commit dafe79e

8 files changed

Lines changed: 126 additions & 135 deletions

File tree

crates/data-generation/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ pub mod dataset;
1919
pub mod generator;
2020
pub mod metrics;
2121
pub mod source;
22+
pub mod storage;
2223
pub mod target;

crates/data-generation/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use data_generation::dataset;
2525
use data_generation::dataset::tpch::TpchDataset;
2626
use data_generation::generator::DataGenerator;
2727
use data_generation::metrics::{IngestResult, Metrics};
28-
use data_generation::target::s3::S3Target;
28+
use data_generation::storage::s3::S3Storage;
2929

3030
fn print_summary(result: &IngestResult) {
3131
println!(" Duration: {:?}", result.elapsed);
@@ -69,7 +69,7 @@ fn build(args: &CommonArgs) -> anyhow::Result<DataGenerator> {
6969
other => anyhow::bail!("Unknown dataset type: {other}. Supported: tpch"),
7070
};
7171

72-
let target = Arc::new(S3Target::new(&target_config)?);
72+
let target = Arc::new(S3Storage::new(&target_config)?);
7373
let metrics = Metrics::new();
7474

7575
let ingestor = DataGenerator::new(

crates/data-generation/src/source/s3.rs

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,62 +14,18 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::sync::Arc;
18-
1917
use async_trait::async_trait;
2018
use futures::TryStreamExt;
21-
use object_store::ObjectStore;
22-
use object_store::aws::AmazonS3Builder;
23-
use object_store::path::Path as ObjectPath;
2419
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
2520

26-
use crate::config::TargetConfig;
21+
use crate::storage::s3::S3Storage;
2722

2823
use super::{ReadResult, Source};
2924

30-
/// Reads Parquet data from S3 (or S3-compatible storage).
31-
#[derive(Clone)]
32-
pub struct S3Source {
33-
store: Arc<dyn ObjectStore>,
34-
prefix: String,
35-
}
36-
37-
impl S3Source {
38-
/// Create a new [`S3Source`] from the same config used for [`S3Target`].
39-
///
40-
/// The source and target typically share the same bucket/prefix, so they
41-
/// reuse [`TargetConfig`] to avoid duplicating configuration structs.
42-
pub fn new(config: &TargetConfig) -> anyhow::Result<Self> {
43-
let mut builder = AmazonS3Builder::from_env().with_bucket_name(&config.bucket);
44-
45-
if let Some(region) = &config.region {
46-
builder = builder.with_region(region);
47-
}
48-
if let Some(endpoint) = &config.endpoint
49-
&& !endpoint.is_empty()
50-
{
51-
builder = builder.with_endpoint(endpoint);
52-
if endpoint.starts_with("http://") {
53-
builder = builder.with_allow_http(true);
54-
}
55-
}
56-
57-
let store = Arc::new(builder.build()?);
58-
Ok(Self {
59-
store,
60-
prefix: config.prefix.clone(),
61-
})
62-
}
63-
}
64-
6525
#[async_trait]
66-
impl Source for S3Source {
26+
impl Source for S3Storage {
6727
async fn list_batches(&self, table_name: &str) -> anyhow::Result<Vec<String>> {
68-
let prefix = if self.prefix.is_empty() {
69-
ObjectPath::from(format!("{table_name}/"))
70-
} else {
71-
ObjectPath::from(format!("{}/{table_name}/", self.prefix))
72-
};
28+
let prefix = self.table_object_prefix(table_name);
7329

7430
let objects: Vec<_> = self.store.list(Some(&prefix)).try_collect().await?;
7531

@@ -87,14 +43,7 @@ impl Source for S3Source {
8743
table_name: &str,
8844
batch_id: u64,
8945
) -> anyhow::Result<Option<ReadResult>> {
90-
let location = if self.prefix.is_empty() {
91-
ObjectPath::from(format!("{table_name}/batch-{batch_id:06}.parquet"))
92-
} else {
93-
ObjectPath::from(format!(
94-
"{}/{table_name}/batch-{batch_id:06}.parquet",
95-
self.prefix
96-
))
97-
};
46+
let location = self.batch_object_path(table_name, batch_id);
9847

9948
let get_result = match self.store.get(&location).await {
10049
Ok(r) => r,
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
pub mod s3;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::sync::Arc;
18+
19+
use object_store::ObjectStore;
20+
use object_store::aws::AmazonS3Builder;
21+
use object_store::path::Path as ObjectPath;
22+
23+
use crate::config::TargetConfig;
24+
25+
/// Unified S3 storage backend that implements both [`Source`] and [`Target`].
26+
///
27+
/// The trait implementations live in their respective modules
28+
/// (`source/s3.rs` and `target/s3.rs`) to keep concerns separated.
29+
#[derive(Clone)]
30+
pub struct S3Storage {
31+
pub(crate) store: Arc<dyn ObjectStore>,
32+
pub(crate) bucket: String,
33+
pub(crate) prefix: String,
34+
pub(crate) region: Option<String>,
35+
}
36+
37+
impl S3Storage {
38+
pub fn new(config: &TargetConfig) -> anyhow::Result<Self> {
39+
let mut builder = AmazonS3Builder::from_env().with_bucket_name(&config.bucket);
40+
41+
if let Some(region) = &config.region {
42+
tracing::info!("S3 storage with region: {region}");
43+
builder = builder.with_region(region);
44+
}
45+
if let Some(endpoint) = &config.endpoint
46+
&& !endpoint.is_empty()
47+
{
48+
builder = builder.with_endpoint(endpoint);
49+
if endpoint.starts_with("http://") {
50+
builder = builder.with_allow_http(true);
51+
}
52+
}
53+
54+
let store = Arc::new(builder.build()?);
55+
Ok(Self {
56+
store,
57+
bucket: config.bucket.clone(),
58+
prefix: config.prefix.clone(),
59+
region: config.region.clone(),
60+
})
61+
}
62+
63+
/// Returns the S3 URI for a given table name (e.g. `s3://bucket/prefix/customer/`).
64+
pub fn table_s3_path(&self, table_name: &str) -> String {
65+
if self.prefix.is_empty() {
66+
format!("s3://{}/{table_name}/", self.bucket)
67+
} else {
68+
format!("s3://{}/{}/{table_name}/", self.bucket, self.prefix)
69+
}
70+
}
71+
72+
/// Returns the [`ObjectPath`] for a batch file within a table directory.
73+
pub(crate) fn batch_object_path(&self, table_name: &str, batch_id: u64) -> ObjectPath {
74+
if self.prefix.is_empty() {
75+
ObjectPath::from(format!("{table_name}/batch-{batch_id:06}.parquet"))
76+
} else {
77+
ObjectPath::from(format!(
78+
"{}/{table_name}/batch-{batch_id:06}.parquet",
79+
self.prefix
80+
))
81+
}
82+
}
83+
84+
/// Returns the [`ObjectPath`] prefix for listing objects in a table directory.
85+
pub(crate) fn table_object_prefix(&self, table_name: &str) -> ObjectPath {
86+
if self.prefix.is_empty() {
87+
ObjectPath::from(format!("{table_name}/"))
88+
} else {
89+
ObjectPath::from(format!("{}/{table_name}/", self.prefix))
90+
}
91+
}
92+
}

crates/data-generation/src/target/s3.rs

Lines changed: 4 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,71 +15,20 @@ limitations under the License.
1515
*/
1616

1717
use std::collections::HashMap;
18-
use std::sync::Arc;
1918

2019
use arrow::array::RecordBatch;
2120
use async_trait::async_trait;
22-
use object_store::aws::AmazonS3Builder;
23-
use object_store::path::Path as ObjectPath;
24-
use object_store::{ObjectStore, PutPayload};
21+
use object_store::PutPayload;
2522
use parquet::arrow::ArrowWriter;
2623
use parquet::basic::Compression;
2724
use parquet::file::properties::WriterProperties;
2825

29-
use crate::config::TargetConfig;
26+
use crate::storage::s3::S3Storage;
3027

3128
use super::{Target, WriteResult};
3229

33-
#[derive(Clone)]
34-
pub struct S3Target {
35-
store: Arc<dyn ObjectStore>,
36-
bucket: String,
37-
prefix: String,
38-
table_format: String,
39-
executor_instance_type: String,
40-
region: Option<String>,
41-
}
42-
43-
impl S3Target {
44-
pub fn new(config: &TargetConfig) -> anyhow::Result<Self> {
45-
let mut builder = AmazonS3Builder::from_env().with_bucket_name(&config.bucket);
46-
47-
if let Some(region) = &config.region {
48-
tracing::info!("S3 Target with region: {region}");
49-
builder = builder.with_region(region);
50-
}
51-
if let Some(endpoint) = &config.endpoint
52-
&& !endpoint.is_empty()
53-
{
54-
builder = builder.with_endpoint(endpoint);
55-
if endpoint.starts_with("http://") {
56-
builder = builder.with_allow_http(true);
57-
}
58-
}
59-
60-
let store = Arc::new(builder.build()?);
61-
Ok(Self {
62-
store,
63-
bucket: config.bucket.clone(),
64-
prefix: config.prefix.clone(),
65-
table_format: config.table_format.to_string(),
66-
executor_instance_type: config.executor_instance_type.clone(),
67-
region: config.region.clone(),
68-
})
69-
}
70-
71-
/// Returns the S3 URI for a given table name (e.g. `s3://bucket/prefix/customer/`).
72-
pub fn table_s3_path(&self, table_name: &str) -> String {
73-
if self.prefix.is_empty() {
74-
format!("s3://{}/{table_name}/", self.bucket)
75-
} else {
76-
format!("s3://{}/{}/{table_name}/", self.bucket, self.prefix)
77-
}
78-
}
79-
}
80-
8130
#[async_trait]
82-
impl Target for S3Target {
31+
impl Target for S3Storage {
8332
fn expected_files(&self, table_name: &str, batch_ids: &[u64]) -> Vec<String> {
8433
batch_ids
8534
.iter()
@@ -110,14 +59,6 @@ impl Target for S3Target {
11059
"file_format".to_string(),
11160
serde_json::Value::String("parquet".to_string()),
11261
);
113-
params.insert(
114-
"table_format".to_string(),
115-
serde_json::Value::String(self.table_format.clone()),
116-
);
117-
params.insert(
118-
"executor_instance_type".to_string(),
119-
serde_json::Value::String(self.executor_instance_type.clone()),
120-
);
12162

12263
if let Some(region) = &self.region {
12364
params.insert(
@@ -151,14 +92,7 @@ impl Target for S3Target {
15192
let bytes_written = buf.len() as u64;
15293

15394
// Upload to S3 with per-table directory structure
154-
let path = if self.prefix.is_empty() {
155-
ObjectPath::from(format!("{table_name}/batch-{batch_id:06}.parquet"))
156-
} else {
157-
ObjectPath::from(format!(
158-
"{}/{table_name}/batch-{batch_id:06}.parquet",
159-
self.prefix
160-
))
161-
};
95+
let path = self.batch_object_path(table_name, batch_id);
16296

16397
self.store.put(&path, PutPayload::from(buf)).await?;
16498

crates/etl/src/main.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ use std::sync::Arc;
1818

1919
use clap::Parser;
2020
use data_generation::config::{DatasetConfig, TableFormat, TargetConfig};
21-
use data_generation::source::s3::S3Source;
22-
use data_generation::target::s3::S3Target;
21+
use data_generation::storage::s3::S3Storage;
2322
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
2423
use tracing_subscriber::EnvFilter;
2524

@@ -127,8 +126,8 @@ async fn main() -> anyhow::Result<()> {
127126
let dataset_source = cli.dataset_source()?;
128127
let dataset_config = cli.dataset_config();
129128

130-
let source = Arc::new(S3Source::new(&cli.source_config())?);
131-
let target = Arc::new(S3Target::new(&cli.target_config())?);
129+
let source = Arc::new(S3Storage::new(&cli.source_config())?);
130+
let target = Arc::new(S3Storage::new(&cli.target_config())?);
132131

133132
let mut pipeline = ETLPipeline::new(dataset_source, &dataset_config, source, target)?;
134133

src/main.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ use std::sync::Arc;
1919
use adbc_client::AdbcConnection;
2020
use clap::Parser;
2121
use data_generation::config::{DatasetConfig as GenerationDatasetConfig, TargetConfig};
22-
use data_generation::source::s3::S3Source;
23-
use data_generation::target::s3::S3Target;
22+
use data_generation::storage::s3::S3Storage;
2423
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
2524
use test_framework::{anyhow, rustls};
2625
use tracing::Level;
@@ -101,8 +100,8 @@ async fn main() -> anyhow::Result<()> {
101100
executor_instance_type: cli.common.executor_instance_type.clone(),
102101
};
103102

104-
let source = Arc::new(S3Source::new(&source_config)?);
105-
let target = Arc::new(S3Target::new(&target_config)?);
103+
let source = Arc::new(S3Storage::new(&source_config)?);
104+
let target = Arc::new(S3Storage::new(&target_config)?);
106105

107106
let mut pipeline = ETLPipeline::new(dataset_source, &generation_config, source, target)?;
108107

0 commit comments

Comments
 (0)