Skip to content

Checkable#ProcessCheckResult(): discard🗑️ CR or delay its producers shutdown #10397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(base_SOURCES
unixsocket.cpp unixsocket.hpp
utility.cpp utility.hpp
value.cpp value.hpp value-operators.cpp
wait-group.cpp wait-group.hpp
win32.hpp
workqueue.cpp workqueue.hpp
)
Expand Down
38 changes: 38 additions & 0 deletions lib/base/wait-group.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */

#include "base/wait-group.hpp"

using namespace icinga;

bool StoppableWaitGroup::try_lock_shared()
{
std::unique_lock lock (m_Mutex);

if (m_Stopped) {
return false;
}

++m_SharedLocks;
return true;
}

void StoppableWaitGroup::unlock_shared()
{
std::unique_lock lock (m_Mutex);

if (!--m_SharedLocks && m_Stopped) {
lock.unlock();
m_CV.notify_all();
}
}

/**
* Disallow new shared locks, wait for all existing ones.
*/
void StoppableWaitGroup::Join()
{
std::unique_lock lock (m_Mutex);

m_Stopped = true;
m_CV.wait(lock, [this] { return !m_SharedLocks; });
}
54 changes: 54 additions & 0 deletions lib/base/wait-group.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */

#pragma once

#include "base/object.hpp"
#include <condition_variable>
#include <cstdint>
#include <mutex>

namespace icinga
{

/**
* A synchronization interface that allows concurrent shared locking.
*
* @ingroup base
*/
class WaitGroup : public Object
{
public:
DECLARE_PTR_TYPEDEFS(WaitGroup);

virtual bool try_lock_shared() = 0;
virtual void unlock_shared() = 0;
};

/**
* A thread-safe wait group that can be stopped to prevent further shared locking.
*
* @ingroup base
*/
class StoppableWaitGroup : public WaitGroup
{
public:
DECLARE_PTR_TYPEDEFS(StoppableWaitGroup);

StoppableWaitGroup() = default;
StoppableWaitGroup(const StoppableWaitGroup&) = delete;
StoppableWaitGroup(StoppableWaitGroup&&) = delete;
StoppableWaitGroup& operator=(const StoppableWaitGroup&) = delete;
StoppableWaitGroup& operator=(StoppableWaitGroup&&) = delete;

bool try_lock_shared() override;
void unlock_shared() override;
void Join();

private:
std::mutex m_Mutex;
std::condition_variable m_CV;
uint_fast32_t m_SharedLocks = 0;
bool m_Stopped = false;
};

}
5 changes: 3 additions & 2 deletions lib/checker/checkercomponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void CheckerComponent::Stop(bool runtimeRemoved)
m_CV.notify_all();
}

m_WaitGroup->Join();
m_ResultTimer->Stop(true);
m_Thread.join();

Expand Down Expand Up @@ -231,7 +232,7 @@ void CheckerComponent::CheckThreadProc()
void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
{
try {
checkable->ExecuteCheck();
checkable->ExecuteCheck(m_WaitGroup);
} catch (const std::exception& ex) {
CheckResult::Ptr cr = new CheckResult();
cr->SetState(ServiceUnknown);
Expand All @@ -245,7 +246,7 @@ void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);

checkable->ProcessCheckResult(cr);
checkable->ProcessCheckResult(cr, m_WaitGroup);

Log(LogCritical, "checker", output);
}
Expand Down
2 changes: 2 additions & 0 deletions lib/checker/checkercomponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "base/configobject.hpp"
#include "base/timer.hpp"
#include "base/utility.hpp"
#include "base/wait-group.hpp"
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/key_extractors.hpp>
Expand Down Expand Up @@ -77,6 +78,7 @@ class CheckerComponent final : public ObjectImpl<CheckerComponent>
CheckableSet m_IdleCheckables;
CheckableSet m_PendingCheckables;

StoppableWaitGroup::Ptr m_WaitGroup = new StoppableWaitGroup();
Timer::Ptr m_ResultTimer;

