Skip to content

Commit bb3f75e

Browse files
committed
refactor tiering options
1 parent 3b4b781 commit bb3f75e

File tree

13 files changed

+92
-75
lines changed

13 files changed

+92
-75
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,6 +1811,23 @@ public class ConfigOptions {
18111811
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
18121812
+ " is false.");
18131813

1814+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_MAX =
1815+
key("lake.tiering.table.duration.max")
1816+
.durationType()
1817+
.defaultValue(Duration.ofMinutes(30))
1818+
.withDescription(
1819+
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
1820+
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
1821+
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
1822+
1823+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL =
1824+
key("lake.tiering.table.duration.detect-interval")
1825+
.durationType()
1826+
.defaultValue(Duration.ofSeconds(30))
1827+
.withDescription(
1828+
"The interval to check if a table tiering operation has reached the maximum duration. "
1829+
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
1830+
18141831
// ------------------------------------------------------------------------
18151832
// ConfigOptions for fluss kafka
18161833
// ------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3535
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
3636

37+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
38+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
3739
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3840
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
39-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
40-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
4141
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4242

4343
/** The builder to build Flink lake tiering job. */
@@ -92,14 +92,14 @@ public JobClient build() throws Exception {
9292
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
9393
}
9494

95-
if (flussConfig.get(TIERING_TABLE_DURATION_MAX) != null) {
95+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX) != null) {
9696
tieringSourceBuilder.withTieringTableDurationMax(
97-
flussConfig.get(TIERING_TABLE_DURATION_MAX).toMillis());
97+
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX).toMillis());
9898
}
9999

