Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions riffle-server/src/app_manager/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ use crate::app_manager::request_context::{
RequireBufferContext, ShuffleResult, WritingViewContext,
};
use crate::block_id_manager::{get_block_id_manager, BlockIdManager};
use crate::client_configs::STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED;
use crate::client_configs::HARD_SPLIT_ENABLED;
use crate::config::Config;
use crate::config_reconfigure::ReconfigurableConfManager;
use crate::config_ref::{ByteString, ConfigOption};
use crate::constant::ALL_LABEL;
use crate::ddashmap::DDashMap;
use crate::error::WorkerError;
use crate::metric::{
BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, RESIDENT_BYTES,
TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, HARD_SPLIT_COUNTER,
RESIDENT_BYTES, TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
TOTAL_PARTITION_NUMBER, TOTAL_READ_DATA, TOTAL_READ_DATA_FROM_LOCALFILE,
TOTAL_READ_DATA_FROM_MEMORY, TOTAL_READ_INDEX_FROM_LOCALFILE, TOTAL_RECEIVED_DATA,
TOTAL_REQUIRE_BUFFER_FAILED,
Expand Down Expand Up @@ -348,15 +348,17 @@ impl App {
if self
.app_config_options
.client_configs
.get(&STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED)
.get(&HARD_SPLIT_ENABLED)
.unwrap_or(false)
// TODO: If the store is corrupted and only a single replica exists, fail the job fast instead of performing a hard split.
&& !self.store.is_healthy().await?
{
warn!(
"[{}] writing is limited due to the unhealthy storage",
"Hard split is activated for [{}] due to the unhealthy storage",
&app_id.to_string()
);
return Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE);
HARD_SPLIT_COUNTER.inc();
return Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE);
}

let mut partition_split_candidates = HashSet::new();
Expand Down Expand Up @@ -516,7 +518,7 @@ mod tests {
use crate::app_manager::application_identifier::ApplicationId;
use crate::app_manager::partition_identifier::PartitionUId;
use crate::app_manager::request_context::RequireBufferContext;
use crate::client_configs::{ClientRssConf, STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED};
use crate::client_configs::{ClientRssConf, HARD_SPLIT_ENABLED};
use crate::config::StorageType;
use crate::config::StorageType::LOCALFILE;
use crate::config_reconfigure::ReconfigurableConfManager;
Expand All @@ -537,10 +539,7 @@ mod tests {
let app_id = ApplicationId::YARN(1, 1, 1);

let mut hmap = HashMap::new();
hmap.insert(
"spark.rss.riffle.storageCapacityPartitionSplitEnabled".to_string(),
"true".to_string(),
);
hmap.insert(HARD_SPLIT_ENABLED.get_key(), "true".to_string());
let conf = ClientRssConf::from(hmap);
let options = AppConfigOptions::new(DataDistribution::NORMAL, 1, None, conf);

Expand Down Expand Up @@ -596,7 +595,7 @@ mod tests {
healthy_tag.store(false, Ordering::SeqCst);

match runtime.block_on(async { app.require_buffer(ctx.clone()).await }) {
Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => {
Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => {
// pass
}
_ => {
Expand Down
15 changes: 9 additions & 6 deletions riffle-server/src/client_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ pub static GET_MEMORY_DATA_URPC_VERSION: Lazy<ClientConfigOption<RpcVersion>> =
.with_description("the urpc version of getMemoryData")
});

pub static STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED: Lazy<ClientConfigOption<bool>> =
Lazy::new(|| {
ClientConfigOption::key("spark.rss.riffle.storageCapacityPartitionSplitEnabled")
.default_value(false)
.with_description("whether to trigger partition split by the storage capacity")
});
pub static HARD_SPLIT_ENABLED: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
ClientConfigOption::key("spark.rss.riffle.hardSplitEnabled")
.default_value(false)
.with_description("whether to trigger partition hard split by the server")
});

#[derive(Debug, Clone, Default)]
pub struct ClientRssConf {
Expand Down Expand Up @@ -97,6 +96,10 @@ impl<T: Clone + Send + Sync + 'static> ClientConfigOption<T> {
self.description = Some(desc.to_string());
self
}

pub fn get_key(&self) -> String {
self.key.clone()
}
}
#[cfg(test)]
mod tests {
Expand Down
4 changes: 2 additions & 2 deletions riffle-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub enum WorkerError {
#[error("The memory usage is limited by huge partition mechanism")]
MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION,

#[error("shuffle writing is limited due to the unhealthy storage state")]
WRITE_LIMITED_BY_STORAGE_STATE,
#[error("trigger hard split due to the unhealthy storage state")]
HARD_SPLIT_BY_UNHEALTHY_STORAGE,

#[error("Http request failed. {0}")]
HTTP_SERVICE_ERROR(String),
Expand Down
2 changes: 1 addition & 1 deletion riffle-server/src/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ impl ShuffleServer for DefaultShuffleServer {
"".to_string(),
vec![],
),
Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => (
Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => (
StatusCode::HARD_SPLIT_FROM_SERVER,
-1i64,
"".to_string(),
Expand Down
3 changes: 3 additions & 0 deletions riffle-server/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,9 @@ pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

pub static HARD_SPLIT_COUNTER: Lazy<IntCounter> =
Lazy::new(|| IntCounter::new("hard_split_count", "hard_split_count").expect(""));

pub static SERVICE_IS_HEALTHY: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("service_is_healthy", "service_is_healthy").expect(""));

Expand Down
Loading