Skip to content

Commit 444f57b

Browse files
authored
Subscription: apply IoTConsensusV2 as cluster mode for integration test (#15546)
1 parent 565f1bf commit 444f57b

File tree

10 files changed

+183
-47
lines changed

10 files changed

+183
-47
lines changed

.github/workflows/pipe-it.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ jobs:
438438
matrix:
439439
java: [ 17 ]
440440
# StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet.
441-
cluster1: [ ScalableSingleNodeMode ]
441+
cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ]
442442
cluster2: [ ScalableSingleNodeMode ]
443443
os: [ ubuntu-latest ]
444444
runs-on: ${{ matrix.os }}
@@ -606,7 +606,7 @@ jobs:
606606
matrix:
607607
java: [ 17 ]
608608
# do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal
609-
cluster1: [ ScalableSingleNodeMode ]
609+
cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ]
610610
cluster2: [ ScalableSingleNodeMode ]
611611
os: [ ubuntu-latest ]
612612
runs-on: ${{ matrix.os }}
@@ -690,7 +690,7 @@ jobs:
690690
matrix:
691691
java: [ 17 ]
692692
# do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal
693-
cluster1: [ ScalableSingleNodeMode ]
693+
cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ]
694694
cluster2: [ ScalableSingleNodeMode ]
695695
os: [ ubuntu-latest ]
696696
runs-on: ${{ matrix.os }}

integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919

2020
package org.apache.iotdb.subscription.it;
2121

22+
import org.apache.iotdb.consensus.ConsensusFactory;
23+
import org.apache.iotdb.itbase.env.BaseEnv;
24+
import org.apache.iotdb.session.Session;
25+
2226
import org.awaitility.Awaitility;
2327
import org.awaitility.core.ConditionFactory;
2428

2529
import java.util.concurrent.TimeUnit;
30+
import java.util.function.Consumer;
2631

