From 8fa7d7c51b9ad2d1e7cc72fef57db7d5eb958db6 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 16:54:29 +0800 Subject: [PATCH 1/2] feat(hard-split): Standardize naming by hard-split and add metric --- riffle-server/src/app_manager/app.rs | 18 ++++++++++-------- riffle-server/src/client_configs.rs | 11 +++++------ riffle-server/src/error.rs | 4 ++-- riffle-server/src/grpc/service.rs | 2 +- riffle-server/src/metric.rs | 3 +++ 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/riffle-server/src/app_manager/app.rs b/riffle-server/src/app_manager/app.rs index 84a78b3c..afb3613a 100644 --- a/riffle-server/src/app_manager/app.rs +++ b/riffle-server/src/app_manager/app.rs @@ -8,7 +8,7 @@ use crate::app_manager::request_context::{ RequireBufferContext, ShuffleResult, WritingViewContext, }; use crate::block_id_manager::{get_block_id_manager, BlockIdManager}; -use crate::client_configs::STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED; +use crate::client_configs::HARD_SPLIT_ENABLED; use crate::config::Config; use crate::config_reconfigure::ReconfigurableConfManager; use crate::config_ref::{ByteString, ConfigOption}; @@ -16,8 +16,8 @@ use crate::constant::ALL_LABEL; use crate::ddashmap::DDashMap; use crate::error::WorkerError; use crate::metric::{ - BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, RESIDENT_BYTES, - TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED, + BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, HARD_SPLIT_COUNTER, + RESIDENT_BYTES, TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED, TOTAL_PARTITION_NUMBER, TOTAL_READ_DATA, TOTAL_READ_DATA_FROM_LOCALFILE, TOTAL_READ_DATA_FROM_MEMORY, TOTAL_READ_INDEX_FROM_LOCALFILE, TOTAL_RECEIVED_DATA, TOTAL_REQUIRE_BUFFER_FAILED, @@ -348,15 +348,17 @@ impl App { if self .app_config_options .client_configs - .get(&STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED) + .get(&HARD_SPLIT_ENABLED) .unwrap_or(false) + // TODO: If the store is corrupted and only a single replica exists, fail the job fast instead of performing a hard split. && !self.store.is_healthy().await? { warn!( - "[{}] writing is limited due to the unhealthy storage", + "Hard split is activated for [{}] due to the unhealthy storage", &app_id.to_string() ); - return Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE); + HARD_SPLIT_COUNTER.inc(); + return Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE); } let mut partition_split_candidates = HashSet::new(); @@ -516,7 +518,7 @@ mod tests { use crate::app_manager::application_identifier::ApplicationId; use crate::app_manager::partition_identifier::PartitionUId; use crate::app_manager::request_context::RequireBufferContext; - use crate::client_configs::{ClientRssConf, STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED}; + use crate::client_configs::{ClientRssConf, HARD_SPLIT_ENABLED}; use crate::config::StorageType; use crate::config::StorageType::LOCALFILE; use crate::config_reconfigure::ReconfigurableConfManager; @@ -596,7 +598,7 @@ mod tests { healthy_tag.store(false, Ordering::SeqCst); match runtime.block_on(async { app.require_buffer(ctx.clone()).await }) { - Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => { + Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => { // pass } _ => { diff --git a/riffle-server/src/client_configs.rs b/riffle-server/src/client_configs.rs index cd2147e3..8e8e3aa6 100644 --- a/riffle-server/src/client_configs.rs +++ b/riffle-server/src/client_configs.rs @@ -41,12 +41,11 @@ pub static GET_MEMORY_DATA_URPC_VERSION: Lazy> = .with_description("the urpc version of getMemoryData") }); -pub static STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED: Lazy> = - Lazy::new(|| { - ClientConfigOption::key("spark.rss.riffle.storageCapacityPartitionSplitEnabled") - .default_value(false) - .with_description("whether to trigger partition split by the storage capacity") - }); +pub static HARD_SPLIT_ENABLED: Lazy> = Lazy::new(|| { + ClientConfigOption::key("spark.rss.riffle.hardSplitEnabled") + .default_value(false) + .with_description("whether to trigger partition hard split by the server") +}); #[derive(Debug, Clone, Default)] pub struct ClientRssConf { diff --git a/riffle-server/src/error.rs b/riffle-server/src/error.rs index 42458313..18c19aa7 100644 --- a/riffle-server/src/error.rs +++ b/riffle-server/src/error.rs @@ -55,8 +55,8 @@ pub enum WorkerError { #[error("The memory usage is limited by huge partition mechanism")] MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION, - #[error("shuffle writing is limited due to the unhealthy storage state")] - WRITE_LIMITED_BY_STORAGE_STATE, + #[error("trigger hard split due to the unhealthy storage state")] + HARD_SPLIT_BY_UNHEALTHY_STORAGE, #[error("Http request failed. {0}")] HTTP_SERVICE_ERROR(String), diff --git a/riffle-server/src/grpc/service.rs b/riffle-server/src/grpc/service.rs index 1d761b56..8678d25a 100644 --- a/riffle-server/src/grpc/service.rs +++ b/riffle-server/src/grpc/service.rs @@ -925,7 +925,7 @@ impl ShuffleServer for DefaultShuffleServer { "".to_string(), vec![], ), - Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => ( + Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => ( StatusCode::HARD_SPLIT_FROM_SERVER, -1i64, "".to_string(), diff --git a/riffle-server/src/metric.rs b/riffle-server/src/metric.rs index 91d8670f..4a841f6a 100644 --- a/riffle-server/src/metric.rs +++ b/riffle-server/src/metric.rs @@ -576,6 +576,9 @@ pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy = Lazy::new(|| { .unwrap() }); +pub static HARD_SPLIT_COUNTER: Lazy = + Lazy::new(|| IntCounter::new("hard_split_count", "hard_split_count").expect("")); + pub static SERVICE_IS_HEALTHY: Lazy = Lazy::new(|| IntGauge::new("service_is_healthy", "service_is_healthy").expect("")); From c3bc033c9af7e4da04b36946157b3e89dc5e2d20 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 17:18:59 +0800 Subject: [PATCH 2/2] fix --- riffle-server/src/app_manager/app.rs | 5 +---- riffle-server/src/client_configs.rs | 4 ++++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/riffle-server/src/app_manager/app.rs b/riffle-server/src/app_manager/app.rs index afb3613a..f85a889d 100644 --- a/riffle-server/src/app_manager/app.rs +++ b/riffle-server/src/app_manager/app.rs @@ -539,10 +539,7 @@ mod tests { let app_id = ApplicationId::YARN(1, 1, 1); let mut hmap = HashMap::new(); - hmap.insert( - "spark.rss.riffle.storageCapacityPartitionSplitEnabled".to_string(), - "true".to_string(), - ); + hmap.insert(HARD_SPLIT_ENABLED.get_key(), "true".to_string()); let conf = ClientRssConf::from(hmap); let options = AppConfigOptions::new(DataDistribution::NORMAL, 1, None, conf); diff --git a/riffle-server/src/client_configs.rs b/riffle-server/src/client_configs.rs index 8e8e3aa6..466ef26c 100644 --- a/riffle-server/src/client_configs.rs +++ b/riffle-server/src/client_configs.rs @@ -96,6 +96,10 @@ impl ClientConfigOption { self.description = Some(desc.to_string()); self } + + pub fn get_key(&self) -> String { + self.key.clone() + } } #[cfg(test)] mod tests {