Skip to content

Commit 6a4cff8

Browse files
committed
Upgrade/Downgrade test
1 parent 2da4ee8 commit 6a4cff8

File tree

7 files changed

+255
-16
lines changed

7 files changed

+255
-16
lines changed

.github/workflows/pulsar-ci.yaml

+15
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,9 @@ jobs:
603603
- name: Metrics
604604
group: METRICS
605605

606+
- name: Upgrade
607+
group: UPGRADE
608+
606609
steps:
607610
- name: checkout
608611
uses: actions/checkout@v4
@@ -648,6 +651,18 @@ jobs:
648651
run: |
649652
$GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-java-test-image
650653
654+
- name: Pull apachepulsar/pulsar:2.10.6
655+
run: |
656+
docker pull apachepulsar/pulsar:2.10.6
657+
658+
- name: Pull apachepulsar/pulsar:3.0.5
659+
run: |
660+
docker pull apachepulsar/pulsar:3.0.5
661+
662+
- name: Pull alpine:3.20.1
663+
run: |
664+
docker pull alpine:3.20.1
665+
651666
- name: Run setup commands
652667
if: ${{ matrix.setup }}
653668
run: |

build/run_integration_group.sh

+4
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ test_group_standalone() {
177177
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-standalone.xml -DintegrationTests
178178
}
179179

180+
test_group_upgrade() {
181+
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-upgrade.xml -DintegrationTests
182+
}
183+
180184
test_group_transaction() {
181185
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
182186
}

tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java

+44-14
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,28 @@ public class PulsarCluster {
7272
* @return the built pulsar cluster
7373
*/
7474
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
75+
return forSpec(spec, Network.newNetwork());
76+
}
77+
78+
public static PulsarCluster forSpec(PulsarClusterSpec spec, Network network) {
79+
checkArgument(network != null, "Network should not be null");
7580
CSContainer csContainer = null;
7681
if (!spec.enableOxia) {
7782
csContainer = new CSContainer(spec.clusterName)
78-
.withNetwork(Network.newNetwork())
83+
.withNetwork(network)
7984
.withNetworkAliases(CSContainer.NAME);
8085
}
81-
return new PulsarCluster(spec, csContainer, false);
86+
return new PulsarCluster(spec, network, csContainer, false);
8287
}
8388

8489
public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
85-
return new PulsarCluster(spec, csContainer, true);
90+
return new PulsarCluster(spec, csContainer.getNetwork(), csContainer, true);
8691
}
8792

8893
@Getter
8994
private final PulsarClusterSpec spec;
9095

96+
public boolean closeNetworkOnExit = true;
9197
@Getter
9298
private final String clusterName;
9399
private final Network network;
@@ -108,19 +114,18 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai
108114
private final String metadataStoreUrl;
109115
private final String configurationMetadataStoreUrl;
110116

111-
private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
112-
117+
private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer csContainer, boolean sharedCsContainer) {
113118
this.spec = spec;
114119
this.sharedCsContainer = sharedCsContainer;
115120
this.clusterName = spec.clusterName();
116-
if (csContainer != null ) {
121+
if (network != null) {
122+
this.network = network;
123+
} else if (csContainer != null) {
117124
this.network = csContainer.getNetwork();
118125
} else {
119126
this.network = Network.newNetwork();
120127
}
121128

122-
123-
124129
if (spec.enableOxia) {
125130
this.zkContainer = null;
126131
this.oxiaContainer = new OxiaContainer(clusterName);
@@ -203,7 +208,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
203208
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
204209
.withEnv("diskUsageThreshold", "0.99")
205210
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
206-
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
211+
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize))
212+
.withEnv("ledgerDirectories", "data/bookkeeper/" + name + "/ledgers")
213+
.withEnv("journalDirectory", "data/bookkeeper/" + name + "/journal");
207214
if (spec.bookkeeperEnvs != null) {
208215
bookieContainer.withEnv(spec.bookkeeperEnvs);
209216
}
@@ -262,10 +269,27 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
262269
}
263270
));
264271

