Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ message QuerySshStepEnvVariablesReply {

message QuerySshStepEnvVariablesForwardRequest {
uint32 task_id = 1;
string execution_node = 2;
string execution_node = 2 [deprecated = true];
}

message QuerySshStepEnvVariablesForwardReply {
Expand Down Expand Up @@ -768,6 +768,9 @@ message StreamCrunRequest {

message TaskX11ForwardReq {
bytes msg = 1;
bool eof = 2;
uint32 local_id = 3;
string craned_id = 4;
}

CrunRequestType type = 1;
Expand All @@ -790,6 +793,9 @@ message StreamCrunReply {
TASK_IO_FORWARD = 4;
TASK_IO_FORWARD_READY = 5;
TASK_X11_FORWARD = 6;
TASK_EXIT_STATUS = 7;
TASK_X11_CONN = 8;
TASK_X11_EOF = 9;
}

message TaskIdReply {
Expand Down Expand Up @@ -819,8 +825,26 @@ message StreamCrunReply {
bytes msg = 1;
}

message TaskX11ConnReply {
string craned_id = 1;
uint32 local_id = 2;
}

message TaskX11ForwardReply {
bytes msg = 1;
string craned_id = 2;
uint32 local_id = 3;
}

message TaskX11EofReply {
string craned_id = 1;
uint32 local_id = 2;
}

message TaskExitStatusReply {
uint32 task_id = 1;
uint32 exit_code = 2;
bool signaled = 3;
}

CforedCrunReplyType type = 1;
Expand All @@ -833,6 +857,9 @@ message StreamCrunReply {
TaskIOForwardReadyReply payload_task_io_forward_ready_reply = 6;
TaskIOForwardReply payload_task_io_forward_reply = 7;
TaskX11ForwardReply payload_task_x11_forward_reply = 8;
TaskExitStatusReply payload_task_exit_status_reply = 9;
TaskX11ConnReply payload_task_x11_conn_reply = 10;
TaskX11EofReply payload_task_x11_eof_reply = 11;
}
}

Expand All @@ -842,6 +869,9 @@ message StreamTaskIORequest {
TASK_OUTPUT = 1;
SUPERVISOR_UNREGISTER = 2;
TASK_X11_OUTPUT = 3;
TASK_EXIT_STATUS = 4;
TASK_X11_CONN = 5;
TASK_X11_EOF = 6;
}

message SupervisorRegisterReq {
Expand All @@ -856,8 +886,26 @@ message StreamTaskIORequest {

message SupervisorUnRegisterReq {}

message TaskX11FwdConnReq {
uint32 local_id = 1;
string craned_id = 2;
}

message TaskX11OutputReq {
bytes msg = 1;
uint32 local_id = 2;
string craned_id = 3;
}

message TaskX11OutputEofReq {
uint32 local_id = 1;
string craned_id = 2;
}

message TaskExitStatus {
uint32 task_id = 1;
uint32 exit_code = 2;
bool signaled = 3;
}

SupervisorRequestType type = 1;
Expand All @@ -867,6 +915,9 @@ message StreamTaskIORequest {
TaskOutputReq payload_task_output_req = 3;
SupervisorUnRegisterReq payload_unregister_req = 4;
TaskX11OutputReq payload_task_x11_output_req = 5;
TaskExitStatus payload_task_exit_status_req = 6;
TaskX11FwdConnReq payload_task_x11_fwd_conn_req = 7;
TaskX11OutputEofReq payload_task_x11_eof_req = 8;
}
}

Expand All @@ -893,6 +944,8 @@ message StreamTaskIOReply {

message TaskX11InputReq {
bytes msg = 1;
bool eof = 2;
uint32 local_id = 3;
}

SupervisorReplyType type = 1;
Expand Down
3 changes: 2 additions & 1 deletion protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ enum TaskStatus {
Cancelled = 5;
OutOfMemory = 6;
Configuring = 7;
Configured = 8;
Starting = 8;
Completing = 9;

Invalid = 15;
Expand Down Expand Up @@ -311,6 +311,7 @@ message StepToD {

uint32 uid = 6;
repeated uint32 gid = 7;
map<uint32 /*task id*/, ResourceInNode> task_res_map = 8;

// For execution step
optional BatchTaskAdditionalMeta batch_meta = 9;
Expand Down
9 changes: 9 additions & 0 deletions protos/Supervisor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ message TerminateTaskReply {
string reason = 2;
}

message MigrateSshProcToCgroupRequest {
int32 pid = 1;
}

message MigrateSshProcToCgroupReply {
ErrCode err_code = 1;
}

message ShutdownSupervisorRequest {}

message ShutdownSupervisorReply {}
Expand All @@ -131,5 +139,6 @@ service Supervisor {
rpc CheckStatus(CheckStatusRequest) returns (CheckStatusReply);
rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply);
rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply);
rpc MigrateSshProcToCgroup(MigrateSshProcToCgroupRequest) returns (MigrateSshProcToCgroupReply);
rpc ShutdownSupervisor(ShutdownSupervisorRequest) returns (ShutdownSupervisorReply);
}
126 changes: 102 additions & 24 deletions src/CraneCtld/CtldPublicDefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,25 @@ DaemonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
}

context->craned_jobs_to_free[craned_id].emplace_back(job->TaskId());
if (job->IsInteractive()) {
auto meta = std::get<InteractiveMeta>(job->meta);
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_step_cancel({.job_id = job_id, .step_id = kPrimaryStepId});
// Completion ack will send in grpc server triggered by task complete
// req
meta.cb_step_completed({.job_id = job_id,
.step_id = kPrimaryStepId,
.send_completion_ack = false,
.cfored_name = meta.cfored_name});
} else {
// Send Completion Ack to frontend now.
meta.cb_step_completed({.job_id = job_id,
.step_id = kPrimaryStepId,
.send_completion_ack = true,
.cfored_name = meta.cfored_name});
}
}
Comment on lines +562 to +580
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: meta is copied, modifications won't persist.

Line 563 copies InteractiveMeta by value. The modification to has_been_cancelled_on_front_end on line 565 affects only the local copy, not the original in job->meta. This means the flag won't be set, potentially causing duplicate cancel notifications.

🔎 Proposed fix
       if (job->IsInteractive()) {
-        auto meta = std::get<InteractiveMeta>(job->meta);
+        auto& meta = std::get<InteractiveMeta>(job->meta);
         if (!meta.has_been_cancelled_on_front_end) {
           meta.has_been_cancelled_on_front_end = true;
🤖 Prompt for AI Agents
In src/CraneCtld/CtldPublicDefs.cpp around lines 562 to 580, the code copies
InteractiveMeta into a local variable so updates (setting
has_been_cancelled_on_front_end) don't persist back into job->meta; change the
local binding to a reference to the stored variant value (e.g., obtain
InteractiveMeta by reference from job->meta or use std::get_if and a pointer) so
mutations update the original meta inside job->meta, then proceed to call the
callbacks as before.

return std::pair{this->Status(), this->ExitCode()};
} else {
if (std::optional error_status = this->PrevErrorStatus(); error_status) {
Expand All @@ -570,7 +589,17 @@ DaemonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
}
CRANE_INFO("[Step #{}.{}] FINISHED with status {}.", job_id,
this->StepId(), this->Status());
return std::pair{job->PrimaryStepStatus(), job->PrimaryStepExitCode()};
// Daemon step terminated by user before primary step created
if (job->PrimaryStepStatus() == crane::grpc::TaskStatus::Invalid) {
return std::pair{this->Status(), this->ExitCode()};
} else {
if (job->AllStepsFinished()) {
return std::make_pair(job->PrimaryStepStatus(),
job->PrimaryStepExitCode());
} else {
return std::nullopt;
}
}
}
}
return std::nullopt;
Expand Down Expand Up @@ -650,6 +679,26 @@ void CommonStepInCtld::InitPrimaryStepFromJob(const TaskInCtld& job) {
}

allocated_craneds_regex = job.allocated_craneds_regex;
// FIXME: Following task fields should set by scheduler
task_id_t cur_task_id = 0;
if (job.IsBatch() || job.IsCalloc()) {
// Batch/Calloc: will launch one task on one node only
craned_task_map[job.executing_craned_ids.front()].insert(cur_task_id);
task_res_map[cur_task_id] =
job.AllocatedRes().at(job.executing_craned_ids.front());
} else {
for (const auto& craned_id : job.CranedIds()) {
for (int i = 0; i < job.ntasks_per_node; ++i) {
craned_task_map[craned_id].insert(cur_task_id);
auto res = job.AllocatedRes().at(craned_id);
// Mem is allocated at step level, set to 0 here to avoid mem limit.
res.allocatable_res.memory_bytes = 0;
res.allocatable_res.memory_sw_bytes = 0;
task_res_map[cur_task_id] = res;
++cur_task_id;
}
}
}

crane::grpc::StepToCtld step;

Expand Down Expand Up @@ -772,6 +821,17 @@ crane::grpc::StepToD CommonStepInCtld::GetStepToD(

step_to_d.set_uid(uid);
step_to_d.mutable_gid()->Assign(this->gids.begin(), this->gids.end());

auto* task_res_map = step_to_d.mutable_task_res_map();
auto task_it = this->craned_task_map.find(craned_id);
if (task_it != this->craned_task_map.end()) {
for (auto task_id : task_it->second) {
auto& res = this->task_res_map.at(task_id);
task_res_map->emplace(task_id,
static_cast<crane::grpc::ResourceInNode>(res));
}
}

step_to_d.mutable_env()->insert(this->env.begin(), this->env.end());

step_to_d.set_cwd(this->cwd);
Expand All @@ -786,27 +846,31 @@ crane::grpc::StepToD CommonStepInCtld::GetStepToD(
step_to_d.mutable_submit_time()->set_seconds(
ToUnixSeconds(this->m_submit_time_));
step_to_d.mutable_time_limit()->set_seconds(ToInt64Seconds(this->time_limit));

if (this->type == crane::grpc::Batch) {
auto* mutable_meta = step_to_d.mutable_batch_meta();
mutable_meta->CopyFrom(StepToCtld().batch_meta());
} else if (this->type == crane::grpc::Interactive) {
auto* mutable_meta = step_to_d.mutable_interactive_meta();
mutable_meta->CopyFrom(StepToCtld().interactive_meta());
} else if (this->type == crane::grpc::Container) {
auto* mutable_meta = step_to_d.mutable_container_meta();
mutable_meta->CopyFrom(StepToCtld().container_meta());
switch (this->type) {
case crane::grpc::Batch:
step_to_d.mutable_batch_meta()->CopyFrom(StepToCtld().batch_meta());
break;
case crane::grpc::Interactive:
step_to_d.mutable_interactive_meta()->CopyFrom(
StepToCtld().interactive_meta());
break;
case crane::grpc::Container:
step_to_d.mutable_container_meta()->CopyFrom(StepToCtld().container_meta());
break;
default:
std::unreachable();
}

return step_to_d;
}

void CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
uint32_t exit_code,
const std::string& reason,
const CranedId& craned_id,
google::protobuf::Timestamp timestamp,
StepStatusChangeContext* context) {
std::optional<std::pair<crane::grpc::TaskStatus, uint32_t>>
CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
uint32_t exit_code,
const std::string& reason,
const CranedId& craned_id,
google::protobuf::Timestamp timestamp,
StepStatusChangeContext* context) {
/**
* Step final status
* finished: step configured successfully, got all step execution status
Expand All @@ -824,9 +888,9 @@ void CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
job_id, step_id, this->Status(), new_status, craned_id);

if (this->Status() == crane::grpc::TaskStatus::Configuring) {
// Configuring -> Configured / Failed / Cancelled,
// Configuring -> Starting / Failed / Cancelled,
this->NodeConfigured(craned_id);
if (new_status != crane::grpc::TaskStatus::Configured) {
if (new_status != crane::grpc::TaskStatus::Starting) {
this->SetErrorStatus(new_status);
this->SetErrorExitCode(exit_code);
}
Expand Down Expand Up @@ -935,6 +999,9 @@ void CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
}

std::unordered_set<step_id_t> pd_steps;
CRANE_DEBUG("[Job #{}] primary step exited, terminating other steps.",
job_id);

// Cancel all other step with CANCELED status
for (const auto& comm_step : job->Steps() | std::views::values) {
// All pending steps are crun steps, just set status to cancelled
Expand All @@ -953,11 +1020,18 @@ void CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
.send_completion_ack = true,
.cfored_name = meta.cfored_name});
pd_steps.insert(comm_step->StepId());
continue;
}
for (const auto& node : comm_step->ExecutionNodes()) {
context->craned_cancel_steps[node][comm_step->job_id].emplace(
comm_step->StepId());
CRANE_TRACE(
"[Step #{}.{}] Cancelled pending step due to primary step "
"exit.",
comm_step->job_id, comm_step->StepId());
} else {
CRANE_TRACE(
"[Step #{}.{}] Sending cancel due to primary step exit.",
comm_step->job_id, comm_step->StepId());
for (const auto& node : comm_step->ExecutionNodes()) {
context->craned_cancel_steps[node][comm_step->job_id].emplace(
comm_step->StepId());
}
}
}
for (const auto& pd_step_id : pd_steps) {
Expand Down Expand Up @@ -992,6 +1066,10 @@ void CommonStepInCtld::StepStatusChange(crane::grpc::TaskStatus new_status,
context->step_ptrs.insert(job->EraseStep(step_id));
}
}
if (job->AllStepsFinished())
return std::make_pair(job->PrimaryStepStatus(), job->PrimaryStepExitCode());
else
return std::nullopt;
}

void CommonStepInCtld::RecoverFromDb(
Expand Down
Loading
Loading