void CheckThreadProc();
Expand Down
4 changes: 3 additions & 1 deletion lib/compat/externalcommandlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void ExternalCommandListener::Start(bool runtimeCreated)
*/
void ExternalCommandListener::Stop(bool runtimeRemoved)
{
m_WaitGroup->Join();

Log(LogInformation, "ExternalCommandListener")
<< "'" << GetName() << "' stopped.";

Expand Down Expand Up @@ -136,7 +138,7 @@ void ExternalCommandListener::CommandPipeThread(const String& commandPath)
Log(LogInformation, "ExternalCommandListener")
<< "Executing external command: " << command;

ExternalCommandProcessor::Execute(command);
ExternalCommandProcessor::Execute(m_WaitGroup, command);
} catch (const std::exception& ex) {
Log(LogWarning, "ExternalCommandListener")
<< "External command failed: " << DiagnosticInformation(ex, false);
Expand Down
3 changes: 3 additions & 0 deletions lib/compat/externalcommandlistener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "compat/externalcommandlistener-ti.hpp"
#include "base/objectlock.hpp"
#include "base/wait-group.hpp"
#include "base/timer.hpp"
#include "base/utility.hpp"
#include <thread>
Expand All @@ -29,6 +30,8 @@ class ExternalCommandListener final : public ObjectImpl<ExternalCommandListener>
void Stop(bool runtimeRemoved) override;

private:
StoppableWaitGroup::Ptr m_WaitGroup = new StoppableWaitGroup();

#ifndef _WIN32
std::thread m_CommandThread;

Expand Down
26 changes: 13 additions & 13 deletions lib/db_ido/idochecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

using namespace icinga;

REGISTER_FUNCTION_NONCONST(Internal, IdoCheck, &IdoCheckTask::ScriptFunc, "checkable:cr:resolvedMacros:useResolvedMacros");
REGISTER_FUNCTION_NONCONST(Internal, IdoCheck, &IdoCheckTask::ScriptFunc, "checkable:cr:producer:resolvedMacros:useResolvedMacros");

static void ReportIdoCheck(
const Checkable::Ptr& checkable, const CheckCommand::Ptr& commandObj,
const CheckResult::Ptr& cr, String output, ServiceState state = ServiceUnknown
const Checkable::Ptr& checkable, const CheckCommand::Ptr& commandObj, const CheckResult::Ptr& cr,
const WaitGroup::Ptr& producer, String output, ServiceState state = ServiceUnknown
)
{
if (Checkable::ExecuteCommandProcessFinishedHandler) {
Expand All @@ -36,12 +36,12 @@ static void ReportIdoCheck(
} else {
cr->SetState(state);
cr->SetOutput(output);
checkable->ProcessCheckResult(cr);
checkable->ProcessCheckResult(cr, producer);
}
}

void IdoCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros)
const WaitGroup::Ptr& producer, const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros)
{
ServiceState state;
CheckCommand::Ptr commandObj = CheckCommand::ExecuteOverride ? CheckCommand::ExecuteOverride : checkable->GetCheckCommand();
Expand Down Expand Up @@ -88,19 +88,19 @@ void IdoCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult
return;

if (idoType.IsEmpty()) {
ReportIdoCheck(checkable, commandObj, cr, "Attribute 'ido_type' must be set.");
ReportIdoCheck(checkable, commandObj, cr, producer, "Attribute 'ido_type' must be set.");
return;
}

if (idoName.IsEmpty()) {
ReportIdoCheck(checkable, commandObj, cr, "Attribute 'ido_name' must be set.");
ReportIdoCheck(checkable, commandObj, cr, producer, "Attribute 'ido_name' must be set.");
return;
}

Type::Ptr type = Type::GetByName(idoType);

if (!type || !DbConnection::TypeInstance->IsAssignableFrom(type)) {
ReportIdoCheck(checkable, commandObj, cr, "DB IDO type '" + idoType + "' is invalid.");
ReportIdoCheck(checkable, commandObj, cr, producer, "DB IDO type '" + idoType + "' is invalid.");
return;
}

Expand All @@ -110,25 +110,25 @@ void IdoCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult
DbConnection::Ptr conn = static_pointer_cast<DbConnection>(dtype->GetObject(idoName));

if (!conn) {
ReportIdoCheck(checkable, commandObj, cr, "DB IDO connection '" + idoName + "' does not exist.");
ReportIdoCheck(checkable, commandObj, cr, producer, "DB IDO connection '" + idoName + "' does not exist.");
return;
}

double qps = conn->GetQueryCount(60) / 60.0;

if (conn->IsPaused()) {
ReportIdoCheck(checkable, commandObj, cr, "DB IDO connection is temporarily disabled on this cluster instance.", ServiceOK);
ReportIdoCheck(checkable, commandObj, cr, producer, "DB IDO connection is temporarily disabled on this cluster instance.", ServiceOK);
return;
}

double pendingQueries = conn->GetPendingQueryCount();

if (!conn->GetConnected()) {
if (conn->GetShouldConnect()) {
ReportIdoCheck(checkable, commandObj, cr, "Could not connect to the database server.", ServiceCritical);
ReportIdoCheck(checkable, commandObj, cr, producer, "Could not connect to the database server.", ServiceCritical);
} else {
ReportIdoCheck(
checkable, commandObj, cr,
checkable, commandObj, cr, producer,
"Not currently enabled: Another cluster instance is responsible for the IDO database.", ServiceOK
);
}
Expand Down Expand Up @@ -193,5 +193,5 @@ void IdoCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult
{ new PerfdataValue("pending_queries", pendingQueries, false, "", pendingQueriesWarning, pendingQueriesCritical) }
}));

ReportIdoCheck(checkable, commandObj, cr, msgbuf.str(), state);
ReportIdoCheck(checkable, commandObj, cr, producer, msgbuf.str(), state);
}
2 changes: 1 addition & 1 deletion lib/db_ido/idochecktask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class IdoCheckTask
{
public:
static void ScriptFunc(const Checkable::Ptr& service, const CheckResult::Ptr& cr,
const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros);
const WaitGroup::Ptr& producer, const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros);

