Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion-fuzzer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ max_row_count = 100
max_expr_level = 3
max_table_count = 3
max_insert_per_table = 20

# Supported oracles: NoCrash, NestedQueries.
# Randomly select one oracle from the configured set for each query.
oracles = ["NoCrash"]
# oracles = ["NoCrash", "NestedQueries"]
46 changes: 27 additions & 19 deletions src/cli/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::common::{InclusionConfig, LogicalTable, Result};
use crate::datasource_generator::dataset_generator::DatasetGenerator;
use crate::fuzz_context::{GlobalContext, ctx_observability::display_all_tables};
use crate::fuzz_runner::{record_query_with_time, update_stat_for_round_completion};
use crate::oracle::{NoCrashOracle, Oracle, QueryContext, QueryExecutionResult};
use crate::oracle::{Oracle, QueryContext, QueryExecutionResult};
use crate::query_generator::stmt_select_def::SelectStatementBuilder;

use super::error_whitelist;
Expand Down Expand Up @@ -214,22 +214,12 @@ async fn execute_oracle_test(
seed: u64,
ctx: &Arc<GlobalContext>,
) -> Result<bool> {
// Create a deterministic RNG instance for this test
let mut rng = StdRng::seed_from_u64(seed);

// === Select a random oracle ===
// TODO: disabled views since joining too many table is slow
let available_oracles: Vec<Box<dyn Oracle + Send>> = vec![
Box::new(NoCrashOracle::new(seed, Arc::clone(ctx))),
// Box::new(NestedQueriesOracle::new(seed, Arc::clone(ctx))),
];
let oracle_index = rng.random_range(0..available_oracles.len());
let mut selected_oracle = available_oracles.into_iter().nth(oracle_index).unwrap();
let mut randomly_selected_oracle = select_random_configured_oracle(seed, ctx);

info!("Selected oracle: {}", selected_oracle);
info!("Selected oracle: {}", randomly_selected_oracle);

// === Generate query group ===
let query_group = match selected_oracle.generate_query_group() {
let query_group = match randomly_selected_oracle.generate_query_group() {
Ok(group) => group,
Err(e) => {
let err_msg = format!("Failed to generate query group: {}", e);
Expand All @@ -250,7 +240,7 @@ async fn execute_oracle_test(
round,
query_index,
seed,
selected_oracle.name(),
randomly_selected_oracle.name(),
&query_group,
)?;

Expand All @@ -269,7 +259,7 @@ async fn execute_oracle_test(
}

// === Validate execution results ===
match selected_oracle
match randomly_selected_oracle
.validate_consistency(&execution_results)
.await
{
Expand All @@ -281,14 +271,31 @@ async fn execute_oracle_test(
error!("Oracle test failed: {}", e);

// Log error report if available
if let Ok(error_report) = selected_oracle.create_error_report(&execution_results) {
if let Ok(error_report) =
randomly_selected_oracle.create_error_report(&execution_results)
{
error!("Error Report:\n{}", error_report);
}
Ok(false)
}
}
}

fn select_random_configured_oracle(seed: u64, ctx: &Arc<GlobalContext>) -> Box<dyn Oracle + Send> {
// Randomly pick one oracle for this query; the configured oracle set bounds the choice.
let available_oracles: Vec<Box<dyn Oracle + Send>> = ctx
.runner_config
.oracles
.iter()
.copied()
.map(|oracle| oracle.build(seed, Arc::clone(ctx)))
.collect();

let mut rng = StdRng::seed_from_u64(seed);
let oracle_index = rng.random_range(0..available_oracles.len());
available_oracles.into_iter().nth(oracle_index).unwrap()
}

fn append_query_log(
ctx: &Arc<GlobalContext>,
round: u32,
Expand Down Expand Up @@ -475,6 +482,7 @@ mod tests {
max_expr_level: 2,
max_table_count: 3,
max_insert_per_table: 20,
oracles: vec![crate::oracle::ConfiguredOracle::NoCrash],
};

// Collect results from multiple runs
Expand Down Expand Up @@ -593,6 +601,7 @@ mod tests {
max_expr_level: 2,
max_table_count: 3,
max_insert_per_table: 20,
oracles: vec![crate::oracle::ConfiguredOracle::NoCrash],
};

let mut results_by_seed = Vec::new();
Expand Down Expand Up @@ -691,8 +700,7 @@ mod tests {
for i in 0..ctx.runner_config.queries_per_round {
let query_seed = query_base_seed.wrapping_add(i as u64);

// Generate a query using the same logic as execute_oracle_test
let mut oracle = NoCrashOracle::new(query_seed, Arc::clone(&ctx));
let mut oracle = select_random_configured_oracle(query_seed, &ctx);
if let Ok(query_group) = oracle.generate_query_group() {
if let Some(query_context) = query_group.first() {
captured_queries
Expand Down
175 changes: 4 additions & 171 deletions src/fuzz_context/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
pub mod ctx_observability;
mod runner_config;

use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{
Arc, Mutex, RwLock,
atomic::{AtomicU32, Ordering},
};

use datafusion::{common::HashMap, prelude::SessionContext};
use serde::{Deserialize, Serialize};

use crate::common::LogicalTable;
use crate::common::value_generator::ValueGenerationConfig;
use crate::common::{LogicalTable, Result, fuzzer_err};
use crate::fuzz_runner::FuzzerStats;

pub use runner_config::RunnerConfig;

/// Create a default DataFusion SessionContext with standard configuration
/// This ensures consistency between initial creation and reset operations
fn default_df_session_context() -> Arc<SessionContext> {
Expand Down Expand Up @@ -53,8 +53,6 @@ impl GlobalContext {
/// Reset the DataFusion context to drop all registered tables
/// This creates a fresh SessionContext and clears all table registrations
pub fn reset_datafusion_context(&self) {
use std::sync::atomic::Ordering;

// Create a new SessionContext to completely reset the DataFusion state
let new_session_context = default_df_session_context();

Expand All @@ -77,171 +75,6 @@ impl GlobalContext {
}
}

/// Unified configuration for the DataFusion fuzzer.
///
/// This configuration controls both:
/// 1. The overall fuzzing process (rounds, queries, timeout)
/// 2. The table and query generation parameters
/// 3. UI and display parameters
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RunnerConfig {
// General fuzzing parameters
pub seed: u64,
pub rounds: u32,
pub queries_per_round: u32,
pub timeout_seconds: u64,
pub log_path: Option<PathBuf>,

// UI and display parameters
pub display_logs: bool,
pub enable_tui: bool,
pub sample_interval_secs: u64,

// Table and query generation parameters
pub max_column_count: u64,
pub max_row_count: u64,
pub max_expr_level: u32,
pub max_table_count: u32,
pub max_insert_per_table: u32,
}

impl RunnerConfig {
pub fn new() -> Self {
Self::default()
}

pub fn with_seed(mut self, seed: u64) -> Self {
self.seed = seed;
self
}

pub fn with_rounds(mut self, rounds: u32) -> Self {
self.rounds = rounds;
self
}

pub fn with_queries_per_round(mut self, queries_per_round: u32) -> Self {
self.queries_per_round = queries_per_round;
self
}

pub fn with_timeout_seconds(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = timeout_seconds;
self
}

pub fn with_log_path(mut self, log_path: Option<PathBuf>) -> Self {
self.log_path = log_path;
self
}

pub fn with_display_logs(mut self, display_logs: bool) -> Self {
self.display_logs = display_logs;
self
}

pub fn with_enable_tui(mut self, enable_tui: bool) -> Self {
self.enable_tui = enable_tui;
self
}

pub fn with_sample_interval_secs(mut self, sample_interval_secs: u64) -> Self {
self.sample_interval_secs = sample_interval_secs;
self
}

pub fn with_max_column_count(mut self, max_column_count: u64) -> Self {
self.max_column_count = max_column_count;
self
}

pub fn with_max_row_count(mut self, max_row_count: u64) -> Self {
self.max_row_count = max_row_count;
self
}

pub fn with_max_expr_level(mut self, max_expr_level: u32) -> Self {
self.max_expr_level = max_expr_level;
self
}

pub fn with_max_table_count(mut self, max_table_count: u32) -> Self {
self.max_table_count = max_table_count;
self
}

pub fn with_max_insert_per_table(mut self, max_insert_per_table: u32) -> Self {
self.max_insert_per_table = max_insert_per_table;
self
}

pub fn from_file(path: &Path) -> Result<Self> {
let content = fs::read_to_string(path)
.map_err(|e| fuzzer_err(&format!("Failed to read config file: {}", e)))?;

let config: Self = toml::from_str(&content)
.map_err(|e| fuzzer_err(&format!("Failed to parse config file: {}", e)))?;

Ok(config)
}

pub fn from_cli(cli: &crate::cli::Cli) -> Result<Self> {
// Start with default or config file if provided
let mut config = if let Some(config_path) = &cli.config {
Self::from_file(config_path)?
} else {
Self::default()
};

// Override with CLI arguments if provided
if cli.seed != 42 {
config.seed = cli.seed;
}

if let Some(rounds) = cli.rounds {
config.rounds = rounds;
}

if let Some(queries) = cli.queries_per_round {
config.queries_per_round = queries;
}

if let Some(timeout) = cli.timeout {
config.timeout_seconds = timeout;
}

if let Some(log_path) = &cli.log_path {
config.log_path = Some(log_path.clone());
}

// Set display_logs from CLI argument
config.display_logs = cli.display_logs;

// Set enable_tui from CLI argument
config.enable_tui = cli.enable_tui;

Ok(config)
}

pub fn default() -> Self {
Self {
seed: 42,
rounds: 3,
queries_per_round: 10,
timeout_seconds: 2,
log_path: Some(PathBuf::from("logs")),
display_logs: false,
enable_tui: true,
sample_interval_secs: 5,
max_column_count: 5,
max_row_count: 100,
max_expr_level: 3,
max_table_count: 3,
max_insert_per_table: 20,
}
}
}

pub struct RuntimeContext {
pub df_ctx: Arc<RwLock<Arc<SessionContext>>>,
pub registered_tables: Arc<RwLock<HashMap<String, Arc<LogicalTable>>>>,
Expand Down
Loading
Loading