Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ECDC-4457 add ev_stats, ev_logs, ev_throttle librdkafka counters #836

Merged
merged 20 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 36 additions & 59 deletions src/common/kafka/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,6 @@ RdKafka::Conf::ConfResult Producer::setConfig(const std::string &Key,
return configResult;
}

// void Producer::dr_cb(RdKafka::Message &message) {
// if (message.status() == RdKafka::Message::MSG_STATUS_NOT_PERSISTED) {
// std::cerr << "Message not persisted: " << message.errstr() << std::endl;
// } else if (message.status() ==
// RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED) {
// std::cerr << "Message possibly persisted: " << message.errstr()
// << std::endl;
// } else if (message.status() == RdKafka::Message::MSG_STATUS_PERSISTED) {
// std::cerr << "Message persisted: " << message.errstr() << std::endl;
// }

// if (message.err()) {
// std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
// } else {
// std::cout << "Message delivered to topic " << message.topic_name() << " ["
// << message.partition() << "] at offset " << message.offset()
// << std::endl;
// }
// }

///
Producer::Producer(const std::string &Broker, const std::string &Topic,
std::vector<std::pair<std::string, std::string>> &Configs)
Expand All @@ -89,12 +69,13 @@ Producer::Producer(const std::string &Broker, const std::string &Topic,
setConfig(Config.first, Config.second);
}

// if (Config->set("event_cb", this, ErrorMessage) != RdKafka::Conf::CONF_OK)
// {
// LOG(KAFKA, Sev::Error, "Kafka: unable to set event_cb");
// }
if (Config->set("event_cb", &EventHandler, ErrorMessage) !=
RdKafka::Conf::CONF_OK) {
LOG(KAFKA, Sev::Error, "Kafka: unable to set event_cb");
}

