Skip to content

Commit c1b6a78

Browse files
committed
Don't expose final stage
1 parent a8033f9 commit c1b6a78

2 files changed

Lines changed: 47 additions & 28 deletions

File tree

ballista/core/src/config.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ pub const BALLISTA_SHUFFLE_STORAGE_TYPE: &str = "ballista.shuffle.storage_type";
5050
pub const BALLISTA_SHUFFLE_STORAGE_URL: &str = "ballista.shuffle.storage_url";
5151
/// Configuration key for shuffle storage mode (disk or memory).
5252
pub const BALLISTA_SHUFFLE_MEMORY_MODE: &str = "ballista.shuffle.memory_mode";
53-
/// Configuration key indicating if this is the final output stage.
53+
/// Internal configuration key indicating if this is the final output stage.
54+
/// This is set by the scheduler based on stage topology, NOT user-configurable.
5455
/// When true, shuffle data is always written to disk regardless of memory_mode setting.
5556
pub const BALLISTA_IS_FINAL_STAGE: &str = "ballista.shuffle.is_final_stage";
5657
/// Shuffle format configuration: "arrow_ipc" or "vortex"
@@ -109,10 +110,9 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
109110
"When enabled, shuffle data is kept in memory on executors instead of being written to disk. This can improve performance for workloads with sufficient memory.".to_string(),
110111
DataType::Boolean,
111112
Some((false).to_string())),
112-
ConfigEntry::new(BALLISTA_IS_FINAL_STAGE.to_string(),
113-
"When true, indicates this is the final output stage. Final stages always write to disk regardless of memory_mode setting to ensure proper cleanup.".to_string(),
114-
DataType::Boolean,
115-
Some((false).to_string())),
113+
// Note: BALLISTA_IS_FINAL_STAGE is intentionally NOT in CONFIG_ENTRIES.
114+
// It's an internal flag set by the scheduler based on stage topology,
115+
// not a user-configurable setting.
116116
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
117117
"Connection timeout for gRPC client in seconds".to_string(),
118118
DataType::UInt64,
@@ -352,9 +352,21 @@ impl BallistaConfig {
352352
}
353353

354354
/// Returns whether this is the final output stage.
355+
/// This is an internal flag set by the scheduler, not user-configurable.
355356
/// Final stages always write to disk regardless of memory_mode setting.
356357
pub fn is_final_stage(&self) -> bool {
357-
self.get_bool_setting(BALLISTA_IS_FINAL_STAGE)
358+
self.settings
359+
.get(BALLISTA_IS_FINAL_STAGE)
360+
.and_then(|v| v.parse::<bool>().ok())
361+
.unwrap_or(false)
362+
}
363+
364+
/// Sets the internal is_final_stage flag.
365+
/// This should only be called by the scheduler when creating task configurations.
366+
pub fn with_is_final_stage(mut self, is_final: bool) -> Self {
367+
self.settings
368+
.insert(BALLISTA_IS_FINAL_STAGE.to_string(), is_final.to_string());
369+
self
358370
}
359371

360372
/// Returns the configured shuffle format (ArrowIpc or Vortex)

ballista/core/src/extension.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
// under the License.
1717

1818
use crate::config::{
19-
BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_IS_FINAL_STAGE, BALLISTA_JOB_NAME,
20-
BALLISTA_SHUFFLE_FORMAT, BALLISTA_SHUFFLE_MEMORY_MODE,
21-
BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, BALLISTA_SHUFFLE_READER_MAX_REQUESTS,
22-
BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, BALLISTA_SHUFFLE_STORAGE_TYPE,
23-
BALLISTA_SHUFFLE_STORAGE_URL, BALLISTA_STANDALONE_PARALLELISM, BallistaConfig,
24-
ShuffleFormat,
19+
BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, BALLISTA_SHUFFLE_FORMAT,
20+
BALLISTA_SHUFFLE_MEMORY_MODE, BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ,
21+
BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT,
22+
BALLISTA_SHUFFLE_STORAGE_TYPE, BALLISTA_SHUFFLE_STORAGE_URL,
23+
BALLISTA_STANDALONE_PARALLELISM, BallistaConfig, ShuffleFormat,
2524
};
2625
use crate::planner::BallistaQueryPlanner;
2726
use crate::serde::protobuf::KeyValuePair;
@@ -513,16 +512,20 @@ impl SessionConfigExt for SessionConfig {
513512
.extensions
514513
.get::<BallistaConfig>()
515514
.map(|c| c.is_final_stage())
516-
.unwrap_or_else(|| BallistaConfig::default().is_final_stage())
515+
.unwrap_or(false)
517516
}
518517

519518
fn with_ballista_is_final_stage(self, is_final: bool) -> Self {
520-
if self.options().extensions.get::<BallistaConfig>().is_some() {
521-
self.set_bool(BALLISTA_IS_FINAL_STAGE, is_final)
522-
} else {
523-
self.with_option_extension(BallistaConfig::default())
524-
.set_bool(BALLISTA_IS_FINAL_STAGE, is_final)
525-
}
519+
// is_final_stage is an internal flag, not a user-configurable setting,
520+
// so we modify the BallistaConfig directly instead of using set_bool
521+
let ballista_config = self
522+
.options()
523+
.extensions
524+
.get::<BallistaConfig>()
525+
.cloned()
526+
.unwrap_or_default()
527+
.with_is_final_stage(is_final);
528+
self.with_option_extension(ballista_config)
526529
}
527530

528531
fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) -> Self {
@@ -980,7 +983,7 @@ mod test {
980983
};
981984

982985
use crate::{
983-
config::BALLISTA_JOB_NAME,
986+
config::{BALLISTA_JOB_NAME, BallistaConfig},
984987
extension::{SessionConfigExt, SessionConfigHelperExt, SessionStateExt},
985988
};
986989

@@ -1065,17 +1068,21 @@ mod test {
10651068
}
10661069

10671070
#[test]
1068-
fn test_is_final_stage_serialization() {
1069-
use crate::config::BALLISTA_IS_FINAL_STAGE;
1070-
1071-
// Test that is_final_stage is included in key-value pairs
1071+
fn test_is_final_stage_internal_setting() {
1072+
// Test that is_final_stage is properly stored in BallistaConfig
10721073
let config =
10731074
SessionConfig::new_with_ballista().with_ballista_is_final_stage(true);
1074-
let pairs = config.to_key_value_pairs();
10751075

1076-
let is_final_pair = pairs.iter().find(|p| p.key == BALLISTA_IS_FINAL_STAGE);
1077-
assert!(is_final_pair.is_some());
1078-
assert_eq!(is_final_pair.unwrap().value, Some("true".to_string()));
1076+
// Verify via the getter
1077+
assert!(config.ballista_is_final_stage());
1078+
1079+
// Verify the internal BallistaConfig has the setting
1080+
let ballista_config = config
1081+
.options()
1082+
.extensions
1083+
.get::<BallistaConfig>()
1084+
.expect("BallistaConfig should exist");
1085+
assert!(ballista_config.is_final_stage());
10791086
}
10801087

10811088
#[test]

0 commit comments

Comments
 (0)