2732
public class IoTDBSubscriptionITConstant {
2833

@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
4045

4146
public static final long SLEEP_NS = 1_000_000_000L;
4247
public static final long POLL_TIMEOUT_MS = 10_000L;
48+
49+
@FunctionalInterface
50+
public interface WrappedVoidSupplier {
51+
void get() throws Throwable;
52+
}
53+
54+
public static void AWAIT_WITH_FLUSH(final Session session, final WrappedVoidSupplier assertions) {
55+
AWAIT.untilAsserted(
56+
() -> {
57+
session.executeNonQueryStatement("flush");
58+
assertions.get();
59+
});
60+
}
61+
62+
public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
63+
env ->
64+
env.getConfig()
65+
.getCommonConfig()
66+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
67+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
68+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
69+
.setSchemaReplicationFactor(1)
70+
.setDataReplicationFactor(1);
4371
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
3030
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
3131
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
32+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
3233
import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
3334

3435
import org.apache.thrift.TException;
@@ -57,6 +58,7 @@
5758
import java.util.Properties;
5859
import java.util.concurrent.atomic.AtomicInteger;
5960

61+
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
6062
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
6163

6264
public abstract class AbstractSubscriptionTreeRegressionIT extends AbstractSubscriptionTripleIT {
@@ -359,26 +361,6 @@ public List<Integer> consume_tsfile(SubscriptionTreePullConsumer consumer, List<
359361
return results;
360362
}
361363

362-
public static void consume_data_long(
363-
SubscriptionTreePullConsumer consumer, Session session, Long timeout)
364-
throws StatementExecutionException, InterruptedException, IoTDBConnectionException {
365-
timeout = System.currentTimeMillis() + timeout;
366-
while (System.currentTimeMillis() < timeout) {
367-
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
368-
if (messages.isEmpty()) {
369-
Thread.sleep(1000);
370-
}
371-
for (final SubscriptionMessage message : messages) {
372-
for (final Iterator<Tablet> it = message.getSessionDataSetsHandler().tabletIterator();
373-
it.hasNext(); ) {
374-
final Tablet tablet = it.next();
375-
session.insertTablet(tablet);
376-
}
377-
}
378-
consumer.commitSync(messages);
379-
}
380-
}
381-
382364
public void consume_data(SubscriptionTreePullConsumer consumer)
383365
throws TException,
384366
IOException,
@@ -388,6 +370,66 @@ public void consume_data(SubscriptionTreePullConsumer consumer)
388370
consume_data(consumer, session_dest);
389371
}
390372

373+
public void consume_data_await(
374+
SubscriptionTreePullConsumer consumer,
375+
Session session,
376+
List<WrappedVoidSupplier> assertions) {
377+
AWAIT.untilAsserted(
378+
() -> {
379+
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
380+
if (messages.isEmpty()) {
381+
session_src.executeNonQueryStatement("flush");
382+
}
383+
for (final SubscriptionMessage message : messages) {
384+
for (final Iterator<Tablet> it = message.getSessionDataSetsHandler().tabletIterator();
385+
it.hasNext(); ) {
386+
final Tablet tablet = it.next();
387+
session.insertTablet(tablet);
388+
}
389+
}
390+
consumer.commitSync(messages);
391+
for (final WrappedVoidSupplier assertion : assertions) {
392+
assertion.get();
393+
}
394+
});
395+
}
396+
397+
public void consume_tsfile_await(
398+
SubscriptionTreePullConsumer consumer, List<String> devices, List<Integer> expected) {
399+
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
400+
for (int i = 0; i < devices.size(); i++) {
401+
counters.add(new AtomicInteger(0));
402+
}
403+
AWAIT.untilAsserted(
404+
() -> {
405+
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
406+
if (messages.isEmpty()) {
407+
session_src.executeNonQueryStatement("flush");
408+
}
409+
for (final SubscriptionMessage message : messages) {
410+
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
411+
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
412+
for (int i = 0; i < devices.size(); i++) {
413+
final Path path = new Path(devices.get(i), "s_0", true);
414+
final QueryDataSet dataSet =
415+
tsFileReader.query(
416+
QueryExpression.create(Collections.singletonList(path), null));
417+
while (dataSet.hasNext()) {
418+
dataSet.next();
419+
counters.get(i).addAndGet(1);
420+
}
421+
}
422+
} catch (IOException e) {
423+
throw new RuntimeException(e);
424+
}
425+
}
426+
consumer.commitSync(messages);
427+
for (int i = 0; i < devices.size(); i++) {
428+
assertEquals(counters.get(i).get(), expected.get(i));
429+
}
430+
});
431+
}
432+
391433
//////////////////////////// strict assertions ////////////////////////////
392434

393435
public static void assertEquals(int actual, int expected) {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
2727
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
2828
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
29+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2930
import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
3031

3132
import org.apache.thrift.TException;
@@ -81,6 +82,15 @@ public void setUp() throws Exception {
8182
}
8283
}
8384

85+
@Override
86+
protected void setUpConfig() {
87+
super.setUpConfig();
88+
89+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
90+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
91+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
92+
}
93+
8494
@Override
8595
@After
8696
public void tearDown() throws Exception {
@@ -107,6 +117,7 @@ private void insert_data(long timestamp, String device)
107117
timestamp += 2000;
108118
}
109119
session_src.insertTablet(tablet);
120+
session_src.executeNonQueryStatement("flush");
110121
}
111122

112123
@Test

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -69,6 +70,15 @@ public void setUp() throws Exception {
6970
session_src.executeNonQueryStatement("create database root.RootPullConsumeTsfile");
7071
}
7172

