Skip to content

Commit 9d82083

Browse files
authored
always teardown after a setup (#86)
Entire-Checkpoint: bf89cf7ab0b7
1 parent f302744 commit 9d82083

1 file changed

Lines changed: 120 additions & 112 deletions

File tree

src/main.rs

Lines changed: 120 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod commands;
3636
mod metrics;
3737
mod scenario;
3838

39+
use crate::args::CommonArgs;
3940
use crate::commands::connect_system_adapter;
4041
use crate::scenario::Scenario;
4142

@@ -76,87 +77,26 @@ pub enum SystemAdapterExecutionMode {
7677
DirectQuery,
7778
}
7879

79-
#[tokio::main]
80-
async fn main() -> anyhow::Result<()> {
81-
let _ = rustls::crypto::CryptoProvider::install_default(
82-
rustls::crypto::aws_lc_rs::default_provider(),
83-
);
84-
85-
tracing_subscriber::fmt()
86-
.with_max_level(Level::INFO)
87-
.with_env_filter(EnvFilter::from_default_env())
88-
.init();
89-
90-
let cli = Cli::parse();
91-
92-
// --- Construct the ETL pipeline ---
93-
let dataset_source = match &cli.common.scenario {
94-
Scenario::TPCH => DatasetSource::Tpch,
95-
};
96-
97-
let generation_config = GenerationDatasetConfig {
98-
dataset_type: match &dataset_source {
99-
DatasetSource::Tpch => "tpch".to_string(),
100-
DatasetSource::SimpleSequence => "simple_sequence".to_string(),
101-
},
102-
scale_factor: cli.common.scale_factor,
103-
num_steps: cli.common.etl_num_steps,
104-
};
105-
106-
let source_config = TargetConfig {
107-
bucket: cli.common.etl_bucket.clone(),
108-
prefix: cli.common.etl_source_prefix.clone(),
109-
region: cli.common.etl_region.clone(),
110-
endpoint: cli.common.etl_endpoint.clone(),
111-
};
112-
113-
let source = Arc::new(S3Storage::new(&source_config)?);
114-
115-
// --- Connect to the system adapter ---
116-
let mut system_adapter_client = match connect_system_adapter(&cli.common).await {
117-
Ok(system_adapter_client) => system_adapter_client,
118-
Err(e) => {
119-
return Err(anyhow::anyhow!("Failed to connect to system adapter: {e}"));
120-
}
121-
};
122-
123-
let run_id = uuid::Uuid::new_v4();
124-
let mutations = MutationConfig::new(0.1, 0.1);
125-
126-
let setup_dataset = dataset_source.create(
127-
&generation_config,
128-
&mutations,
129-
Arc::clone(&source) as Arc<dyn DataStorage>,
130-
)?;
131-
let datasets = create_tables_request_datasets(&setup_dataset);
132-
133-
let setup_metadata = std::collections::HashMap::from([
134-
(
135-
"executor_instance_type".to_string(),
136-
serde_json::Value::String(cli.common.executor_instance_type.clone()),
137-
),
138-
(
139-
"table_format".to_string(),
140-
serde_json::Value::String(cli.common.table_format.to_string()),
141-
),
142-
]);
143-
144-
let adbc_driver = match system_adapter_client.setup(run_id, setup_metadata).await {
145-
Ok(response) => response,
146-
Err(e) => {
147-
return Err(anyhow::anyhow!("Failed to setup system adapter: {e}"));
148-
}
149-
};
150-
80+
async fn run_benchmark(
81+
common: &CommonArgs,
82+
system_adapter_client: &mut system_adapter_protocol::Client,
83+
run_id: uuid::Uuid,
84+
adbc_driver: system_adapter_protocol::SetupResponse,
85+
dataset_source: DatasetSource,
86+
generation_config: &GenerationDatasetConfig,
87+
mutations: &MutationConfig,
88+
source: Arc<S3Storage>,
89+
datasets: HashMap<String, system_adapter_protocol::DatasetConfig>,
90+
) -> anyhow::Result<()> {
15191
// --- Download checkpoints from S3 ---
152-
let scenario_name = cli.common.scenario.to_string();
92+
let scenario_name = common.scenario.to_string();
15393
let checkpoint_dir = tempfile::tempdir()?;
15494

15595
let checkpoint_store = CheckpointStore::new(
156-
&cli.common.etl_bucket,
157-
&cli.common.etl_source_prefix,
158-
cli.common.etl_region.as_deref(),
159-
cli.common.etl_endpoint.as_deref(),
96+
&common.etl_bucket,
97+
&common.etl_source_prefix,
98+
common.etl_region.as_deref(),
99+
common.etl_endpoint.as_deref(),
160100
)?;
161101

162102
let manifest = checkpoint_store.download_manifest().await.map_err(|e| {
@@ -199,38 +139,17 @@ async fn main() -> anyhow::Result<()> {
199139
let sink_kwargs = adbc_driver.db_kwargs.clone();
200140
let load_kwargs = adbc_driver.db_kwargs;
201141

202-
let adbc_conn: Option<AdbcConnection> = match AdbcConnection::create(&driver_name, sink_kwargs)
203-
{
204-
Ok(conn) => {
205-
println!(
206-
"ADBC connection established (driver: {})",
207-
adbc_driver.driver
208-
);
209-
Some(conn)
210-
}
211-
Err(e) => {
212-
eprintln!(
213-
"Failed to create ADBC connection for driver {}: {e}",
214-
adbc_driver.driver
215-
);
216-
None
217-
}
218-
};
219-
220-
let Some(adbc_conn) = adbc_conn else {
221-
return Err(anyhow::anyhow!(
222-
"ADBC connection is required to run benchmarks"
223-
));
224-
};
142+
let adbc_conn = AdbcConnection::create(&driver_name, sink_kwargs).map_err(|e| {
143+
anyhow::anyhow!(
144+
"Failed to create ADBC connection for driver {}: {e}",
145+
driver_name
146+
)
147+
})?;
148+
println!("ADBC connection established (driver: {})", driver_name);
225149

226150
let target = Arc::new(AdbcSink::new_without_table_creation(adbc_conn, None));
227-
let mut pipeline = ETLPipeline::new(
228-
dataset_source,
229-
&generation_config,
230-
source,
231-
target,
232-
&mutations,
233-
)?;
151+
let mut pipeline =
152+
ETLPipeline::new(dataset_source, generation_config, source, target, mutations)?;
234153

235154
if let Err(e) = system_adapter_client.create_tables(run_id, datasets).await {
236155
pipeline.cancel();
@@ -250,14 +169,14 @@ async fn main() -> anyhow::Result<()> {
250169
pipeline.cancel();
251170
return Err(anyhow::anyhow!(
252171
"Failed to create benchmark ADBC connection for driver {}: {e}",
253-
adbc_driver.driver
172+
driver_name
254173
));
255174
}
256175
};
257176

258177
commands::load::run(
259-
&cli.common.scenario,
260-
&cli.common,
178+
&common.scenario,
179+
common,
261180
load_conn,
262181
&mut pipeline,
263182
checkpoint_steps,
@@ -285,9 +204,98 @@ async fn main() -> anyhow::Result<()> {
285204
}
286205
}
287206

207+
Ok(())
208+
}
209+
210+
#[tokio::main]
211+
async fn main() -> anyhow::Result<()> {
212+
let _ = rustls::crypto::CryptoProvider::install_default(
213+
rustls::crypto::aws_lc_rs::default_provider(),
214+
);
215+
216+
tracing_subscriber::fmt()
217+
.with_max_level(Level::INFO)
218+
.with_env_filter(EnvFilter::from_default_env())
219+
.init();
220+
221+
let cli = Cli::parse();
222+
223+
// --- Construct the ETL pipeline ---
224+
let dataset_source = match &cli.common.scenario {
225+
Scenario::TPCH => DatasetSource::Tpch,
226+
};
227+
228+
let generation_config = GenerationDatasetConfig {
229+
dataset_type: match &dataset_source {
230+
DatasetSource::Tpch => "tpch".to_string(),
231+
DatasetSource::SimpleSequence => "simple_sequence".to_string(),
232+
},
233+
scale_factor: cli.common.scale_factor,
234+
num_steps: cli.common.etl_num_steps,
235+
};
236+
237+
let source_config = TargetConfig {
238+
bucket: cli.common.etl_bucket.clone(),
239+
prefix: cli.common.etl_source_prefix.clone(),
240+
region: cli.common.etl_region.clone(),
241+
endpoint: cli.common.etl_endpoint.clone(),
242+
};
243+
244+
let source = Arc::new(S3Storage::new(&source_config)?);
245+
246+
// --- Connect to the system adapter ---
247+
let mut system_adapter_client = match connect_system_adapter(&cli.common).await {
248+
Ok(system_adapter_client) => system_adapter_client,
249+
Err(e) => {
250+
return Err(anyhow::anyhow!("Failed to connect to system adapter: {e}"));
251+
}
252+
};
253+
254+
let run_id = uuid::Uuid::new_v4();
255+
let mutations = MutationConfig::new(0.1, 0.1);
256+
257+
let setup_dataset = dataset_source.create(
258+
&generation_config,
259+
&mutations,
260+
Arc::clone(&source) as Arc<dyn DataStorage>,
261+
)?;
262+
let datasets = create_tables_request_datasets(&setup_dataset);
263+
264+
let setup_metadata = std::collections::HashMap::from([
265+
(
266+
"executor_instance_type".to_string(),
267+
serde_json::Value::String(cli.common.executor_instance_type.clone()),
268+
),
269+
(
270+
"table_format".to_string(),
271+
serde_json::Value::String(cli.common.table_format.to_string()),
272+
),
273+
]);
274+
275+
let adbc_driver = match system_adapter_client.setup(run_id, setup_metadata).await {
276+
Ok(response) => response,
277+
Err(e) => {
278+
return Err(anyhow::anyhow!("Failed to setup system adapter: {e}"));
279+
}
280+
};
281+
282+
let result = run_benchmark(
283+
&cli.common,
284+
&mut system_adapter_client,
285+
run_id,
286+
adbc_driver,
287+
dataset_source,
288+
&generation_config,
289+
&mutations,
290+
source,
291+
datasets,
292+
)
293+
.await;
294+
295+
// After successful setup, always teardown even if there are errors in between.
288296
if let Err(e) = system_adapter_client.teardown(run_id).await {
289-
return Err(anyhow::anyhow!("Failed to teardown system adapter: {e}"));
297+
tracing::error!("Failed to teardown system adapter: {e}");
290298
}
291299

292-
Ok(())
300+
result
293301
}

0 commit comments

Comments
 (0)