Skip to content

Commit 0feec4c

Browse files
Start setting up tpch gen (#311)
Waiting on release from tpchgen-arrow to prevent version conflict on chrono
1 parent 2c19e36 commit 0feec4c

10 files changed

Lines changed: 322 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ tower-http = { version = "0.6.2", features = [
5656
"timeout",
5757
"trace",
5858
], optional = true }
59+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba" }
60+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "07745d653516f232c616d795f7bc794b2fdf9bba" }
5961
tracing = { version = "0.1.41", features = ["log"] }
6062
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
6163
tui-logger = { version = "0.12", features = ["tracing-support"] }

src/args.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
2020
use crate::config::get_data_dir;
2121
use clap::{Parser, Subcommand};
22-
use std::{
23-
net::SocketAddr,
24-
path::{Path, PathBuf},
25-
};
22+
#[cfg(any(feature = "http", feature = "flightsql"))]
23+
use std::net::SocketAddr;
24+
use std::path::{Path, PathBuf};
2625

2726
const LONG_ABOUT: &str = "
2827
dft - DataFusion TUI
@@ -58,7 +57,7 @@ pub struct DftArgs {
5857
)]
5958
pub commands: Vec<String>,
6059

61-
#[clap(long, help = "Path to the configuration file")]
60+
#[clap(long, global = true, help = "Path to the configuration file")]
6261
pub config: Option<String>,
6362

6463
#[clap(
@@ -105,7 +104,6 @@ pub struct DftArgs {
105104
)]
106105
pub output: Option<PathBuf>,
107106

