Skip to content

[improve][broker] Implementing delayed message cancellation in pulsar #23907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -5493,4 +5493,151 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}

protected void internalCancelDelayedMessage(AsyncResponse asyncResponse, long ledgerId, long entryId,
List<String> subscriptionNames, boolean authoritative) {
CompletableFuture<Void> validationFuture = validateTopicOperationAsync(topicName,
TopicOperation.CANCEL_DELAYED_MESSAGE);
validationFuture = validationFuture.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
});
validationFuture.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (topicName.isPartitioned()) {
internalCancelDelayedMessageForNonPartitionedTopic(asyncResponse, ledgerId, entryId,
subscriptionNames, authoritative);
} else {
if (partitionMetadata.partitions > 0) {
internalCancelDelayedMessageForPartitionedTopic(asyncResponse, partitionMetadata,
ledgerId, entryId, subscriptionNames);
} else {
internalCancelDelayedMessageForNonPartitionedTopic(asyncResponse, ledgerId, entryId,
subscriptionNames, authoritative);
}
}
}).exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to cancel delayed message {}-{} on topic {}: {}",
clientAppId(), ledgerId, entryId, topicName, ex.getMessage(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private void internalCancelDelayedMessageForPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata partitionMetadata,
long ledgerId, long entryId,
List<String> subscriptionNames) {
final List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
futures.add(admin
.topics()
.cancelDelayedMessageAsync(topicNamePartition.toString(),
ledgerId, entryId, subscriptionNames));
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = FutureUtil.unwrapCompletionException(exception);
log.warn("[{}] Failed to cancel delayed message {}-{} on some partitions of {}: {}",
clientAppId(), ledgerId, entryId, topicName, t.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, t);
} else {
log.info("[{}] Successfully requested cancellation for delayed message {}-{} on"
+ " all partitions of topic {}",
clientAppId(), ledgerId, entryId, topicName);
asyncResponse.resume(Response.noContent().build());
}
return null;
});
}

private void internalCancelDelayedMessageForNonPartitionedTopic(AsyncResponse asyncResponse,
long ledgerId, long entryId,
List<String> subscriptionNames,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(optTopic -> {
if (!(optTopic instanceof PersistentTopic)) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Cancel delayed message on a non-persistent"
+ " topic is not allowed");
}
PersistentTopic persistentTopic = (PersistentTopic) optTopic;
List<String> subsToProcess;
if (subscriptionNames == null || subscriptionNames.isEmpty()) {
subsToProcess = persistentTopic.getSubscriptions().keySet().stream()
.filter(subName -> !subName.equals(Compactor.COMPACTION_SUBSCRIPTION)
&& !SystemTopicNames.isSystemTopic(TopicName.
get(persistentTopic.getName() + "/" + subName)))
.collect(Collectors.toList());
if (subsToProcess.isEmpty()) {
log.info("[{}] No user subscriptions found to process for cancelling delayed message on"
+ " topic {}.", clientAppId(), topicName);
return CompletableFuture.completedFuture(null);
}
log.info("[{}] Cancelling delayed message {}-{} for all non-system"
+ " subscriptions on topic {}",
clientAppId(), ledgerId, entryId, topicName);
} else {
subsToProcess = new ArrayList<>(subscriptionNames);
log.info("[{}] Cancelling delayed message {}-{} for subscriptions {} on"
+ " topic {}", clientAppId(), ledgerId, entryId, subsToProcess, topicName);
}
List<CompletableFuture<Void>> cancelFutures = subsToProcess.stream()
.map(subName -> internalCancelDelayedMessageForSubscriptionAsync(
persistentTopic, subName, ledgerId, entryId))
.collect(Collectors.toList());
return FutureUtil.waitForAll(cancelFutures);
})
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (isNot307And404Exception(t)) {
log.error("[{}] Error in internalCancelDelayedMessageForNonPartitionedTopic for {}: {}",
clientAppId(), topicName, t.getMessage(), t);
}
resumeAsyncResponseExceptionally(asyncResponse, t);
return null;
});
}