100-
if (flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
100+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
101101
tieringSourceBuilder.withTieringTableDurationDetectInterval(
102-
flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
102+
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
103103
}
104104

105105
TieringSource<?> tieringSource = tieringSourceBuilder.build();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444

4545
import java.nio.charset.StandardCharsets;
4646

47+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
48+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
4749
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
48-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
49-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
5050

5151
/**
5252
* The flink source implementation for tiering data from Fluss to downstream lake.
@@ -147,9 +147,9 @@ public static class Builder<WriteResult> {
147147
private long pollTieringTableIntervalMs =
148148
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
149149
private long tieringTableDurationMaxMs =
150-
TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
150+
LAKE_TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
151151
private long tieringTableDurationDetectIntervalMs =
152-
TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
152+
LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
153153

154154
public Builder(
155155
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import java.util.function.Supplier;
3434

3535
/**
36-
* The SplitFetcherManager for Fluss source. This class is needed to help notify a table reaches to
37-
* deadline of tiering to {@link TieringSplitReader}.
36+
* The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches
37+
* to deadline of tiering to {@link TieringSplitReader}.
3838
*/
3939
public class TieringSourceFetcherManager<WriteResult>
4040
extends SingleThreadFetcherManagerAdapter<
@@ -50,31 +50,31 @@ public TieringSourceFetcherManager(
5050
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
5151
}
5252

53-
public void markTableReachTieringDeadline(long tableId) {
53+
public void markTableReachTieringMaxDuration(long tableId) {
5454
if (!fetchers.isEmpty()) {
5555
// The fetcher thread is still running. This should be the majority of the cases.
5656
fetchers.values()
5757
.forEach(
5858
splitFetcher ->
59-
enqueueMarkTableReachTieringDeadlineTask(
59+
enqueueMarkTableReachTieringMaxDurationTask(
6060
splitFetcher, tableId));
6161
} else {
6262
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
6363
createSplitFetcher();
64-
enqueueMarkTableReachTieringDeadlineTask(splitFetcher, tableId);
64+
enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
6565
startFetcher(splitFetcher);
6666
}
6767
}
6868

69-
private void enqueueMarkTableReachTieringDeadlineTask(
69+
private void enqueueMarkTableReachTieringMaxDurationTask(
7070
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
7171
long reachTieringDeadlineTable) {
7272
splitFetcher.enqueueTask(
7373
new SplitFetcherTask() {
7474
@Override
7575
public boolean run() {
7676
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
77-
.handleTableReachTieringDeadline(reachTieringDeadlineTable);
77+
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
7878
return true;
7979
}
8080

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,4 @@ public class TieringSourceOptions {
3434
.defaultValue(Duration.ofSeconds(30))
3535
.withDescription(
3636
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
37-
38-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
39-
key("tiering.table.duration.max")
40-
.durationType()
41-
.defaultValue(Duration.ofMinutes(30))
42-
.withDescription(
43-
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
44-
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
45-
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
46-
47-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_DETECT_INTERVAL =
48-
key("tiering.table.duration.detect-interval")
49-
.durationType()
50-
.defaultValue(Duration.ofSeconds(30))
51-
.withDescription(
52-
"The interval to check if a table tiering operation has reached the maximum duration. "
53-
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
5437
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
122122
(TieringReachMaxDurationEvent) sourceEvent;
123123
long tableId = reachMaxDurationEvent.getTableId();
124124
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
125-
.markTableReachTieringDeadline(tableId);
125+
.markTableReachTieringMaxDuration(tableId);
126126
}
127127
}
128128

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.fluss.flink.tiering.source;
1919

20+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
21+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
22+
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
23+
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
2024
import org.apache.fluss.client.Connection;
2125
import org.apache.fluss.client.table.Table;
2226
import org.apache.fluss.client.table.scanner.ScanRecord;
@@ -33,16 +37,10 @@
3337
import org.apache.fluss.metadata.TableInfo;
3438
import org.apache.fluss.metadata.TablePath;
3539
import org.apache.fluss.utils.CloseableIterator;
36-
37-
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
38-
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
39-
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
40-
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
4342

4443
import javax.annotation.Nullable;
45-
4644
import java.io.IOException;
4745
import java.time.Duration;
4846
import java.util.ArrayDeque;
@@ -57,6 +55,7 @@
5755

5856
import static org.apache.fluss.utils.Preconditions.checkArgument;
5957
import static org.apache.fluss.utils.Preconditions.checkNotNull;
58+
import static org.apache.fluss.utils.Preconditions.checkState;
6059

6160
/** The {@link SplitReader} implementation which will read Fluss and write to lake. */
6261
public class TieringSplitReader<WriteResult>
@@ -81,7 +80,7 @@ public class TieringSplitReader<WriteResult>
8180
// the table_id to the pending splits
8281
private final Map<Long, Set<TieringSplit>> pendingTieringSplits;
8382

84-
private final Set<Long> reachTieringDeadlineTables;
83+
private final Set<Long> reachTieringMaxDurationTables;
8584

8685
private final Map<TableBucket, LakeWriter<WriteResult>> lakeWriters;
8786
private final Connection connection;
@@ -124,7 +123,7 @@ protected TieringSplitReader(
124123
this.currentTableSplitsByBucket = new HashMap<>();
125124
this.lakeWriters = new HashMap<>();
126125
this.currentPendingSnapshotSplits = new ArrayDeque<>();
127-
this.reachTieringDeadlineTables = new HashSet<>();
126+
this.reachTieringMaxDurationTables = new HashSet<>();
128127
this.pollTimeout = pollTimeout;
129128
}
130129

@@ -157,11 +156,11 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
157156
}
158157
} else {
159158
if (currentLogScanner != null) {
160-
if (reachTieringDeadlineTables.contains(currentTableId)) {
159+
// force to complete records
160+
if (reachTieringMaxDurationTables.contains(currentTableId)) {
161161
return forceCompleteTieringLogRecords();
162162
}
163163
ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
164-
// force to complete records
165164
return forLogRecords(scanRecords);
166165
} else {
167166
return emptyTableBucketWriteResultWithSplitIds();
@@ -182,6 +181,9 @@ public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
182181
if (split.isForceIgnore()) {
183182
// if the split is forced to ignore,
184183
// mark it as empty
184+
LOG.info(
185+
"ignore split {} since the split is set to force to ignore",
186+
split.splitId());
185187
currentEmptySplits.add(split);
186188
continue;
187189
}
@@ -300,7 +302,7 @@ private void mayCreateLogScanner() {
300302
long logEndOffset =
301303
logOffsetAndTimestamp == null
302304
? UNKNOWN_BUCKET_OFFSET
303-
// logEngOffset is equal to offset tiered + 1
305+
// logEndOffset is equal to offset tiered + 1
304306
: logOffsetAndTimestamp.logOffset + 1;
305307
long timestamp =
306308
logOffsetAndTimestamp == null
@@ -309,15 +311,26 @@ private void mayCreateLogScanner() {
309311
TableBucketWriteResult<WriteResult> bucketWriteResult =
310312
completeLakeWriter(
311313
bucket, split.getPartitionName(), logEndOffset, timestamp);
314+
315+
if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
316+
// when the log end offset is unknown, the write result must be
317+
// null, otherwise, we should throw exception directly to avoid data
318+
// inconsistent
319+
checkState(
320+
bucketWriteResult.writeResult() == null,
321+
"bucketWriteResult must be null when log end offset is unknown when tiering "
322+
+ split);
323+
}
324+
312325
writeResults.put(bucket, bucketWriteResult);
313326
finishedSplitIds.put(bucket, split.splitId());
314327
LOG.info(
315-
"Split {} is forced to be finished due to tiering timeout.",
328+
"Split {} is forced to be finished due to tiering reach max duration.",
316329
split.splitId());
317330
currentTieringSplitsIterator.remove();
318331
}
319332
}
320-
reachTieringDeadlineTables.remove(this.currentTableId);
333+
reachTieringMaxDurationTables.remove(this.currentTableId);
321334
mayFinishCurrentTable();
322335
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
323336
}
@@ -530,13 +543,13 @@ private void finishCurrentTable() throws IOException {
530543
}
531544

532545
/**
533-
* Handle a table reach tiered deadline. This will mark the current table as timed out, and it
534-
* will be force completed in the next fetch cycle.
546+
* Handle a table reach max tiering duration. This will mark the current table as reaching max
547+
* duration, and it will be force completed in the next fetch cycle.
535548
*/
536-
public void handleTableReachTieringDeadline(long tableId) {
537-
if ((currentTableId != null && currentTableId.equals(tableId)
538-
|| pendingTieringSplits.containsKey(tableId))) {
539-
reachTieringDeadlineTables.add(tableId);
549+
public void handleTableReachTieringMaxDuration(long tableId) {
550+
if ((currentTableId != null && currentTableId.equals(tableId))
551+
|| pendingTieringSplits.containsKey(tableId)) {
552+
reachTieringMaxDurationTables.add(tableId);
540553
}
541554
}
542555

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public TieringLogSplit(
7272
long stoppingOffset,
7373
boolean forceIgnore,
7474
int numberOfSplits) {
75-
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
75+
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
7676
this.startingOffset = startingOffset;
7777
this.stoppingOffset = stoppingOffset;
7878
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public TieringSnapshotSplit(
6363
long logOffsetOfSnapshot,
6464
int numberOfSplits,
6565
boolean forceIgnore) {
66-
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
66+
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
6767
this.snapshotId = snapshotId;
6868
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
6969
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.fluss.flink.tiering.source.split;
1919

20+
import org.apache.flink.api.connector.source.SourceSplit;
2021
import org.apache.fluss.metadata.TableBucket;
2122
import org.apache.fluss.metadata.TablePath;
2223

23-
import org.apache.flink.api.connector.source.SourceSplit;
24-
2524
import javax.annotation.Nullable;
26-
2725
import java.util.Objects;
2826

2927
/** The base table split for tiering service. */
@@ -38,17 +36,16 @@ public abstract class TieringSplit implements SourceSplit {
3836
protected final TableBucket tableBucket;
3937
@Nullable protected final String partitionName;
4038

39+
protected boolean forceIgnore;
4140
// the total number of splits in one round of tiering
4241
protected final int numberOfSplits;
4342

44-
protected boolean forceIgnore;
45-
4643
public TieringSplit(
4744
TablePath tablePath,
4845
TableBucket tableBucket,
4946
@Nullable String partitionName,
50-
int numberOfSplits,
51-
boolean forceIgnore) {
47+
boolean forceIgnore,
48+
int numberOfSplits) {
5249
this.tablePath = tablePath;
5350
this.tableBucket = tableBucket;
5451
this.partitionName = partitionName;
@@ -57,8 +54,8 @@ public TieringSplit(
5754
throw new IllegalArgumentException(
5855
"Partition name and partition id must be both null or both not null.");
5956
}
60-
this.numberOfSplits = numberOfSplits;
6157
this.forceIgnore = forceIgnore;
58+
this.numberOfSplits = numberOfSplits;
6259
}
6360

6461
/** Checks whether this split is a primary key table split to tier. */
@@ -140,11 +137,12 @@ public boolean equals(Object object) {
140137
return Objects.equals(tablePath, that.tablePath)
141138
&& Objects.equals(tableBucket, that.tableBucket)
142139
&& Objects.equals(partitionName, that.partitionName)
140+
&& forceIgnore == that.forceIgnore
143141
&& numberOfSplits == that.numberOfSplits;
144142
}
145143

146144
@Override
147145
public int hashCode() {
148-
return Objects.hash(tablePath, tableBucket, partitionName, numberOfSplits);
146+
return Objects.hash(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
149147
}
150148
}

0 commit comments

Comments
 (0)