Skip to content

Commit 832dcfd

Browse files
committed
Subscription: apply IoTConsensusV2 as cluster mode for integration test (apache#15546)
1 parent 1a33299 commit 832dcfd

File tree

9 files changed

+178
-24
lines changed

9 files changed

+178
-24
lines changed

integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919

2020
package org.apache.iotdb.subscription.it;
2121

22+
import org.apache.iotdb.consensus.ConsensusFactory;
23+
import org.apache.iotdb.itbase.env.BaseEnv;
24+
import org.apache.iotdb.session.Session;
25+
2226
import org.awaitility.Awaitility;
2327
import org.awaitility.core.ConditionFactory;
2428

2529
import java.util.concurrent.TimeUnit;
30+
import java.util.function.Consumer;
2631

2732
public class IoTDBSubscriptionITConstant {
2833

@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
4045

4146
public static final long SLEEP_NS = 1_000_000_000L;
4247
public static final long POLL_TIMEOUT_MS = 10_000L;
48+
49+
@FunctionalInterface
50+
public interface WrappedVoidSupplier {
51+
void get() throws Throwable;
52+
}
53+
54+
public static void AWAIT_WITH_FLUSH(final Session session, final WrappedVoidSupplier assertions) {
55+
AWAIT.untilAsserted(
56+
() -> {
57+
session.executeNonQueryStatement("flush");
58+
assertions.get();
59+
});
60+
}
61+
62+
public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
63+
env ->
64+
env.getConfig()
65+
.getCommonConfig()
66+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
67+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
68+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
69+
.setSchemaReplicationFactor(1)
70+
.setDataReplicationFactor(1);
4371
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
3030
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
3131
import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
32+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
3233
import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
3334

3435
import org.apache.thrift.TException;
@@ -57,6 +58,7 @@
5758
import java.util.Properties;
5859
import java.util.concurrent.atomic.AtomicInteger;
5960

61+
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
6062
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
6163

6264
public abstract class AbstractSubscriptionRegressionIT extends AbstractSubscriptionTripleIT {
@@ -388,6 +390,64 @@ public void consume_data(SubscriptionPullConsumer consumer)
388390
consume_data(consumer, session_dest);
389391
}
390392

393+
public void consume_data_await(
394+
SubscriptionPullConsumer consumer, Session session, List<WrappedVoidSupplier> assertions) {
395+
AWAIT.untilAsserted(
396+
() -> {
397+
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
398+
if (messages.isEmpty()) {
399+
session_src.executeNonQueryStatement("flush");
400+
}
401+
for (final SubscriptionMessage message : messages) {
402+
for (final Iterator<Tablet> it = message.getSessionDataSetsHandler().tabletIterator();
403+
it.hasNext(); ) {
404+
final Tablet tablet = it.next();
405+
session.insertTablet(tablet);
406+
}
407+
}
408+
consumer.commitSync(messages);
409+
for (final WrappedVoidSupplier assertion : assertions) {
410+
assertion.get();
411+
}
412+
});
413+
}
414+
415+
public void consume_tsfile_await(
416+
SubscriptionPullConsumer consumer, List<String> devices, List<Integer> expected) {
417+
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
418+
for (int i = 0; i < devices.size(); i++) {
419+
counters.add(new AtomicInteger(0));
420+
}
421+
AWAIT.untilAsserted(
422+
() -> {
423+
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
424+
if (messages.isEmpty()) {
425+
session_src.executeNonQueryStatement("flush");
426+
}
427+
for (final SubscriptionMessage message : messages) {
428+
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
429+
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
430+
for (int i = 0; i < devices.size(); i++) {
431+
final Path path = new Path(devices.get(i), "s_0", true);
432+
final QueryDataSet dataSet =
433+
tsFileReader.query(
434+
QueryExpression.create(Collections.singletonList(path), null));
435+
while (dataSet.hasNext()) {
436+
dataSet.next();
437+
counters.get(i).addAndGet(1);
438+
}
439+
}
440+
} catch (IOException e) {
441+
throw new RuntimeException(e);
442+
}
443+
}
444+
consumer.commitSync(messages);
445+
for (int i = 0; i < devices.size(); i++) {
446+
assertEquals(counters.get(i).get(), expected.get(i));
447+
}
448+
});
449+
}
450+
391451
//////////////////////////// strict assertions ////////////////////////////
392452

393453
public static void assertEquals(int actual, int expected) {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
2727
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
2828
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
29+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2930
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
3031

3132
import org.apache.thrift.TException;
@@ -80,6 +81,15 @@ public void setUp() throws Exception {
8081
}
8182
}
8283

84+
@Override
85+
protected void setUpConfig() {
86+
super.setUpConfig();
87+
88+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
89+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
90+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
91+
}
92+
8393
@Override
8494
@After
8595
public void tearDown() throws Exception {
@@ -106,6 +116,7 @@ private void insert_data(long timestamp, String device)
106116
timestamp += 2000;
107117
}
108118
session_src.insertTablet(tablet);
119+
session_src.executeNonQueryStatement("flush");
109120
}
110121

111122
@Test

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -68,6 +69,15 @@ public void setUp() throws Exception {
6869
session_src.executeNonQueryStatement("create database root.RootPullConsumeTsfile");
6970
}
7071

