Skip to content
Open
6 changes: 5 additions & 1 deletion etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ Craned:
PingInterval: 15
# register operation timeout in seconds
CraneCtldTimeout: 5

# Set the interval for node health check.
# 0 means the health check is disabled.
NodeHealthCheckInterval: 0

# Scheduling settings
# Current implemented scheduling algorithms are:
Expand Down Expand Up @@ -142,6 +144,8 @@ JobFileAppend: false
# Set the flag to ignore warnings about config files mismatches.
IgnoreConfigInconsistency: false



Supervisor:
Path: /usr/libexec/csupervisor
# Supervisor log level
Expand Down
8 changes: 7 additions & 1 deletion protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,12 @@ message QueryTxnLogReply {
repeated Txn txn_log_list = 3;
}

message CranedReportHealthRequest {
string craned_id = 1;
bool healthy = 2;
string reason = 3;
}

// Todo: Divide service into two parts: one for Craned and one for Crun
// We need to distinguish the message sender
// and have some kind of authentication
Expand Down Expand Up @@ -1007,7 +1013,7 @@ service CraneCtldForInternal {
rpc CranedTriggerReverseConn(CranedTriggerReverseConnRequest) returns (google.protobuf.Empty);
rpc CranedRegister(CranedRegisterRequest) returns (CranedRegisterReply);
rpc CranedPing(CranedPingRequest) returns (CranedPingReply);

rpc CranedReportHealth(CranedReportHealthRequest) returns (google.protobuf.Empty);
/* RPCs called from Cfored */
rpc CforedStream(stream StreamCforedRequest) returns (stream StreamCtldReply);
}
Expand Down
24 changes: 24 additions & 0 deletions src/CraneCtld/CranedMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,30 @@ CraneExpected<void> CranedMetaContainer::ModifyPartitionAcl(
return result;
}

void CranedMetaContainer::UpdateNodeStateWithNodeHealthCheck(
const CranedId& craned_id, bool is_healthy, const std::string& reason) {
if (!craned_meta_map_.Contains(craned_id)) {
CRANE_ERROR(
"Node health check: unknown craned_id '{}', cannot update drain state.",
craned_id);
return;
}

auto craned_meta = craned_meta_map_[craned_id];

if (craned_meta->drain && craned_meta->state_reason == reason) {
craned_meta->drain = !is_healthy;
if (is_healthy) craned_meta->state_reason.clear();
} else if (!craned_meta->drain && !is_healthy) {
craned_meta->drain = true;
craned_meta->state_reason = reason;
}

CRANE_WARN(
"Node health check: craned_id '{}' drain state changed to {}. Reason: {}",
craned_id, craned_meta->drain, reason);
}

CraneExpected<void> CranedMetaContainer::CheckIfAccountIsAllowedInPartition(
const std::string& partition_name, const std::string& account_name) {
auto part_metas_map = partition_meta_map_.GetMapSharedPtr();
Expand Down
3 changes: 3 additions & 0 deletions src/CraneCtld/CranedMetaContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class CranedMetaContainer final {
crane::grpc::ModifyCranedStateReply ChangeNodeState(
const crane::grpc::ModifyCranedStateRequest& request);

void UpdateNodeStateWithNodeHealthCheck(const CranedId& craned_id,
bool is_healthy,
const std::string& reason);
CraneExpected<void> ModifyPartitionAcl(
const std::string& partition_name, bool is_allowed_list,
std::unordered_set<std::string>&& accounts);
Expand Down
10 changes: 10 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ grpc::Status CtldForInternalServiceImpl::CranedPing(
return grpc::Status::OK;
}

grpc::Status CtldForInternalServiceImpl::CranedReportHealth(
grpc::ServerContext *context,
const crane::grpc::CranedReportHealthRequest *request,
google::protobuf::Empty *response) {
g_meta_container->UpdateNodeStateWithNodeHealthCheck(
request->craned_id(), request->healthy(), request->reason());

return grpc::Status::OK;
}

grpc::Status CtldForInternalServiceImpl::CforedStream(
grpc::ServerContext *context,
grpc::ServerReaderWriter<crane::grpc::StreamCtldReply,
Expand Down
5 changes: 5 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ class CtldForInternalServiceImpl final
crane::grpc::StreamCforedRequest> *stream)
override;

grpc::Status CranedReportHealth(
grpc::ServerContext *context,
const crane::grpc::CranedReportHealthRequest *request,
google::protobuf::Empty *response) override;

private:
CtldServer *m_ctld_server_;
};
Expand Down
3 changes: 3 additions & 0 deletions src/Craned/Core/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ void ParseCranedConfig(const YAML::Node& config) {
conf.PingIntervalSec = craned_config["PingInterval"].as<uint32_t>();
if (craned_config["CraneCtldTimeout"])
conf.CtldTimeoutSec = craned_config["CraneCtldTimeout"].as<uint32_t>();

conf.NodeHealthCheckInterval =
YamlValueOr<uint32_t>(craned_config["NodeHealthCheckInterval"], 0);
}
g_config.CranedConf = std::move(conf);
}
Expand Down
1 change: 1 addition & 0 deletions src/Craned/Core/CranedPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct Config {
struct CranedConfig {
uint32_t PingIntervalSec;
uint32_t CtldTimeoutSec;
uint32_t NodeHealthCheckInterval;
};
CranedConfig CranedConf;
struct CranedListenConf {
Expand Down
91 changes: 91 additions & 0 deletions src/Craned/Core/CtldClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "CtldClient.h"

#include "CranedServer.h"
#include "DeviceManager.h"
#include "JobManager.h"
#include "SupervisorKeeper.h"
#include "crane/GrpcHelper.h"
Expand Down Expand Up @@ -377,6 +378,19 @@ CtldClient::CtldClient() {
m_uvw_loop_->run();
});
m_last_active_time_ = std::chrono::steady_clock::time_point{};

m_node_health_check_timer_ = m_uvw_loop_->resource<uvw::timer_handle>();
m_node_health_check_timer_->on<uvw::timer_event>(
[this](const uvw::timer_event&, uvw::timer_handle& h) {
if (m_stopping_ || !m_stub_) return;
// TODO: should check state?
NodeHealthCheck_();
});
if (g_config.CranedConf.NodeHealthCheckInterval > 0) {
auto interval =
std::chrono::seconds(g_config.CranedConf.NodeHealthCheckInterval);
m_node_health_check_timer_->start(interval, interval);
}
}

