Skip to content

Commit 094375d

Browse files
authored
feat(hard-split): Standardize naming by hard-split and add metric (#565)
1 parent 27576c9 commit 094375d

File tree

5 files changed

+26
-21
lines changed

5 files changed

+26
-21
lines changed

riffle-server/src/app_manager/app.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ use crate::app_manager::request_context::{
88
RequireBufferContext, ShuffleResult, WritingViewContext,
99
};
1010
use crate::block_id_manager::{get_block_id_manager, BlockIdManager};
11-
use crate::client_configs::STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED;
11+
use crate::client_configs::HARD_SPLIT_ENABLED;
1212
use crate::config::Config;
1313
use crate::config_reconfigure::ReconfigurableConfManager;
1414
use crate::config_ref::{ByteString, ConfigOption};
1515
use crate::constant::ALL_LABEL;
1616
use crate::ddashmap::DDashMap;
1717
use crate::error::WorkerError;
1818
use crate::metric::{
19-
BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, RESIDENT_BYTES,
20-
TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
19+
BLOCK_ID_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER, HARD_SPLIT_COUNTER,
20+
RESIDENT_BYTES, TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
2121
TOTAL_PARTITION_NUMBER, TOTAL_READ_DATA, TOTAL_READ_DATA_FROM_LOCALFILE,
2222
TOTAL_READ_DATA_FROM_MEMORY, TOTAL_READ_INDEX_FROM_LOCALFILE, TOTAL_RECEIVED_DATA,
2323
TOTAL_REQUIRE_BUFFER_FAILED,
@@ -348,15 +348,17 @@ impl App {
348348
if self
349349
.app_config_options
350350
.client_configs
351-
.get(&STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED)
351+
.get(&HARD_SPLIT_ENABLED)
352352
.unwrap_or(false)
353+
// TODO: If the store is corrupted and only a single replica exists, fail the job fast instead of performing a hard split.
353354
&& !self.store.is_healthy().await?
354355
{
355356
warn!(
356-
"[{}] writing is limited due to the unhealthy storage",
357+
"Hard split is activated for [{}] due to the unhealthy storage",
357358
&app_id.to_string()
358359
);
359-
return Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE);
360+
HARD_SPLIT_COUNTER.inc();
361+
return Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE);
360362
}
361363

362364
let mut partition_split_candidates = HashSet::new();
@@ -516,7 +518,7 @@ mod tests {
516518
use crate::app_manager::application_identifier::ApplicationId;
517519
use crate::app_manager::partition_identifier::PartitionUId;
518520
use crate::app_manager::request_context::RequireBufferContext;
519-
use crate::client_configs::{ClientRssConf, STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED};
521+
use crate::client_configs::{ClientRssConf, HARD_SPLIT_ENABLED};
520522
use crate::config::StorageType;
521523
use crate::config::StorageType::LOCALFILE;
522524
use crate::config_reconfigure::ReconfigurableConfManager;
@@ -537,10 +539,7 @@ mod tests {
537539
let app_id = ApplicationId::YARN(1, 1, 1);
538540

539541
let mut hmap = HashMap::new();
540-
hmap.insert(
541-
"spark.rss.riffle.storageCapacityPartitionSplitEnabled".to_string(),
542-
"true".to_string(),
543-
);
542+
hmap.insert(HARD_SPLIT_ENABLED.get_key(), "true".to_string());
544543
let conf = ClientRssConf::from(hmap);
545544
let options = AppConfigOptions::new(DataDistribution::NORMAL, 1, None, conf);
546545

@@ -596,7 +595,7 @@ mod tests {
596595
healthy_tag.store(false, Ordering::SeqCst);
597596

598597
match runtime.block_on(async { app.require_buffer(ctx.clone()).await }) {
599-
Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => {
598+
Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => {
600599
// pass
601600
}
602601
_ => {

riffle-server/src/client_configs.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ pub static GET_MEMORY_DATA_URPC_VERSION: Lazy<ClientConfigOption<RpcVersion>> =
4141
.with_description("the urpc version of getMemoryData")
4242
});
4343

44-
pub static STORAGE_CAPACITY_PARTITION_SPLIT_ENABLED: Lazy<ClientConfigOption<bool>> =
45-
Lazy::new(|| {
46-
ClientConfigOption::key("spark.rss.riffle.storageCapacityPartitionSplitEnabled")
47-
.default_value(false)
48-
.with_description("whether to trigger partition split by the storage capacity")
49-
});
44+
pub static HARD_SPLIT_ENABLED: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
45+
ClientConfigOption::key("spark.rss.riffle.hardSplitEnabled")
46+
.default_value(false)
47+
.with_description("whether to trigger partition hard split by the server")
48+
});
5049

5150
#[derive(Debug, Clone, Default)]
5251
pub struct ClientRssConf {
@@ -97,6 +96,10 @@ impl<T: Clone + Send + Sync + 'static> ClientConfigOption<T> {
9796
self.description = Some(desc.to_string());
9897
self
9998
}
99+
100+
pub fn get_key(&self) -> String {
101+
self.key.clone()
102+
}
100103
}
101104
#[cfg(test)]
102105
mod tests {

riffle-server/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ pub enum WorkerError {
5555
#[error("The memory usage is limited by huge partition mechanism")]
5656
MEMORY_USAGE_LIMITED_BY_HUGE_PARTITION,
5757

58-
#[error("shuffle writing is limited due to the unhealthy storage state")]
59-
WRITE_LIMITED_BY_STORAGE_STATE,
58+
#[error("trigger hard split due to the unhealthy storage state")]
59+
HARD_SPLIT_BY_UNHEALTHY_STORAGE,
6060

6161
#[error("Http request failed. {0}")]
6262
HTTP_SERVICE_ERROR(String),

riffle-server/src/grpc/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ impl ShuffleServer for DefaultShuffleServer {
925925
"".to_string(),
926926
vec![],
927927
),
928-
Err(WorkerError::WRITE_LIMITED_BY_STORAGE_STATE) => (
928+
Err(WorkerError::HARD_SPLIT_BY_UNHEALTHY_STORAGE) => (
929929
StatusCode::HARD_SPLIT_FROM_SERVER,
930930
-1i64,
931931
"".to_string(),

riffle-server/src/metric.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,9 @@ pub static GAUGE_LOCAL_DISK_IS_HEALTHY: Lazy<IntGaugeVec> = Lazy::new(|| {
576576
.unwrap()
577577
});
578578

579+
pub static HARD_SPLIT_COUNTER: Lazy<IntCounter> =
580+
Lazy::new(|| IntCounter::new("hard_split_count", "hard_split_count").expect(""));
581+
579582
pub static SERVICE_IS_HEALTHY: Lazy<IntGauge> =
580583
Lazy::new(|| IntGauge::new("service_is_healthy", "service_is_healthy").expect(""));
581584

0 commit comments

Comments
 (0)