private:
IdoCheckTask();
Expand Down
5 changes: 3 additions & 2 deletions lib/icinga/apiactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ Dictionary::Ptr ApiActions::ProcessCheckResult(const ConfigObject::Ptr& object,
if (params->Contains("ttl"))
cr->SetTtl(HttpUtility::GetLastParameter(params, "ttl"));

Result result = checkable->ProcessCheckResult(cr);
Result result = checkable->ProcessCheckResult(cr, ApiListener::GetInstance()->GetWaitGroup());

switch (result) {
case Result::Ok:
return ApiActions::CreateResult(200, "Successfully processed check result for object '" + checkable->GetName() + "'.");
Expand Down Expand Up @@ -787,7 +788,7 @@ Dictionary::Ptr ApiActions::ExecuteCommand(const ConfigObject::Ptr& object, cons
Defer resetCheckCommandOverride([]() {
CheckCommand::ExecuteOverride = nullptr;
});
cmd->Execute(checkable, cr, execMacros, false);
cmd->Execute(checkable, cr, listener->GetWaitGroup(), execMacros, false);
}
} else if (command_type == "EventCommand") {
EventCommand::Ptr cmd = GetSingleObjectByNameUsingPermissions(EventCommand::GetTypeName(), resolved_command, ActionsHandler::AuthenticatedApiUser);
Expand Down
24 changes: 17 additions & 7 deletions lib/icinga/checkable-check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "base/context.hpp"
#include <shared_mutex>

using namespace icinga;

Expand Down Expand Up @@ -96,11 +97,12 @@ double Checkable::GetLastCheck() const
return schedule_end;
}

Checkable::ProcessingResult Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin)
Checkable::ProcessingResult Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const WaitGroup::Ptr& producer, const MessageOrigin::Ptr& origin)
{
using Result = Checkable::ProcessingResult;

VERIFY(cr);
VERIFY(producer);

{
ObjectLock olock(this);
Expand Down Expand Up @@ -135,6 +137,14 @@ Checkable::ProcessingResult Checkable::ProcessCheckResult(const CheckResult::Ptr
cr->SetCheckSource(command_endpoint->GetName());
}

std::shared_lock producerLock (*producer, std::try_to_lock);

if (!producerLock) {
// Discard the check result to not delay the current reload.
// We'll re-run the check immediately after the reload.
return Result::CheckableInactive;
}

/* agent checks go through the api */
if (command_endpoint && GetExtension("agent_check")) {
ApiListener::Ptr listener = ApiListener::GetInstance();
Expand Down Expand Up @@ -544,7 +554,7 @@ Checkable::ProcessingResult Checkable::ProcessCheckResult(const CheckResult::Ptr
return Result::Ok;
}

void Checkable::ExecuteRemoteCheck(const Dictionary::Ptr& resolvedMacros)
void Checkable::ExecuteRemoteCheck(const WaitGroup::Ptr& producer, const Dictionary::Ptr& resolvedMacros)
{
CONTEXT("Executing remote check for object '" << GetName() << "'");

Expand All @@ -555,10 +565,10 @@ void Checkable::ExecuteRemoteCheck(const Dictionary::Ptr& resolvedMacros)
cr->SetScheduleStart(scheduled_start);
cr->SetExecutionStart(before_check);

GetCheckCommand()->Execute(this, cr, resolvedMacros, true);
GetCheckCommand()->Execute(this, cr, producer, resolvedMacros, true);
}

void Checkable::ExecuteCheck()
void Checkable::ExecuteCheck(const WaitGroup::Ptr& producer)
{
CONTEXT("Executing check for object '" << GetName() << "'");

Expand Down Expand Up @@ -599,10 +609,10 @@ void Checkable::ExecuteCheck()
bool local = !endpoint || endpoint == Endpoint::GetLocalEndpoint();

if (local) {
GetCheckCommand()->Execute(this, cr, nullptr, false);
GetCheckCommand()->Execute(this, cr, producer, nullptr, false);
} else {
Dictionary::Ptr macros = new Dictionary();
GetCheckCommand()->Execute(this, cr, macros, false);
GetCheckCommand()->Execute(this, cr, producer, macros, false);

if (endpoint->GetConnected()) {
/* perform check on remote endpoint */
Expand Down Expand Up @@ -663,7 +673,7 @@ void Checkable::ExecuteCheck()

cr->SetOutput(output);

ProcessCheckResult(cr);
ProcessCheckResult(cr, producer);
}

{
Expand Down
5 changes: 4 additions & 1 deletion lib/icinga/checkable-script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "base/function.hpp"
#include "base/functionwrapper.hpp"
#include "base/scriptframe.hpp"
#include "remote/apilistener.hpp"

using namespace icinga;

Expand All @@ -16,7 +17,9 @@ static void CheckableProcessCheckResult(const CheckResult::Ptr& cr)
REQUIRE_NOT_NULL(self);

if (cr) {
self->ProcessCheckResult(cr);
auto api (ApiListener::GetInstance());

self->ProcessCheckResult(cr, api ? api->GetWaitGroup() : new StoppableWaitGroup());
}
}

Expand Down
Loading
Loading