Skip to content

Commit 4ab4b33

Browse files
committed
Serialize fields before queueing the event to the workqueue
1 parent a589b87 commit 4ab4b33

8 files changed

+134
-198
lines changed

lib/perfdata/elasticsearchwriter.cpp

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -236,15 +236,6 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
236236
if (IsPaused())
237237
return;
238238

239-
m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
240-
}
241-
242-
void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
243-
{
244-
AssertOnWorkQueue();
245-
246-
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
247-
248239
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
249240
return;
250241

@@ -277,23 +268,18 @@ void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& check
277268
AddCheckResult(fields, checkable, cr);
278269
AddTemplateTags(fields, checkable, cr);
279270

280-
Enqueue(checkable, "checkresult", fields, cr->GetExecutionEnd());
271+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
272+
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
273+
274+
Enqueue(checkable, "checkresult", fields, ts);
275+
});
281276
}
282277

283278
void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
284279
{
285280
if (IsPaused())
286281
return;
287282

288-
m_WorkQueue.Enqueue([this, checkable, cr]() { StateChangeHandlerInternal(checkable, cr); });
289-
}
290-
291-
void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
292-
{
293-
AssertOnWorkQueue();
294-
295-
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
296-
297283
Host::Ptr host;
298284
Service::Ptr service;
299285
tie(host, service) = GetHostService(checkable);
@@ -320,7 +306,11 @@ void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& check
320306
AddCheckResult(fields, checkable, cr);
321307
AddTemplateTags(fields, checkable, cr);
322308

323-
Enqueue(checkable, "statechange", fields, cr->GetExecutionEnd());
309+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
310+
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
311+
312+
Enqueue(checkable, "statechange", fields, ts);
313+
});
324314
}
325315

326316
void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
@@ -329,21 +319,6 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
329319
if (IsPaused())
330320
return;
331321

332-
m_WorkQueue.Enqueue([this, checkable, users, type, cr, author, text]() {
333-
NotificationSentToAllUsersHandlerInternal(checkable, users, type, cr, author, text);
334-
});
335-
}
336-
337-
void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
338-
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text)
339-
{
340-
AssertOnWorkQueue();
341-
342-
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
343-
344-
Log(LogDebug, "ElasticsearchWriter")
345-
<< "Processing notification for '" << checkable->GetName() << "'";
346-
347322
Host::Ptr host;
348323
Service::Ptr service;
349324
tie(host, service) = GetHostService(checkable);
@@ -386,12 +361,21 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Checka
386361

387362
AddTemplateTags(fields, checkable, cr);
388363

389-
Enqueue(checkable, "notification", fields, ts);
364+
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
365+
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
366+
367+
Log(LogDebug, "ElasticsearchWriter")
368+
<< "Processing notification for '" << checkable->GetName() << "'";
369+
370+
Enqueue(checkable, "notification", fields, ts);
371+
});
390372
}
391373

392374
void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
393375
const Dictionary::Ptr& fields, double ts)
394376
{
377+
AssertOnWorkQueue();
378+
395379
/* Atomically buffer the data point. */
396380
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
397381

lib/perfdata/elasticsearchwriter.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,9 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
4343
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4444

4545
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
46-
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4746
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
48-
void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4947
void NotificationSentToAllUsersHandler(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
5048
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
51-
void NotificationSentToAllUsersHandlerInternal(const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
52-
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text);
5349

5450
void Enqueue(const Checkable::Ptr& checkable, const String& type,
5551
const Dictionary::Ptr& fields, double ts);

lib/perfdata/gelfwriter.cpp

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -268,18 +268,6 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
268268
if (IsPaused())
269269
return;
270270

271-
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
272-
}
273-
274-
void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
275-
{
276-
AssertOnWorkQueue();
277-
278-
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
279-
280-
Log(LogDebug, "GelfWriter")
281-
<< "Processing check result for '" << checkable->GetName() << "'";
282-
283271
Host::Ptr host;
284272
Service::Ptr service;
285273
tie(host, service) = GetHostService(checkable);
@@ -359,29 +347,21 @@ void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, con
359347
}
360348
}
361349

362-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
363-
}
350+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
351+
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
364352

365-
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
366-
CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName)
367-
{
368-
if (IsPaused())
369-
return;
353+
Log(LogDebug, "GelfWriter")
354+
<< "Processing check result for '" << checkable->GetName() << "'";
370355

371-
m_WorkQueue.Enqueue([this, checkable, notificationType, cr, author, commentText, commandName]() {
372-
NotificationToUserHandlerInternal(checkable, notificationType, cr, author, commentText, commandName);
356+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
373357
});
374358
}
375359

376-
void GelfWriter::NotificationToUserHandlerInternal(const Checkable::Ptr& checkable, NotificationType notificationType,
377-
CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName)
360+
void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
361+
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName)
378362
{
379-
AssertOnWorkQueue();
380-
381-
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
382-
383-
Log(LogDebug, "GelfWriter")
384-
<< "Processing notification for '" << checkable->GetName() << "'";
363+
if (IsPaused())
364+
return;
385365

386366
Host::Ptr host;
387367
Service::Ptr service;
@@ -423,26 +403,21 @@ void GelfWriter::NotificationToUserHandlerInternal(const Checkable::Ptr& checkab
423403
fields->Set("_comment", authorComment);
424404
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
425405

426-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
406+
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
407+
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
408+
409+
Log(LogDebug, "GelfWriter")
410+
<< "Processing notification for '" << checkable->GetName() << "'";
411+
412+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
413+
});
427414
}
428415

