Skip to content

Commit aca30b4

Browse files
Simplify
1 parent b8432b3 commit aca30b4

3 files changed

Lines changed: 39 additions & 125 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ arrow-flight = { version = "54", features = [
1818
"flight-sql-experimental",
1919
], optional = true }
2020
axum = { version = "0.7.9", features = ["macros"], optional = true }
21-
chrono = "=0.4.39"
2221
clap = { version = "4.5.27", features = ["derive"] }
2322
color-eyre = "0.6.3"
2423
crossterm = { version = "0.28.1", features = ["event-stream"] }
@@ -57,8 +56,8 @@ tower-http = { version = "0.6.2", features = [
5756
"timeout",
5857
"trace",
5958
], optional = true }
60-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba"}
61-
tpchgen-arrow = {git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba"}
59+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba" }
60+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba" }
6261
tracing = { version = "0.1.41", features = ["log"] }
6362
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
6463
tui-logger = { version = "0.12", features = ["tracing-support"] }

src/tpch.rs

Lines changed: 37 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{fs::create_dir_all, path::PathBuf, sync::Arc};
18+
use std::{
19+
fs::create_dir_all,
20+
path::{Path, PathBuf},
21+
sync::Arc,
22+
};
1923

2024
use color_eyre::{eyre, Result};
25+
use datafusion::arrow::record_batch::RecordBatch;
2126
use log::info;
2227
use parquet::arrow::ArrowWriter;
2328
use tpchgen::generators::{
@@ -101,6 +106,29 @@ fn create_tpch_dirs(config: &AppConfig) -> Result<Vec<(GeneratorType, PathBuf)>>
101106
Ok(table_paths)
102107
}
103108

109+
fn write_batches_to_parquet<I>(
110+
mut batches: std::iter::Peekable<I>,
111+
table_path: &Path,
112+
table_type: &str,
113+
) -> Result<()>
114+
where
115+
I: Iterator<Item = RecordBatch>,
116+
{
117+
let first = batches.peek().ok_or(eyre::Error::msg(format!(
118+
"unable to generate {table_type} TPC-H data"
119+
)))?;
120+
121+
let file_path = table_path.join("data.parquet");
122+
let file = std::fs::File::create(file_path)?;
123+
let mut writer = ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
124+
info!("...writing {table_type} batches");
125+
for batch in batches {
126+
writer.write(&batch)?;
127+
}
128+
writer.finish()?;
129+
Ok(())
130+
}
131+
104132
pub fn generate(config: AppConfig, scale_factor: f64) -> Result<()> {
105133
info!("Generating TPC-H data");
106134
let table_paths = create_tpch_dirs(&config)?;
@@ -111,159 +139,47 @@ pub fn generate(config: AppConfig, scale_factor: f64) -> Result<()> {
111139
info!("...generating customers");
112140
let arrow_generator =
113141
CustomerArrow::new(CustomerGenerator::new(scale_factor, 1, 1));
114-
115-
let mut peekable = arrow_generator.peekable();
116-
let first = peekable
117-
.peek()
118-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
119-
120-
let file_path = table_path.join("data.parquet");
121-
let file = std::fs::File::create(file_path)?;
122-
let mut writer =
123-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
124-
info!("...writing Customer batches");
125-
for batch in peekable {
126-
writer.write(&batch)?;
127-
}
128-
writer.finish()?;
142+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Customer")?;
129143
}
130144
GeneratorType::Order => {
131145
info!("...generating orders");
132146
let arrow_generator = OrderArrow::new(OrderGenerator::new(scale_factor, 1, 1));
133-
134-
let mut peekable = arrow_generator.peekable();
135-
let first = peekable
136-
.peek()
137-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
138-
139-
let file_path = table_path.join("data.parquet");
140-
let file = std::fs::File::create(file_path)?;
141-
let mut writer =
142-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
143-
info!("...writing Order batches");
144-
for batch in peekable {
145-
writer.write(&batch)?;
146-
}
147-
writer.finish()?;
147+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Order")?;
148148
}
149149
GeneratorType::LineItem => {
150150
info!("...generating LineItems");
151151
let arrow_generator =
152152
LineItemArrow::new(LineItemGenerator::new(scale_factor, 1, 1));
153-
154-
let mut peekable = arrow_generator.peekable();
155-
let first = peekable
156-
.peek()
157-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
158-
159-
let file_path = table_path.join("data.parquet");
160-
let file = std::fs::File::create(file_path)?;
161-
let mut writer =
162-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
163-
info!("...writing LineItem batches");
164-
for batch in peekable {
165-
writer.write(&batch)?;
166-
}
167-
writer.finish()?;
153+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "LineItem")?;
168154
}
169155
GeneratorType::Nation => {
170156
info!("...generating Nations");
171157
let arrow_generator =
172158
NationArrow::new(NationGenerator::new(scale_factor, 1, 1));
173-
174-
let mut peekable = arrow_generator.peekable();
175-
let first = peekable
176-
.peek()
177-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
178-
179-
let file_path = table_path.join("data.parquet");
180-
let file = std::fs::File::create(file_path)?;
181-
let mut writer =
182-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
183-
info!("...writing Nation batches");
184-
for batch in peekable {
185-
writer.write(&batch)?;
186-
}
187-
writer.finish()?;
159+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Nation")?;
188160
}
189161
GeneratorType::Part => {
190162
info!("...generating Parts");
191163
let arrow_generator = PartArrow::new(PartGenerator::new(scale_factor, 1, 1));
192-
193-
let mut peekable = arrow_generator.peekable();
194-
let first = peekable
195-
.peek()
196-
.ok_or(eyre::Error::msg("Unable to generate Customer TPC-H data"))?;
197-
198-
let file_path = table_path.join("data.parquet");
199-
let file = std::fs::File::create(file_path)?;
200-
let mut writer =
201-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
202-
info!("...writing Part batches");
203-
for batch in peekable {
204-
writer.write(&batch)?;
205-
}
206-
writer.finish()?;
164+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Part")?;
207165
}
208166
GeneratorType::PartSupp => {
209167
info!("...generating PartSupps");
210168
let arrow_generator =
211169
PartSuppArrow::new(PartSuppGenerator::new(scale_factor, 1, 1));
212-
213-
let mut peekable = arrow_generator.peekable();
214-
let first = peekable
215-
.peek()
216-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
217-
218-
let file_path = table_path.join("data.parquet");
219-
let file = std::fs::File::create(file_path)?;
220-
let mut writer =
221-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
222-
info!("...writing PartSupp batches");
223-
for batch in peekable {
224-
writer.write(&batch)?;
225-
}
226-
writer.finish()?;
170+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "PartSupp")?;
227171
}
228172
GeneratorType::Region => {
229173
info!("...generating Regions");
230174
let arrow_generator =
231175
RegionArrow::new(RegionGenerator::new(scale_factor, 1, 1));
232-
233-
let mut peekable = arrow_generator.peekable();
234-
let first = peekable
235-
.peek()
236-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
237-
238-
let file_path = table_path.join("data.parquet");
239-
let file = std::fs::File::create(file_path)?;
240-
let mut writer =
241-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
242-
info!("...writing Region batches");
243-
for batch in peekable {
244-
writer.write(&batch)?;
245-
}
246-
writer.finish()?;
176+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Region")?;
247177
}
248178
GeneratorType::Supplier => {
249179
info!("...generating Suppliers");
250180
let arrow_generator =
251181
SupplierArrow::new(SupplierGenerator::new(scale_factor, 1, 1));
252-
253-
let mut peekable = arrow_generator.peekable();
254-
let first = peekable
255-
.peek()
256-
.ok_or(eyre::Error::msg("unable to generate Customer TPC-H data"))?;
257-
258-
let file_path = table_path.join("data.parquet");
259-
let file = std::fs::File::create(file_path)?;
260-
let mut writer =
261-
ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
262-
info!("...writing Supplier batches");
263-
for batch in peekable {
264-
writer.write(&batch)?;
265-
}
266-
writer.finish()?;
182+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Supplier")?;
267183
}
268184
};
269185
}

0 commit comments

Comments
 (0)