Skip to content

Commit 4366a13

Browse files
authored
[fix][broker][branch-3.1] Fix broker not starting when both transactions and the Extensible Load Manager are enabled (#22194)
1 parent 7e28e84 commit 4366a13

File tree

9 files changed

+350
-121
lines changed

9 files changed

+350
-121
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.commons.lang3.tuple.MutablePair;
5959
import org.apache.pulsar.broker.ServiceConfiguration;
6060
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
61+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
6162
import org.apache.pulsar.broker.service.AbstractSubscription;
6263
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
6364
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -155,7 +156,8 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
155156
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
156157
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
157158
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
158-
&& !isEventSystemTopic(TopicName.get(topicName))) {
159+
&& !isEventSystemTopic(TopicName.get(topicName))
160+
&& !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
159161
this.pendingAckHandle = new PendingAckHandleImpl(this);
160162
} else {
161163
this.pendingAckHandle = new PendingAckHandleDisabled();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.pulsar.broker.PulsarServerException;
8484
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
8585
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
86+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
8687
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
8788
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
8889
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -306,7 +307,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
306307
TopicName topicName = TopicName.get(topic);
307308
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
308309
&& !isEventSystemTopic(topicName)
309-
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
310+
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
311+
&& !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
310312
this.transactionBuffer = brokerService.getPulsar()
311313
.getTransactionBufferProvider().newTransactionBuffer(this);
312314
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import static org.mockito.Mockito.reset;
22+
import static org.mockito.Mockito.spy;
23+
import com.google.common.collect.Sets;
24+
import java.util.concurrent.CompletableFuture;
25+
import org.apache.commons.lang3.reflect.FieldUtils;
26+
import org.apache.commons.lang3.tuple.Pair;
27+
import org.apache.pulsar.broker.PulsarService;
28+
import org.apache.pulsar.broker.ServiceConfiguration;
29+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
30+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
31+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
32+
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
33+
import org.apache.pulsar.client.admin.PulsarAdminException;
34+
import org.apache.pulsar.client.impl.LookupService;
35+
import org.apache.pulsar.common.naming.NamespaceBundle;
36+
import org.apache.pulsar.common.naming.SystemTopicNames;
37+
import org.apache.pulsar.common.naming.TopicName;
38+
import org.apache.pulsar.common.policies.data.ClusterData;
39+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
40+
import org.apache.pulsar.common.policies.data.TopicType;
41+
import org.testng.annotations.AfterClass;
42+
import org.testng.annotations.BeforeClass;
43+
import org.testng.annotations.BeforeMethod;
44+
45+
public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest {
46+
47+
protected PulsarService pulsar1;
48+
protected PulsarService pulsar2;
49+
50+
protected PulsarTestContext additionalPulsarTestContext;
51+
52+
protected ExtensibleLoadManagerImpl primaryLoadManager;
53+
54+
protected ExtensibleLoadManagerImpl secondaryLoadManager;
55+
56+
protected ServiceUnitStateChannelImpl channel1;
57+
protected ServiceUnitStateChannelImpl channel2;
58+
59+
protected final String defaultTestNamespace;
60+
61+
protected LookupService lookupService;
62+
63+
protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
64+
this.defaultTestNamespace = defaultTestNamespace;
65+
}
66+
67+
protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
68+
conf.setForceDeleteNamespaceAllowed(true);
69+
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
70+
conf.setAllowAutoTopicCreation(true);
71+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
72+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
73+
conf.setLoadBalancerSheddingEnabled(false);
74+
conf.setLoadBalancerDebugModeEnabled(true);
75+
conf.setTopicLevelPoliciesEnabled(true);
76+
return conf;
77+
}
78+
79+
@Override
80+
@BeforeClass(alwaysRun = true)
81+
protected void setup() throws Exception {
82+
initConfig(conf);
83+
super.internalSetup(conf);
84+
pulsar1 = pulsar;
85+
var conf2 = initConfig(getDefaultConf());
86+
additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
87+
pulsar2 = additionalPulsarTestContext.getPulsarService();
88+
89+
setPrimaryLoadManager();
90+
setSecondaryLoadManager();
91+
92+
admin.clusters().createCluster(this.conf.getClusterName(),
93+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
94+
admin.tenants().createTenant("public",
95+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
96+
Sets.newHashSet(this.conf.getClusterName())));
97+
admin.namespaces().createNamespace("public/default");
98+
admin.namespaces().setNamespaceReplicationClusters("public/default",
99+
Sets.newHashSet(this.conf.getClusterName()));
100+
101+
admin.namespaces().createNamespace(defaultTestNamespace, 128);
102+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
103+
Sets.newHashSet(this.conf.getClusterName()));
104+
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
105+
}
106+
107+
@Override
108+
@AfterClass(alwaysRun = true)
109+
protected void cleanup() throws Exception {
110+
this.additionalPulsarTestContext.close();
111+
super.internalCleanup();
112+
}
113+
114+
@BeforeMethod(alwaysRun = true)
115+
protected void initializeState() throws PulsarAdminException, IllegalAccessException {
116+
admin.namespaces().unload(defaultTestNamespace);
117+
reset(primaryLoadManager, secondaryLoadManager);
118+
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
119+
}
120+
121+
protected void setPrimaryLoadManager() throws IllegalAccessException {
122+
ExtensibleLoadManagerWrapper wrapper =
123+
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
124+
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
125+
FieldUtils.readField(wrapper, "loadManager", true));
126+
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
127+
channel1 = (ServiceUnitStateChannelImpl)
128+
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
129+
}
130+
131+
private void setSecondaryLoadManager() throws IllegalAccessException {
132+
ExtensibleLoadManagerWrapper wrapper =
133+
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
134+
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
135+
FieldUtils.readField(wrapper, "loadManager", true));
136+
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
137+
channel2 = (ServiceUnitStateChannelImpl)
138+
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
139+
}
140+
141+
protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
142+
return pulsar.getNamespaceService().getBundleAsync(topic);
143+
}
144+
145+
protected Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
146+
throws Exception {
147+
TopicName changeEventsTopicName =
148+
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
149+
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
150+
int i = 0;
151+
while(true) {
152+
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
153+
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
154+
if (!bundle.equals(changeEventsBundle)) {
155+
return Pair.of(topicName, bundle);
156+
}
157+
i++;
158+
}
159+
}
160+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+5-118
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import org.apache.commons.lang3.tuple.Pair;
7676
import org.apache.pulsar.broker.PulsarService;
7777
import org.apache.pulsar.broker.ServiceConfiguration;
78-
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
7978
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
8079
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
8180
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -101,107 +100,35 @@
101100
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
102101
import org.apache.pulsar.broker.namespace.NamespaceService;
103102
import org.apache.pulsar.broker.service.BrokerServiceException;
104-
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
105103
import org.apache.pulsar.client.admin.PulsarAdminException;
106104
import org.apache.pulsar.client.impl.TableViewImpl;
107105
import org.apache.pulsar.common.naming.NamespaceBundle;
108106
import org.apache.pulsar.common.naming.NamespaceName;
109107
import org.apache.pulsar.common.naming.ServiceUnitId;
110-
import org.apache.pulsar.common.naming.SystemTopicNames;
111108
import org.apache.pulsar.common.naming.TopicName;
112109
import org.apache.pulsar.common.naming.TopicVersion;
113110
import org.apache.pulsar.common.policies.data.BrokerAssignment;
114111
import org.apache.pulsar.common.policies.data.BundlesData;
115-
import org.apache.pulsar.common.policies.data.ClusterData;
116112
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
117-
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
118-
import org.apache.pulsar.common.policies.data.TopicType;
119113
import org.apache.pulsar.common.stats.Metrics;
120114
import org.apache.pulsar.common.util.FutureUtil;
121115
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
122116
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
123117
import org.awaitility.Awaitility;
124118
import org.mockito.MockedStatic;
125119
import org.testng.AssertJUnit;
126-
import org.testng.annotations.AfterClass;
127-
import org.testng.annotations.BeforeClass;
128-
import org.testng.annotations.BeforeMethod;
129120
import org.testng.annotations.Test;
130121

