Skip to content

Commit 9293810

Browse files
committed
add integration tests
1 parent cda12ef commit 9293810

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.client.admin.Admin;
23+
import org.apache.fluss.client.table.Table;
24+
import org.apache.fluss.client.table.writer.UpsertWriter;
25+
import org.apache.fluss.config.ConfigOptions;
26+
import org.apache.fluss.config.Configuration;
27+
import org.apache.fluss.config.MemorySize;
28+
import org.apache.fluss.metadata.DatabaseDescriptor;
29+
import org.apache.fluss.metadata.PartitionSpec;
30+
import org.apache.fluss.metadata.Schema;
31+
import org.apache.fluss.metadata.TableDescriptor;
32+
import org.apache.fluss.metadata.TablePath;
33+
import org.apache.fluss.row.InternalRow;
34+
import org.apache.fluss.server.testutils.FlussClusterExtension;
35+
import org.apache.fluss.types.DataTypes;
36+
import org.apache.fluss.types.RowType;
37+
38+
import org.junit.jupiter.api.AfterEach;
39+
import org.junit.jupiter.api.BeforeEach;
40+
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.extension.RegisterExtension;
42+
43+
import java.time.Duration;
44+
import java.util.ArrayList;
45+
import java.util.Collections;
46+
import java.util.Comparator;
47+
import java.util.HashSet;
48+
import java.util.List;
49+
import java.util.concurrent.CompletionException;
50+
import java.util.stream.Collectors;
51+
52+
import static org.apache.fluss.testutils.DataTestUtils.row;
53+
import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow;
54+
import static org.assertj.core.api.Assertions.assertThat;
55+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
56+
57+
/** End-to-end IT cases for @PrimaryKeyLookuper functionality. */
58+
class PrimaryKeyLookuperITCase {
59+
60+
@RegisterExtension
61+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
62+
FlussClusterExtension.builder()
63+
.setNumOfTabletServers(3)
64+
.setClusterConf(initClusterConfig())
65+
.build();
66+
67+
private Connection conn;
68+
private Admin admin;
69+
private Configuration clientConf;
70+
71+
private static Configuration initClusterConfig() {
72+
Configuration conf = new Configuration();
73+
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
74+
// faster snapshotting in tests (not strictly required by fullScan, but helps stability)
75+
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
76+
// writer settings to keep buffers small
77+
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
78+
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
79+
conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
80+
conf.set(ConfigOptions.MAX_BUCKET_NUM, 30);
81+
return conf;
82+
}
83+
84+
@BeforeEach
85+
void setUp() {
86+
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
87+
conn = ConnectionFactory.createConnection(clientConf);
88+
admin = conn.getAdmin();
89+
}
90+
91+
@AfterEach
92+
void tearDown() throws Exception {
93+
if (admin != null) {
94+
admin.close();
95+
admin = null;
96+
}
97+
if (conn != null) {
98+
conn.close();
99+
conn = null;
100+
}
101+
}
102+
103+
@Test
104+
void testSnapshotAllNotPartitioned() throws Exception {
105+
TablePath tablePath = TablePath.of("pk_snapshot", "non_partitioned_pk");
106+
Schema schema =
107+
Schema.newBuilder()
108+
.column("a", DataTypes.INT())
109+
.column("b", DataTypes.STRING())
110+
.primaryKey("a")
111+
.build();
112+
113+
TableDescriptor desc =
114+
TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
115+
116+
// create db/table
117+
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, true).get();
118+
admin.createTable(tablePath, desc, false).get();
119+
120+
try (Table table = conn.getTable(tablePath)) {
121+
// write 10 rows across 3 buckets
122+
UpsertWriter upsert = table.newUpsert().createWriter();
123+
List<InternalRow> expected = new ArrayList<>();
124+
for (int i = 1; i <= 10; i++) {
125+
InternalRow r = row(i, "v" + i);
126+
upsert.upsert(r);
127+
expected.add(r);
128+
}
129+
upsert.flush();
130+
131+
// run snapshotAll()
132+
Lookuper lookuper = table.newLookup().createLookuper();
133+
List<InternalRow> actual = lookuper.snapshotAll().get();
134+
135+
// verify count and contents (order-agnostic by sorting on primary key 'a')
136+
assertThat(actual).hasSize(10);
137+
RowType rowType = schema.getRowType();
138+
Comparator<InternalRow> byKey = Comparator.comparingInt(r -> r.getInt(0));
139+
List<InternalRow> sortedActual =
140+
actual.stream().sorted(byKey).collect(Collectors.toList());
141+
List<InternalRow> sortedExpected =
142+
expected.stream().sorted(byKey).collect(Collectors.toList());
143+
for (int i = 0; i < sortedActual.size(); i++) {
144+
assertThatRow(sortedActual.get(i))
145+
.withSchema(rowType)
146+
.isEqualTo(sortedExpected.get(i));
147+
}
148+
149+
// unhappy path: snapshotAllPartition on non-partitioned table
150+
assertThatThrownBy(() -> lookuper.snapshotAllPartition("p1").join())
151+
.isInstanceOf(CompletionException.class)
152+
.hasMessageContaining("Table is not partitioned");
153+
}
154+
}
155+
156+
@Test
157+
void testSnapshotAllPartitioned() throws Exception {
158+
TablePath tablePath = TablePath.of("pk_snapshot", "partitioned_pk");
159+
// Partition by column 'p', and include 'p' in PK so physical PK = [a, p]
160+
Schema schema =
161+
Schema.newBuilder()
162+
.column("a", DataTypes.INT())
163+
.column("b", DataTypes.STRING())
164+
.column("p", DataTypes.STRING())
165+
.primaryKey("a", "p")
166+
.build();
167+
TableDescriptor desc =
168+
TableDescriptor.builder()
169+
.schema(schema)
170+
.distributedBy(3, "a")
171+
.partitionedBy("p")
172+
.build();
173+
174+
// create db/table
175+
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, true).get();
176+
admin.createTable(tablePath, desc, false).get();
177+
178+
// explicitly create two partitions: p=20240101 and p=20240102
179+
admin.createPartition(
180+
tablePath,
181+
new PartitionSpec(Collections.singletonMap("p", "20240101")),
182+
true)
183+
.get();
184+
admin.createPartition(
185+
tablePath,
186+
new PartitionSpec(Collections.singletonMap("p", "20240102")),
187+
true)
188+
.get();
189+
190+
try (Table table = conn.getTable(tablePath)) {
191+
UpsertWriter upsert = table.newUpsert().createWriter();
192+
List<InternalRow> expectedP1 = new ArrayList<>();
193+
List<InternalRow> expectedP2 = new ArrayList<>();
194+
for (int i = 1; i <= 10; i++) {
195+
InternalRow r1 = row(i, "v" + i, "20240101");
196+
upsert.upsert(r1);
197+
expectedP1.add(r1);
198+
InternalRow r2 = row(i, "w" + i, "20240102");
199+
upsert.upsert(r2);
200+
expectedP2.add(r2);
201+
}
202+
upsert.flush();
203+
204+
Lookuper lookuper = table.newLookup().createLookuper();
205+
206+
// valid partition snapshots
207+
List<InternalRow> p1Rows = lookuper.snapshotAllPartition("20240101").get();
208+
List<InternalRow> p2Rows = lookuper.snapshotAllPartition("20240102").get();
209+
210+
RowType rowType = schema.getRowType();
211+
Comparator<InternalRow> byKey = Comparator.comparingInt(r -> r.getInt(0));
212+
213+
// verify P1
214+
assertThat(p1Rows).hasSize(10);
215+
List<InternalRow> sortedP1 = p1Rows.stream().sorted(byKey).collect(Collectors.toList());
216+
List<InternalRow> sortedExpectedP1 =
217+
expectedP1.stream().sorted(byKey).collect(Collectors.toList());
218+
for (int i = 0; i < sortedP1.size(); i++) {
219+
assertThatRow(sortedP1.get(i))
220+
.withSchema(rowType)
221+
.isEqualTo(sortedExpectedP1.get(i));
222+
}
223+
224+
// verify P2
225+
assertThat(p2Rows).hasSize(10);
226+
List<InternalRow> sortedP2 = p2Rows.stream().sorted(byKey).collect(Collectors.toList());
227+
List<InternalRow> sortedExpectedP2 =
228+
expectedP2.stream().sorted(byKey).collect(Collectors.toList());
229+
for (int i = 0; i < sortedP2.size(); i++) {
230+
assertThatRow(sortedP2.get(i))
231+
.withSchema(rowType)
232+
.isEqualTo(sortedExpectedP2.get(i));
233+
}
234+
235+
// unhappy path: invalid partition name
236+
assertThatThrownBy(() -> lookuper.snapshotAllPartition("p=does_not_exist").join())
237+
.isInstanceOf(CompletionException.class)
238+
.hasMessageContaining("does not exist");
239+
240+
// unhappy path: snapshotAll on partitioned table
241+
assertThatThrownBy(() -> lookuper.snapshotAll().join())
242+
.isInstanceOf(CompletionException.class)
243+
.hasMessageContaining("Table is partitioned");
244+
}
245+
}
246+
247+
// This test is mainly used for checking the latency when the table is around 10k values
248+
// which is fairly too large for the particular use case of snapshotAll().
249+
@Test
250+
void testSnapshotAllFor10KTable() throws Exception {
251+
TablePath tablePath = TablePath.of("pk_snapshot", "non_partitioned_pk_10k");
252+
Schema schema =
253+
Schema.newBuilder()
254+
.column("a", DataTypes.INT())
255+
.column("b", DataTypes.STRING())
256+
.primaryKey("a")
257+
.build();
258+
TableDescriptor desc =
259+
TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
260+
261+
// create db/table
262+
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, true).get();
263+
admin.createTable(tablePath, desc, false).get();
264+
265+
try (Table table = conn.getTable(tablePath)) {
266+
UpsertWriter upsert = table.newUpsert().createWriter();
267+
int total = 10_000;
268+
for (int i = 1; i <= total; i++) {
269+
upsert.upsert(row(i, "v" + i));
270+
if (i % 1000 == 0) {
271+
// periodic flush to keep memory stable in CI
272+
upsert.flush();
273+
}
274+
}
275+
upsert.flush();
276+
277+
// run snapshotAll
278+
Lookuper lookuper = table.newLookup().createLookuper();
279+
List<InternalRow> rows = lookuper.snapshotAll().get();
280+
281+
// verify size and key coverage
282+
assertThat(rows).hasSize(total);
283+
HashSet<Integer> keys = new HashSet<>(rows.size());
284+
for (InternalRow r : rows) {
285+
keys.add(r.getInt(0));
286+
}
287+
assertThat(keys).hasSize(total);
288+
// spot-check a few
289+
assertThat(keys.contains(1)).isTrue();
290+
assertThat(keys.contains(5000)).isTrue();
291+
assertThat(keys.contains(10000)).isTrue();
292+
}
293+
}
294+
}

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,10 +1023,12 @@ public DefaultValueRecordBatch fullScan(long tableId, @Nullable Long partitionId
10231023
for (byte[] value : values) {
10241024
builder.append(value);
10251025
}
1026+
10261027
valueCount += values.size();
10271028
bucketCount++;
10281029
}
10291030
long elapsed = System.currentTimeMillis() - start;
1031+
10301032
if (bucketCount > 0) {
10311033
LOG.info(
10321034
"Full-scan success: tableId={}, partitionId={}, buckets_scanned={}, values={}, elapsed_ms={}",

0 commit comments

Comments
 (0)