private CompletableFuture<Void> internalCancelDelayedMessageForSubscriptionAsync(
PersistentTopic topic, String subName, long ledgerId, long entryId) {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topic.getName(), subName)));
}
return sub.cancelDelayedMessage(ledgerId, entryId)
.thenCompose(cancelled -> {
if (cancelled) {
log.info("[{}] Successfully requested cancellation for delayed message {}-{}"
+ " on subscription {} of topic {}",
clientAppId(), ledgerId, entryId, subName, topic.getName());
return CompletableFuture.completedFuture(null);
} else {
String errorMsg = String.format(
"Failed to cancel delayed message %d-%d on subscription %s"
+ " of topic %s. Message may not exist in tracker, already delivered/cancelled,"
+ " or tracker not available/initialized.",
ledgerId, entryId, subName, topic.getName());
log.warn("[{}] {}", clientAppId(), errorMsg);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, errorMsg));
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -1134,4 +1135,53 @@ public void getReplicatedSubscriptionStatus(
internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative);
}

@POST
@Path("/{property}/{cluster}/{namespace}/{topic}/cancelDelayedMessage")
@ApiOperation(hidden = true, value = "Cancel a delayed message on specified subscriptions"
+ " (or all if none specified).")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"),
@ApiResponse(code = 500, message = "Internal server error")})
public void cancelDelayedMessage(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the property (tenant)", required = true)
@PathParam("property") String property,
@ApiParam(value = "Specify the cluster", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Ledger ID of the target delayed message", required = true)
@QueryParam("ledgerId") long ledgerId,
@ApiParam(value = "Entry ID of the target delayed message", required = true)
@QueryParam("entryId") long entryId,
@ApiParam(value = "List of subscription names to cancel on (comma-separated, empty or null for"
+ " all subscriptions)")
@QueryParam("subscriptionNames") List<String> subscriptionNames) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
if (ledgerId < 0 || entryId < 0) {
asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
"ledgerId, entryId must be non-negative."));
return;
}
List<String> finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty())
? null : subscriptionNames;
internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, finalSubscriptionNames, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5015,5 +5015,51 @@ public void removeAutoSubscriptionCreation(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/cancelDelayedMessage")
@ApiOperation(value = "Cancel a delayed message on specified subscriptions (or all if none specified).")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"),
@ApiResponse(code = 500, message = "Internal server error")})
public void cancelDelayedMessage(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Ledger ID of the target delayed message", required = true)
@QueryParam("ledgerId") long ledgerId,
@ApiParam(value = "Entry ID of the target delayed message", required = true)
@QueryParam("entryId") long entryId,
@ApiParam(value = "List of subscription names to cancel on (empty or null for all subscriptions)")
@QueryParam("subscriptionNames") List<String> subscriptionNames) {
try {
validateTopicName(tenant, namespace, encodedTopic);
if (ledgerId < 0 || entryId < 0) {
asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED,
"ledgerId, entryId must be positive."));
return;
}
List<String> finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty())
? null : subscriptionNames;
internalCancelDelayedMessage(asyncResponse, ledgerId, entryId,
finalSubscriptionNames, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsh

CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position);

/**
* Cancels a specific delayed message for subscriptions.
*
* @param ledgerId The ledger ID of the message to cancel.
* @param entryId The entry ID of the message to cancel.
* @return A CompletableFuture that resolves to true if the cancellation was successfully
* requested from the tracker, false otherwise (e.g., message not found in tracker, tracker not present,
* or already delivered/cancelled).
*/
default CompletableFuture<Boolean> cancelDelayedMessage(long ledgerId, long entryId) {
return CompletableFuture.completedFuture(false);
}

default int getNumberOfSameAddressConsumers(final String clientAddress) {
int count = 0;
if (clientAddress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -1547,6 +1548,18 @@ public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, i
}
}

@Override
public CompletableFuture<Boolean> cancelDelayedMessage(long ledgerId, long entryId) {
if (Subscription.isCumulativeAckMode(getType())) {
return CompletableFuture.completedFuture(false);
}
Position position = PositionFactory.create(ledgerId, entryId);
List<Position> positions = Collections.singletonList(position);
Map<String, Long> properties = Collections.emptyMap();
acknowledgeMessage(positions, AckType.Individual, properties);
return CompletableFuture.completedFuture(true);
}

@VisibleForTesting
public ManagedCursor getCursor() {
return cursor;
Expand Down
Loading