Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ECDC-4457 add ev_stats, ev_logs, ev_throttle librdkafka counters #836

Merged
merged 20 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 105 additions & 50 deletions src/common/kafka/Producer.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
// Copyright (C) 2016-2024 European Spallation Source, ERIC. See LICENSE file
// Copyright (C) 2016 - 2025 European Spallation Source, ERIC. See LICENSE file
//===----------------------------------------------------------------------===//
///
/// \file
/// \brief Kafka producer - wrapper for librdkafka
///
/// See https://github.com/edenhill/librdkafka
/// \note: See https://github.com/edenhill/librdkafka
///
/// For more information the avaialbe producer statistics see:
/// https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
///
//===----------------------------------------------------------------------===//

#include <cassert>
#include <common/debug/Log.h>
#include <common/debug/Trace.h>
#include <common/kafka/Producer.h>
#include <common/system/gccintel.h>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <librdkafka/rdkafkacpp.h>
#include <nlohmann/json.hpp>

// #undef TRC_LEVEL
Expand All @@ -37,30 +45,6 @@ RdKafka::Conf::ConfResult Producer::setConfig(const std::string &Key,
return configResult;
}

///
void Producer::event_cb(RdKafka::Event &event) {
nlohmann::json res;
switch (event.type()) {
case RdKafka::Event::EVENT_STATS:
res = nlohmann::json::parse(event.str());
stats.librdkafka_msg_cnt += res["msg_cnt"].get<int64_t>();
stats.librdkafka_msg_size += res["msg_size"].get<int64_t>();
break;
case RdKafka::Event::EVENT_ERROR:
LOG(KAFKA, Sev::Warning, "Rdkafka::Event::EVENT_ERROR: {}",
RdKafka::err2str(event.err()).c_str());
XTRACE(KAFKA, WAR, "Rdkafka::Event::EVENT_ERROR: %s\n",
RdKafka::err2str(event.err()).c_str());
stats.ev_errors++;
break;
default:
XTRACE(KAFKA, INF, "RdKafka::Event:: %d: %s\n", event.type(),
RdKafka::err2str(event.err()).c_str());
stats.ev_others++;
break;
}
}

///
Producer::Producer(const std::string &Broker, const std::string &Topic,
std::vector<std::pair<std::string, std::string>> &Configs)
Expand All @@ -85,13 +69,24 @@ Producer::Producer(const std::string &Broker, const std::string &Topic,
setConfig(Config.first, Config.second);
}

