Skip to content

Commit 38f205c

Browse files
committed
feat: Introduce independent health service for more policies
1 parent a697611 commit 38f205c

File tree

7 files changed

+83
-5
lines changed

7 files changed

+83
-5
lines changed

src/app.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,10 @@ impl AppManager {
766766
self.apps.get(app_id).map(|v| v.value().clone())
767767
}
768768

769+
pub fn get_alive_app_number(&self) -> usize {
770+
self.apps.len()
771+
}
772+
769773
pub fn register(
770774
&self,
771775
app_id: String,

src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ impl Default for RuntimeConfig {
183183

184184
// =========================================================
185185

186+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
187+
pub struct HealthServiceConfig {
188+
pub alive_app_number_max_limit: Option<usize>,
189+
}
190+
191+
// =========================================================
192+
186193
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
187194
pub struct HybridStoreConfig {
188195
#[serde(default = "as_default_memory_spill_high_watermark")]
@@ -280,9 +287,15 @@ pub struct Config {
280287
pub http_monitor_service_port: u16,
281288

282289
pub tracing: Option<TracingConfig>,
290+
291+
#[serde(default = "as_default_health_service_config")]
292+
pub health_service_config: HealthServiceConfig,
283293
}
284294

285295
// ====
296+
fn as_default_health_service_config() -> HealthServiceConfig {
297+
Default::default()
298+
}
286299
fn as_default_hybrid_store_config() -> HybridStoreConfig {
287300
HybridStoreConfig::default()
288301
}

src/health_service.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use crate::app::AppManagerRef;
2+
use crate::config::HealthServiceConfig;
3+
use crate::storage::HybridStorage;
4+
use anyhow::Result;
5+
6+
#[derive(Clone)]
7+
pub struct HealthService {
8+
app_manager_ref: AppManagerRef,
9+
hybrid_storage: HybridStorage,
10+
11+
alive_app_number_limit: Option<usize>,
12+
}
13+
14+
impl HealthService {
15+
pub fn new(
16+
app_manager: &AppManagerRef,
17+
storage: &HybridStorage,
18+
conf: &HealthServiceConfig,
19+
) -> Self {
20+
Self {
21+
app_manager_ref: app_manager.clone(),
22+
hybrid_storage: storage.clone(),
23+
alive_app_number_limit: conf.alive_app_number_max_limit,
24+
}
25+
}
26+
27+
pub async fn is_healthy(&self) -> Result<bool> {
28+
if !self.app_manager_ref.store_is_healthy().await? {
29+
return Ok(false);
30+
}
31+
32+
// for the initial deploy, to ensure the service stable.
33+
// this could be removed in the future.
34+
// case1: app number limit
35+
// case2: once disk unhealthy, mark the service unhealthy
36+
37+
if let Some(limit) = self.alive_app_number_limit {
38+
let alive_app_number = self.app_manager_ref.get_alive_app_number();
39+
if alive_app_number > limit {
40+
return Ok(false);
41+
}
42+
}
43+
44+
Ok(true)
45+
}
46+
}

src/heartbeat.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::app::{AppManagerRef, SHUFFLE_SERVER_ID, SHUFFLE_SERVER_IP};
22
use crate::config::Config;
33
use crate::grpc::protobuf::uniffle::coordinator_server_client::CoordinatorServerClient;
44
use crate::grpc::protobuf::uniffle::{ShuffleServerHeartBeatRequest, ShuffleServerId};
5+
use crate::health_service::HealthService;
56
use crate::runtime::manager::RuntimeManager;
6-
use crate::util::get_local_ip;
77
use log::info;
88
use std::time::Duration;
99
use tonic::transport::Channel;
@@ -13,7 +13,16 @@ const DEFAULT_SHUFFLE_SERVER_TAG: &str = "ss_v4";
1313
pub struct HeartbeatTask;
1414

1515
impl HeartbeatTask {
16-
pub fn init(config: &Config, runtime_manager: RuntimeManager, app_manager: AppManagerRef) {
16+
pub fn init(
17+
config: &Config,
18+
runtime_manager: &RuntimeManager,
19+
app_manager: &AppManagerRef,
20+
health_service: &HealthService,
21+
) {
22+
let runtime_manager = runtime_manager.clone();
23+
let app_manager = app_manager.clone();
24+
let health_service = health_service.clone();
25+
1726
let coordinator_quorum = config.coordinator_quorum.clone();
1827
let tags = config.tags.clone().unwrap_or(vec![]);
1928

@@ -48,7 +57,7 @@ impl HeartbeatTask {
4857
all_tags.push(DEFAULT_SHUFFLE_SERVER_TAG.to_string());
4958
all_tags.extend_from_slice(&*tags);
5059

51-
let healthy = app_manager.store_is_healthy().await.unwrap_or(false);
60+
let healthy = health_service.is_healthy().await.unwrap_or(false);
5261
let memory_snapshot = app_manager
5362
.store_memory_snapshot()
5463
.await

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub mod urpc;
4040
pub mod util;
4141

4242
pub mod event_bus;
43+
mod health_service;
4344
mod kerberos;
4445
mod reject;
4546
pub mod semaphore_with_index;

src/main.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use crate::app::AppManager;
2121
use crate::common::init_global_variable;
2222
use crate::config::Config;
23+
use crate::health_service::HealthService;
2324
use crate::heartbeat::HeartbeatTask;
2425
use crate::http::{HTTPServer, HttpMonitorService};
2526
use crate::log_service::LogService;
@@ -46,6 +47,7 @@ pub mod constant;
4647
mod error;
4748
pub mod event_bus;
4849
pub mod grpc;
50+
pub mod health_service;
4951
pub mod heartbeat;
5052
mod http;
5153
pub mod kerberos;
@@ -96,9 +98,12 @@ fn main() -> Result<()> {
9698
let app_manager_ref = AppManager::get_ref(runtime_manager.clone(), config.clone(), &storage);
9799
storage.with_app_manager(&app_manager_ref);
98100

101+
let health_service =
102+
HealthService::new(&app_manager_ref, &storage, &config.health_service_config);
103+
99104
MetricService::init(&config, runtime_manager.clone());
100105
FastraceWrapper::init(config.clone());
101-
HeartbeatTask::init(&config, runtime_manager.clone(), app_manager_ref.clone());
106+
HeartbeatTask::init(&config, &runtime_manager, &app_manager_ref, &health_service);
102107
HttpMonitorService::init(&config, runtime_manager.clone());
103108

104109
DefaultRpcService {}.start(&config, runtime_manager, app_manager_ref)?;

src/store/hybrid.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ impl Store for HybridStore {
536536
let cold = check_healthy(self.cold_store.as_ref())
537537
.await
538538
.unwrap_or(false);
539-
Ok(self.hot_store.is_healthy().await? && (warm || cold))
539+
Ok(self.hot_store.is_healthy().await? && warm && cold)
540540
}
541541

542542
#[trace]

0 commit comments

Comments
 (0)