Skip to content

Commit 3b4b781

Browse files
committed
add it
1 parent 15bc8f8 commit 3b4b781

File tree

5 files changed

+434
-6
lines changed

5 files changed

+434
-6
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class TieringSourceOptions {
3838
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
3939
key("tiering.table.duration.max")
4040
.durationType()
41-
.defaultValue(Duration.ofMinutes(10))
41+
.defaultValue(Duration.ofMinutes(30))
4242
.withDescription(
4343
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
4444
+ "it will be force completed: the tiering will be finalized and committed to the data lake "

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti
158158
partitionName,
159159
startingOffset,
160160
stoppingOffset,
161-
numberOfSplits,
162-
forceIgnore);
161+
forceIgnore,
162+
numberOfSplits);
163163
}
164164
}
165165
}
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering;
20+
21+
import org.apache.fluss.client.Connection;
22+
import org.apache.fluss.client.ConnectionFactory;
23+
import org.apache.fluss.client.admin.Admin;
24+
import org.apache.fluss.client.metadata.LakeSnapshot;
25+
import org.apache.fluss.client.table.Table;
26+
import org.apache.fluss.client.table.writer.AppendWriter;
27+
import org.apache.fluss.client.table.writer.TableWriter;
28+
import org.apache.fluss.client.table.writer.UpsertWriter;
29+
import org.apache.fluss.config.ConfigOptions;
30+
import org.apache.fluss.config.Configuration;
31+
import org.apache.fluss.exception.FlussRuntimeException;
32+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
33+
import org.apache.fluss.metadata.DataLakeFormat;
34+
import org.apache.fluss.metadata.Schema;
35+
import org.apache.fluss.metadata.TableBucket;
36+
import org.apache.fluss.metadata.TableDescriptor;
37+
import org.apache.fluss.metadata.TablePath;
38+
import org.apache.fluss.row.BinaryString;
39+
import org.apache.fluss.row.GenericRow;
40+
import org.apache.fluss.row.InternalRow;
41+
import org.apache.fluss.server.testutils.FlussClusterExtension;
42+
import org.apache.fluss.types.DataTypes;
43+
import org.apache.fluss.utils.ExceptionUtils;
44+
45+
import org.apache.flink.api.common.RuntimeExecutionMode;
46+
import org.apache.flink.core.execution.JobClient;
47+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
48+
import org.junit.jupiter.api.AfterAll;
49+
import org.junit.jupiter.api.BeforeAll;
50+
import org.junit.jupiter.api.Test;
51+
import org.junit.jupiter.api.extension.RegisterExtension;
52+
53+
import java.nio.file.Files;
54+
import java.time.Duration;
55+
import java.util.ArrayList;
56+
import java.util.Collections;
57+
import java.util.List;
58+
import java.util.Map;
59+
import java.util.Optional;
60+
61+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
62+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
63+
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
64+
import static org.assertj.core.api.Assertions.assertThat;
65+
66+
/** The IT case for tiering. */
67+
class TieringITCase {
68+
69+
@RegisterExtension
70+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
71+
FlussClusterExtension.builder()
72+
.setClusterConf(initConfig())
73+
.setNumOfTabletServers(3)
74+
.build();
75+
76+
protected static String warehousePath;
77+
protected static Connection conn;
78+
protected static Admin admin;
79+
protected static StreamExecutionEnvironment execEnv;
80+
81+
protected static Configuration initConfig() {
82+
Configuration conf = new Configuration();
83+
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
84+
// not to clean snapshots for test purpose
85+
.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
86+
conf.setString("datalake.format", "paimon");
87+
conf.setString("datalake.paimon.metastore", "filesystem");
88+
try {
89+
warehousePath =
90+
Files.createTempDirectory("fluss-testing-datalake-tiered")
91+
.resolve("warehouse")
92+
.toString();
93+
} catch (Exception e) {
94+
throw new FlussRuntimeException("Failed to create warehouse path");
95+
}
96+
conf.setString("datalake.paimon.warehouse", warehousePath);
97+
return conf;
98+
}
99+
100+
@BeforeAll
101+
static void beforeAll() {
102+
conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig());
103+
admin = conn.getAdmin();
104+
execEnv =
105+
StreamExecutionEnvironment.getExecutionEnvironment()
106+
.setParallelism(1)
107+
.setRuntimeMode(RuntimeExecutionMode.STREAMING);
108+
}
109+
110+
@Test
111+
void testTieringReachMaxDuration() throws Exception {
112+
TablePath logTablePath = TablePath.of("fluss", "logtable");
113+
createTable(logTablePath, false);
114+
TablePath pkTablePath = TablePath.of("fluss", "pktable");
115+
long pkTableId = createTable(pkTablePath, true);
116+
117+
// write some records to log table
118+
List<InternalRow> rows = new ArrayList<>();
119+
int recordCount = 6;
120+
for (int i = 0; i < recordCount; i++) {
121+
rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
122+
}
123+
writeRows(logTablePath, rows, true);
124+
125+
rows = new ArrayList<>();
126+
// write 6 records to primary key table, each bucket should only contain few record
127+
for (int i = 0; i < recordCount; i++) {
128+
rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
129+
}
130+
writeRows(pkTablePath, rows, false);
131+
132+
waitUntilSnapshot(pkTableId, 3, 0);
133+
134+
JobClient jobClient = buildTieringJob(execEnv);
135+
136+
try {
137+
// verify the tiered records is less than the table total record to
138+
// make sure tiering is forced to complete when reach max duration
139+
LakeSnapshot logTableLakeSnapshot = waitLakeSnapshot(logTablePath);
140+
long tieredRecords = countTieredRecords(logTableLakeSnapshot);
141+
assertThat(tieredRecords).isLessThan(recordCount);
142+
143+
// verify the tiered records is less than the table total record to
144+
// make sure tiering is forced to complete when reach max duration
145+
LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath);
146+
tieredRecords = countTieredRecords(pkTableLakeSnapshot);
147+
assertThat(tieredRecords).isLessThan(recordCount);
148+
} finally {
149+
jobClient.cancel();
150+
}
151+
}
152+
153+
@AfterAll
154+
static void afterAll() throws Exception {
155+
if (admin != null) {
156+
admin.close();
157+
admin = null;
158+
}
159+
if (conn != null) {
160+
conn.close();
161+
conn = null;
162+
}
163+
}
164+
165+
private long countTieredRecords(LakeSnapshot lakeSnapshot) throws Exception {
166+
return lakeSnapshot.getTableBucketsOffset().values().stream()
167+
.mapToLong(Long::longValue)
168+
.sum();
169+
}
170+
171+
private LakeSnapshot waitLakeSnapshot(TablePath tablePath) {
172+
return waitValue(
173+
() -> {
174+
try {
175+
return Optional.of(admin.getLatestLakeSnapshot(tablePath).get());
176+
} catch (Exception e) {
177+
if (ExceptionUtils.stripExecutionException(e)
178+
instanceof LakeTableSnapshotNotExistException) {
179+
return Optional.empty();
180+
}
181+
throw e;
182+
}
183+
},
184+
Duration.ofSeconds(30),
185+
"Fail to wait for one round of tiering finish for table " + tablePath);
186+
}
187+
188+
private long createTable(TablePath tablePath, boolean isPrimaryKeyTable) throws Exception {
189+
Schema.Builder schemaBuilder =
190+
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
191+
if (isPrimaryKeyTable) {
192+
schemaBuilder.primaryKey("a");
193+
}
194+
TableDescriptor.Builder tableDescriptorBuilder =
195+
TableDescriptor.builder()
196+
.schema(schemaBuilder.build())
197+
.distributedBy(3, "a")
198+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
199+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
200+
201+
// see TestingPaimonStoragePlugin#TestingPaimonWriter, we set write-pause
202+
// to 1s to make it easy to mock tiering reach max duration
203+
Map<String, String> customProperties = Collections.singletonMap("write-pause", "1s");
204+
tableDescriptorBuilder.customProperties(customProperties);
205+
return createTable(tablePath, tableDescriptorBuilder.build());
206+
}
207+
208+
protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor)
209+
throws Exception {
210+
admin.createTable(tablePath, tableDescriptor, true).get();
211+
return admin.getTableInfo(tablePath).get().getTableId();
212+
}
213+
214+
private void writeRows(TablePath tablePath, List<InternalRow> rows, boolean append)
215+
throws Exception {
216+
try (Table table = conn.getTable(tablePath)) {
217+
TableWriter tableWriter;
218+
if (append) {
219+
tableWriter = table.newAppend().createWriter();
220+
} else {
221+
tableWriter = table.newUpsert().createWriter();
222+
}
223+
for (InternalRow row : rows) {
224+
if (tableWriter instanceof AppendWriter) {
225+
((AppendWriter) tableWriter).append(row);
226+
} else {
227+
((UpsertWriter) tableWriter).upsert(row);
228+
}
229+
}
230+
tableWriter.flush();
231+
}
232+
}
233+
234+
private JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception {
235+
Configuration lakeTieringConfig = new Configuration();
236+
lakeTieringConfig.set(TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
237+
lakeTieringConfig.set(TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
238+
239+
Configuration flussConfig = new Configuration();
240+
flussConfig.setString(
241+
ConfigOptions.BOOTSTRAP_SERVERS.key(),
242+
FLUSS_CLUSTER_EXTENSION.getBootstrapServers());
243+
flussConfig.set(TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
244+
flussConfig.set(TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
245+
return LakeTieringJobBuilder.newBuilder(
246+
execEnv,
247+
flussConfig,
248+
new Configuration(),
249+
lakeTieringConfig,
250+
DataLakeFormat.PAIMON.toString())
251+
.build();
252+
}
253+
254+
protected void waitUntilSnapshot(long tableId, int bucketNum, long snapshotId) {
255+
for (int i = 0; i < bucketNum; i++) {
256+
TableBucket tableBucket = new TableBucket(tableId, i);
257+
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId);
258+
}
259+
}
260+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ void testForceIgnoreSerde() throws Exception {
121121

122122
// Test TieringLogSplit with forceIgnore set at creation
123123
TieringLogSplit logSplitWithForceIgnore =
124-
new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, true);
124+
new TieringLogSplit(tablePath, tableBucket, null, 100, 200, true, 40);
125125
serialized = serializer.serialize(logSplitWithForceIgnore);
126126
TieringLogSplit deserializedLogSplit =
127127
(TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized);
@@ -141,7 +141,7 @@ void testForceIgnoreSerde() throws Exception {
141141

142142
// Test TieringLogSplit with forceIgnore set after creation
143143
TieringLogSplit logSplit =
144-
new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, false);
144+
new TieringLogSplit(tablePath, tableBucket, null, 100, 200, false, 40);
145145
assertThat(logSplit.isForceIgnore()).isFalse();
146146
logSplit.forceIgnore();
147147
assertThat(logSplit.isForceIgnore()).isTrue();

0 commit comments

Comments
 (0)