131122
/**
132123
* Unit test for {@link ExtensibleLoadManagerImpl}.
133124
*/
134125
@Slf4j
135126
@Test(groups = "broker")
136-
public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
127+
@SuppressWarnings("unchecked")
128+
public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest {
137129

138-
private PulsarService pulsar1;
139-
private PulsarService pulsar2;
140-
141-
private PulsarTestContext additionalPulsarTestContext;
142-
143-
private ExtensibleLoadManagerImpl primaryLoadManager;
144-
145-
private ExtensibleLoadManagerImpl secondaryLoadManager;
146-
147-
private ServiceUnitStateChannelImpl channel1;
148-
private ServiceUnitStateChannelImpl channel2;
149-
150-
private final String defaultTestNamespace = "public/test";
151-
152-
private static void initConfig(ServiceConfiguration conf){
153-
conf.setForceDeleteNamespaceAllowed(true);
154-
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
155-
conf.setAllowAutoTopicCreation(true);
156-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
157-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
158-
conf.setLoadBalancerSheddingEnabled(false);
159-
conf.setLoadBalancerDebugModeEnabled(true);
160-
conf.setTopicLevelPoliciesEnabled(true);
161-
}
162-
163-
@BeforeClass
164-
@Override
165-
public void setup() throws Exception {
166-
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
167-
// stuck when doing unload.
168-
initConfig(conf);
169-
super.internalSetup(conf);
170-
pulsar1 = pulsar;
171-
ServiceConfiguration defaultConf = getDefaultConf();
172-
initConfig(defaultConf);
173-
additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
174-
pulsar2 = additionalPulsarTestContext.getPulsarService();
175-
176-
setPrimaryLoadManager();
177-
178-
setSecondaryLoadManager();
179-
180-
admin.clusters().createCluster(this.conf.getClusterName(),
181-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
182-
admin.tenants().createTenant("public",
183-
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
184-
Sets.newHashSet(this.conf.getClusterName())));
185-
admin.namespaces().createNamespace("public/default");
186-
admin.namespaces().setNamespaceReplicationClusters("public/default",
187-
Sets.newHashSet(this.conf.getClusterName()));
188-
189-
admin.namespaces().createNamespace(defaultTestNamespace);
190-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
191-
Sets.newHashSet(this.conf.getClusterName()));
192-
}
193-
194-
@Override
195-
@AfterClass(alwaysRun = true)
196-
protected void cleanup() throws Exception {
197-
this.additionalPulsarTestContext.close();
198-
super.internalCleanup();
199-
}
200-
201-
@BeforeMethod(alwaysRun = true)
202-
protected void initializeState() throws PulsarAdminException {
203-
admin.namespaces().unload(defaultTestNamespace);
204-
reset(primaryLoadManager, secondaryLoadManager);
130+
public ExtensibleLoadManagerImplTest() {
131+
super("public/test");
205132
}
206133

207134
@Test
@@ -459,7 +386,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
459386
public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
460387
String namespace = defaultTestNamespace;
461388
String topic = "persistent://" + namespace + "/test-split-with-specific-position";
462-
admin.topics().createPartitionedTopic(topic, 10);
389+
admin.topics().createPartitionedTopic(topic, 1024);
463390
BundlesData bundles = admin.namespaces().getBundles(namespace);
464391
int numBundles = bundles.getNumBundles();
465392

@@ -1320,44 +1247,4 @@ public String name() {
13201247
}
13211248

13221249
}
1323-
1324-
private void setPrimaryLoadManager() throws IllegalAccessException {
1325-
ExtensibleLoadManagerWrapper wrapper =
1326-
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
1327-
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
1328-
FieldUtils.readField(wrapper, "loadManager", true));
1329-
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
1330-
channel1 = (ServiceUnitStateChannelImpl)
1331-
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
1332-
}
1333-
1334-
private void setSecondaryLoadManager() throws IllegalAccessException {
1335-
ExtensibleLoadManagerWrapper wrapper =
1336-
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
1337-
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
1338-
FieldUtils.readField(wrapper, "loadManager", true));
1339-
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
1340-
channel2 = (ServiceUnitStateChannelImpl)
1341-
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
1342-
}
1343-
1344-
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
1345-
return pulsar.getNamespaceService().getBundleAsync(topic);
1346-
}
1347-
1348-
private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
1349-
throws Exception {
1350-
TopicName changeEventsTopicName =
1351-
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
1352-
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
1353-
int i = 0;
1354-
while (true) {
1355-
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
1356-
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
1357-
if (!bundle.equals(changeEventsBundle)) {
1358-
return Pair.of(topicName, bundle);
1359-
}
1360-
i++;
1361-
}
1362-
}
13631250
}

0 commit comments

Comments
 (0)