Skip to content

Commit bbd9d38

Browse files
committed
refactor tiering options
1 parent 3b4b781 commit bbd9d38

File tree

6 files changed

+33
-35
lines changed

6 files changed

+33
-35
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,6 +1811,23 @@ public class ConfigOptions {
18111811
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
18121812
+ " is false.");
18131813

1814+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_MAX =
1815+
key("lake.tiering.table.duration.max")
1816+
.durationType()
1817+
.defaultValue(Duration.ofMinutes(30))
1818+
.withDescription(
1819+
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
1820+
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
1821+
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
1822+
1823+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL =
1824+
key("lake.tiering.table.duration.detect-interval")
1825+
.durationType()
1826+
.defaultValue(Duration.ofSeconds(30))
1827+
.withDescription(
1828+
"The interval to check if a table tiering operation has reached the maximum duration. "
1829+
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
1830+
18141831
// ------------------------------------------------------------------------
18151832
// ConfigOptions for fluss kafka
18161833
// ------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3535
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
3636

37+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
38+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
3739
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3840
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
39-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
40-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
4141
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4242

4343
/** The builder to build Flink lake tiering job. */
@@ -92,14 +92,14 @@ public JobClient build() throws Exception {
9292
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
9393
}
9494

95-
if (flussConfig.get(TIERING_TABLE_DURATION_MAX) != null) {
95+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX) != null) {
9696
tieringSourceBuilder.withTieringTableDurationMax(
97-
flussConfig.get(TIERING_TABLE_DURATION_MAX).toMillis());
97+
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX).toMillis());
9898
}
9999

100-
if (flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
100+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
101101
tieringSourceBuilder.withTieringTableDurationDetectInterval(
102-
flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
102+
flussConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
103103
}
104104

105105
TieringSource<?> tieringSource = tieringSourceBuilder.build();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444

4545
import java.nio.charset.StandardCharsets;
4646

47+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
48+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
4749
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
48-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
49-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
5050

5151
/**
5252
* The flink source implementation for tiering data from Fluss to downstream lake.
@@ -147,9 +147,9 @@ public static class Builder<WriteResult> {
147147
private long pollTieringTableIntervalMs =
148148
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
149149
private long tieringTableDurationMaxMs =
150-
TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
150+
LAKE_TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
151151
private long tieringTableDurationDetectIntervalMs =
152-
TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
152+
LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
153153

154154
public Builder(
155155
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,4 @@ public class TieringSourceOptions {
3434
.defaultValue(Duration.ofSeconds(30))
3535
.withDescription(
3636
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
37-
38-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
39-
key("tiering.table.duration.max")
40-
.durationType()
41-
.defaultValue(Duration.ofMinutes(30))
42-
.withDescription(
43-
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
44-
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
45-
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
46-
47-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_DETECT_INTERVAL =
48-
key("tiering.table.duration.detect-interval")
49-
.durationType()
50-
.defaultValue(Duration.ofSeconds(30))
51-
.withDescription(
52-
"The interval to check if a table tiering operation has reached the maximum duration. "
53-
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
5437
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
import java.util.Map;
5959
import java.util.Optional;
6060

61-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
62-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
61+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
62+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
6363
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
6464
import static org.assertj.core.api.Assertions.assertThat;
6565

@@ -233,15 +233,13 @@ private void writeRows(TablePath tablePath, List<InternalRow> rows, boolean appe
233233

234234
private JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception {
235235
Configuration lakeTieringConfig = new Configuration();
236-
lakeTieringConfig.set(TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
237-
lakeTieringConfig.set(TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
236+
lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
237+
lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
238238

239239
Configuration flussConfig = new Configuration();
240240
flussConfig.setString(
241241
ConfigOptions.BOOTSTRAP_SERVERS.key(),
242242
FLUSS_CLUSTER_EXTENSION.getBootstrapServers());
243-
flussConfig.set(TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
244-
flussConfig.set(TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
245243
return LakeTieringJobBuilder.newBuilder(
246244
execEnv,
247245
flussConfig,

fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,14 @@ public TestPaimonCommittable deserialize(int version, byte[] serialized)
160160

161161
private static class TestingPaimonWriter implements LakeWriter<TestingPaimonWriteResult> {
162162

163-
static ConfigOption<Duration> WRITE_PAUSE =
163+
static ConfigOption<Duration> writePauseOption =
164164
key("write-pause").durationType().noDefaultValue();
165165

166166
private int writtenRecords = 0;
167167
private final Duration writePause;
168168

169169
private TestingPaimonWriter(TableInfo tableInfo) {
170-
this.writePause = tableInfo.getCustomProperties().get(WRITE_PAUSE);
170+
this.writePause = tableInfo.getCustomProperties().get(writePauseOption);
171171
}
172172

173173
@Override

0 commit comments

Comments
 (0)