72+
@Override
73+
protected void setUpConfig() {
74+
super.setUpConfig();
75+
76+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
77+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
78+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
79+
}
80+
7181
@Override
7282
@After
7383
public void tearDown() throws Exception {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -82,6 +83,15 @@ public void setUp() throws Exception {
8283
assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
8384
}
8485

86+
@Override
87+
protected void setUpConfig() {
88+
super.setUpConfig();
89+
90+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
91+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
92+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
93+
}
94+
8595
@Override
8696
@After
8797
public void tearDown() throws Exception {

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.ArrayList;
43+
import java.util.Collections;
4344
import java.util.List;
4445

4546
@RunWith(IoTDBTestRunner.class)
@@ -114,6 +115,7 @@ private void insert_data(long timestamp)
114115
timestamp += row * 2000;
115116
}
116117
session_src.insertTablet(tablet);
118+
session_src.executeNonQueryStatement("flush");
117119
}
118120

119121
@Test
@@ -131,13 +133,18 @@ public void do_test()
131133
assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after subscription");
132134
insert_data(System.currentTimeMillis() - 30000L);
133135
// Consumption data
134-
consume_data(consumer, session_dest);
135136
String sql = "select count(s_0) from " + device;
136-
System.out.println("src: " + getCount(session_src, sql));
137-
check_count(8, sql, "Consumption data:" + pattern);
138-
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
139-
check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1");
140-
check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2");
137+
consume_data_await(
138+
consumer,
139+
session_dest,
140+
Collections.singletonList(
141+
() -> {
142+
System.out.println("src: " + getCount(session_src, sql));
143+
check_count(8, sql, "Consumption data:" + pattern);
144+
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
145+
check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1");
146+
check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2");
147+
}));
141148
insert_data(System.currentTimeMillis());
142149
// Unsubscribe
143150
consumer.unsubscribe(topicName);
@@ -148,8 +155,14 @@ public void do_test()
148155
System.out.println("src: " + getCount(session_src, sql));
149156
// Consumption data: Progress is not retained after unsubscribing and then re-subscribing. Full
150157
// synchronization.
151-
consume_data(consumer, session_dest);
152-
check_count(12, "select count(s_0) from " + device, "consume data again:s_0");
153-
check_count(12, "select count(s_1) from " + device, "Consumption data: s_1");
158+
consume_data_await(
159+
consumer,
160+
session_dest,
161+
Collections.singletonList(
162+
() -> {
163+
System.out.println("src: " + getCount(session_src, sql));
164+
check_count(12, "select count(s_0) from " + device, "consume data again:s_0");
165+
check_count(12, "select count(s_1) from " + device, "Consumption data: s_1");
166+
}));
154167
}
155168
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.rpc.IoTDBConnectionException;
2525
import org.apache.iotdb.rpc.StatementExecutionException;
2626
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
27+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
2728
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
2829

2930
import org.apache.thrift.TException;
@@ -40,6 +41,7 @@
4041

4142
import java.io.IOException;
4243
import java.util.ArrayList;
44+
import java.util.Arrays;
4345
import java.util.List;
4446

4547
/***
@@ -92,6 +94,16 @@ public void setUp() throws Exception {
9294
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
9395
}
9496

97+
// TODO: remove it later
98+
@Override
99+
protected void setUpConfig() {
100+
super.setUpConfig();
101+
102+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
103+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
104+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
105+
}
106+
95107
@Override
96108
@After
97109
public void tearDown() throws Exception {
@@ -149,11 +161,7 @@ public void do_test()
149161
devices.add(device);
150162
devices.add(device2);
151163
devices.add(database2 + ".d_2");
152-
153-
List<Integer> rowCounts = consume_tsfile(consumer, devices);
154-
assertEquals(rowCounts.get(0), 10);
155-
assertEquals(rowCounts.get(1), 1);
156-
assertEquals(rowCounts.get(2), 1);
164+
consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
157165
// Unsubscribe
158166
consumer.unsubscribe(topicName);
159167
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after cancellation");
@@ -163,14 +171,6 @@ public void do_test()
163171
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
164172
// Consumption data: Progress is not retained after canceling and re-subscribing. Full
165173
// synchronization.
166-
rowCounts = consume_tsfile(consumer, devices);
167-
168-
assertEquals(
169-
rowCounts.get(0),
170-
15,
171-
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
172-
assertEquals(
173-
rowCounts.get(1), 1, "Cancel subscription and subscribe again," + database + ".d_1");
174-
assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + database2 + ".d_2");
174+
consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
175175
}
176176
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
2929
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
3030
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
31+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
3132
import org.apache.iotdb.subscription.it.Retry;
3233
import org.apache.iotdb.subscription.it.RetryRule;
3334
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -104,6 +105,16 @@ public void setUp() throws Exception {
104105
assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
105106
}
106107

108+
// TODO: remove it later
109+
@Override
110+
protected void setUpConfig() {
111+
super.setUpConfig();
112+
113+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
114+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
115+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
116+
}
117+
107118
@Override
108119
@After
109120
public void tearDown() throws Exception {
@@ -194,7 +205,8 @@ public void do_test()
194205

195206
// Consumption data: Progress is not retained when re-subscribing after cancellation. Full
196207
// synchronization.
197-
AWAIT.untilAsserted(
208+
IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH(
209+
session_src,
198210
() -> {
199211
check_count(12, "select count(s_0) from " + device, "consume data again:s_0 " + device);
200212
check_count(0, "select count(s_1) from " + device, "Consumption data: s_1 " + device);

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
2828
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
2929
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
30+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
3031
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
3132

3233
import org.apache.thrift.TException;
@@ -121,6 +122,15 @@ public void setUp() throws Exception {
121122
subs.getTopics().forEach(System.out::println);
122123
}
123124

125+
@Override
126+
protected void setUpConfig() {
127+
super.setUpConfig();
128+
129+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
130+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
131+
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
132+
}
133+
124134
@Override
125135
@After
126136
public void tearDown() throws Exception {

0 commit comments

Comments
 (0)