Skip to content
6 changes: 6 additions & 0 deletions etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Container:
RuntimeDelete: /usr/bin/runc --rootless=true --root=/run/user/%U/ delete --force %u.%U.%j.%x
RuntimeRun: /usr/bin/runc --rootless=true --root=/run/user/%U/ run -b %b %u.%U.%j.%x

HealthCheck:
Program: /usr/sbin/nhc
Interval: 60
NodeState: ANY # IDLE, ALLOC, MIXED, ANY
Cycle: false

Plugin:
# Toggle the plugin module in CraneSched
Enabled: false
Expand Down
18 changes: 18 additions & 0 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,21 @@ message QueryTxnLogReply {
repeated Txn txn_log_list = 3;
}

message SendHealthCheckResultRequest {
string craned_id = 1;
bool healthy = 2;
}

message QueryNodeStateRequest {
string craned_id = 1;
}

message QueryNodeStateReply {
CranedResourceState state = 1;
bool drain = 2;
bool ok = 3;
}

// Todo: Divide service into two parts: one for Craned and one for Crun
// We need to distinguish the message sender
// and have some kind of authentication
Expand Down Expand Up @@ -988,6 +1003,9 @@ service CraneCtldForInternal {
rpc CranedRegister(CranedRegisterRequest) returns (CranedRegisterReply);
rpc CranedPing(CranedPingRequest) returns (CranedPingReply);

rpc SendHealthCheckResult(SendHealthCheckResultRequest) returns (google.protobuf.Empty);
rpc QueryNodeState(QueryNodeStateRequest) returns (QueryNodeStateReply);

/* RPCs called from Cfored */
rpc CforedStream(stream StreamCforedRequest) returns (stream StreamCtldReply);
}
Expand Down
45 changes: 45 additions & 0 deletions src/CraneCtld/CranedMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,51 @@ crane::grpc::ModifyCranedStateReply CranedMetaContainer::ChangeNodeState(
return reply;
}

void CranedMetaContainer::UpdateNodeStateWithHealthCheck_(
const CranedId& craned_id, bool is_health) {
if (!craned_meta_map_.Contains(craned_id)) {
CRANE_ERROR(
"Health check: unknown craned_id '{}', cannot update drain state.",
craned_id);
return;
}

auto craned_meta = craned_meta_map_[craned_id];
// only update drain state when the reason is 'Node failed health check'
if (craned_meta->drain &&
craned_meta->state_reason == HealthCheckFailedReason) {
craned_meta->drain = !is_health;
if (is_health) craned_meta->state_reason.clear();
} else if (!craned_meta->drain && !is_health) {
craned_meta->drain = true;
craned_meta->state_reason = HealthCheckFailedReason;
}
CRANE_TRACE("Health check: craned_id '{}' drain state changed to {}.",
craned_id, craned_meta->drain);
}

void CranedMetaContainer::QueryNodeState(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • ALLOC
    在 ALLOC 状态(所有 CPU 均已分配)的节点上运行。
  • ANY
    在任意状态的节点上运行。
  • CYCLE
    不是在所有节点上同时运行健康检查程序,而是在整个 HealthCheckInterval 周期内轮流在所有计算节点上执行。可以与其他节点状态选项组合。
  • IDLE
    在 IDLE 状态(空闲)的节点上运行。
  • NONDRAINED_IDLE
    在处于 IDLE 状态且未被 DRAINED 的节点上运行。
  • MIXED
    在 MIXED 状态(部分 CPU 空闲,部分 CPU 已分配)的节点上运行。
  • START_ONLY
    仅在 slurmd 守护进程启动时运行。

const CranedId& craned_id, crane::grpc::QueryNodeStateReply* response) {
if (!craned_meta_map_.Contains(craned_id)) {
CRANE_ERROR(
"Health check: unknown craned_id '{}', cannot update drain state.",
craned_id);
response->set_ok(false);
return;
}

auto craned_meta = craned_meta_map_[craned_id];
if (craned_meta->res_in_use.IsZero())
response->set_state(crane::grpc::CranedResourceState::CRANE_IDLE);
else if (craned_meta->res_avail.allocatable_res.IsAnyZero())
response->set_state(crane::grpc::CranedResourceState::CRANE_ALLOC);
else
response->set_state(crane::grpc::CranedResourceState::CRANE_MIX);

response->set_drain(craned_meta->drain);
response->set_ok(true);
}

