Skip to content

Fix: Replace bdlmt::Throttle direct usage with BALL_LOGTHROTTLE #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/groups/bmq/bmqtsk/bmqtsk_alarmlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,22 @@
// 'BALL_LOG_ERROR_BLOCK', meaning that:
//: o log message is streamed into the macro
//: o the call must be ended by a terminal 'BMQTSK_ALARMLOG_END'
#define BMQTSK_ALARMLOG_ALARM_HEAD(CATEGORY) \
BALL_LOG_OUTPUT_STREAM << "ALARM [" << CATEGORY << "] "; \
BALL_LOG_RECORD->customFields().appendString("ALARM"); \
BALL_LOG_OUTPUT_STREAM
#define BMQTSK_ALARMLOG_PANIC_HEAD(CATEGORY) \
BALL_LOG_OUTPUT_STREAM << "PANIC [" << CATEGORY << "] "; \
BALL_LOG_RECORD->customFields().appendString("PANIC"); \
BALL_LOG_OUTPUT_STREAM
#define BMQTSK_ALARMLOG_ALARM(CATEGORY) \
BALL_LOG_ERROR_BLOCK \
{ \
BALL_LOG_OUTPUT_STREAM << "ALARM [" << CATEGORY << "] "; \
BALL_LOG_RECORD->customFields().appendString("ALARM"); \
BALL_LOG_OUTPUT_STREAM
BMQTSK_ALARMLOG_ALARM_HEAD(CATEGORY)
#define BMQTSK_ALARMLOG_PANIC(CATEGORY) \
BALL_LOG_ERROR_BLOCK \
{ \
BALL_LOG_OUTPUT_STREAM << "PANIC [" << CATEGORY << "] "; \
BALL_LOG_RECORD->customFields().appendString("PANIC"); \
BALL_LOG_OUTPUT_STREAM
BMQTSK_ALARMLOG_PANIC_HEAD(CATEGORY)
#define BMQTSK_ALARMLOG_END \
""; \
}
Expand Down
149 changes: 60 additions & 89 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@

// BDE
#include <ball_log.h>
#include <ball_logthrottle.h>
#include <bdlb_nullablevalue.h>
#include <bdlb_scopeexit.h>
#include <bdld_datum.h>
Expand Down Expand Up @@ -359,20 +360,9 @@ ClientSessionState::ClientSessionState(
, d_schemaEventBuilder(blobSpPool, encodingType, allocator)
, d_pushBuilder(blobSpPool, allocator)
, d_ackBuilder(blobSpPool, allocator)
, d_throttledFailedAckMessages()
, d_throttledFailedPutMessages()
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(encodingType != bmqp::EncodingType::e_UNKNOWN);

d_throttledFailedAckMessages.initialize(
1,
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
// One maximum log per 5 seconds
d_throttledFailedPutMessages.initialize(
1,
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
// One maximum log per 5 seconds
}

// -------------------
Expand Down Expand Up @@ -595,15 +585,12 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
// Throttle error log if this is a 'failed Ack': note that we log at
// INFO level in order not to overwhelm the dashboard, if a queue is
// full, every post will nack, which could be a lot.
if (d_state.d_throttledFailedAckMessages.requestPermission()) {
BALL_LOG_INFO << description() << ": failed Ack "
<< "[status: " << status << ", source: '" << source
<< "'"
<< ", correlationId: " << correlationId
<< ", GUID: " << messageGUID << ", queue: '" << uri
<< "' "
<< "(id: " << queueId << ")]";
}
BALL_LOGTHROTTLE_INFO(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< description() << ": failed Ack " << "[status: " << status
<< ", source: '" << source << "'"
<< ", correlationId: " << correlationId
<< ", GUID: " << messageGUID << ", queue: '" << uri << "' "
<< "(id: " << queueId << ")]";
}

// Always print at trace level
Expand Down Expand Up @@ -2375,18 +2362,15 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
//
// In order to not lose the data, we do send the message
// upstream and here we just log the error.
if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN
<< "#UNACKED_PUT_GUID_COLLISION " << description()
<< ": GUID collision detected "
<< "for a PUT message at first hop for queue ["
<< queueStatePtr->d_handle_p->queue()->uri()
<< "], correlationId: " << correlationId
<< ", message size: " << putIt.applicationDataSize()
<< ", GUID: " << insertRc.first->first << ". "
<< "This could be due to retransmitted PUT "
<< "message.";
}
BALL_LOGTHROTTLE_WARN(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "#UNACKED_PUT_GUID_COLLISION " << description()
<< ": GUID collision detected "
<< "for a PUT message at first hop for queue ["
<< queueStatePtr->d_handle_p->queue()->uri()
<< "], correlationId: " << correlationId
<< ", message size: " << putIt.applicationDataSize()
<< ", GUID: " << insertRc.first->first
<< ". This could be due to retransmitted PUT message.";
}
}

Expand All @@ -2408,14 +2392,12 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc < 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_ERROR_BLOCK
{
BALL_LOG_OUTPUT_STREAM << "#CORRUPTED_EVENT " << description()
<< ": Invalid Put event [rc: " << rc
<< "]\n";
putIt.dumpBlob(BALL_LOG_OUTPUT_STREAM);
}
BALL_LOGTHROTTLE_ERROR_BLOCK(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
{
BALL_LOG_OUTPUT_STREAM << "#CORRUPTED_EVENT " << description()
<< ": Invalid Put event [rc: " << rc
<< "]\n";
putIt.dumpBlob(BALL_LOG_OUTPUT_STREAM);
}
}
}
Expand Down Expand Up @@ -2501,24 +2483,22 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc != 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_ERROR_BLOCK
{
BALL_LOG_OUTPUT_STREAM
<< "#CORRUPTED_EVENT " << description()
<< ": Failed to load PUT message payload [queueId: "
<< queueId;
BALL_LOGTHROTTLE_ERROR_BLOCK(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
{
BALL_LOG_OUTPUT_STREAM
<< "#CORRUPTED_EVENT " << description()
<< ": Failed to load PUT message payload [queueId: "
<< queueId;

if (isFirstHop && !d_isClientGeneratingGUIDs) {
BALL_LOG_OUTPUT_STREAM << ", correlationId: "
<< putIt.header().correlationId();
}
else {
BALL_LOG_OUTPUT_STREAM << ", GUID: "
<< putIt.header().messageGUID();
}
BALL_LOG_OUTPUT_STREAM << ", rc: " << rc << "]";
if (isFirstHop && !d_isClientGeneratingGUIDs) {
BALL_LOG_OUTPUT_STREAM << ", correlationId: "
<< putIt.header().correlationId();
}
else {
BALL_LOG_OUTPUT_STREAM << ", GUID: "
<< putIt.header().messageGUID();
}
BALL_LOG_OUTPUT_STREAM << ", rc: " << rc << "]";
}

ackStatus = bmqt::AckResult::e_INVALID_ARGUMENT;
Expand All @@ -2530,11 +2510,9 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
// queue)
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN << "#CLIENT_INVALID_PUT " << description()
<< ": PUT message for queue with unknown Id ("
<< queueId << ")";
}
BALL_LOGTHROTTLE_WARN(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "#CLIENT_INVALID_PUT " << description()
<< ": PUT message for queue with unknown Id (" << queueId << ")";

ackStatus = bmqt::AckResult::e_INVALID_ARGUMENT;
ackDescription = "putEvent::unknownQueue";
Expand All @@ -2544,12 +2522,10 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
// Has this client already sent the 'final' closeQueue request?
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN << "#CLIENT_IMPROPER_BEHAVIOR " << description()
<< ": PUT message for queue Id (" << queueId
<< ") after final"
<< " closeQueue request.";
}
BALL_LOGTHROTTLE_WARN(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "#CLIENT_IMPROPER_BEHAVIOR " << description()
<< ": PUT message for queue Id (" << queueId << ") after final"
<< " closeQueue request.";

ackStatus = bmqt::AckResult::e_REFUSED;
ackDescription = "putEvent::afterFinalCloseQueue";
Expand All @@ -2566,24 +2542,21 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc != 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_ERROR_BLOCK
{
BALL_LOG_OUTPUT_STREAM << "#CORRUPTED_EVENT "
<< description()
<< ": Failed to load PUT message "
<< "options [queueId: " << queueId;
if (isFirstHop && !d_isClientGeneratingGUIDs) {
BALL_LOG_OUTPUT_STREAM
<< ", correlationId: "
<< putIt.header().correlationId();
}
else {
BALL_LOG_OUTPUT_STREAM << ", GUID: "
<< putIt.header().messageGUID();
}
BALL_LOG_OUTPUT_STREAM << ", rc: " << rc << "]";
BALL_LOGTHROTTLE_ERROR_BLOCK(1,
5 * bdlt::TimeUnitRatio::k_NS_PER_S)
{
BALL_LOG_OUTPUT_STREAM << "#CORRUPTED_EVENT " << description()
<< ": Failed to load PUT message "
<< "options [queueId: " << queueId;
if (isFirstHop && !d_isClientGeneratingGUIDs) {
BALL_LOG_OUTPUT_STREAM << ", correlationId: "
<< putIt.header().correlationId();
}
else {
BALL_LOG_OUTPUT_STREAM << ", GUID: "
<< putIt.header().messageGUID();
}
BALL_LOG_OUTPUT_STREAM << ", rc: " << rc << "]";
}

ackStatus = bmqt::AckResult::e_INVALID_ARGUMENT;
Expand All @@ -2596,11 +2569,9 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
// Check if SDK sent PUT with non-empty GUID
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (d_state.d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_ERROR << "#CLIENT_INVALID_PUT " << description()
<< ": PUT message with unset GUID [queueId: "
<< queueId << "]";
}
BALL_LOGTHROTTLE_ERROR(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "#CLIENT_INVALID_PUT " << description()
<< ": PUT message with unset GUID [queueId: " << queueId << "]";

ackStatus = bmqt::AckResult::e_INVALID_ARGUMENT;
ackDescription = "putEvent::unsetGUID";
Expand Down
6 changes: 0 additions & 6 deletions src/groups/mqb/mqba/mqba_clientsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,6 @@ struct ClientSessionState {
/// Builder for ack messages. To be used only in client dispatcher thread.
bmqp::AckEventBuilder d_ackBuilder;

/// Throttler for failed ACK messages.
bdlmt::Throttle d_throttledFailedAckMessages;

/// Throttler for failed PUT messages.
bdlmt::Throttle d_throttledFailedPutMessages;

/// Stats associated with an unknown queue, lazily created when the first
/// usage of an unknown queue is encountered
bdlb::NullableValue<mqbstat::QueueStatsClient> d_invalidQueueStats;
Expand Down
18 changes: 6 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <bmqu_memoutstream.h>

// BDE
#include <ball_logthrottle.h>
#include <bdlb_nullablevalue.h>
#include <bdlb_print.h>
#include <bdlma_localsequentialallocator.h>
Expand Down Expand Up @@ -66,18 +67,12 @@ LocalQueue::LocalQueue(QueueState* state, bslma::Allocator* allocator)
, d_state_p(state)
, d_queueEngine_mp(0)
, d_throttledFailedPutMessages(5000, 1) // 1 log per 5s interval
, d_throttledDuplicateMessages()
, d_haveStrongConsistency(false)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_state_p->id() == bmqp::QueueId::k_PRIMARY_QUEUE_ID);
// A LocalQueue has no 'upstream', hence no id

d_throttledDuplicateMessages.initialize(
1,
1 * bdlt::TimeUnitRatio::k_NS_PER_S);
// Maximum one per 1 seconds

d_state_p->setDescription(d_state_p->uri().asString());
}

Expand Down Expand Up @@ -517,12 +512,11 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

if (res == mqbi::StorageResult::e_DUPLICATE) {
if (d_throttledDuplicateMessages.requestPermission()) {
BALL_LOG_WARN << "Duplicate PUT message queue ["
<< d_state_p->uri() << "] from client ["
<< source->client()->description() << "], GUID ["
<< putHeader.messageGUID() << "]";
}
// Maximum one per second
BALL_LOGTHROTTLE_WARN(1, 1 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "Duplicate PUT message queue [" << d_state_p->uri()
<< "] from client [" << source->client()->description()
<< "], GUID [" << putHeader.messageGUID() << "]";
}
else {
d_state_p->stats()
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class LocalQueue BSLS_CPP11_FINAL {
QueueState* d_state_p;
bslma::ManagedPtr<mqbi::QueueEngine> d_queueEngine_mp;
bmqu::ThrottledActionParams d_throttledFailedPutMessages;
bdlmt::Throttle d_throttledDuplicateMessages;
/// Throttler for duplicates.
bool d_haveStrongConsistency;

Expand Down
35 changes: 12 additions & 23 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,6 @@ QueueHandle::QueueHandle(
BSLS_ASSERT_SAFE(d_queue_sp);
BSLS_ASSERT_SAFE(d_clientContext_sp->client());

d_throttledFailedAckMessages.initialize(
1,
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
d_throttledDroppedPutMessages.initialize(
1,
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
// One maximum log per 5 seconds

setHandleParameters(handleParameters);
}

Expand Down Expand Up @@ -859,10 +851,9 @@ void QueueHandle::deliverMessage(
// any sort of statistics increment or so; but at some point, if
// needed we way have to change this method to return a tri-value
// (delivered, non-delivered, skipped).
if (d_throttledDroppedPutMessages.requestPermission()) {
BALL_LOG_INFO << "Queue '" << d_queue_sp->description()
<< "' skipping duplicate PUSH " << msgGUID;
}
BALL_LOGTHROTTLE_INFO(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "Queue '" << d_queue_sp->description()
<< "' skipping duplicate PUSH " << msgGUID;
continue; // CONTINUE
}

Expand Down Expand Up @@ -1172,17 +1163,15 @@ void QueueHandle::onAckMessage(const bmqp::AckMessage& ackMessage)
// check for it here.
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!d_clientContext_sp)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
if (d_throttledFailedAckMessages.requestPermission()) {
BALL_LOG_INFO << "#CLIENT_ACK_FAILURE: Received an ACK from "
<< "upstream for unavailable client "
<< "[queue: " << d_queue_sp->description()
<< ", upstream queueId: " << ackMessage.queueId()
<< ", downstream queueId: " << id()
<< ", guid: " << ackMessage.messageGUID()
<< ", status: " << ackMessage.status()
<< ", correlationId: " << ackMessage.correlationId()
<< "]";
}
BALL_LOGTHROTTLE_INFO(1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S)
<< "#CLIENT_ACK_FAILURE: Received an ACK from "
<< "upstream for unavailable client "
<< "[queue: " << d_queue_sp->description()
<< ", upstream queueId: " << ackMessage.queueId()
<< ", downstream queueId: " << id()
<< ", guid: " << ackMessage.messageGUID()
<< ", status: " << ackMessage.status()
<< ", correlationId: " << ackMessage.correlationId() << "]";
return; // RETURN
}

Expand Down
5 changes: 0 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ class QueueHandle : public mqbi::QueueHandle {
/// delivering a message to the client for efficiency.
bool d_isClientClusterMember;

/// Throttler for failed ACK messages.
bdlmt::Throttle d_throttledFailedAckMessages;

bdlmt::Throttle d_throttledDroppedPutMessages;

/// Mechanism to serialize execution of the substream deconfigure callbacks
/// and the caller callback invoked when all the substreams are
/// deconfigured.
Expand Down
Loading
Loading