272+
if (spec.dataContainer != null) {
273+
if (!sharedCsContainer && csContainer != null) {
274+
csContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);
275+
}
276+
if (zkContainer != null) {
277+
zkContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);
278+
}
279+
proxyContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE);
280+
281+
bookieContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
282+
brokerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
283+
workerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
284+
}
285+
265286
spec.classPathVolumeMounts.forEach((key, value) -> {
266287
if (zkContainer != null) {
267288
zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
268289
}
290+
if (!sharedCsContainer && csContainer != null) {
291+
csContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
292+
}
269293
proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
270294

271295
bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
@@ -323,6 +347,10 @@ public Map<String, GenericContainer<?>> getExternalServices() {
323347
}
324348

325349
public void start() throws Exception {
350+
start(true);
351+
}
352+
353+
public void start(boolean doInit) throws Exception {
326354

327355
if (!spec.enableOxia) {
328356
// start the local zookeeper
@@ -338,7 +366,7 @@ public void start() throws Exception {
338366
oxiaContainer.start();
339367
}
340368

341-
{
369+
if (doInit) {
342370
// Run cluster metadata initialization
343371
@Cleanup
344372
PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
@@ -453,10 +481,12 @@ public synchronized void stop() {
453481
oxiaContainer.stop();
454482
}
455483

456-
try {
457-
network.close();
458-
} catch (Exception e) {
459-
log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
484+
if (closeNetworkOnExit) {
485+
try {
486+
network.close();
487+
} catch (Exception e) {
488+
log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
489+
}
460490
}
461491
}
462492

tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java

+6
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ public class PulsarClusterSpec {
124124
@Builder.Default
125125
Map<String, String> classPathVolumeMounts = new TreeMap<>();
126126

127+
/**
128+
* Data container
129+
*/
130+
@Builder.Default
131+
GenericContainer<?> dataContainer = null;
132+
127133
/**
128134
* Pulsar Test Image Name
129135
*

tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ protected void beforeStartCluster() throws Exception {
142142
}
143143

144144
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
145+
setupCluster(spec, true);
146+
}
147+
148+
protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws Exception {
145149
incrementSetupNumber();
146150
log.info("Setting up cluster {} with {} bookies, {} brokers",
147151
spec.clusterName(), spec.numBookies(), spec.numBrokers());
@@ -150,7 +154,7 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception {
150154

151155
beforeStartCluster();
152156

153-
pulsarCluster.start();
157+
pulsarCluster.start(doInit);
154158

155159
pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
156160

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.tests.integration.upgrade;
20+
21+
import com.github.dockerjava.api.model.Bind;
22+
import lombok.Cleanup;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.pulsar.client.api.Consumer;
25+
import org.apache.pulsar.client.api.Message;
26+
import org.apache.pulsar.client.api.MessageId;
27+
import org.apache.pulsar.client.api.Producer;
28+
import org.apache.pulsar.client.api.PulsarClient;
29+
import org.apache.pulsar.client.api.Schema;
30+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
31+
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
32+
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
33+
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
34+
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
35+
import org.testcontainers.containers.GenericContainer;
36+
import org.testcontainers.containers.Network;
37+
import org.testng.annotations.Test;
38+
import java.util.stream.Stream;
39+
import static java.util.stream.Collectors.joining;
40+
import static org.testng.Assert.assertEquals;
41+
42+
/**
43+
* Test upgrading/downgrading Pulsar cluster from major releases.
44+
*/
45+
@Slf4j
46+
public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase {
47+
48+
@Test(timeOut=600_000)
49+
public void upgradeFrom_2_10_6() throws Exception {
50+
testUpgradeDowngrade("apachepulsar/pulsar:2.10.6", PulsarContainer.DEFAULT_IMAGE_NAME);
51+
}
52+
53+
@Test(timeOut=600_000)
54+
public void upgradeFrom_3_0_5() throws Exception {
55+
testUpgradeDowngrade("apachepulsar/pulsar:3.0.5", PulsarContainer.DEFAULT_IMAGE_NAME);
56+
}
57+
58+
private void testUpgradeDowngrade(String imageOld, String imageNew) throws Exception {
59+
final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
60+
.filter(s -> !s.isEmpty())
61+
.collect(joining("-"));
62+
String topicName = generateTopicName("testupdown", true);
63+
64+
@Cleanup
65+
Network network = Network.newNetwork();
66+
@Cleanup
67+
GenericContainer<?> alpine = new GenericContainer<>("alpine:3.20.1")
68+
.withExposedPorts(80)
69+
.withNetwork(network)
70+
.withNetworkAliases("shared-storage")
71+
.withEnv("MAGIC_NUMBER", "42")
72+
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
73+
.getHostConfig()
74+
.withBinds(Bind.parse("/pulsar/data:/pulsar/data")))
75+
.withCommand("/bin/sh", "-c",
76+
"mkdir -p /pulsar/data && "
77+
+ "chmod -R ug+rwx /pulsar/data && "
78+
+ "chown -R 10000:0 /pulsar/data && "
79+
+ "rm -rf /pulsar/data/* && "
80+
+ "while true; do echo \"$MAGIC_NUMBER\" | nc -l -p 80; done");
81+
alpine.start();
82+
83+
PulsarClusterSpec specOld = PulsarClusterSpec.builder()
84+
.numBookies(2)
85+
.numBrokers(1)
86+
.clusterName(clusterName)
87+
.dataContainer(alpine)
88+
.pulsarTestImage(imageOld)
89+
.build();
90+
91+
PulsarClusterSpec specNew = PulsarClusterSpec.builder()
92+
.numBookies(2)
93+
.numBrokers(1)
94+
.clusterName(clusterName)
95+
.dataContainer(alpine)
96+
.pulsarTestImage(imageNew)
97+
.build();
98+
99+
log.info("Setting up OLD cluster {} with {} bookies, {} brokers using {}",
100+
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld);
101+
102+
pulsarCluster = PulsarCluster.forSpec(specNew, network);
103+
pulsarCluster.closeNetworkOnExit = false;
104+
pulsarCluster.start(true);
105+
106+
try {
107+
log.info("setting retention");
108+
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
109+
"set-retention", "--size", "100M", "--time", "100m", "public/default");
110+
111+
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 10);
112+
} finally {
113+
pulsarCluster.stop();
114+
}
115+
116+
log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers using {}",
117+
specNew.clusterName(), specNew.numBookies(), specNew.numBrokers(), imageNew);
118+
119+
pulsarCluster = PulsarCluster.forSpec(specNew, network);
120+
pulsarCluster.closeNetworkOnExit = false;
121+
pulsarCluster.start(false);
122+
123+
try {
124+
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 20);
125+
} finally {
126+
pulsarCluster.stop();
127+
}
128+
129+
log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers using {}",
130+
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld);
131+
132+
pulsarCluster = PulsarCluster.forSpec(specOld, network);
133+
pulsarCluster.closeNetworkOnExit = false;
134+
pulsarCluster.start(false);
135+
136+
try {
137+
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 30);
138+
} finally {
139+
pulsarCluster.stop();
140+
alpine.stop();
141+
network.close();
142+
}
143+
}
144+
145+
private void publishAndConsume(String topicName, String serviceUrl, int numProduce, int numConsume) throws Exception {
146+
log.info("publishAndConsume: topic name: {}", topicName);
147+
148+
@Cleanup
149+
PulsarClient client = PulsarClient.builder()
150+
.serviceUrl(serviceUrl)
151+
.build();
152+
153+
@Cleanup
154+
Producer<String> producer = client.newProducer(Schema.STRING)
155+
.topic(topicName)
156+
.create();
157+
158+
log.info("Publishing {} messages", numProduce);
159+
for (int i = numConsume - numProduce; i < numConsume; i++) {
160+
log.info("Publishing message: {}", "smoke-message-" + i);
161+
producer.send("smoke-message-" + i);
162+
}
163+
164+
@Cleanup
165+
Consumer<String> consumer = client.newConsumer(Schema.STRING)
166+
.topic(topicName)
167+
.subscriptionName("my-sub")
168+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
169+
.subscribe();
170+
consumer.seek(MessageId.earliest);
171+
172+
log.info("Consuming {} messages", numConsume);
173+
for (int i = 0; i < numConsume; i++) {
174+
log.info("Waiting for message: {}", i);
175+
Message<String> m = consumer.receive();
176+
log.info("Received message: {}", m.getValue());
177+
assertEquals("smoke-message-" + i, m.getValue());
178+
}
179+
}
180+
}

tests/integration/src/test/resources/pulsar-upgrade.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<suite name="Pulsar Upgrade Integration Tests" verbose="2" annotations="JDK">
2323
<test name="pulsar-upgrade-test-suite" preserve-order="true" >
2424
<classes>
25-
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarZKDowngradeTest" />
25+
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarUpgradeDowngradeTest" />
2626
</classes>
2727
</test>
2828
</suite>

0 commit comments

Comments
 (0)