|
| 1 | +use std::sync::Arc; |
| 2 | + |
| 3 | +use arrow_csv2::clickbench; |
| 4 | +use arrow_csv2::{ArrowCsv2Decoder, ParallelCsvSource}; |
| 5 | +use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder}; |
| 6 | +use datafusion::execution::object_store::ObjectStoreUrl; |
| 7 | +use datafusion::physical_plan::collect; |
| 8 | +use datafusion::prelude::SessionContext; |
| 9 | +use datafusion_datasource::PartitionedFile; |
| 10 | +use datafusion_datasource::source::DataSourceExec; |
| 11 | +use object_store::path::Path as ObjectPath; |
| 12 | + |
| 13 | +const FILE: &str = "hits_100mb.csv"; |
| 14 | +const BATCH_SIZE: usize = 8192; |
| 15 | + |
| 16 | +fn main() { |
| 17 | + let schema = clickbench::schema(); |
| 18 | + let file_len = std::fs::metadata(FILE) |
| 19 | + .expect("hits_100mb.csv not found") |
| 20 | + .len() as usize; |
| 21 | + |
| 22 | + let num_partitions = std::env::args() |
| 23 | + .nth(1) |
| 24 | + .and_then(|s| s.parse().ok()) |
| 25 | + .unwrap_or(0); |
| 26 | + |
| 27 | + if num_partitions == 0 { |
| 28 | + let file = std::fs::File::open(FILE).unwrap(); |
| 29 | + let reader = arrow_csv2::ReaderBuilder::new(schema) |
| 30 | + .with_batch_size(BATCH_SIZE) |
| 31 | + .build(file); |
| 32 | + |
| 33 | + let _ = reader.collect::<Vec<_>>(); |
| 34 | + } else { |
| 35 | + let rt = tokio::runtime::Runtime::new().unwrap(); |
| 36 | + rt.block_on(async { |
| 37 | + let plan = build_parallel_csv(&schema, file_len, num_partitions); |
| 38 | + let ctx = SessionContext::new(); |
| 39 | + let _ = collect(plan, ctx.task_ctx()).await.unwrap(); |
| 40 | + }) |
| 41 | + } |
| 42 | +} |
| 43 | + |
| 44 | +fn build_parallel_csv( |
| 45 | + schema: &arrow_schema::SchemaRef, |
| 46 | + file_len: usize, |
| 47 | + num_partitions: usize, |
| 48 | +) -> Arc<dyn datafusion::physical_plan::ExecutionPlan> { |
| 49 | + let chunk_size = file_len / num_partitions; |
| 50 | + let boundaries: Arc<[usize]> = (0..num_partitions) |
| 51 | + .map(|i| i * chunk_size) |
| 52 | + .chain(std::iter::once(file_len)) |
| 53 | + .collect::<Vec<_>>() |
| 54 | + .into(); |
| 55 | + |
| 56 | + let abs_path = std::fs::canonicalize(FILE).unwrap(); |
| 57 | + let object_path = ObjectPath::from_absolute_path(&abs_path).unwrap(); |
| 58 | + |
| 59 | + let source = Arc::new(ParallelCsvSource::new( |
| 60 | + schema.clone(), |
| 61 | + object_path, |
| 62 | + boundaries.clone(), |
| 63 | + BATCH_SIZE, |
| 64 | + ArrowCsv2Decoder, |
| 65 | + )); |
| 66 | + |
| 67 | + let url = ObjectStoreUrl::parse("file://").unwrap(); |
| 68 | + let mut builder = FileScanConfigBuilder::new(url, source); |
| 69 | + for i in 0..num_partitions { |
| 70 | + let file = PartitionedFile::new(FILE.to_string(), file_len as u64) |
| 71 | + .with_range(boundaries[i] as i64, boundaries[i + 1] as i64); |
| 72 | + builder = builder.with_file_group(FileGroup::new(vec![file])); |
| 73 | + } |
| 74 | + |
| 75 | + DataSourceExec::from_data_source(builder.build()) |
| 76 | +} |
0 commit comments