CraneExpected<void> CranedMetaContainer::ModifyPartitionAcl(
const std::string& partition_name, bool is_allowed_list,
std::unordered_set<std::string>&& accounts) {
Expand Down
6 changes: 6 additions & 0 deletions src/CraneCtld/CranedMetaContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class CranedMetaContainer final {
crane::grpc::ModifyCranedStateReply ChangeNodeState(
const crane::grpc::ModifyCranedStateRequest& request);

void UpdateNodeStateWithHealthCheck_(const CranedId& craned_id,
bool is_health);

void QueryNodeState(const CranedId& craned_id,
crane::grpc::QueryNodeStateReply* response);

CraneExpected<void> ModifyPartitionAcl(
const std::string& partition_name, bool is_allowed_list,
std::unordered_set<std::string>&& accounts);
Expand Down
19 changes: 19 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ grpc::Status CtldForInternalServiceImpl::CranedPing(
return grpc::Status::OK;
}

grpc::Status CtldForInternalServiceImpl::SendHealthCheckResult(
grpc::ServerContext *context,
const crane::grpc::SendHealthCheckResultRequest *request,
google::protobuf::Empty *response) {
g_meta_container->UpdateNodeStateWithHealthCheck_(request->craned_id(),
request->healthy());

return grpc::Status::OK;
}

grpc::Status CtldForInternalServiceImpl::QueryNodeState(
grpc::ServerContext *context,
const crane::grpc::QueryNodeStateRequest *request,
crane::grpc::QueryNodeStateReply *response) {
g_meta_container->QueryNodeState(request->craned_id(), response);

return grpc::Status::OK;
}

grpc::Status CtldForInternalServiceImpl::CforedStream(
grpc::ServerContext *context,
grpc::ServerReaderWriter<crane::grpc::StreamCtldReply,
Expand Down
10 changes: 10 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ class CtldForInternalServiceImpl final
const crane::grpc::CranedPingRequest *request,
crane::grpc::CranedPingReply *response) override;

grpc::Status SendHealthCheckResult(
grpc::ServerContext *context,
const crane::grpc::SendHealthCheckResultRequest *request,
google::protobuf::Empty *response) override;

grpc::Status QueryNodeState(
grpc::ServerContext *context,
const crane::grpc::QueryNodeStateRequest *request,
crane::grpc::QueryNodeStateReply *response) override;

grpc::Status CforedStream(
grpc::ServerContext *context,
grpc::ServerReaderWriter<crane::grpc::StreamCtldReply,
Expand Down
35 changes: 35 additions & 0 deletions src/Craned/Core/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,41 @@ void ParseConfig(int argc, char** argv) {
}
}

if (config["HealthCheck"]) {
const auto& health_check_config = config["HealthCheck"];
g_config.HealthCheck.Program =
YamlValueOr(health_check_config["Program"], "");
if (g_config.HealthCheck.Program.empty()) {
CRANE_ERROR("HealthCheckProgram is not configured");
std::exit(1);
}
g_config.HealthCheck.Interval =
YamlValueOr<uint64_t>(health_check_config["Interval"], 0L);
std::string node_state;
node_state = absl::StripAsciiWhitespace(absl::AsciiStrToLower(
YamlValueOr(health_check_config["NodeState"], "any")));
if (node_state != "any" && node_state != "idle" &&
node_state != "mixed" && node_state != "alloc") {
CRANE_WARN("HealthCheckNodeState is not valid, reset to any");
node_state = "any";
}
if (node_state == "any") {
g_config.HealthCheck.NodeState =
Craned::Config::HealthCheckConfig::ANY;
} else if (node_state == "idle") {
g_config.HealthCheck.NodeState =
Craned::Config::HealthCheckConfig::IDLE;
} else if (node_state == "mixed") {
g_config.HealthCheck.NodeState =
Craned::Config::HealthCheckConfig::MIXED;
} else if (node_state == "alloc") {
g_config.HealthCheck.NodeState =
Craned::Config::HealthCheckConfig::ALLOC;
}
g_config.HealthCheck.Cycle =
YamlValueOr<bool>(health_check_config["Cycle"], false);
}
Comment on lines +664 to +684
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Allow parsing of the NONDRAINED_IDLE option.

HealthCheckConfig::NodeStateEnum (see src/Craned/Core/CranedPublicDefs.h) now exposes NONDRAINED_IDLE, but this block treats it as invalid and silently downgrades it to any, so operators can never activate the new behavior. Please accept the token when present.

-          if (node_state != "any" && node_state != "idle" &&
-              node_state != "mixed" && node_state != "alloc") {
+          if (node_state != "any" && node_state != "idle" &&
+              node_state != "mixed" && node_state != "alloc" &&
+              node_state != "nondrained_idle") {
             CRANE_WARN("HealthCheckNodeState is not valid, reset to any");
             node_state = "any";
           }
@@
-          } else if (node_state == "alloc") {
+          } else if (node_state == "alloc") {
             g_config.HealthCheck.NodeState =
                 Craned::Config::HealthCheckConfig::ALLOC;
+          } else if (node_state == "nondrained_idle") {
+            g_config.HealthCheck.NodeState =
+                Craned::Config::HealthCheckConfig::NONDRAINED_IDLE;
           }
🤖 Prompt for AI Agents
In src/Craned/Core/Craned.cpp around lines 664 to 684, the validation currently
rejects the new NONDRAINED_IDLE node state and resets it to "any"; update the
logic to accept the token (e.g. "nondrained_idle") by adding it to the initial
validity check and add an else-if branch that sets
g_config.HealthCheck.NodeState =
Craned::Config::HealthCheckConfig::NONDRAINED_IDLE when node_state equals the
new token; ensure the token string matches the canonical representation used
elsewhere (case/underscore) so operators can enable the new behavior.


if (config["Plugin"]) {
const auto& plugin_config = config["Plugin"];
g_config.Plugin.Enabled =
Expand Down
9 changes: 9 additions & 0 deletions src/Craned/Core/CranedPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ struct Config {
};
ContainerConfig Container;

struct HealthCheckConfig {
enum NodeStateEnum { IDLE, ALLOC, MIXED, ANY, NONDRAINED_IDLE };
std::string Program;
uint64_t Interval{0};
NodeStateEnum NodeState;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同时指定多个state

bool Cycle;
};
HealthCheckConfig HealthCheck;

struct PluginConfig {
bool Enabled{false};
std::string PlugindSockPath;
Expand Down
132 changes: 132 additions & 0 deletions src/Craned/Core/CtldClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ CtldClient::~CtldClient() {
CRANE_TRACE("Waiting for CtldClient thread to finish.");
if (m_async_send_thread_.joinable()) m_async_send_thread_.join();
if (m_uvw_thread_.joinable()) m_uvw_thread_.join();
if (m_health_check_thread_.joinable()) m_health_check_thread_.join();
}

void CtldClient::Init() {
Expand Down Expand Up @@ -460,6 +461,24 @@ void CtldClient::InitGrpcChannel(const std::string& server_address) {
// std::unique_ptr will automatically release the dangling stub.
m_stub_ = CraneCtldForInternal::NewStub(m_ctld_channel_);

if (g_config.HealthCheck.Interval > 0L) {
HealthCheck_();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个需要马上执行一次吗?这个可能会影响Craned的上线速度吧

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需求里是需要执行,确保故障节点不会连上。

m_health_check_thread_ = std::thread([this] {
std::mt19937 rng{std::random_device{}()};
do {
uint64_t interval = g_config.HealthCheck.Interval;
int delay = interval;
if (g_config.HealthCheck.Cycle) {
std::uniform_int_distribution<int> dist(1, interval);
delay = dist(rng);
}
std::this_thread::sleep_for(std::chrono::seconds(delay));
if (m_stopping_ || !m_stub_) return;
if (CheckNodeState_()) HealthCheck_();
} while (true);
});
}

m_async_send_thread_ = std::thread([this] { AsyncSendThread_(); });
}

Expand Down Expand Up @@ -703,6 +722,119 @@ void CtldClient::SendStatusChanges_() {
}
}

void CtldClient::SendHealthCheckResult_(bool is_health) const {
if (m_stopping_ || !m_stub_) return;

grpc::ClientContext context;
crane::grpc::SendHealthCheckResultRequest request;
google::protobuf::Empty reply;

request.set_craned_id(g_config.CranedIdOfThisNode);
request.set_healthy(is_health);

auto result = m_stub_->SendHealthCheckResult(&context, request, &reply);
if (!result.ok()) {
CRANE_ERROR("SendHealthCheckResult failed: is_health={}", is_health);
}
Comment on lines +725 to +738
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add an RPC deadline to avoid hanging the health-check thread.

SendHealthCheckResult_ invokes m_stub_->SendHealthCheckResult with a naked grpc::ClientContext. If Ctld stops responding, this call blocks indefinitely and the health-check thread never recovers. All other outbound RPCs in this class set context.set_deadline(...); we should do the same here (e.g. reuse kCranedRpcTimeoutSeconds).

🤖 Prompt for AI Agents
In src/Craned/Core/CtldClient.cpp around lines 725 to 738, the health-check RPC
uses a bare grpc::ClientContext causing a potential indefinite block; set a
deadline on the context (e.g.
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCranedRpcTimeoutSeconds))) before calling
m_stub_->SendHealthCheckResult so the call times out like the other RPCs, and
ensure necessary <chrono> includes and the kCranedRpcTimeoutSeconds constant are
available.

}

void CtldClient::HealthCheck_() {
if (!g_server->ReadyFor(RequestSource::CTLD)) return;

CRANE_DEBUG("Health checking.....");

subprocess_s subprocess{};
std::vector<const char*> argv = {g_config.HealthCheck.Program.c_str(),
nullptr};

if (subprocess_create(argv.data(), 0, &subprocess) != 0) {
CRANE_ERROR(
"[Craned Subprocess] HealthCheck subprocess creation failed: {}.",
strerror(errno));
SendHealthCheckResult_(false);
return;
}

pid_t pid = subprocess.child;
int result = 0;

auto fut = std::async(std::launch::async,
[pid, &result]() { return waitpid(pid, &result, 0); });

bool child_exited = false;
if (fut.wait_for(std::chrono::milliseconds(MaxHealthCheckWaitTime)) ==
std::future_status::ready) {
if (fut.get() == pid) {
child_exited = true;
}
}

auto read_stream = [](std::FILE* f) {
std::string out;
char buf[4096];
while (std::fgets(buf, sizeof(buf), f)) out.append(buf);
return out;
};

if (!child_exited) {
kill(pid, SIGKILL);
waitpid(pid, &result, 0);
std::string stdout_str = read_stream(subprocess_stdout(&subprocess));
std::string stderr_str = read_stream(subprocess_stderr(&subprocess));
CRANE_WARN("HealthCheck: Timeout. stdout: {}, stderr: {}", stdout_str,
stderr_str);
SendHealthCheckResult_(false);
subprocess_destroy(&subprocess);
return;
}

if (subprocess_destroy(&subprocess) != 0)
CRANE_ERROR("[Craned Subprocess] HealthCheck destroy failed.");

if (result != 0) {
std::string stdout_str = read_stream(subprocess_stdout(&subprocess));
std::string stderr_str = read_stream(subprocess_stderr(&subprocess));
CRANE_WARN("HealthCheck: Failed (exit code:{}). stdout: {}, stderr: {}",
result, stdout_str, stderr_str);
SendHealthCheckResult_(false);
return;
}

CRANE_DEBUG("Health check success.");
SendHealthCheckResult_(true);
}

bool CtldClient::CheckNodeState_() {
if (g_config.HealthCheck.NodeState == Config::HealthCheckConfig::ANY)
return true;

grpc::ClientContext context;
crane::grpc::QueryNodeStateRequest req;
crane::grpc::QueryNodeStateReply reply;
req.set_craned_id(g_config.CranedIdOfThisNode);
auto result = m_stub_->QueryNodeState(&context, req, &reply);
if (!result.ok() || !reply.ok()) {
CRANE_ERROR("QueryNodeState failed");
return false;
}

Comment on lines +811 to +820
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set a deadline on the QueryNodeState RPC.

CheckNodeState_ issues QueryNodeState with no deadline, so a stalled Ctld will block this worker thread (and therefore all future health checks) indefinitely. Please mirror the other call sites by setting a sensible deadline on the ClientContext before invoking the RPC.

🤖 Prompt for AI Agents
In src/Craned/Core/CtldClient.cpp around lines 811 to 820, the ClientContext for
QueryNodeState has no deadline which can hang the health-check thread; set a
deadline on the grpc::ClientContext (mirror other call sites) before calling
m_stub_->QueryNodeState, using a sensible timeout (e.g., a short configurable
duration or a constant like 1s–5s) calculated from
std::chrono::system_clock::now() plus that duration, then proceed with the RPC
and existing result/error handling.

switch (g_config.HealthCheck.NodeState) {
case Config::HealthCheckConfig::NONDRAINED_IDLE:
return !reply.drain() &&
reply.state() == crane::grpc::CranedResourceState::CRANE_IDLE;
case Config::HealthCheckConfig::IDLE:
return reply.state() == crane::grpc::CranedResourceState::CRANE_IDLE;
case Config::HealthCheckConfig::MIXED:
return reply.state() == crane::grpc::CranedResourceState::CRANE_MIX;
case Config::HealthCheckConfig::ALLOC:
return reply.state() == crane::grpc::CranedResourceState::CRANE_ALLOC;
case Config::HealthCheckConfig::ANY:
break;
}

return false;
}

bool CtldClient::Ping_() {
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() +
Expand Down
8 changes: 8 additions & 0 deletions src/Craned/Core/CtldClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ class CtldClient {

void SendStatusChanges_();

void SendHealthCheckResult_(bool is_health) const;

void HealthCheck_();

bool CheckNodeState_();

absl::Mutex m_step_status_change_mtx_;

std::list<TaskStatusChangeQueueElem> m_step_status_change_list_
Expand Down Expand Up @@ -237,6 +243,8 @@ class CtldClient {
std::shared_ptr<uvw::timer_handle> m_ping_handle_;
std::atomic_bool m_ping_ctld_{false};
std::atomic<std::chrono::steady_clock::time_point> m_last_active_time_;

std::thread m_health_check_thread_;
};

} // namespace Craned
Expand Down
3 changes: 3 additions & 0 deletions src/Utilities/PublicHeader/include/crane/PublicHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ using CraneExpectedRich = std::expected<T, CraneRichError>;
constexpr const char* kLogPattern =
"[%^%L%$ %C-%m-%d %H:%M:%S.%e %s:%#][%n] %v";

constexpr const char* HealthCheckFailedReason = "Node failed health check";
constexpr int MaxHealthCheckWaitTime = 60000;

inline const char* const kDefaultHost = "0.0.0.0";

inline const char* kCtldDefaultPort = "10011";
Expand Down