Skip to content

Commit 6b586f1

Browse files
committed
[fix][broker] Align bundles logic in getNamespacePoliciesAsync with sync method
Fixes #25128
1 parent 805c71d commit 6b586f1

File tree

1 file changed

+20
-18
lines changed

1 file changed

+20
-18
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -341,31 +341,33 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
341341
}
342342

343343
protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
344-
CompletableFuture<Policies> result = new CompletableFuture<>();
345-
namespaceResources().getPoliciesAsync(namespaceName)
346-
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
347-
if (pl.isPresent()) {
348-
Policies policies = pl.get();
349-
if (localPolicies.isPresent()) {
350-
policies.bundles = localPolicies.get().bundles;
351-
policies.migrated = localPolicies.get().migrated;
352-
}
344+
return namespaceResources().getPoliciesAsync(namespaceName)
345+
.thenCompose(pl -> {
346+
if (pl.isEmpty()) {
347+
return FutureUtil.failedFuture(
348+
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
349+
}
350+
Policies policies = pl.get();
351+
// fetch bundles from NamespaceBundleFactory (aligns with sync method)
352+
CompletableFuture<BundlesData> bundlesFuture = pulsar().getNamespaceService()
353+
.getNamespaceBundleFactory()
354+
.getBundlesAsync(namespaceName)
355+
.thenApply(bundles -> bundles.getBundlesData());
356+
CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
357+
getLocalPolicies().getLocalPoliciesAsync(namespaceName);
358+
359+
return bundlesFuture.thenCombine(localPoliciesFuture, (bundleData, localPolicies) -> {
360+
policies.bundles = bundleData != null ? bundleData : policies.bundles;
361+
policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false;
353362
if (policies.is_allow_auto_update_schema == null) {
354363
// the type changed from boolean to Boolean. return
355364
// broker value here for keeping compatibility.
356365
policies.is_allow_auto_update_schema = pulsar().getConfig()
357366
.isAllowAutoUpdateSchemaEnabled();
358367
}
359-
result.complete(policies);
360-
} else {
361-
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
362-
}
363-
return null;
364-
}).exceptionally(ex -> {
365-
result.completeExceptionally(ex.getCause());
366-
return null;
368+
return policies;
369+
});
367370
});
368-
return result;
369371
}
370372

371373
protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,

0 commit comments

Comments
 (0)