Skip to content

Commit 848b306

Browse files
committed
feat: Catch up the localfile/hdfs underlying flush failure
1 parent cb3c718 commit 848b306

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

src/metric.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,13 @@ pub static TOTAL_MEMORY_SPILL_IN_FLUSHING_OPERATION: Lazy<IntCounter> = Lazy::ne
322322
.expect("metric should be created")
323323
});
324324
pub static TOTAL_MEMORY_SPILL_OPERATION_FAILED: Lazy<IntCounter> = Lazy::new(|| {
325-
IntCounter::new("total_memory_spill_failed", "memory capacity")
326-
.expect("metric should be created")
325+
IntCounter::new("total_memory_spill_failed", "").expect("metric should be created")
326+
});
327+
pub static TOTAL_MEMORY_SPILL_TO_LOCALFILE_OPERATION_FAILED: Lazy<IntCounter> = Lazy::new(|| {
328+
IntCounter::new("total_memory_to_localfile_spill_failed", "").expect("metric should be created")
329+
});
330+
pub static TOTAL_MEMORY_SPILL_TO_HDFS_OPERATION_FAILED: Lazy<IntCounter> = Lazy::new(|| {
331+
IntCounter::new("total_memory_to_hdfs_spill_failed", "").expect("metric should be created")
327332
});
328333
pub static TOTAL_MEMORY_SPILL_TO_LOCALFILE: Lazy<IntCounter> = Lazy::new(|| {
329334
IntCounter::new(
@@ -631,7 +636,17 @@ fn register_custom_metrics() {
631636
.expect("total_memory_spill_operation must be registered");
632637
REGISTRY
633638
.register(Box::new(TOTAL_MEMORY_SPILL_OPERATION_FAILED.clone()))
634-
.expect("total_memory_spill_operation_failed must be registered");
639+
.expect("total_memory_spill_failed must be registered");
640+
REGISTRY
641+
.register(Box::new(
642+
TOTAL_MEMORY_SPILL_TO_LOCALFILE_OPERATION_FAILED.clone(),
643+
))
644+
.expect("total_memory_to_localfile_spill_failed must be registered");
645+
REGISTRY
646+
.register(Box::new(
647+
TOTAL_MEMORY_SPILL_TO_HDFS_OPERATION_FAILED.clone(),
648+
))
649+
.expect("total_memory_to_hdfs_spill_failed must be registered");
635650
REGISTRY
636651
.register(Box::new(TOTAL_APP_NUMBER.clone()))
637652
.expect("total_app_number must be registered");

src/store/spill/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::app::PartitionedUId;
22
use crate::config::StorageType;
33
use crate::error::WorkerError;
44
use crate::metric::{
5-
TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_SPILL_EVENTS_DROPPED,
5+
TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS_OPERATION_FAILED,
6+
TOTAL_MEMORY_SPILL_TO_LOCALFILE_OPERATION_FAILED, TOTAL_SPILL_EVENTS_DROPPED,
67
TOTAL_SPILL_EVENTS_DROPPED_WITH_APP_NOT_FOUND,
78
};
89
use crate::store::hybrid::{HybridStore, PersistentStore};
@@ -127,6 +128,15 @@ async fn handle_spill_failure(
127128
}
128129
error => {
129130
TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
131+
if let Some(stype) = message.get_candidate_storage_type() {
132+
match stype {
133+
StorageType::LOCALFILE => {
134+
TOTAL_MEMORY_SPILL_TO_LOCALFILE_OPERATION_FAILED.inc()
135+
}
136+
StorageType::HDFS => TOTAL_MEMORY_SPILL_TO_HDFS_OPERATION_FAILED.inc(),
137+
_ => {}
138+
}
139+
}
130140
error!(
131141
"Errors on spill memory data to persistent storage. The error: {:#?}",
132142
error

0 commit comments

Comments
 (0)