108-
#[cfg(any(feature = "flightsql", feature = "http"))]
109107
#[command(subcommand)]
110108
pub command: Option<Command>,
111109
}
@@ -132,6 +130,7 @@ impl DftArgs {
132130
#[derive(Clone, Debug, Subcommand)]
133131
pub enum Command {
134132
/// Start a HTTP server
133+
#[cfg(feature = "http")]
135134
ServeHttp {
136135
#[clap(short, long)]
137136
config: Option<String>,
@@ -141,6 +140,7 @@ pub enum Command {
141140
metrics_addr: Option<SocketAddr>,
142141
},
143142
/// Start a FlightSQL server
143+
#[cfg(feature = "flightsql")]
144144
#[command(name = "serve-flightsql")]
145145
ServeFlightSql {
146146
#[clap(short, long)]
@@ -150,6 +150,10 @@ pub enum Command {
150150
#[clap(long, help = "Set the port to be used for serving metrics")]
151151
metrics_addr: Option<SocketAddr>,
152152
},
153+
GenerateTpch {
154+
#[clap(long, default_value = "1.0")]
155+
scale_factor: f64,
156+
},
153157
}
154158

155159
fn parse_valid_file(file: &str) -> std::result::Result<PathBuf, String> {

src/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ pub struct AppConfig {
175175
#[cfg(feature = "http")]
176176
#[serde(default)]
177177
pub http_server: HttpServerConfig,
178+
#[serde(default = "default_db_config")]
179+
pub db: DbConfig,
178180
}
179181

180182
fn default_execution_config() -> ExecutionConfig {
@@ -189,6 +191,29 @@ fn default_interaction_config() -> InteractionConfig {
189191
InteractionConfig::default()
190192
}
191193

194+
#[derive(Debug, Clone, Deserialize)]
195+
pub struct DbConfig {
196+
#[serde(default = "default_db_path")]
197+
pub path: PathBuf,
198+
}
199+
200+
impl Default for DbConfig {
201+
fn default() -> Self {
202+
default_db_config()
203+
}
204+
}
205+
206+
fn default_db_config() -> DbConfig {
207+
DbConfig {
208+
path: default_db_path(),
209+
}
210+
}
211+
212+
fn default_db_path() -> PathBuf {
213+
let base = directories::BaseDirs::new().expect("Base directories should be available");
214+
base.data_dir().to_path_buf().join("dft")
215+
}
216+
192217
#[derive(Clone, Debug, Deserialize)]
193218
pub struct DisplayConfig {
194219
#[serde(default = "default_frame_rate")]

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod execution;
66
pub mod server;
77
pub mod telemetry;
88
pub mod test_utils;
9+
pub mod tpch;
910
pub mod tui;
1011

1112
pub const APP_NAME: &str = "dft";

src/main.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
use clap::Parser;
1919
use color_eyre::Result;
20+
use datafusion_dft::args::Command;
2021
#[cfg(any(feature = "flightsql", feature = "http"))]
21-
use datafusion_dft::{args::Command, server};
22-
use datafusion_dft::{args::DftArgs, cli, config::create_config, tui};
22+
use datafusion_dft::server;
23+
use datafusion_dft::{args::DftArgs, cli, config::create_config, tpch, tui};
2324
#[cfg(feature = "http")]
2425
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
2526

@@ -39,12 +40,16 @@ fn main() -> Result<()> {
3940
runtime.block_on(entry_point)
4041
}
4142

43+
// TODO: FlightSQL should use tracing
4244
fn should_init_env_logger(cli: &DftArgs) -> bool {
4345
#[cfg(feature = "flightsql")]
4446
if let Some(Command::ServeFlightSql { .. }) = cli.command {
4547
return true;
4648
}
4749

50+
if let Some(Command::GenerateTpch { .. }) = cli.command {
51+
return true;
52+
}
4853
if !cli.files.is_empty() || !cli.commands.is_empty() {
4954
return true;
5055
}
@@ -56,6 +61,10 @@ async fn app_entry_point(cli: DftArgs) -> Result<()> {
5661
env_logger::init();
5762
}
5863
let cfg = create_config(cli.config_path());
64+
if let Some(Command::GenerateTpch { scale_factor }) = cli.command {
65+
tpch::generate(cfg.clone(), scale_factor)?;
66+
return Ok(());
67+
}
5968

6069
#[cfg(feature = "flightsql")]
6170
if let Some(Command::ServeFlightSql { .. }) = cli.command {

src/tpch.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{
19+
fs::create_dir_all,
20+
path::{Path, PathBuf},
21+
sync::Arc,
22+
};
23+
24+
use color_eyre::{eyre, Result};
25+
use datafusion::arrow::record_batch::RecordBatch;
26+
use log::info;
27+
use parquet::arrow::ArrowWriter;
28+
use tpchgen::generators::{
29+
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
30+
PartSuppGenerator, RegionGenerator, SupplierGenerator,
31+
};
32+
use tpchgen_arrow::{
33+
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow,
34+
SupplierArrow,
35+
};
36+
37+
use crate::config::AppConfig;
38+
39+
enum GeneratorType {
40+
Customer,
41+
Order,
42+
LineItem,
43+
Nation,
44+
Part,
45+
PartSupp,
46+
Region,
47+
Supplier,
48+
}
49+
50+
impl TryFrom<&str> for GeneratorType {
51+
type Error = color_eyre::Report;
52+
53+
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
54+
match value {
55+
"customers" => Ok(Self::Customer),
56+
"orders" => Ok(Self::Order),
57+
"line_items" => Ok(Self::LineItem),
58+
"nations" => Ok(Self::Nation),
59+
"parts" => Ok(Self::Part),
60+
"part_supps" => Ok(Self::PartSupp),
61+
"regions" => Ok(Self::Region),
62+
"suppliers" => Ok(Self::Supplier),
63+
_ => Err(eyre::Report::msg(format!("Unknown generator type {value}"))),
64+
}
65+
}
66+
}
67+
68+
fn create_tpch_dirs(config: &AppConfig) -> Result<Vec<(GeneratorType, PathBuf)>> {
69+
info!("...configured DB directory is {:?}", config.db.path);
70+
if config.db.path.is_file() {
71+
eyre::bail!("config DB directory is a file and it must be a directory")
72+
}
73+
74+
if !config.db.path.exists() {
75+
info!("...DB directory does not exist, creating");
76+
std::fs::create_dir_all(config.db.path.clone())?;
77+
} else {
78+
info!("...DB directory exists");
79+
}
80+
let tpch_dir = config.db.path.join("tables").join("tpch");
81+
if !tpch_dir.exists() {
82+
info!(
83+
"...TPC-H table directory ({:?}) does not exist, creating",
84+
config.db.path
85+
);
86+
create_dir_all(&tpch_dir)?;
87+
} else {
88+
info!("...TPC-H table directory ({tpch_dir:?}) exists");
89+
};
90+
let needed_dirs = [
91+
"customers",
92+
"orders",
93+
"line_items",
94+
"nations",
95+
"parts",
96+
"part_supps",
97+
"regions",
98+
"suppliers",
99+
];
100+
let mut table_paths = Vec::new();
101+
for dir in needed_dirs {
102+
let table_path = tpch_dir.join(dir);
103+
create_dir_all(&table_path)?;
104+
table_paths.push((GeneratorType::try_from(dir)?, table_path))
105+
}
106+
Ok(table_paths)
107+
}
108+
109+
fn write_batches_to_parquet<I>(
110+
mut batches: std::iter::Peekable<I>,
111+
table_path: &Path,
112+
table_type: &str,
113+
) -> Result<()>
114+
where
115+
I: Iterator<Item = RecordBatch>,
116+
{
117+
let first = batches.peek().ok_or(eyre::Error::msg(format!(
118+
"unable to generate {table_type} TPC-H data"
119+
)))?;
120+
121+
let file_path = table_path.join("data.parquet");
122+
let file = std::fs::File::create(file_path)?;
123+
let mut writer = ArrowWriter::try_new(file, Arc::clone(first.schema_ref()), None)?;
124+
info!("...writing {table_type} batches");
125+
for batch in batches {
126+
writer.write(&batch)?;
127+
}
128+
writer.finish()?;
129+
Ok(())
130+
}
131+
132+
pub fn generate(config: AppConfig, scale_factor: f64) -> Result<()> {
133+
info!("Generating TPC-H data");
134+
let table_paths = create_tpch_dirs(&config)?;
135+
for (table, table_path) in table_paths {
136+
if table_path.is_dir() {
137+
match table {
138+
GeneratorType::Customer => {
139+
info!("...generating customers");
140+
let arrow_generator =
141+
CustomerArrow::new(CustomerGenerator::new(scale_factor, 1, 1));
142+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Customer")?;
143+
}
144+
GeneratorType::Order => {
145+
info!("...generating orders");
146+
let arrow_generator = OrderArrow::new(OrderGenerator::new(scale_factor, 1, 1));
147+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Order")?;
148+
}
149+
GeneratorType::LineItem => {
150+
info!("...generating LineItems");
151+
let arrow_generator =
152+
LineItemArrow::new(LineItemGenerator::new(scale_factor, 1, 1));
153+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "LineItem")?;
154+
}
155+
GeneratorType::Nation => {
156+
info!("...generating Nations");
157+
let arrow_generator =
158+
NationArrow::new(NationGenerator::new(scale_factor, 1, 1));
159+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Nation")?;
160+
}
161+
GeneratorType::Part => {
162+
info!("...generating Parts");
163+
let arrow_generator = PartArrow::new(PartGenerator::new(scale_factor, 1, 1));
164+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Part")?;
165+
}
166+
GeneratorType::PartSupp => {
167+
info!("...generating PartSupps");
168+
let arrow_generator =
169+
PartSuppArrow::new(PartSuppGenerator::new(scale_factor, 1, 1));
170+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "PartSupp")?;
171+
}
172+
GeneratorType::Region => {
173+
info!("...generating Regions");
174+
let arrow_generator =
175+
RegionArrow::new(RegionGenerator::new(scale_factor, 1, 1));
176+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Region")?;
177+
}
178+
GeneratorType::Supplier => {
179+
info!("...generating Suppliers");
180+
let arrow_generator =
181+
SupplierArrow::new(SupplierGenerator::new(scale_factor, 1, 1));
182+
write_batches_to_parquet(arrow_generator.peekable(), &table_path, "Supplier")?;
183+
}
184+
};
185+
}
186+
}
187+
188+
Ok(())
189+
}

tests/cli_cases/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
mod basic;
1919
mod bench;
2020
mod config;
21+
mod tpch;
2122

2223
use assert_cmd::Command;
2324
use predicates::str::ContainsPredicate;

0 commit comments

Comments
 (0)