CtldClient::~CtldClient() {
Expand Down Expand Up @@ -732,4 +746,81 @@ bool CtldClient::Ping_() {
return reply.ok();
}

void CtldClient::CranedReportHealth_(bool is_healthy, std::string reason) {
if (m_stopping_ || !m_stub_) return;

grpc::ClientContext context;
crane::grpc::CranedReportHealthRequest request;
google::protobuf::Empty reply;

context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCranedRpcTimeoutSeconds));
request.set_craned_id(g_config.CranedIdOfThisNode);
request.set_healthy(is_healthy);
request.set_reason(reason);

auto result = m_stub_->CranedReportHealth(&context, request, &reply);
if (!result.ok()) {
CRANE_ERROR("CranedReportHealth failed: is_healthy={}", is_healthy);
}
}

void CtldClient::NodeHealthCheck_() {
if (!g_server->ReadyFor(RequestSource::CTLD)) return;

NodeSpecInfo node_real;

if (!util::os::GetNodeInfo(&node_real)) {
CRANE_ERROR("Failed to get node real info.");
return;
}

if (!g_config.CranedRes.contains(g_config.Hostname)) {
CRANE_ERROR("Failed to get node config info.");
return;
}

auto node_config = g_config.CranedRes.at(g_config.Hostname);

CRANE_DEBUG("Start node health checking....");

std::string reason = NodeHealthCheckFailedReason;
int64_t cpu_count =
static_cast<int64_t>(node_config->allocatable_res.cpu_count);
if (node_real.cpu < cpu_count) {
CRANE_WARN(
"Node health check fail. config cpu_count: {}, real cpu_count: {}",
cpu_count, node_real.cpu);
CranedReportHealth_(false, reason);
return;
}

uint64_t mem_bytes_config = node_config->allocatable_res.memory_bytes;
double mem_gb_config =
static_cast<double>(mem_bytes_config) / (1024 * 1024 * 1024);

if (std::abs(node_real.memory_gb - mem_gb_config) > kMemoryToleranceGB) {
CranedReportHealth_(false, reason);
CRANE_WARN("Node health check fail. config_mem : {:.3f}, real_mem : {:.3f}",
mem_gb_config, node_real.memory_gb);
return;
}

for (const auto& device_pair : Common::g_this_node_device) {
const auto& device = device_pair.second;
for (const auto& file_meta : device->device_file_metas) {
if (!std::filesystem::exists(file_meta.path)) {
CRANE_WARN("Node health check fail. Device file {} not found.",
file_meta.path);
CranedReportHealth_(false, reason);
return;
}
}
}

CRANE_DEBUG("Node health check success.");
CranedReportHealth_(true, reason);
return;
}

} // namespace Craned
6 changes: 6 additions & 0 deletions src/Craned/Core/CtldClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ class CtldClient {

void SendStatusChanges_();

void CranedReportHealth_(bool is_healthy, std::string reason);

void NodeHealthCheck_();

absl::Mutex m_step_status_change_mtx_;

std::list<TaskStatusChangeQueueElem> m_step_status_change_list_
Expand Down Expand Up @@ -237,6 +241,8 @@ class CtldClient {
std::shared_ptr<uvw::timer_handle> m_ping_handle_;
std::atomic_bool m_ping_ctld_{false};
std::atomic<std::chrono::steady_clock::time_point> m_last_active_time_;

std::shared_ptr<uvw::timer_handle> m_node_health_check_timer_;
};

} // namespace Craned
Expand Down
4 changes: 4 additions & 0 deletions src/Utilities/PublicHeader/include/crane/PublicHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ inline const char* const kHostFilePath = "/etc/hosts";
inline constexpr size_t kDefaultQueryTaskNumLimit = 1000;
inline constexpr uint32_t kDefaultQosPriority = 1000;
inline constexpr uint64_t kPriorityDefaultMaxAge = 7UL * 24 * 3600; // 7 days
inline constexpr double kMemoryToleranceGB = 0.01;

inline const char* const NodeHealthCheckFailedReason =
"Node failed health check";

inline constexpr uint64_t kDefaultCertExpirationMinutes = 30;

Expand Down