Skip to content

[Bug] maxMessagePublishBufferSizeInMB permits leak can stall and timeout connections #23921

Open
@lhotari

Description

@lhotari

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master branch code analysis

Minimal reproduce step

There's currently an issue that the org.apache.pulsar.broker.service.ServerCnx#completedSendOperation might not get called in error cases.
The impact of this is that message publishing could stop for all connections using a particular IO thread.

The broker maxMessagePublishBufferSizeInMB limit is split into a maxPendingBytesPerThread limit:

this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();

The pending bytes is incremented in sending:

PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize, maxPendingBytesPerThread);

It is decremented in ServerCnx#completedSendOperation method:

public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize, resumeThresholdPendingBytesPerThread);

If the call to decrement is missing, there will be a leak which will eventually cause all message publishing to stop for all connections using a particular IO thread.

The leak happens here:

public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
/* If the topic is being transferred(in the Releasing bundle state),
we don't want to forcefully close topic here.
Instead, we will rely on the service unit state channel's bundle(topic) transfer protocol.
At the end of the transfer protocol, at Owned state, the source broker should close the topic properly.
*/
if (transferring) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {} while transferring.",
topic, exception.getMessage(), exception);
}
return;
}
PublishContext callback = (PublishContext) ctx;
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();

There should be a call to MessagePublishContext#completed for all exception cases. ServerCnx#completedSendOperation gets called for exception path in MessagePublishContext#completed here:

public void completed(Exception exception, long ledgerId, long entryId) {
if (exception != null) {
final ServerError serverError = getServerError(exception);
producer.cnx.execute(() -> {
// if the topic is transferring, we don't send error code to the clients.
if (producer.getTopic().isTransferring()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received producer exception: {} while transferring.",
producer.getTopic().getName(), exception.getMessage(), exception);
}
} else if (!(exception instanceof TopicClosedException)) {
// For TopicClosed exception there's no need to send explicit error, since the client was
// already notified
// For TopicClosingOrDeleting exception, a notification will be sent separately
long callBackSequenceId = Math.max(highestSequenceId, sequenceId);
producer.cnx.getCommandSender().sendSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage());
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);

The other exception cases contain the required call to callback.completed which will call ServerCnx#completedSendOperation:

if (exception instanceof ManagedLedgerAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
callback.completed(new TopicClosedException(exception), -1, -1);
return;
} else {
log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
if (exception instanceof ManagedLedgerTerminatedException && !isMigrated()) {
// Signal the producer that this topic is no longer available
callback.completed(new TopicTerminatedException(exception), -1, -1);
} else {
// Use generic persistence exception
callback.completed(new PersistenceException(exception), -1, -1);
}

What did you expect to see?

There shouldn't be a leak in maxPendingBytesPerThread permits which eventually leads to message publishing stopping for all connections using a particular IO thread.

What did you see instead?

Based on the analysis of the code, there's a leak.

Anything else?

This might be related to issue #23920

A heap dump could be used to check if the issue applies. This can be done by searching org.apache.pulsar.broker.service.ServerCnx$PendingBytesPerThreadTracker instances in the heap dump and checking the pendingBytes and limitExceeded field values.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions