Skip to content

Commit f4bd3f6

Browse files
committed
[fix][broker] Align bundles logic in getNamespacePoliciesAsync with sync method
Fixes #25128
1 parent 805c71d commit f4bd3f6

File tree

2 files changed

+50
-18
lines changed

2 files changed

+50
-18
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -341,31 +341,33 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
341341
}
342342

343343
protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
344-
CompletableFuture<Policies> result = new CompletableFuture<>();
345-
namespaceResources().getPoliciesAsync(namespaceName)
346-
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
347-
if (pl.isPresent()) {
348-
Policies policies = pl.get();
349-
if (localPolicies.isPresent()) {
350-
policies.bundles = localPolicies.get().bundles;
351-
policies.migrated = localPolicies.get().migrated;
352-
}
344+
return namespaceResources().getPoliciesAsync(namespaceName)
345+
.thenCompose(pl -> {
346+
if (pl.isEmpty()) {
347+
return FutureUtil.failedFuture(
348+
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
349+
}
350+
Policies policies = pl.get();
351+
// fetch bundles from NamespaceBundleFactory (aligns with sync method)
352+
CompletableFuture<BundlesData> bundlesFuture = pulsar().getNamespaceService()
353+
.getNamespaceBundleFactory()
354+
.getBundlesAsync(namespaceName)
355+
.thenApply(bundles -> bundles.getBundlesData());
356+
CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
357+
getLocalPolicies().getLocalPoliciesAsync(namespaceName);
358+
359+
return bundlesFuture.thenCombine(localPoliciesFuture, (bundleData, localPolicies) -> {
360+
policies.bundles = bundleData != null ? bundleData : policies.bundles;
361+
policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false;
353362
if (policies.is_allow_auto_update_schema == null) {
354363
// the type changed from boolean to Boolean. return
355364
// broker value here for keeping compatibility.
356365
policies.is_allow_auto_update_schema = pulsar().getConfig()
357366
.isAllowAutoUpdateSchemaEnabled();
358367
}
359-
result.complete(policies);
360-
} else {
361-
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
362-
}
363-
return null;
364-
}).exceptionally(ex -> {
365-
result.completeExceptionally(ex.getCause());
366-
return null;
368+
return policies;
369+
});
367370
});
368-
return result;
369371
}
370372

371373
protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3429,6 +3429,36 @@ public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
34293429
deleteNamespaceWithRetry(ns, false);
34303430
}
34313431

3432+
@Test
3433+
public void testGetNamespacePoliciesSyncAsyncBundlesConsistency() throws Exception {
3434+
// Test that sync (getPolicies) and async (getPoliciesAsync) methods return consistent bundles data.
3435+
String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
3436+
int numBundles = 16;
3437+
3438+
admin.namespaces().createNamespace(ns, numBundles);
3439+
3440+
try {
3441+
// Get policies using sync method
3442+
Policies syncPolicies = admin.namespaces().getPolicies(ns);
3443+
3444+
// Get policies using async method
3445+
Policies asyncPolicies = admin.namespaces().getPoliciesAsync(ns).get();
3446+
3447+
// Verify bundles are consistent between sync and async
3448+
assertNotNull(syncPolicies.bundles, "Sync policies should have bundles");
3449+
assertNotNull(asyncPolicies.bundles, "Async policies should have bundles");
3450+
assertEquals(asyncPolicies.bundles.getNumBundles(), syncPolicies.bundles.getNumBundles(),
3451+
"Number of bundles should match between sync and async");
3452+
assertEquals(asyncPolicies.bundles.getBoundaries(), syncPolicies.bundles.getBoundaries(),
3453+
"Bundle boundaries should match between sync and async");
3454+
3455+
// Also verify the expected number of bundles
3456+
assertEquals(syncPolicies.bundles.getNumBundles(), numBundles);
3457+
} finally {
3458+
deleteNamespaceWithRetry(ns, false);
3459+
}
3460+
}
3461+
34323462
@Test
34333463
public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception {
34343464
final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages";

0 commit comments

Comments
 (0)