Skip to content

Commit 73e3054

Browse files
authored
fix: Cleanup dead code (#185)
* fix: Cleanup dead code * fix: Delete more dead code * chore: Clippy
1 parent b26c7fc commit 73e3054

25 files changed

Lines changed: 151 additions & 1213 deletions

File tree

.github/workflows/pr.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ jobs:
139139
--scale-factor 0.001 \
140140
--bucket spiceai-public-datasets \
141141
--prefix pr-validation \
142-
--max-concurrency 4 \
143142
--region us-east-1
144143
145144
- name: Install ADBC driver

crates/adbc_client/src/lib.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,25 +306,22 @@ fn is_opaque_numeric(field: &Field) -> bool {
306306
let metadata = field.metadata();
307307

308308
// PostgreSQL: arrow.opaque extension type for numeric
309-
if let Some(ext_name) = metadata.get("ARROW:extension:name") {
310-
if ext_name == "arrow.opaque" {
311-
if let Some(ext_meta) = metadata.get("ARROW:extension:metadata") {
312-
if serde_json::from_str::<serde_json::Value>(ext_meta)
313-
.ok()
314-
.and_then(|v| v.get("type_name")?.as_str().map(|s| s == "numeric"))
315-
.unwrap_or(false)
316-
{
317-
return true;
318-
}
319-
}
320-
}
309+
if let Some(ext_name) = metadata.get("ARROW:extension:name")
310+
&& ext_name == "arrow.opaque"
311+
&& let Some(ext_meta) = metadata.get("ARROW:extension:metadata")
312+
&& serde_json::from_str::<serde_json::Value>(ext_meta)
313+
.ok()
314+
.and_then(|v| v.get("type_name")?.as_str().map(|s| s == "numeric"))
315+
.unwrap_or(false)
316+
{
317+
return true;
321318
}
322319

323320
// Databricks Spark: decimal type serialised as Utf8 with Spark metadata
324-
if let Some(sql_name) = metadata.get("Spark:DataType:SqlName") {
325-
if sql_name.starts_with("DECIMAL(") {
326-
return true;
327-
}
321+
if let Some(sql_name) = metadata.get("Spark:DataType:SqlName")
322+
&& sql_name.starts_with("DECIMAL(")
323+
{
324+
return true;
328325
}
329326

330327
false

crates/checkpointer/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ pub struct ScenarioCheckpoint {
7272
/// S3‑backed store for uploading and downloading checkpoint artefacts.
7373
pub struct CheckpointStore {
7474
store: Arc<dyn ObjectStore>,
75-
#[allow(dead_code)]
76-
bucket: String,
7775
prefix: String,
7876
}
7977

@@ -105,7 +103,6 @@ impl CheckpointStore {
105103
let store = Arc::new(builder.build()?);
106104
Ok(Self {
107105
store,
108-
bucket: bucket.to_owned(),
109106
prefix: prefix.to_owned(),
110107
})
111108
}

crates/data-generation/src/config.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,6 @@ pub struct CommonArgs {
6767
/// S3 endpoint URL (for MinIO/LocalStack)
6868
#[arg(long)]
6969
pub endpoint: Option<String>,
70-
71-
/// Maximum number of concurrent S3 writes (legacy, unused with file storage)
72-
#[arg(long, default_value_t = 16)]
73-
pub max_concurrency: usize,
7470
}
7571

7672
pub struct DatasetConfig {
@@ -92,10 +88,6 @@ pub struct TargetConfig {
9288
pub partition_columns: Vec<String>,
9389
}
9490

95-
pub struct IngestorConfig {
96-
pub max_concurrency: usize,
97-
}
98-
9991
impl CommonArgs {
10092
pub fn dataset_config(&self) -> DatasetConfig {
10193
DatasetConfig {
@@ -133,12 +125,6 @@ impl CommonArgs {
133125
partition_columns: vec![],
134126
})
135127
}
136-
137-
pub fn ingestor_config(&self) -> IngestorConfig {
138-
IngestorConfig {
139-
max_concurrency: self.max_concurrency,
140-
}
141-
}
142128
}
143129

144130
/// Builds the versioned storage prefix: `{prefix}/{scenario}/{version}`.

crates/data-generation/src/generator.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::time::Instant;
2020

2121
use tokio::task::JoinSet;
2222

23-
use super::config::IngestorConfig;
2423
use super::dataset::Dataset;
2524
use super::metrics::{IngestResult, Metrics};
2625
use super::storage::DataStorage;
@@ -51,7 +50,6 @@ impl DataGenerator {
5150
pub fn new(
5251
dataset: Arc<dyn Dataset>,
5352
target: Arc<dyn DataStorage>,
54-
_config: &IngestorConfig,
5553
metrics: Metrics,
5654
version_config: VersionConfig,
5755
) -> Self {
@@ -88,12 +86,13 @@ impl DataGenerator {
8886

8987
// For each table, spawn a single task that generates and writes inline.
9088
let mut join_set = JoinSet::new();
91-
for table_name in self.dataset.tables().keys().cloned() {
89+
for table_name in self.dataset.tables().keys() {
9290
let dataset = Arc::clone(&self.dataset);
9391
let target = self.target.clone();
9492
let metrics = self.metrics.clone();
9593
let written_ids = Arc::clone(&written_batches);
9694

95+
let table_name = table_name.clone();
9796
join_set.spawn(async move {
9897
let mut batch_id: u64 = 0;
9998
loop {
@@ -453,7 +452,6 @@ mod tests {
453452
let generator = DataGenerator::new(
454453
dataset,
455454
target,
456-
&IngestorConfig { max_concurrency: 4 },
457455
Metrics::new(),
458456
VersionConfig {
459457
scenario: "test".to_string(),

crates/data-generation/src/main.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,11 @@ fn print_summary(result: &IngestResult) {
5555

5656
fn build(args: &CommonArgs, file_storage: Arc<FileStorage>) -> anyhow::Result<DataGenerator> {
5757
let dataset_config = args.dataset_config();
58-
let ingestor_config = args.ingestor_config();
5958
let version = format_scale_factor(args.scale_factor);
6059

6160
tracing::info!(
6261
dataset_type = dataset_config.dataset_type,
6362
num_steps = dataset_config.num_steps,
64-
max_concurrency = ingestor_config.max_concurrency,
6563
version = %version,
6664
scenario = %args.scenario,
6765
scale_factor = args.scale_factor,
@@ -88,7 +86,6 @@ fn build(args: &CommonArgs, file_storage: Arc<FileStorage>) -> anyhow::Result<Da
8886
let ingestor = DataGenerator::new(
8987
dataset,
9088
file_storage as Arc<dyn DataStorage>,
91-
&ingestor_config,
9289
metrics,
9390
version_config,
9491
);

crates/data-generation/src/storage/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl DataStorage for FileStorage {
103103
for entry in std::fs::read_dir(&dir)? {
104104
let entry = entry?;
105105
let path = entry.path();
106-
if path.extension().map_or(false, |ext| ext == "parquet") {
106+
if path.extension().is_some_and(|ext| ext == "parquet") {
107107
paths.push(path.display().to_string());
108108
}
109109
}

crates/etl/README.md

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
`etl` reads a generated archive, rehydrates records, and writes to either:
44

5-
- S3 as hive-partitioned Parquet (default)
6-
- an ADBC target via bulk ingest
5+
- an ADBC target via bulk ingest (default)
76
- a null sink that discards writes for throughput benchmarking
87

98
Dataset configuration is read from the extracted `version.json` metadata written by `data-generation`.
@@ -17,25 +16,6 @@ Provide one of these source modes:
1716

1817
The version path is derived automatically from `--scale-factor`, so `--scale-factor 1` reads from the `1.0` version path.
1918

20-
## S3 Hive Sink (default)
21-
22-
Use `--sink s3-hive` to write hive-partitioned Parquet to S3.
23-
24-
- `--target-prefix`: Base S3 key prefix for ETL output. Defaults to the source prefix when empty.
25-
- `--partition-by`: Comma-separated partition columns. Defaults to `__created_at`.
26-
27-
### Example
28-
29-
```bash
30-
cargo run -p etl -- \
31-
--scenario tpch \
32-
--scale-factor 1 \
33-
--bucket peasee-indexes \
34-
--prefix raw \
35-
--target-prefix rehydrated \
36-
--partition-by __created_at
37-
```
38-
3919
## ADBC Sink
4020

4121
Use `--sink adbc` to write via ADBC bulk ingest.

crates/etl/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,6 +1518,7 @@ impl ETLPipeline {
15181518
/// it is consumed. If `step_limit` is `Some(n)`, at most `n` logical steps are
15191519
/// consumed before the function returns [`PipelineState::Paused`]. Unconsumed
15201520
/// steps remain in the shared work state for a subsequent call.
1521+
#[allow(clippy::too_many_arguments)]
15211522
async fn run_pipeline(
15221523
data_storage: Arc<dyn DataStorage>,
15231524
data_sink: Arc<dyn Sink>,

crates/etl/src/main.rs

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use data_generation::storage::s3::S3Storage;
2525
use etl::sink::Sink;
2626
use etl::sink::adbc::AdbcSink;
2727
use etl::sink::null::NullSink;
28-
use etl::sink::s3_hive::S3HiveSink;
2928
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
3029
use tracing_subscriber::EnvFilter;
3130

@@ -35,8 +34,6 @@ const DEFAULT_FLIGHTSQL_MAX_MSG_SIZE_BYTES: &str = "78643200";
3534
#[derive(Clone, Debug, Default, ValueEnum)]
3635
enum SinkType {
3736
#[default]
38-
#[value(name = "s3-hive")]
39-
S3Hive,
4037
#[value(name = "adbc")]
4138
Adbc,
4239
#[value(name = "null")]
@@ -45,7 +42,7 @@ enum SinkType {
4542

4643
#[derive(Parser)]
4744
#[command(
48-
about = "Run an ETL pipeline that reads from a data archive, rehydrates data, and writes to S3 Hive, ADBC, or a null sink"
45+
about = "Run an ETL pipeline that reads from a data archive, rehydrates data, and writes to ADBC or a null sink"
4946
)]
5047
struct Cli {
5148
/// Scenario name (e.g. "tpch") — used in the storage path `{prefix}/{scenario}/{version}/`
@@ -82,23 +79,11 @@ struct Cli {
8279
#[arg(long)]
8380
endpoint: Option<String>,
8481

85-
/// Base S3 key prefix for the ETL target (hive-partitioned output).
86-
/// Defaults to the source prefix if not specified.
87-
#[arg(long, default_value = "")]
88-
target_prefix: String,
89-
90-
/// Ordered list of columns used for hive-style partitioning.
91-
///
92-
/// Example: `--partition-by __created_at,product_type`
93-
#[arg(long, value_delimiter = ',', default_value = "__created_at")]
94-
partition_by: Vec<String>,
95-
9682
/// ETL sink target.
9783
///
98-
/// - s3-hive: write hive-partitioned parquet to S3
9984
/// - adbc: write via ADBC bulk ingest
10085
/// - null: discard all writes (throughput benchmark mode)
101-
#[arg(long, value_enum, default_value_t = SinkType::S3Hive)]
86+
#[arg(long, value_enum, default_value_t = SinkType::Adbc)]
10287
sink: SinkType,
10388

10489
/// ADBC driver name (for example: "databricks" or "flightsql").
@@ -271,55 +256,6 @@ async fn main() -> anyhow::Result<()> {
271256
Some(adbc_sink),
272257
)
273258
}
274-
SinkType::S3Hive => {
275-
if cli.adbc_driver.is_some()
276-
|| cli.adbc_uri.is_some()
277-
|| !cli.adbc_options.is_empty()
278-
|| cli.adbc_catalog.is_some()
279-
|| cli.adbc_schema.is_some()
280-
|| cli.adbc_create_tables
281-
{
282-
anyhow::bail!(
283-
"ADBC options are only valid with --sink adbc. Remove ADBC flags or set --sink adbc."
284-
);
285-
}
286-
287-
let hive_prefix = if cli.target_prefix.is_empty() {
288-
format!(
289-
"{}/{}/{}",
290-
cli.prefix.trim_matches('/'),
291-
cli.scenario,
292-
version
293-
)
294-
} else {
295-
format!(
296-
"{}/{}/{}",
297-
cli.target_prefix.trim_matches('/'),
298-
cli.scenario,
299-
version
300-
)
301-
};
302-
303-
let bucket = cli
304-
.bucket
305-
.as_ref()
306-
.ok_or_else(|| anyhow::anyhow!("--bucket is required for --sink s3-hive"))?;
307-
308-
let hive_config = TargetConfig {
309-
bucket: bucket.clone(),
310-
prefix: hive_prefix,
311-
region: cli.region.clone(),
312-
endpoint: cli.endpoint.clone(),
313-
partition_columns: cli.partition_by.clone(),
314-
};
315-
316-
(
317-
Arc::new(S3HiveSink::new(&hive_config)?),
318-
Some(hive_config),
319-
"s3-hive".to_string(),
320-
None,
321-
)
322-
}
323259
SinkType::Null => {
324260
if cli.adbc_driver.is_some()
325261
|| cli.adbc_uri.is_some()
@@ -373,8 +309,6 @@ async fn main() -> anyhow::Result<()> {
373309
adbc_catalog = ?cli.adbc_catalog,
374310
adbc_schema = ?cli.adbc_schema,
375311
adbc_create_tables = cli.adbc_create_tables,
376-
target_prefix = %cli.target_prefix,
377-
partition_by = ?cli.partition_by,
378312
scale_factor = version_metadata.scale_factor,
379313
num_steps = version_metadata.num_steps,
380314
"Starting ETL pipeline"

0 commit comments

Comments
 (0)