Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Craned:
# ping interval in seconds
PingInterval: 15
# register operation timeout in seconds
CraneCtldTimeout: 5
CraneCtldTimeout: 30
# max size of craned log file
MaxLogFileSize: 50M
# max files of craned log file
Expand Down
14 changes: 14 additions & 0 deletions src/CraneCtld/EmbeddedDbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ EmbeddedDbClient::~EmbeddedDbClient() {
CRANE_ERROR("Error occurred when closing the embedded db of fixed data!");
}

if (m_step_var_db_) {
auto result = m_step_var_db_->Close();
if (!result)
CRANE_ERROR(
"Error occurred when closing the embedded db of step variable data!");
}

if (m_step_fixed_db_) {
auto result = m_step_fixed_db_->Close();
if (!result)
CRANE_ERROR(
"Error occurred when closing the embedded db of step fixed data!");
}

if (m_resv_db_) {
auto result = m_resv_db_->Close();
if (!result)
Expand Down
21 changes: 10 additions & 11 deletions src/CraneCtld/RpcService/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ crane::grpc::ExecInContainerTaskReply CranedStub::ExecInContainerTask(

void CranedStub::HandleGrpcErrorCode_(grpc::StatusCode code) {
if (code == grpc::UNAVAILABLE) {
CRANE_INFO("Craned {} reports service unavailable. Considering it down.",
m_craned_id_);
g_meta_container->CranedDown(m_craned_id_);
CRANE_INFO(
"Craned {} reports service unavailable. Maybe it is connecting to "
"CTLD.",
m_craned_id_);
}
}

Expand Down Expand Up @@ -538,15 +539,13 @@ CranedKeeper::CqTag *CranedKeeper::InitCranedStateMachine_(
ProtoTimestampToString(craned->m_token_.value()));

WriterLock lock(&m_connect_craned_mtx_);
util::lock_guard guard(m_unavail_craned_set_mtx_);
CRANE_ASSERT(
!m_connected_craned_id_stub_map_.contains(craned->m_craned_id_));
m_connected_craned_id_stub_map_.emplace(craned->m_craned_id_, craned);
craned->m_disconnected_ = false;
CRANE_ASSERT(m_unavail_craned_set_.contains(craned->m_craned_id_));
CRANE_ASSERT(m_connecting_craned_set_.contains(craned->m_craned_id_));
token = m_unavail_craned_set_.at(craned->m_craned_id_);
m_unavail_craned_set_.erase(craned->m_craned_id_);
auto it = m_connecting_craned_set_.find(craned->m_craned_id_);
CRANE_ASSERT(it != m_connecting_craned_set_.end());
token = it->second;
m_connecting_craned_set_.erase(craned->m_craned_id_);
}

Expand Down Expand Up @@ -872,7 +871,7 @@ void CranedKeeper::PeriodConnectCranedThreadFunc_() {
while (it != m_unavail_craned_set_.end() && fetch_num > 0) {
if (!m_connecting_craned_set_.contains(it->first) &&
!m_connected_craned_id_stub_map_.contains(it->first)) {
m_connecting_craned_set_.emplace(it->first);
m_connecting_craned_set_.emplace(*it);
g_thread_pool->detach_task(
[this, craned_id = it->first, token = it->second] {
ConnectCranedNode_(craned_id, token);
Expand All @@ -881,10 +880,10 @@ void CranedKeeper::PeriodConnectCranedThreadFunc_() {
} else {
CRANE_LOGGER_TRACE(g_runtime_status.conn_logger,
"Craned {} is already connecting or connected, "
"ignore new connection request. Token {}.",
"drop new connection request. Token {}.",
it->first, ProtoTimestampToString(it->second));
}
++it;
it = m_unavail_craned_set_.erase(it);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/CraneCtld/RpcService/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class CranedKeeper {
Mutex m_unavail_craned_set_mtx_;
std::unordered_map<CranedId, RegToken> m_unavail_craned_set_
ABSL_GUARDED_BY(m_unavail_craned_set_mtx_);
std::unordered_set<CranedId> m_connecting_craned_set_
std::unordered_map<CranedId, RegToken> m_connecting_craned_set_
ABSL_GUARDED_BY(m_connect_craned_mtx_);

std::vector<grpc::CompletionQueue> m_cq_vec_;
Expand Down
29 changes: 27 additions & 2 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2525,8 +2525,7 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
args.begin(), approximate_size);
if (actual_size == 0) return;
args.resize(actual_size);

CRANE_TRACE("Cleaning {} TaskStatusChanges...", actual_size);
auto begin_time = std::chrono::steady_clock::now();

StepStatusChangeContext context{};
context.rn_step_raw_ptrs.reserve(actual_size);
Expand Down Expand Up @@ -2837,6 +2836,7 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {

txn_id_t txn_id;

auto now = std::chrono::steady_clock::now();
// When store, write step info first, then job info.
auto ok = g_embedded_db_client->BeginStepVarDbTransaction(&txn_id);
if (!ok) {
Expand All @@ -2857,9 +2857,19 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
"TaskScheduler failed to commit step transaction when clean step "
"status change.");
}
auto duration = std::chrono::steady_clock::now() - now;
CRANE_TRACE(
"Persist step status changes to embedded db cost {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
now = std::chrono::steady_clock::now();

ProcessFinalSteps_(context.step_raw_ptrs);
duration = std::chrono::steady_clock::now() - now;
CRANE_TRACE(
"ProcessFinalSteps_ took {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());

now = std::chrono::steady_clock::now();
ok = g_embedded_db_client->BeginVariableDbTransaction(&txn_id);
if (!ok) {
CRANE_ERROR(
Expand All @@ -2880,7 +2890,22 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
"TaskScheduler failed to commit job transaction when clean step "
"status change.");
}
duration = std::chrono::steady_clock::now() - now;
CRANE_TRACE(
"Persist job status changes to embedded db cost {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
now = std::chrono::steady_clock::now();
ProcessFinalTasks_(context.job_raw_ptrs);
duration = std::chrono::steady_clock::now() - now;
CRANE_TRACE(
"ProcessFinalTasks_ took {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());

auto end_time = std::chrono::steady_clock::now();
CRANE_TRACE("Cleaning {} StepStatusChanges cost {} ms.", actual_size,
std::chrono::duration_cast<std::chrono::milliseconds>(end_time -
begin_time)
.count());
}

void TaskScheduler::QueryTasksInRam(
Expand Down
7 changes: 3 additions & 4 deletions src/Craned/Core/Craned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,9 @@ void GlobalVariableInit() {
PasswordEntry::InitializeEntrySize();

// It is always ok to create thread pool first.
g_thread_pool =
std::make_unique<BS::thread_pool>(std::thread::hardware_concurrency());
g_thread_pool = std::make_unique<BS::thread_pool>(
std::thread::hardware_concurrency(),
[] { util::SetCurrentThreadName("BsThreadPool"); });

g_supervisor_keeper = std::make_unique<Craned::SupervisorKeeper>();

Expand Down Expand Up @@ -1116,8 +1117,6 @@ void StartServer() {

GlobalVariableInit();

// Set FD_CLOEXEC on stdin, stdout, stderr
util::os::SetCloseOnExecOnFdRange(STDIN_FILENO, STDERR_FILENO + 1);
util::os::CheckProxyEnvironmentVariable();

g_ctld_client->StartGrpcCtldConnection();
Expand Down
41 changes: 25 additions & 16 deletions src/Craned/Core/CtldClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

namespace Craned {

using namespace std::chrono_literals;

CtldClientStateMachine::CtldClientStateMachine() {
m_logger_ = g_runtime_status.conn_logger;
m_uvw_loop_ = uvw::loop::create();
Expand Down Expand Up @@ -325,7 +327,11 @@ void CtldClient::Shutdown() {
m_stopping_ = true;
m_step_status_change_mtx_.Lock();
CRANE_INFO("Cleaning up status changes in CtldClient");
SendStatusChanges_();

std::list<StepStatusChangeQueueElem> changes;
changes.splice(changes.begin(), std::move(m_step_status_change_list_));
m_step_status_change_mtx_.Unlock();
SendStatusChanges_(std::move(changes));
}

CtldClient::CtldClient() {
Expand Down Expand Up @@ -661,6 +667,7 @@ bool CtldClient::CranedRegister_(
}

void CtldClient::AsyncSendThread_() {
util::SetCurrentThreadName("SendCtldThr");
// Wait Craned grpc server initialization.
m_connection_start_notification_.WaitForNotification();

Expand All @@ -677,12 +684,13 @@ void CtldClient::AsyncSendThread_() {
&m_step_status_change_list_);

while (true) {
{
if (m_stopping_) {
absl::MutexLock lock(&m_step_status_change_mtx_);
if (m_step_status_change_list_.empty() && m_stopping_) break;
if (m_step_status_change_list_.empty()) break;
}

grpc_state = m_ctld_channel_->GetState(true);
// Try to connect when not connected.
grpc_state = m_ctld_channel_->GetState(!prev_connected);
connected = prev_grpc_state == GRPC_CHANNEL_READY;

if (!connected) {
Expand Down Expand Up @@ -727,17 +735,20 @@ void CtldClient::AsyncSendThread_() {
cond, absl::Milliseconds(50));
if (!has_msg) {
m_step_status_change_mtx_.Unlock();
// No msg, sleep for a while to avoid busy loop.
std::this_thread::sleep_for(50ms);
} else {
SendStatusChanges_();
std::list<StepStatusChangeQueueElem> changes;
changes.splice(changes.begin(), std::move(m_step_status_change_list_));
m_step_status_change_mtx_.Unlock();
bool err = SendStatusChanges_(std::move(changes));
if (err) std::this_thread::sleep_for(100ms);
}
}
}

void CtldClient::SendStatusChanges_() {
std::list<StepStatusChangeQueueElem> changes;
changes.splice(changes.begin(), std::move(m_step_status_change_list_));
m_step_status_change_mtx_.Unlock();

bool CtldClient::SendStatusChanges_(
std::list<StepStatusChangeQueueElem>&& changes) {
while (!changes.empty()) {
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() +
Expand Down Expand Up @@ -773,8 +784,8 @@ void CtldClient::SendStatusChanges_() {
if (m_stopping_) {
CRANE_INFO(
"Failed to send StepStatusChange but stopping, drop all status "
"change.");
return;
"change to send.");
return false;
}
// If some messages are not sent due to channel failure,
// put them back into m_task_status_change_list_
Expand All @@ -784,16 +795,14 @@ void CtldClient::SendStatusChanges_() {
std::move(changes));
m_step_status_change_mtx_.Unlock();
}
// Sleep for a while to avoid too many retries.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
break;

return false;
} else {
CRANE_TRACE("[Step #{}.{}] StepStatusChange sent. reply.ok={}",
status_change.job_id, status_change.step_id, reply.ok());
changes.pop_front();
}
}
return true;
}

bool CtldClient::Ping_() {
Expand Down
2 changes: 1 addition & 1 deletion src/Craned/Core/CtldClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class CtldClient {
void AsyncSendThread_();
bool Ping_();

void SendStatusChanges_();
bool SendStatusChanges_(std::list<StepStatusChangeQueueElem>&& changes);

absl::Mutex m_step_status_change_mtx_;

Expand Down
64 changes: 16 additions & 48 deletions src/Craned/Core/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,60 +659,28 @@ CraneErrCode JobManager::SpawnSupervisor_(JobInD* job, StepInstance* step) {
int supervisor_craned_fd = supervisor_craned_pipe[1];
close(supervisor_craned_pipe[0]);

// Message will send to stdin of Supervisor for its init.
for (int retry_count{0}; retry_count < 5; ++retry_count) {
if (-1 == dup2(craned_supervisor_fd, STDIN_FILENO)) {
if (errno == EINTR) {
fmt::print(
stderr,
"[Step #{}.{}] Retrying dup2 stdin: interrupted system call, "
"retry count: {}",
job_id, step_id, retry_count);
continue;
}
std::error_code ec(errno, std::generic_category());
fmt::print("[Step #{}.{}] Failed to dup2 stdin: {}.", job_id, step_id,
ec.message());
std::abort();
}

break;
}

for (int retry_count{0}; retry_count < 5; ++retry_count) {
if (-1 == dup2(supervisor_craned_fd, STDOUT_FILENO)) {
if (errno == EINTR) {
fmt::print(
stderr,
"[Step #{}.{}] Retrying dup2 stdout: interrupted system call, "
"retry count: {}",
job_id, step_id, retry_count);
continue;
}
std::error_code ec(errno, std::generic_category());
fmt::print("[Step #{}.{}] Failed to dup2 stdout: {}.", job_id, step_id,
ec.message());
std::abort();
}

break;
}
close(craned_supervisor_fd);
close(supervisor_craned_fd);

util::os::CloseFdFrom(3);
util::os::CloseFdFromExcept(3,
{craned_supervisor_fd, supervisor_craned_fd});

// Prepare the command line arguments.
std::vector<std::string> string_argv;
std::vector<const char*> argv;

// Argv[0] is the program name which can be anything.
auto supv_name = fmt::format("csupervisor: [{}.{}]", job_id, step_id);
argv.emplace_back(supv_name.c_str());
argv.push_back(nullptr);
fmt::print(
stderr, "[{}] [Step #{}.{}]: Executing supervisor\n",
std::chrono::current_zone()->to_local(std::chrono::system_clock::now()),
job_id, step_id);
string_argv.emplace_back(supv_name.c_str());
string_argv.push_back("--input-fd");
string_argv.push_back(std::to_string(craned_supervisor_fd));
string_argv.push_back("--output-fd");
string_argv.push_back(std::to_string(supervisor_craned_fd));
argv.reserve(string_argv.size());
for (auto& arg : string_argv) {
argv.push_back(arg.c_str());
}
argv.push_back(nullptr); // argv must be null-terminated.
fmt::print(stderr,
"[{:%Y-%m-%d %H:%M:%S}] [Step #{}.{}]: Executing supervisor\n",
std::chrono::system_clock::now(), job_id, step_id);

// Use execvp to search the kSupervisorPath in the PATH.
execvp(g_config.Supervisor.Path.c_str(),
Expand Down
1 change: 1 addition & 0 deletions src/Craned/Supervisor/CranedClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void CranedClient::StepStatusChangeAsync(crane::grpc::TaskStatus new_status,
}

void CranedClient::AsyncSendThread_() {
util::SetCurrentThreadName("CrndClient");
while (true) {
{
absl::MutexLock lock(&m_mutex_);
Expand Down
Loading