Skip to content

Commit 1dd2d66

Browse files
authored
Chore: stream download large clickbench file (#6210)
Just a nice thing to have. Note that we do not run this on our normal CI benchmarks so we don't really ever run this that often. But the Clickbench (real) benchmarks do run this. Signed-off-by: Connor Tsui <[email protected]>
1 parent 12078a9 commit 1dd2d66

File tree

1 file changed

+38
-7
lines changed

1 file changed

+38
-7
lines changed

vortex-bench/src/clickbench/data.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use arrow_schema::Schema;
1414
use arrow_schema::TimeUnit;
1515
use bytes::Bytes;
1616
use clap::ValueEnum;
17+
use futures::StreamExt;
18+
use indicatif::ProgressBar;
19+
use indicatif::ProgressStyle;
1720
use reqwest::IntoUrl;
1821
use serde::Deserialize;
1922
use serde::Serialize;
@@ -196,16 +199,13 @@ impl Flavor {
196199
match self {
197200
Flavor::Single => {
198201
let output_path = basepath.join(Format::Parquet.name()).join("hits.parquet");
199-
idempotent_async(output_path.as_path(), |output_path| async {
202+
idempotent_async(output_path.as_path(), |output_path| async move {
200203
info!("Downloading single clickbench file");
201204
let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet";
202-
let body = retry_get(&client, url).await?;
203-
let mut file = File::create(output_path).await?;
204-
205-
file.write_all(&body).await?;
206-
205+
download_large_file(&client, url, &output_path).await?;
207206
anyhow::Ok(())
208-
}).await?;
207+
})
208+
.await?;
209209
}
210210
Flavor::Partitioned => {
211211
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
@@ -235,6 +235,37 @@ impl Flavor {
235235
}
236236
}
237237

238+
/// Downloads a large file with streaming and progress indication.
239+
async fn download_large_file(
240+
client: &reqwest::Client,
241+
url: &str,
242+
output_path: &Path,
243+
) -> anyhow::Result<()> {
244+
let response = client.get(url).send().await?.error_for_status()?;
245+
246+
let total_size = response.content_length().unwrap_or(0);
247+
248+
let progress = ProgressBar::new(total_size);
249+
progress.set_style(
250+
ProgressStyle::with_template(
251+
"[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})",
252+
)
253+
.expect("valid template"),
254+
);
255+
256+
let mut file = File::create(output_path).await?;
257+
let mut stream = response.bytes_stream();
258+
259+
while let Some(chunk) = stream.next().await {
260+
let chunk = chunk?;
261+
file.write_all(&chunk).await?;
262+
progress.inc(chunk.len() as u64);
263+
}
264+
265+
progress.finish();
266+
Ok(())
267+
}
268+
238269
async fn retry_get(client: &reqwest::Client, url: impl IntoUrl) -> anyhow::Result<Bytes> {
239270
let url = url.as_str();
240271
let make_req = || async { client.get(url).send().await };

0 commit comments

Comments
 (0)