Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ enum InteractiveTaskType {
Crun = 1;
}

message SignalParam {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SignalOption

int32 signal_number = 1;
uint32 seconds_before_kill = 2;
}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand Down Expand Up @@ -181,6 +186,7 @@ message TaskToCtld {
}
repeated License licenses_count = 40;
bool is_licenses_or = 41;
optional SignalParam signal_param = 42;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上修改

}

message TaskInEmbeddedDb {
Expand Down Expand Up @@ -335,6 +341,7 @@ message StepToD {
double cpus_per_task = 24;

bool get_user_env = 25;
optional SignalParam signal_param = 26;

// Not used now.
string extra_attr = 29;
Expand Down
17 changes: 17 additions & 0 deletions src/CraneCtld/CtldPublicDefs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ crane::grpc::StepToD DaemonStepInCtld::GetStepToD(
step_to_d.mutable_container_meta()->CopyFrom(
crane::grpc::ContainerTaskAdditionalMeta(container_meta.value()));

if (this->job->TaskToCtld().has_signal_param()) {
step_to_d.mutable_signal_param()->CopyFrom(
this->job->TaskToCtld().signal_param());
auto signal_param = step_to_d.signal_param();
CRANE_INFO(" seconds_before_kill {} signal_num {} ",
signal_param.seconds_before_kill(),
signal_param.signal_number());
Comment on lines +465 to +467
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

打印这个的目的是?

}

return step_to_d;
}

Expand Down Expand Up @@ -797,6 +806,14 @@ crane::grpc::StepToD CommonStepInCtld::GetStepToD(
auto* mutable_meta = step_to_d.mutable_container_meta();
mutable_meta->CopyFrom(StepToCtld().container_meta());
}
if (this->job->TaskToCtld().has_signal_param()) {
step_to_d.mutable_signal_param()->CopyFrom(
this->job->TaskToCtld().signal_param());
auto signal_param = step_to_d.signal_param();
CRANE_INFO(" common seconds_before_kill {} signal_num {} ",
signal_param.seconds_before_kill(),
signal_param.signal_number());
Comment on lines +813 to +815
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,这种没必要打印

}

return step_to_d;
}
Expand Down
46 changes: 43 additions & 3 deletions src/Craned/Supervisor/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,7 @@ void TaskManager::TaskFinish_(task_id_t task_id,
bool orphaned = m_step_.orphaned;
if (m_step_.AllTaskFinished()) {
DelTerminationTimer_();
DelSignalTimer_();
m_step_.StopCforedClient();
if (!orphaned) {
g_craned_client->StepStatusChangeAsync(new_status, exit_code,
Expand Down Expand Up @@ -1952,6 +1953,7 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() {
}
// Delete the old timer.
DelTerminationTimer_();
DelSignalTimer_();

absl::Time start_time =
absl::FromUnixSeconds(m_step_.GetStep().start_time().seconds());
Expand All @@ -1965,8 +1967,26 @@ void TaskManager::EvCleanChangeTaskTimeLimitQueueCb_() {
m_terminate_task_async_handle_->send();
} else {
// If the task haven't timed out, set up a new timer.
AddTerminationTimer_(
ToInt64Seconds((new_time_limit - (absl::Now() - start_time))));
int64_t new_sec =
ToInt64Seconds(new_time_limit - (absl::Now() - start_time));
AddTerminationTimer_(new_sec);

if (m_step_.GetStep().has_signal_param()) {
auto signal_param = m_step_.GetStep().signal_param();
int64_t signal_sec = new_sec - signal_param.seconds_before_kill();
if (signal_sec > 0) {
int signal_num = signal_param.signal_number();
AddSignalTimer_(signal_sec, signal_num);
CRANE_INFO(
"Add a new signal timer of seconds_before_kill {} signal_num "
"{} new seconds {}, time_limit {}",
signal_param.seconds_before_kill(), signal_param.signal_number(),
signal_sec, new_sec);
Comment on lines +1981 to +1984
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个日志不对啊,改一下吧:
Change signal timer from {}s to {}s for signal {}, as time limit changed to {}.

} else {
CRANE_WARN("Signal offset {} >= time_limit {}, skipping signal timer",
signal_param.seconds_before_kill(), new_sec);
}
}
}

elem.ok_prom.set_value(CraneErrCode::SUCCESS);
Expand Down Expand Up @@ -2000,7 +2020,27 @@ void TaskManager::EvGrpcExecuteTaskCb_() {
// so we move it outside the multithreading part.
int64_t sec = m_step_.GetStep().time_limit().seconds();
AddTerminationTimer_(sec);
CRANE_INFO("Add a timer of {} seconds", sec);
CRANE_INFO("Add a timer of {} seconds {}", sec,
m_step_.GetStep().has_signal_param());
Comment on lines +2023 to +2024
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个日志被改错了,这个地方是打印 timelimit,怎么把 Signal 加进去了?保留原样,下面的分支会打印 signal 相关的信息了。

if (m_step_.GetStep().has_signal_param()) {
auto signal_param = m_step_.GetStep().signal_param();
int64_t signal_sec = sec - signal_param.seconds_before_kill();
if (signal_sec > 0) {
int signal_num = signal_param.signal_number();
AddSignalTimer_(signal_sec, signal_num);
CRANE_INFO(
"Add a signal timer of seconds_before_kill {} signal_num {} for "
"job #{}",
signal_param.seconds_before_kill(), signal_param.signal_number(),
Comment on lines +2032 to +2034
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志的问题同上。
Add a signal time of {}s for signal {} to job #{}.

m_step_.GetStep().job_id());
Comment on lines +2031 to +2035
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

前面已经有signal_sec和signal_num了,后面没必要再从 proto 里面拿了。另外 jobid 直接用 g_config.JobId 更方便。

} else {
CRANE_WARN(
"Signal offset {} >= time_limit {} for job #{}, skipping signal "
"timer",
signal_param.seconds_before_kill(), sec,
m_step_.GetStep().job_id());
}
}

m_step_.pwd.Init(m_step_.uid);
if (!m_step_.pwd.Valid()) {
Expand Down
24 changes: 24 additions & 0 deletions src/Craned/Supervisor/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ITaskInstance;
class StepInstance {
public:
std::shared_ptr<uvw::timer_handle> termination_timer{nullptr};
std::shared_ptr<uvw::timer_handle> signal_timer{nullptr};
PasswordEntry pwd;

bool orphaned{false};
Expand Down Expand Up @@ -431,6 +432,29 @@ class TaskManager {
}
}

void AddSignalTimer_(int64_t secs, int signal_number) {
auto signal_handle = m_uvw_loop_->resource<uvw::timer_handle>();
signal_handle->on<uvw::timer_event>(
[this, signal_number, TaskIds = m_step_.GetTaskIds()](
const uvw::timer_event&, uvw::timer_handle& h) {
for (task_id_t task_id : TaskIds) {
auto* task = m_step_.GetTaskInstance(task_id);
if (task != nullptr && task->GetExecId().has_value()) {
task->Kill(signal_number);
}
}
});
signal_handle->start(std::chrono::seconds(secs), std::chrono::seconds(0));
m_step_.signal_timer = signal_handle;
}

void DelSignalTimer_() {
if (m_step_.signal_timer) {
m_step_.signal_timer->close();
m_step_.signal_timer.reset();
}
}

void TaskFinish_(task_id_t task_id, crane::grpc::TaskStatus new_status,
uint32_t exit_code, std::optional<std::string> reason);
CraneErrCode LaunchExecution_(ITaskInstance* task);
Expand Down
Loading