if (Config->set("event_cb", this, ErrorMessage) != RdKafka::Conf::CONF_OK) {
if (Config->set("event_cb", &EventHandler, ErrorMessage) !=
RdKafka::Conf::CONF_OK) {
LOG(KAFKA, Sev::Error, "Kafka: unable to set event_cb");
}

// if (conf->set("dr_cb", this, ErrorMessage) != RdKafka::Conf::CONF_OK) {
// LOG(KAFKA, Sev::Error, "Kafka: unable to set dr_cb");
// }
if (Config->set("dr_cb", &DeliveryHandler, ErrorMessage) !=
RdKafka::Conf::CONF_OK) {
LOG(KAFKA, Sev::Error, "Kafka: unable to set dr_cb");
}

// Set message timeout to 5 seconds
Config->set("message.timeout.ms", "5000", ErrorMessage);

// Set retry backoff to 100ms
Config->set("retry.backoff.ms", "100", ErrorMessage);

// Set the number of retries to 3
Config->set("retries", "3", ErrorMessage);

KafkaProducer.reset(RdKafka::Producer::create(Config.get(), ErrorMessage));
if (!KafkaProducer) {
Expand All @@ -115,33 +110,93 @@ int Producer::produce(const nonstd::span<const std::uint8_t> &Buffer,
}

// non-blocking, copies the buffer to kafka thread for transfer
RdKafka::ErrorCode resp = KafkaProducer->produce(
TopicName, -1, RdKafka::Producer::RK_MSG_COPY,
const_cast<std::uint8_t *>(Buffer.data()), Buffer.size_bytes(), NULL, 0,
MessageTimestampMS, NULL);
KafkaProducer->produce(TopicName, -1, RdKafka::Producer::RK_MSG_COPY,
const_cast<std::uint8_t *>(Buffer.data()),
Buffer.size_bytes(), NULL, 0, MessageTimestampMS,
NULL);

stats.produce_calls++;

// poll for events in the event queue and triggers callbacks on them
KafkaProducer->poll(0);

if (resp != RdKafka::ERR_NO_ERROR) {
if (resp == RdKafka::ERR__UNKNOWN_TOPIC) {
stats.err_unknown_topic++;
} else if (resp == RdKafka::ERR__QUEUE_FULL) {
stats.err_queue_full++;
} else {
stats.err_other++;
stats.produce_bytes_ok += Buffer.size_bytes();

return 0;
}

// Implementation of KafkaEventHandler override
void KafkaEventHandler::event_cb(RdKafka::Event &event) {
nlohmann::json res;
switch (event.type()) {
case RdKafka::Event::EVENT_STATS:
res = nlohmann::json::parse(event.str());
NumberOfMsgInQueue += res["msg_cnt"].get<int64_t>();
SizeOfMsgInQueue += res["msg_size"].get<int64_t>();
for (auto &broker : res["brokers"].items()) {
const auto &broker_info = broker.value();
BytesTransmittedToBrokers += broker_info.value("txbytes", (int64_t)0);
TransmissionErrors += broker_info.value("txerrs", (int64_t)0);
TxRequestRetries += broker_info.value("txerrs", (int64_t)0);
}
break;
case RdKafka::Event::EVENT_ERROR:

XTRACE(KAFKA, DEB, "produce: %s", RdKafka::err2str(resp).c_str());
stats.produce_bytes_error += Buffer.size_bytes();
stats.produce_errors++;
return resp;
} else {
stats.produce_bytes_ok += Buffer.size_bytes();
stats.produce_no_errors++;
}
// First log the error and its error string.
LOG(KAFKA, Sev::Warning, "Rdkafka::Event::EVENT_ERROR [{}]: {}",
event.err(), RdKafka::err2str(event.err()).c_str());
XTRACE(KAFKA, WAR, "Rdkafka::Event::EVENT_ERROR [%d]: %s\n", event.err(),
RdKafka::err2str(event.err()).c_str());

return 0;
switch (event.err()) {
case RdKafka::ErrorCode::ERR__TIMED_OUT:
++ErrTimeout;
break;
case RdKafka::ErrorCode::ERR__TRANSPORT:
++ErrTransport;
break;
case RdKafka::ErrorCode::ERR_BROKER_NOT_AVAILABLE:
++ErrBrokerNotAvailable;
break;
case RdKafka::ErrorCode::ERR__UNKNOWN_TOPIC:
++ErrUnknownTopic;
break;
case RdKafka::ErrorCode::ERR__QUEUE_FULL:
++ErrQueueFull;
break;
case RdKafka::ErrorCode::ERR_NO_ERROR:
break;
default:
++ErrOther;
break;
}
break;
default:
break;
}
}

// Implementation of DeliveryReportHandler override
void DeliveryReportHandler::dr_cb(RdKafka::Message &message) {
++TotalMessageDelivered;

switch (message.status()) {
case RdKafka::Message::MSG_STATUS_NOT_PERSISTED:
++MsgStatusNotPersisted;
break;
case RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED:
++MsgStatusPossiblyPersisted;
break;
case RdKafka::Message::MSG_STATUS_PERSISTED:
++MsgStatusPersisted;
break;
default:
break;
}

if (message.err() != RdKafka::ERR_NO_ERROR) {
++errorCount;
} else {
++successCount;
}
}
62 changes: 44 additions & 18 deletions src/common/kafka/Producer.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2016-2024 European Spallation Source, ERIC. See LICENSE file
// Copyright (C) 2016 - 2025 European Spallation Source, ERIC. See LICENSE file
//===----------------------------------------------------------------------===//
///
/// \file
Expand All @@ -9,6 +9,7 @@

#pragma once

#include <cstdint>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
#include <librdkafka/rdkafkacpp.h>
Expand All @@ -18,9 +19,45 @@
#include <common/memory/span.hpp>
#include <functional>
#include <memory>
#include <unordered_map> // add if not already included
#include <utility>
#include <vector>

// New handler classes
class KafkaEventHandler : public RdKafka::EventCb {
public:
KafkaEventHandler() {}

void event_cb(RdKafka::Event &event) override;

int64_t ErrTimeout{0};
int64_t ErrTransport{0};
int64_t ErrBrokerNotAvailable{0};
int64_t ErrUnknownTopic{0};
int64_t ErrQueueFull{0};
int64_t ErrOther{0};

// Counters for statistics
int64_t NumberOfMsgInQueue{0};
int64_t SizeOfMsgInQueue{0};
int64_t BytesTransmittedToBrokers{0};
int64_t TransmissionErrors{0};
int64_t TxRequestRetries{0};
};

class DeliveryReportHandler : public RdKafka::DeliveryReportCb {
public:
DeliveryReportHandler() : TotalMessageDelivered(0), errorCount(0), successCount(0),
MsgStatusNotPersisted(0), MsgStatusPossiblyPersisted(0), MsgStatusPersisted(0) {}
void dr_cb(RdKafka::Message &message) override;
uint64_t TotalMessageDelivered; // Total delivery reports received
uint64_t errorCount; // Count of delivery reports with error
uint64_t successCount; // Count of successful delivery reports
uint64_t MsgStatusNotPersisted; // Count of messages not persisted
uint64_t MsgStatusPossiblyPersisted; // Count of messages possibly persisted
uint64_t MsgStatusPersisted; // Count of messages persisted
};

///
class ProducerBase {
public:
Expand All @@ -42,7 +79,7 @@ class ProducerBase {
/// sending them to the cluster.
///
/// It inherits from ProducerBase and RdKafka::EventCb.
class Producer : public ProducerBase, public RdKafka::EventCb {
class Producer : public ProducerBase {
public:
/// \brief Constructs a Producer object.
///
Expand Down Expand Up @@ -72,33 +109,22 @@ class Producer : public ProducerBase, public RdKafka::EventCb {
/// \param Key The configuration key.
/// \param Value The configuration value.
/// \return RdKafka::Conf::ConfResult The result of setting the configuration.
RdKafka::Conf::ConfResult setConfig(const std::string &Key, const std::string &Value);

/// \brief Callback function for Kafka to handle events like errors,
/// statistics.
///
/// \param event The Kafka event to be handled
void event_cb(RdKafka::Event &event) override;
RdKafka::Conf::ConfResult setConfig(const std::string &Key,
const std::string &Value);

/// \brief Structure to hold producer statistics.
struct ProducerStats {
int64_t config_errors;
int64_t ev_errors;
int64_t ev_others;
int64_t dr_errors;
int64_t dr_noerrors;
int64_t produce_bytes_ok;
int64_t produce_bytes_error;
int64_t produce_calls;
int64_t produce_errors;
int64_t produce_no_errors;
int64_t err_unknown_topic;
int64_t err_queue_full;
int64_t err_other;
int64_t librdkafka_msg_cnt;
int64_t librdkafka_msg_size;
} stats = {};

KafkaEventHandler EventHandler;
DeliveryReportHandler DeliveryHandler;

protected:
std::string ErrorMessage;
std::string TopicName;
Expand Down
5 changes: 4 additions & 1 deletion src/modules/caen/CaenBase.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2016 - 2024 European Spallation Source, ERIC. See LICENSE file
// Copyright (C) 2016 - 2025 European Spallation Source, ERIC. See LICENSE file
//===----------------------------------------------------------------------===//
///
/// \file
Expand Down Expand Up @@ -115,6 +115,9 @@ CaenBase::CaenBase(BaseSettings const &settings,
Stats.create("kafka.err_unknown_topic", Counters.KafkaStats.err_unknown_topic);
Stats.create("kafka.err_queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.err_other", Counters.KafkaStats.err_other);
Stats.create("kafka.ev_stats", Counters.KafkaStats.ev_stats);
Stats.create("kafka.ev_throttle", Counters.KafkaStats.ev_throttle);
Stats.create("kafka.ev_logs", Counters.KafkaStats.ev_logs);
Stats.create("kafka.ev_errors", Counters.KafkaStats.ev_errors);
Stats.create("kafka.ev_others", Counters.KafkaStats.ev_others);
Stats.create("kafka.dr_errors", Counters.KafkaStats.dr_errors);
Expand Down
17 changes: 13 additions & 4 deletions src/modules/cbm/CbmBase.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2022 - 2024 European Spallation Source, see LICENSE file
// Copyright (C) 2022 - 2025 European Spallation Source, see LICENSE file
//===----------------------------------------------------------------------===//
///
/// \file
Expand Down Expand Up @@ -100,15 +100,24 @@ CbmBase::CbmBase(BaseSettings const &settings)
Stats.create("kafka.produce_calls", Counters.KafkaStats.produce_calls);
Stats.create("kafka.produce_no_errors", Counters.KafkaStats.produce_no_errors);
Stats.create("kafka.produce_errors", Counters.KafkaStats.produce_errors);
Stats.create("kafka.err_unknown_topic", Counters.KafkaStats.err_unknown_topic);
Stats.create("kafka.err_queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.err_other", Counters.KafkaStats.err_other);
Stats.create("kafka.error.unknown_topic", Counters.KafkaEventHandler.ErrUnknownTopic);
Stats.create("kafka.error.queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.error.timeout", Counters.KafkaStats.err_timeout);
Stats.create("kafka.error.transport", Counters.KafkaStats.err_transport);
Stats.create("kafka.error.other", Counters.KafkaStats.err_other);
Stats.create("kafka.ev_stats", Counters.KafkaStats.ev_stats);
Stats.create("kafka.ev_throttle", Counters.KafkaStats.ev_throttle);
Stats.create("kafka.ev_logs", Counters.KafkaStats.ev_logs);
Stats.create("kafka.ev_errors", Counters.KafkaStats.ev_errors);
Stats.create("kafka.ev_others", Counters.KafkaStats.ev_others);
Stats.create("kafka.dr_errors", Counters.KafkaStats.dr_errors);
Stats.create("kafka.dr_others", Counters.KafkaStats.dr_noerrors);
Stats.create("kafka.librdkafka_msg_cnt", Counters.KafkaStats.librdkafka_msg_cnt);
Stats.create("kafka.librdkafka_msg_size", Counters.KafkaStats.librdkafka_msg_size);
Stats.create("kafka.librdkafka_tx_bytes", Counters.KafkaStats.librdkafka_tx_bytes);
Stats.create("kafka.librdkafka_broker_tx_bytes", Counters.KafkaStats.librdkafka_brokers_tx_byte);
Stats.create("kafka.librdkafka_broker_tx_errors", Counters.KafkaStats.librdkafka_brokers_txerrors);
Stats.create("kafka.librdkafka_broker_waitresp", Counters.KafkaStats.librdkafka_brokers_waitresp);
// clang-format on
std::function<void()> inputFunc = [this]() { inputThread(); };
AddThreadFunction(inputFunc, "input");
Expand Down
2 changes: 2 additions & 0 deletions src/modules/cbm/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct Counters {

// Kafka stats below are common to all detectors
struct Producer::ProducerStats KafkaStats;
KafkaEventHandler KafkaEventHandler;
DeliveryReportHandler KafkaDeliveryReportHandler;

} __attribute__((aligned(64)));

Expand Down
5 changes: 4 additions & 1 deletion src/modules/dream/DreamBase.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2016 - 2024 European Spallation Source, ERIC. See LICENSE file
// Copyright (C) 2016 - 2025 European Spallation Source, ERIC. See LICENSE file
//===----------------------------------------------------------------------===//
///
/// \file
Expand Down Expand Up @@ -83,6 +83,9 @@ DreamBase::DreamBase(BaseSettings const &Settings) : Detector(Settings) {
Stats.create("kafka.err_unknown_topic", Counters.KafkaStats.err_unknown_topic);
Stats.create("kafka.err_queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.err_other", Counters.KafkaStats.err_other);
Stats.create("kafka.ev_stats", Counters.KafkaStats.ev_stats);
Stats.create("kafka.ev_throttle", Counters.KafkaStats.ev_throttle);
Stats.create("kafka.ev_logs", Counters.KafkaStats.ev_logs);
Stats.create("kafka.ev_errors", Counters.KafkaStats.ev_errors);
Stats.create("kafka.ev_others", Counters.KafkaStats.ev_others);
Stats.create("kafka.dr_errors", Counters.KafkaStats.dr_errors);
Expand Down
Loading