Skip to content

Commit 070132b

Browse files
authored
[improve][client] Enhance connection and timeout logging (#539)
1 parent 2606df9 commit 070132b

File tree

5 files changed

+63
-7
lines changed

5 files changed

+63
-7
lines changed

lib/ClientConnection.cc

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ using namespace ASIO::ip;
5656

5757
namespace pulsar {
5858

59+
namespace {
60+
static std::ostream& operator<<(std::ostream& os, const tcp::resolver::results_type& results) {
61+
for (const auto& entry : results) {
62+
const auto& ep = entry.endpoint();
63+
os << ep.address().to_string() << ":" << ep.port() << " ";
64+
}
65+
return os;
66+
}
67+
} // anonymous namespace
68+
5969
using proto::BaseCommand;
6070

6171
static const uint32_t DefaultBufferSize = 64 * 1024;
@@ -486,7 +496,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
486496
handleHandshake(ASIO_SUCCESS);
487497
}
488498
} else {
489-
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
499+
LOG_ERROR(cnxString_ << "Failed to establish connection to " << endpoint << ": " << err.message());
490500
if (err == ASIO::error::operation_aborted) {
491501
close();
492502
} else {
@@ -603,16 +613,25 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result
603613
return;
604614
}
605615

616+
if (!results.empty()) {
617+
LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
618+
for (const auto& entry : results) {
619+
const auto& ep = entry.endpoint();
620+
LOG_DEBUG(cnxString_ << " " << ep.address().to_string() << ":" << ep.port());
621+
}
622+
}
623+
606624
auto weakSelf = weak_from_this();
607-
connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
625+
connectTimeoutTask_->setCallback([weakSelf, results = tcp::resolver::results_type(results)](
626+
const PeriodicTask::ErrorCode& ec) {
608627
ClientConnectionPtr ptr = weakSelf.lock();
609628
if (!ptr) {
610-
// Connection was already destroyed
629+
LOG_DEBUG("Connect timeout callback skipped: connection was already destroyed");
611630
return;
612631
}
613632

614633
if (ptr->state_ != Ready) {
615-
LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
634+
LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was not established in "
616635
<< ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
617636
PeriodicTask::ErrorCode err;
618637
ptr->socket_->close(err);
@@ -1212,20 +1231,23 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
12121231
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
12131232
const PendingRequestData& pendingRequestData) {
12141233
if (!ec && !pendingRequestData.hasGotResponse->load()) {
1234+
LOG_WARN(cnxString_ << "Network request timeout to broker, remote: " << physicalAddress_);
12151235
pendingRequestData.promise.setFailed(ResultTimeout);
12161236
}
12171237
}
12181238

12191239
void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
12201240
const LookupRequestData& pendingRequestData) {
12211241
if (!ec) {
1242+
LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " << physicalAddress_);
12221243
pendingRequestData.promise->setFailed(ResultTimeout);
12231244
}
12241245
}
12251246

12261247
void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
12271248
const ClientConnection::LastMessageIdRequestData& data) {
12281249
if (!ec) {
1250+
LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker, remote: " << physicalAddress_);
12291251
data.promise->setFailed(ResultTimeout);
12301252
}
12311253
}

lib/ConsumerImpl.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,14 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
11041104
if (state_ != Ready) {
11051105
return ResultAlreadyClosed;
11061106
}
1107+
auto cnx = getCnx().lock();
1108+
if (cnx) {
1109+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: "
1110+
<< cnx->cnxString() << ", queue size: " << incomingMessages_.size());
1111+
} else {
1112+
LOG_WARN(getName() << " Receive timeout after " << timeout
1113+
<< " ms, no connection, queue size: " << incomingMessages_.size());
1114+
}
11071115
return ResultTimeout;
11081116
}
11091117
}

lib/MultiTopicsConsumerImpl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <chrono>
2222
#include <stdexcept>
2323

24+
#include "ClientConnection.h"
2425
#include "ClientImpl.h"
2526
#include "ConsumerImpl.h"
2627
#include "ExecutorService.h"
@@ -600,6 +601,14 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
600601
if (state_ != Ready) {
601602
return ResultAlreadyClosed;
602603
}
604+
auto cnx = getCnx().lock();
605+
if (cnx) {
606+
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: "
607+
<< cnx->cnxString() << ", queue size: " << incomingMessages_.size());
608+
} else {
609+
LOG_WARN(getName() << " Receive timeout after " << timeout
610+
<< " ms, no connection, queue size: " << incomingMessages_.size());
611+
}
603612
return ResultTimeout;
604613
}
605614
}

lib/ProducerImpl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,15 @@ void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) {
868868
}
869869

870870
lock.unlock();
871+
auto cnx = getCnx().lock();
872+
if (cnx) {
873+
LOG_WARN(getName() << "Send timeout due to queueing delay, connection: " << cnx->cnxString()
874+
<< ", pending messages: " << pendingMessages.size()
875+
<< ", queue size: " << pendingMessagesQueue_.size());
876+
} else {
877+
LOG_WARN(getName() << "Send timeout due to queueing delay, no connection, pending messages: "
878+
<< pendingMessages.size() << ", queue size: " << pendingMessagesQueue_.size());
879+
}
871880
for (const auto& op : pendingMessages) {
872881
op->complete(ResultTimeout, {});
873882
}

lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <functional>
2222

23+
#include "ClientConnection.h"
2324
#include "ClientImpl.h"
2425
#include "ConsumerImplBase.h"
2526
#include "ExecutorService.h"
@@ -57,9 +58,16 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
5758

5859
std::set<MessageId> msgIdsToRedeliver;
5960
if (!headPartition.empty()) {
60-
LOG_INFO(consumerReference_.getName().c_str()
61-
<< ": " << headPartition.size() << " Messages were not acked within "
62-
<< timePartitions.size() * tickDurationInMs_ << " time");
61+
auto cnx = consumerReference_.getCnx().lock();
62+
if (cnx) {
63+
LOG_WARN(consumerReference_.getName()
64+
<< " Unacked messages timeout: " << headPartition.size() << " messages not acked within "
65+
<< timeoutMs_ << " ms, connection: " << cnx->cnxString());
66+
} else {
67+
LOG_WARN(consumerReference_.getName()
68+
<< " Unacked messages timeout: " << headPartition.size() << " messages not acked within "
69+
<< timeoutMs_ << " ms, no connection");
70+
}
6371
for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
6472
msgIdsToRedeliver.insert(*it);
6573
messageIdPartitionMap.erase(*it);

0 commit comments

Comments
 (0)