Skip to content

Serialize fields before queueing it to the workqueue #10420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 34 additions & 73 deletions lib/perfdata/elasticsearchwriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ void ElasticsearchWriter::Resume()
CheckResultHandler(checkable, cr);
});
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type);
const CheckResult::Ptr& cr, StateType, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr);
});
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr&,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
NotificationSentToAllUsersHandler(checkable, users, type, cr, author, text);
});
}

Expand Down Expand Up @@ -236,15 +236,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
if (IsPaused())
return;

m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
}

void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();

CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");

if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
return;

Expand Down Expand Up @@ -272,38 +263,24 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());

fields->Set("reachable", checkable->IsReachable());
fields->Set("check_command", checkable->GetCheckCommand()->GetName());

CheckCommand::Ptr commandObj = checkable->GetCheckCommand();

if (commandObj)
fields->Set("check_command", commandObj->GetName());
AddTemplateTags(fields, checkable, cr);

double ts = Utility::GetTime();
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");

if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}

AddTemplateTags(fields, checkable, cr);

Enqueue(checkable, "checkresult", fields, ts);
Enqueue(checkable, "checkresult", fields, cr->GetExecutionEnd());
});
}

void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
return;

m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
}

void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();

CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");

Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Expand All @@ -325,46 +302,25 @@ void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& check
fields->Set("last_hard_state", host->GetLastHardState());
}

CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
fields->Set("check_command", checkable->GetCheckCommand()->GetName());

if (commandObj)
fields->Set("check_command", commandObj->GetName());
AddTemplateTags(fields, checkable, cr);

double ts = Utility::GetTime();
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");

if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}

AddTemplateTags(fields, checkable, cr);

Enqueue(checkable, "statechange", fields, ts);
Enqueue(checkable, "statechange", fields, cr->GetExecutionEnd());
});
}

void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text)
{
if (IsPaused())
return;

m_WorkQueue.Enqueue([this, notification, checkable, users, type, cr, author, text]() {
NotificationSentToAllUsersHandlerInternal(notification, checkable, users, type, cr, author, text);
});
}

void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
AssertOnWorkQueue();

CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");

Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";

Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
Expand Down Expand Up @@ -396,27 +352,32 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notifi
fields->Set("notification_type", notificationTypeString);
fields->Set("author", author);
fields->Set("text", text);
fields->Set("check_command", checkable->GetCheckCommand()->GetName());

CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
AddTemplateTags(fields, checkable, cr);

if (commandObj)
fields->Set("check_command", commandObj->GetName());
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");

double ts = Utility::GetTime();
Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";

if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}
double ts = Utility::GetTime();

AddTemplateTags(fields, checkable, cr);
if (cr) {
AddCheckResult(fields, checkable, cr);
ts = cr->GetExecutionEnd();
}

Enqueue(checkable, "notification", fields, ts);
Enqueue(checkable, "notification", fields, ts);
});
}

void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts)
{
AssertOnWorkQueue();

/* Atomically buffer the data point. */
std::unique_lock<std::mutex> lock(m_DataBufferMutex);

Expand Down
12 changes: 3 additions & 9 deletions lib/perfdata/elasticsearchwriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,10 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text);
void NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);

void Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts);
Expand Down
Loading
Loading