429416
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
430417
{
431418
if (IsPaused())
432419
return;
433420

434-
m_WorkQueue.Enqueue([this, checkable, cr]() { StateChangeHandlerInternal(checkable, cr); });
435-
}
436-
437-
void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
438-
{
439-
AssertOnWorkQueue();
440-
441-
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
442-
443-
Log(LogDebug, "GelfWriter")
444-
<< "Processing state change for '" << checkable->GetName() << "'";
445-
446421
Host::Ptr host;
447422
Service::Ptr service;
448423
tie(host, service) = GetHostService(checkable);
@@ -470,7 +445,14 @@ void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, con
470445
fields->Set("full_message", cr->GetOutput());
471446
fields->Set("_check_source", cr->GetCheckSource());
472447

473-
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), cr->GetExecutionEnd()));
448+
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
449+
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
450+
451+
Log(LogDebug, "GelfWriter")
452+
<< "Processing state change for '" << checkable->GetName() << "'";
453+
454+
SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
455+
});
474456
}
475457

476458
String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
@@ -484,6 +466,8 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
484466

485467
void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
486468
{
469+
AssertOnWorkQueue();
470+
487471
std::ostringstream msgbuf;
488472
msgbuf << gelfMessage;
489473
msgbuf << '\0';

lib/perfdata/gelfwriter.hpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,9 @@ class GelfWriter final : public ObjectImpl<GelfWriter>
4040
Timer::Ptr m_ReconnectTimer;
4141

4242
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
43-
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
44-
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType,
45-
const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName);
46-
void NotificationToUserHandlerInternal(const Checkable::Ptr& checkable, NotificationType notification_type,
47-
const CheckResult::Ptr& cr, const String& author, const String& comment_text, const String& command_name);
43+
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
44+
const String& author, const String& commentText, const String& commandName);
4845
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
49-
void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
5046

5147
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
5248
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);

lib/perfdata/graphitewriter.cpp

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -261,27 +261,6 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
261261
if (IsPaused())
262262
return;
263263

264-
m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
265-
}
266-
267-
/**
268-
* Check result event handler, prepares metadata and perfdata values and calls Send*()
269-
*
270-
* Called inside the WQ.
271-
*
272-
* @param checkable Host/Service object
273-
* @param cr Check result including performance data
274-
*/
275-
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
276-
{
277-
AssertOnWorkQueue();
278-
279-
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
280-
281-
/* TODO: Deal with missing connection here. Needs refactoring
282-
* into parsing the actual performance data and then putting it
283-
* into a queue for re-inserting. */
284-
285264
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
286265
return;
287266

@@ -306,29 +285,34 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
306285
});
307286
}
308287

309-
String prefixPerfdata = prefix + ".perfdata";
310-
String prefixMetadata = prefix + ".metadata";
288+
std::vector<std::pair<String, double>> metadata;
289+
if (GetEnableSendMetadata()) {
290+
metadata = {
291+
{"state", service ? service->GetState() : host->GetState()},
292+
{"current_attempt", checkable->GetCheckAttempt()},
293+
{"max_check_attempts", checkable->GetMaxCheckAttempts()},
294+
{"state_type", checkable->GetStateType()},
295+
{"reachable", checkable->IsReachable()},
296+
{"downtime_depth", checkable->GetDowntimeDepth()},
297+
{"acknowledgement", checkable->GetAcknowledgement()},
298+
{"latency", cr->CalculateLatency()},
299+
{"execution_time", cr->CalculateExecutionTime()}
300+
};
301+
}
311302

312-
double ts = cr->GetExecutionEnd();
303+
m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() {
304+
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
313305

314-
if (GetEnableSendMetadata()) {
315-
if (service) {
316-
SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
317-
} else {
318-
SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
319-
}
306+
/* TODO: Deal with missing connection here. Needs refactoring
307+
* into parsing the actual performance data and then putting it
308+
* into a queue for re-inserting. */
320309

321-
SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
322-
SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
323-
SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
324-
SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
325-
SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
326-
SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
327-
SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
328-
SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
329-
}
310+
for (auto& [name, val] : metadata) {
311+
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
312+
}
330313

331-
SendPerfdata(checkable, prefixPerfdata, cr, ts);
314+
SendPerfdata(checkable, prefix + ".perfdata", cr);
315+
});
332316
}
333317

334318
/**
@@ -337,10 +321,11 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
337321
* @param checkable Host/service object
338322
* @param prefix Metric prefix string
339323
* @param cr Check result including performance data
340-
* @param ts Timestamp when the check result was created
341324
*/
342-
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
325+
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
343326
{
327+
AssertOnWorkQueue();
328+
344329
Array::Ptr perfdata = cr->GetPerformanceData();
345330

346331
if (!perfdata)
@@ -367,6 +352,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
367352
}
368353

369354
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
355+
double ts = cr->GetExecutionEnd();
370356

371357
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
372358

@@ -394,6 +380,8 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
394380
*/
395381
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
396382
{
383+
AssertOnWorkQueue();
384+
397385
namespace asio = boost::asio;
398386

399387
std::ostringstream msgbuf;

lib/perfdata/graphitewriter.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ class GraphiteWriter final : public ObjectImpl<GraphiteWriter>
4545
Timer::Ptr m_ReconnectTimer;
4646

4747
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
48-
void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4948
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
50-
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts);
49+
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
5150
static String EscapeMetric(const String& str);
5251
static String EscapeMetricLabel(const String& str);
5352
static Value EscapeMacroMetric(const Value& value);

0 commit comments

Comments
 (0)