if (Config->set("dr_cb", this, ErrorMessage) != RdKafka::Conf::CONF_OK) {
if (Config->set("dr_cb", &DeliveryHandler, ErrorMessage) !=
RdKafka::Conf::CONF_OK) {
LOG(KAFKA, Sev::Error, "Kafka: unable to set dr_cb");
}

Expand Down Expand Up @@ -129,35 +110,17 @@ int Producer::produce(const nonstd::span<const std::uint8_t> &Buffer,
}

// non-blocking, copies the buffer to kafka thread for transfer
RdKafka::ErrorCode resp = KafkaProducer->produce(
TopicName, -1, RdKafka::Producer::RK_MSG_COPY,
const_cast<std::uint8_t *>(Buffer.data()), Buffer.size_bytes(), NULL, 0,
MessageTimestampMS, NULL);
KafkaProducer->produce(TopicName, -1, RdKafka::Producer::RK_MSG_COPY,
const_cast<std::uint8_t *>(Buffer.data()),
Buffer.size_bytes(), NULL, 0, MessageTimestampMS,
NULL);

stats.produce_calls++;

// poll for events in the event queue and triggers callbacks on them
KafkaProducer->poll(0);

/// \todo: this is probably not happen because we have async producer with
/// kafka
if (resp != RdKafka::ERR_NO_ERROR) {
if (resp == RdKafka::ERR__UNKNOWN_TOPIC) {
stats.err_unknown_topic++;
} else if (resp == RdKafka::ERR__QUEUE_FULL) {
stats.err_queue_full++;
} else {
stats.err_other++;
}

XTRACE(KAFKA, DEB, "produce: %s", RdKafka::err2str(resp).c_str());
stats.produce_bytes_error += Buffer.size_bytes();
stats.produce_errors++;
return resp;
} else {
stats.produce_bytes_ok += Buffer.size_bytes();
// stats.produce_no_errors++;
}
stats.produce_bytes_ok += Buffer.size_bytes();

return 0;
}
Expand All @@ -168,8 +131,8 @@ void KafkaEventHandler::event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_STATS:
res = nlohmann::json::parse(event.str());
NumberOfMessageInQueue += res["msg_cnt"].get<int64_t>();
SizeOfMessageInQueue += res["msg_size"].get<int64_t>();
NumberOfMsgInQueue += res["msg_cnt"].get<int64_t>();
SizeOfMsgInQueue += res["msg_size"].get<int64_t>();
for (auto &broker : res["brokers"].items()) {
const auto &broker_info = broker.value();
BytesTransmittedToBrokers += broker_info.value("txbytes", (int64_t)0);
Expand All @@ -179,32 +142,32 @@ void KafkaEventHandler::event_cb(RdKafka::Event &event) {
break;
case RdKafka::Event::EVENT_ERROR:

// First log the error and it's erro string.
// First log the error and its error string.
LOG(KAFKA, Sev::Warning, "Rdkafka::Event::EVENT_ERROR [{}]: {}",
event.err(), RdKafka::err2str(event.err()).c_str());
XTRACE(KAFKA, WAR, "Rdkafka::Event::EVENT_ERROR [%d]: %s\n", event.err(),
RdKafka::err2str(event.err()).c_str());

switch (event.err()) {
case RdKafka::ErrorCode::ERR__TIMED_OUT:
ErrTimeout++;
++ErrTimeout;
break;
case RdKafka::ErrorCode::ERR__TRANSPORT:
ErrTransport++;
++ErrTransport;
break;
case RdKafka::ErrorCode::ERR_BROKER_NOT_AVAILABLE:
ErrBrokerNotAvailable++;
++ErrBrokerNotAvailable;
break;
case RdKafka::ErrorCode::ERR__UNKNOWN_TOPIC:
ErrUnknownTopic++;
++ErrUnknownTopic;
break;
case RdKafka::ErrorCode::ERR__QUEUE_FULL:
ErrQueueFull++;
++ErrQueueFull;
break;
case RdKafka::ErrorCode::ERR_NO_ERROR:
break;
default:
ErrOther++;
++ErrOther;
break;
}
break;
Expand All @@ -215,11 +178,25 @@ void KafkaEventHandler::event_cb(RdKafka::Event &event) {

// Implementation of DeliveryReportHandler override
void DeliveryReportHandler::dr_cb(RdKafka::Message &message) {
++totalCount;
++TotalMessageDelivered;

switch (message.status()) {
case RdKafka::Message::MSG_STATUS_NOT_PERSISTED:
++MsgStatusNotPersisted;
break;
case RdKafka::Message::MSG_STATUS_POSSIBLY_PERSISTED:
++MsgStatusPossiblyPersisted;
break;
case RdKafka::Message::MSG_STATUS_PERSISTED:
++MsgStatusPersisted;
break;
default:
break;
}

if (message.err() != RdKafka::ERR_NO_ERROR) {
++errorCount;
} else {
++successCount;
}
// ...existing code...
}
42 changes: 14 additions & 28 deletions src/common/kafka/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,32 @@ class KafkaEventHandler : public RdKafka::EventCb {

void event_cb(RdKafka::Event &event) override;

uint64_t ErrTimeout{0};
uint64_t ErrTransport{0};
uint64_t ErrBrokerNotAvailable{0};
uint64_t ErrUnknownTopic{0};
uint64_t ErrQueueFull{0};
uint64_t ErrOther{0};
int64_t ErrTimeout{0};
int64_t ErrTransport{0};
int64_t ErrBrokerNotAvailable{0};
int64_t ErrUnknownTopic{0};
int64_t ErrQueueFull{0};
int64_t ErrOther{0};

// Counters for statistics
int64_t NumberOfMessageInQueue{0};
int64_t SizeOfMessageInQueue{0};
int64_t NumberOfMsgInQueue{0};
int64_t SizeOfMsgInQueue{0};
int64_t BytesTransmittedToBrokers{0};
int64_t TransmissionErrors{0};
int64_t TxRequestRetries{0};
};

class DeliveryReportHandler : public RdKafka::DeliveryReportCb {
public:
DeliveryReportHandler() : totalCount(0), errorCount(0), successCount(0) {}
DeliveryReportHandler() : TotalMessageDelivered(0), errorCount(0), successCount(0),
MsgStatusNotPersisted(0), MsgStatusPossiblyPersisted(0), MsgStatusPersisted(0) {}
void dr_cb(RdKafka::Message &message) override;
uint64_t totalCount; // Total delivery reports received
uint64_t TotalMessageDelivered; // Total delivery reports received
uint64_t errorCount; // Count of delivery reports with error
uint64_t successCount; // Count of successful delivery reports
uint64_t MsgStatusNotPersisted; // Count of messages not persisted
uint64_t MsgStatusPossiblyPersisted; // Count of messages possibly persisted
uint64_t MsgStatusPersisted; // Count of messages persisted
};

///
Expand Down Expand Up @@ -111,29 +115,11 @@ class Producer : public ProducerBase {
/// \brief Structure to hold producer statistics.
struct ProducerStats {
int64_t config_errors;
int64_t ev_stats;
int64_t ev_logs;
int64_t ev_throttle;
int64_t ev_errors;
int64_t ev_others;
int64_t dr_errors;
int64_t dr_noerrors;
int64_t produce_bytes_ok;
int64_t produce_bytes_error;
int64_t produce_calls;
int64_t produce_errors;
int64_t produce_no_errors;
int64_t err_timeout;
int64_t err_transport;
int64_t err_unknown_topic;
int64_t err_queue_full;
int64_t err_other;
int64_t librdkafka_msg_cnt;
int64_t librdkafka_msg_size;
int64_t librdkafka_tx_bytes;
int64_t librdkafka_brokers_waitresp = 0;
int64_t librdkafka_brokers_tx_byte = 0;
int64_t librdkafka_brokers_txerrors = 0;
} stats = {};

KafkaEventHandler EventHandler;
Expand Down
10 changes: 5 additions & 5 deletions src/modules/cbm/CbmBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ CbmBase::CbmBase(BaseSettings const &settings)
Stats.create("kafka.produce_calls", Counters.KafkaStats.produce_calls);
Stats.create("kafka.produce_no_errors", Counters.KafkaStats.produce_no_errors);
Stats.create("kafka.produce_errors", Counters.KafkaStats.produce_errors);
Stats.create("kafka.err_unknown_topic", Counters.KafkaStats.err_unknown_topic);
Stats.create("kafka.err_queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.err_timeout", Counters.KafkaStats.err_timeout);
Stats.create("kafka.err_transport", Counters.KafkaStats.err_transport);
Stats.create("kafka.err_other", Counters.KafkaStats.err_other);
Stats.create("kafka.error.unknown_topic", Counters.KafkaEventHandler.ErrUnknownTopic);
Stats.create("kafka.error.queue_full", Counters.KafkaStats.err_queue_full);
Stats.create("kafka.error.timeout", Counters.KafkaStats.err_timeout);
Stats.create("kafka.error.transport", Counters.KafkaStats.err_transport);
Stats.create("kafka.error.other", Counters.KafkaStats.err_other);
Stats.create("kafka.ev_stats", Counters.KafkaStats.ev_stats);
Stats.create("kafka.ev_throttle", Counters.KafkaStats.ev_throttle);
Stats.create("kafka.ev_logs", Counters.KafkaStats.ev_logs);
Expand Down
2 changes: 2 additions & 0 deletions src/modules/cbm/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct Counters {

// Kafka stats below are common to all detectors
struct Producer::ProducerStats KafkaStats;
KafkaEventHandler KafkaEventHandler;
DeliveryReportHandler KafkaDeliveryReportHandler;

} __attribute__((aligned(64)));

Expand Down