73+
@Override
74+
protected void setUpConfig() {
75+
super.setUpConfig();
76+
77+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
78+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
79+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
80+
}
81+
7282
@Override
7383
@After
7484
public void tearDown() throws Exception {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -83,6 +84,15 @@ public void setUp() throws Exception {
8384
assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
8485
}
8586

87+
@Override
88+
protected void setUpConfig() {
89+
super.setUpConfig();
90+
91+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
92+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
93+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
94+
}
95+
8696
@Override
8797
@After
8898
public void tearDown() throws Exception {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import java.io.IOException;
4343
import java.util.ArrayList;
44+
import java.util.Collections;
4445
import java.util.List;
4546

4647
@RunWith(IoTDBTestRunner.class)
@@ -115,6 +116,7 @@ private void insert_data(long timestamp)
115116
timestamp += row * 2000;
116117
}
117118
session_src.insertTablet(tablet);
119+
session_src.executeNonQueryStatement("flush");
118120
}
119121

120122
@Test
@@ -132,13 +134,18 @@ public void do_test()
132134
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after subscription");
133135
insert_data(System.currentTimeMillis() - 30000L);
134136
// Consumption data
135-
consume_data(consumer, session_dest);
136137
String sql = "select count(s_0) from " + device;
137-
System.out.println("src: " + getCount(session_src, sql));
138-
check_count(8, sql, "Consumption data:" + pattern);
139-
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
140-
check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1");
141-
check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2");
138+
consume_data_await(
139+
consumer,
140+
session_dest,
141+
Collections.singletonList(
142+
() -> {
143+
System.out.println("src: " + getCount(session_src, sql));
144+
check_count(8, sql, "Consumption data:" + pattern);
145+
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
146+
check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1");
147+
check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2");
148+
}));
142149
insert_data(System.currentTimeMillis());
143150
// Unsubscribe
144151
consumer.unsubscribe(topicName);
@@ -149,8 +156,14 @@ public void do_test()
149156
System.out.println("src: " + getCount(session_src, sql));
150157
// Consumption data: Progress is not retained after unsubscribing and then re-subscribing. Full
151158
// synchronization.
152-
consume_data(consumer, session_dest);
153-
check_count(12, "select count(s_0) from " + device, "consume data again:s_0");
154-
check_count(12, "select count(s_1) from " + device, "Consumption data: s_1");
159+
consume_data_await(
160+
consumer,
161+
session_dest,
162+
Collections.singletonList(
163+
() -> {
164+
System.out.println("src: " + getCount(session_src, sql));
165+
check_count(12, "select count(s_0) from " + device, "consume data again:s_0");
166+
check_count(12, "select count(s_1) from " + device, "Consumption data: s_1");
167+
}));
155168
}
156169
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -41,6 +42,7 @@
4142

4243
import java.io.IOException;
4344
import java.util.ArrayList;
45+
import java.util.Arrays;
4446
import java.util.List;
4547

4648
/***
@@ -94,6 +96,16 @@ public void setUp() throws Exception {
9496
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
9597
}
9698

99+
// TODO: remove it later
100+
@Override
101+
protected void setUpConfig() {
102+
super.setUpConfig();
103+
104+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
105+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
106+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
107+
}
108+
97109
@Override
98110
@After
99111
public void tearDown() throws Exception {
@@ -151,11 +163,7 @@ public void do_test()
151163
devices.add(device);
152164
devices.add(device2);
153165
devices.add(database2 + ".d_2");
154-
155-
List<Integer> rowCounts = consume_tsfile(consumer, devices);
156-
assertEquals(rowCounts.get(0), 10);
157-
assertEquals(rowCounts.get(1), 1);
158-
assertEquals(rowCounts.get(2), 1);
166+
consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
159167
// Unsubscribe
160168
consumer.unsubscribe(topicName);
161169
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after cancellation");
@@ -165,14 +173,6 @@ public void do_test()
165173
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
166174
// Consumption data: Progress is not retained after canceling and re-subscribing. Full
167175
// synchronization.
168-
rowCounts = consume_tsfile(consumer, devices);
169-
170-
assertEquals(
171-
rowCounts.get(0),
172-
15,
173-
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
174-
assertEquals(
175-
rowCounts.get(1), 1, "Cancel subscription and subscribe again," + database + ".d_1");
176-
assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + database2 + ".d_2");
176+
consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
177177
}
178178
}

0 commit comments

Comments
 (0)