Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ message TaskToCtld {
}
repeated License licenses_count = 40;
bool is_licenses_or = 41;
string submit_hostname = 42;
string submit_dir = 43;
}

message TaskInEmbeddedDb {
Expand Down Expand Up @@ -245,7 +247,7 @@ message StepToCtld {

string excludes = 34;
string nodelist = 35;
string container = 26;
string container = 36;
}

message RuntimeAttrOfStep {
Expand Down Expand Up @@ -335,6 +337,9 @@ message StepToD {
double cpus_per_task = 24;

bool get_user_env = 25;
string submit_hostname = 26;
uint32 total_gpus = 27;
string submit_dir = 28;

// Not used now.
string extra_attr = 29;
Expand Down
10 changes: 9 additions & 1 deletion src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,16 @@ void ParseConfig(int argc, char** argv) {
try {
YAML::Node config = YAML::LoadFile(config_path);

if (config["ClusterName"])
if (config["ClusterName"]) {
g_config.CraneClusterName = config["ClusterName"].as<std::string>();
if (g_config.CraneClusterName.empty()) {
CRANE_ERROR("ClusterName is empty.");
std::exit(1);
}
} else {
CRANE_ERROR("ClusterName is empty.");
std::exit(1);
}

g_config.ConfigCrcVal = util::CalcConfigCRC32(config);

Expand Down
10 changes: 10 additions & 0 deletions src/CraneCtld/CtldPublicDefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ crane::grpc::JobToD DaemonStepInCtld::GetJobToD(
crane::grpc::JobToD job_to_d;
job_to_d.set_job_id(job_id);
job_to_d.set_uid(uid);
job_to_d.set_account(job->account);
job_to_d.set_qos(job->qos);
job_to_d.set_partition(job->partition_id);
*job_to_d.mutable_res() =
crane::grpc::ResourceInNode(m_allocated_res_.at(craned_id));
return job_to_d;
Expand Down Expand Up @@ -458,6 +461,13 @@ crane::grpc::StepToD DaemonStepInCtld::GetStepToD(
step_to_d.mutable_container_meta()->CopyFrom(
crane::grpc::ContainerTaskAdditionalMeta(container_meta.value()));

step_to_d.set_submit_hostname(job->TaskToCtld().submit_hostname());
step_to_d.set_total_gpus(this->requested_node_res_view.GpuCount());
step_to_d.set_cwd(this->job->cwd);
step_to_d.set_ntasks_per_node(this->job->ntasks_per_node);
step_to_d.set_cpus_per_task(this->job->TaskToCtld().cpus_per_task());
step_to_d.set_submit_dir(this->job->TaskToCtld().submit_dir());

return step_to_d;
}

Expand Down
6 changes: 3 additions & 3 deletions src/Craned/Common/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,9 @@ Common::EnvMap CgroupManager::GetResourceEnvMapByResInNode(

env_map.emplace(
"CRANE_MEM_PER_NODE",
std::to_string(
res_in_node.allocatable_res_in_node().memory_limit_bytes() /
(1024 * 1024)));
std::format("{}M",
res_in_node.allocatable_res_in_node().memory_limit_bytes() /
(1024ULL * 1024ULL)));

return env_map;
}
Expand Down
10 changes: 10 additions & 0 deletions src/Craned/Core/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,16 @@ void ParseConfig(int argc, char** argv) {
g_config.CraneCtldForInternalListenPort =
YamlValueOr(config["CraneCtldForInternalListenPort"],
kCtldForInternalDefaultPort);
if (config["ClusterName"]) {
g_config.CraneClusterName = config["ClusterName"].as<std::string>();
if (g_config.CraneClusterName.empty()) {
CRANE_ERROR("Cluster name is empty.");
std::exit(1);
}
} else {
CRANE_ERROR("Cluster name is empty.");
std::exit(1);
}

if (config["Nodes"]) {
for (auto it = config["Nodes"].begin(); it != config["Nodes"].end();
Expand Down
38 changes: 36 additions & 2 deletions src/Craned/Core/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,31 @@ EnvMap JobInD::GetJobEnvMap() {
auto env_map = CgroupManager::GetResourceEnvMapByResInNode(job_to_d.res());

auto& daemon_step_to_d = step_map.at(kDaemonStepId)->step_to_d;
auto nodelist = daemon_step_to_d.nodelist();
auto node_id_to_str = [nodelist]() -> std::string {
auto it = std::ranges::find(nodelist, g_config.Hostname);
if (it == nodelist.end()) {
return "-1";
}
return std::to_string(static_cast<uint32_t>(it - nodelist.begin()));
};

auto alloc_node_num = daemon_step_to_d.nodelist().size();
auto mem_in_node =
daemon_step_to_d.res().allocatable_res_in_node().memory_limit_bytes() /
(static_cast<uint64_t>(1024 * 1024));

auto cpus_on_node =
daemon_step_to_d.res().allocatable_res_in_node().cpu_core_limit();
auto mem_per_cpu = (std::abs(cpus_on_node) > 1e-8)
? (static_cast<double>(mem_in_node) / cpus_on_node)
: 0.0;

env_map.emplace("CRANE_JOB_ACCOUNT", job_to_d.account());

auto time_limit_dur =
std::chrono::seconds(daemon_step_to_d.time_limit().seconds()) +
std::chrono::nanoseconds(daemon_step_to_d.time_limit().nanos());

env_map.emplace(
"CRANE_JOB_END_TIME",
std::to_string((std::chrono::system_clock::now() + time_limit_dur)
Expand All @@ -74,7 +93,22 @@ EnvMap JobInD::GetJobEnvMap() {
std::to_string(daemon_step_to_d.node_num()));
env_map.emplace("CRANE_JOB_PARTITION", job_to_d.partition());
env_map.emplace("CRANE_JOB_QOS", job_to_d.qos());

env_map.emplace("CRANE_SUBMIT_DIR", daemon_step_to_d.submit_dir());
env_map.emplace("CRANE_CPUS_PER_TASK",
std::format("{:.2f}", daemon_step_to_d.cpus_per_task()));
env_map.insert_or_assign("CRANE_MEM_PER_NODE",
std::format("{}M", mem_in_node));
env_map.emplace("CRANE_NTASKS_PER_NODE",
std::to_string(daemon_step_to_d.ntasks_per_node()));
env_map.emplace("CRANE_GPUS", std::to_string(daemon_step_to_d.total_gpus()));
env_map.emplace("CRANE_MEM_PER_CPU", std::format("{:.2f}", mem_per_cpu));
env_map.emplace(
"CRANE_NTASKS",
std::to_string(alloc_node_num * daemon_step_to_d.ntasks_per_node()));
env_map.emplace("CRANE_CLUSTER_NAME", g_config.CraneClusterName);
env_map.emplace("CRANE_CPUS_ON_NODE", std::format("{:.2f}", cpus_on_node));
env_map.emplace("CRANE_NODEID", node_id_to_str());
env_map.emplace("CRANE_SUBMIT_HOST", daemon_step_to_d.submit_hostname());
return env_map;
}

Expand Down
13 changes: 13 additions & 0 deletions src/Utilities/PublicHeader/PublicHeader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,19 @@ double ResourceView::CpuCount() const {
return static_cast<double>(allocatable_res.cpu_count);
}

uint64_t ResourceView::GpuCount() const {
auto it = device_map.find(kResourceTypeGpu);
if (it == device_map.end()) return 0;

const auto& [untyped_count, type_map] = it->second;

uint64_t type_sum = std::accumulate(std::views::values(type_map).begin(),
std::views::values(type_map).end(),
static_cast<uint64_t>(0));

return untyped_count + type_sum;
}

uint64_t ResourceView::MemoryBytes() const {
return allocatable_res.memory_bytes;
}
Expand Down
3 changes: 3 additions & 0 deletions src/Utilities/PublicHeader/include/crane/PublicHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ inline const char* const kDefaultSupervisorUnixSockDir = "/tmp/crane";

inline const char* const kDefaultPlugindUnixSockPath = "cplugind/cplugind.sock";

inline const char* const kResourceTypeGpu = "gpu";

constexpr uint64_t kTaskMinTimeLimitSec = 11;
constexpr int64_t kTaskMaxTimeLimitSec =
google::protobuf::util::TimeUtil::kDurationMaxSeconds;
Expand Down Expand Up @@ -516,6 +518,7 @@ class ResourceView {

double CpuCount() const;
uint64_t MemoryBytes() const;
uint64_t GpuCount() const;

AllocatableResource& GetAllocatableRes() { return allocatable_res; }
const AllocatableResource& GetAllocatableRes() const {
Expand Down
Loading