Skip to content

Commit c72407d

Browse files
committed
refactor tiering options
1 parent 3b4b781 commit c72407d

File tree

13 files changed

+87
-65
lines changed

13 files changed

+87
-65
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,6 +1811,23 @@ public class ConfigOptions {
18111811
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
18121812
+ " is false.");
18131813

1814+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_MAX =
1815+
key("lake.tiering.table.duration.max")
1816+
.durationType()
1817+
.defaultValue(Duration.ofMinutes(30))
1818+
.withDescription(
1819+
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
1820+
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
1821+
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
1822+
1823+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL =
1824+
key("lake.tiering.table.duration.detect-interval")
1825+
.durationType()
1826+
.defaultValue(Duration.ofSeconds(30))
1827+
.withDescription(
1828+
"The interval to check if a table tiering operation has reached the maximum duration. "
1829+
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
1830+
18141831
// ------------------------------------------------------------------------
18151832
// ConfigOptions for fluss kafka
18161833
// ------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3535
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
3636

37+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
38+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
3739
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3840
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
39-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
40-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
4141
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4242

4343
/** The builder to build Flink lake tiering job. */
@@ -92,14 +92,14 @@ public JobClient build() throws Exception {
9292
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
9393
}
9494

95-
if (flussConfig.get(TIERING_TABLE_DURATION_MAX) != null) {
95+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX) != null) {
9696
tieringSourceBuilder.withTieringTableDurationMax(
97-
flussConfig.get(TIERING_TABLE_DURATION_MAX).toMillis());
97+
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX).toMillis());
9898
}
9999

100-
if (flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
100+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
101101
tieringSourceBuilder.withTieringTableDurationDetectInterval(
102-
flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
102+
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
103103
}
104104

105105
TieringSource<?> tieringSource = tieringSourceBuilder.build();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444

4545
import java.nio.charset.StandardCharsets;
4646

47+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
48+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
4749
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
48-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
49-
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
5050

5151
/**
5252
* The flink source implementation for tiering data from Fluss to downstream lake.
@@ -147,9 +147,9 @@ public static class Builder<WriteResult> {
147147
private long pollTieringTableIntervalMs =
148148
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
149149
private long tieringTableDurationMaxMs =
150-
TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
150+
LAKE_TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
151151
private long tieringTableDurationDetectIntervalMs =
152-
TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
152+
LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
153153

154154
public Builder(
155155
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import java.util.function.Supplier;
3434

3535
/**
36-
* The SplitFetcherManager for Fluss source. This class is needed to help notify a table reaches to
37-
* deadline of tiering to {@link TieringSplitReader}.
36+
* The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches
37+
* to deadline of tiering to {@link TieringSplitReader}.
3838
*/
3939
public class TieringSourceFetcherManager<WriteResult>
4040
extends SingleThreadFetcherManagerAdapter<
@@ -50,31 +50,31 @@ public TieringSourceFetcherManager(
5050
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
5151
}
5252

53-
public void markTableReachTieringDeadline(long tableId) {
53+
public void markTableReachTieringMaxDuration(long tableId) {
5454
if (!fetchers.isEmpty()) {
5555
// The fetcher thread is still running. This should be the majority of the cases.
5656
fetchers.values()
5757
.forEach(
5858
splitFetcher ->
59-
enqueueMarkTableReachTieringDeadlineTask(
59+
enqueueMarkTableReachTieringMaxDurationTask(
6060
splitFetcher, tableId));
6161
} else {
6262
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
6363
createSplitFetcher();
64-
enqueueMarkTableReachTieringDeadlineTask(splitFetcher, tableId);
64+
enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
6565
startFetcher(splitFetcher);
6666
}
6767
}
6868

69-
private void enqueueMarkTableReachTieringDeadlineTask(
69+
private void enqueueMarkTableReachTieringMaxDurationTask(
7070
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
7171
long reachTieringDeadlineTable) {
7272
splitFetcher.enqueueTask(
7373
new SplitFetcherTask() {
7474
@Override
7575
public boolean run() {
7676
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
77-
.handleTableReachTieringDeadline(reachTieringDeadlineTable);
77+
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
7878
return true;
7979
}
8080

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,4 @@ public class TieringSourceOptions {
3434
.defaultValue(Duration.ofSeconds(30))
3535
.withDescription(
3636
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
37-
38-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
39-
key("tiering.table.duration.max")
40-
.durationType()
41-
.defaultValue(Duration.ofMinutes(30))
42-
.withDescription(
43-
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
44-
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
45-
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
46-
47-
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_DETECT_INTERVAL =
48-
key("tiering.table.duration.detect-interval")
49-
.durationType()
50-
.defaultValue(Duration.ofSeconds(30))
51-
.withDescription(
52-
"The interval to check if a table tiering operation has reached the maximum duration. "
53-
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
5437
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
122122
(TieringReachMaxDurationEvent) sourceEvent;
123123
long tableId = reachMaxDurationEvent.getTableId();
124124
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
125-
.markTableReachTieringDeadline(tableId);
125+
.markTableReachTieringMaxDuration(tableId);
126126
}
127127
}
128128

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757

5858
import static org.apache.fluss.utils.Preconditions.checkArgument;
5959
import static org.apache.fluss.utils.Preconditions.checkNotNull;
60+
import static org.apache.fluss.utils.Preconditions.checkState;
6061

6162
/** The {@link SplitReader} implementation which will read Fluss and write to lake. */
6263
public class TieringSplitReader<WriteResult>
@@ -81,7 +82,7 @@ public class TieringSplitReader<WriteResult>
8182
// the table_id to the pending splits
8283
private final Map<Long, Set<TieringSplit>> pendingTieringSplits;
8384

84-
private final Set<Long> reachTieringDeadlineTables;
85+
private final Set<Long> reachTieringMaxDurationTables;
8586

8687
private final Map<TableBucket, LakeWriter<WriteResult>> lakeWriters;
8788
private final Connection connection;
@@ -124,7 +125,7 @@ protected TieringSplitReader(
124125
this.currentTableSplitsByBucket = new HashMap<>();
125126
this.lakeWriters = new HashMap<>();
126127
this.currentPendingSnapshotSplits = new ArrayDeque<>();
127-
this.reachTieringDeadlineTables = new HashSet<>();
128+
this.reachTieringMaxDurationTables = new HashSet<>();
128129
this.pollTimeout = pollTimeout;
129130
}
130131

@@ -157,11 +158,11 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
157158
}
158159
} else {
159160
if (currentLogScanner != null) {
160-
if (reachTieringDeadlineTables.contains(currentTableId)) {
161+
// force to complete records
162+
if (reachTieringMaxDurationTables.contains(currentTableId)) {
161163
return forceCompleteTieringLogRecords();
162164
}
163165
ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
164-
// force to complete records
165166
return forLogRecords(scanRecords);
166167
} else {
167168
return emptyTableBucketWriteResultWithSplitIds();
@@ -182,6 +183,9 @@ public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
182183
if (split.isForceIgnore()) {
183184
// if the split is forced to ignore,
184185
// mark it as empty
186+
LOG.info(
187+
"ignore split {} since the split is set to force to ignore",
188+
split.splitId());
185189
currentEmptySplits.add(split);
186190
continue;
187191
}
@@ -300,7 +304,7 @@ private void mayCreateLogScanner() {
300304
long logEndOffset =
301305
logOffsetAndTimestamp == null
302306
? UNKNOWN_BUCKET_OFFSET
303-
// logEngOffset is equal to offset tiered + 1
307+
// logEndOffset is equal to offset tiered + 1
304308
: logOffsetAndTimestamp.logOffset + 1;
305309
long timestamp =
306310
logOffsetAndTimestamp == null
@@ -309,15 +313,26 @@ private void mayCreateLogScanner() {
309313
TableBucketWriteResult<WriteResult> bucketWriteResult =
310314
completeLakeWriter(
311315
bucket, split.getPartitionName(), logEndOffset, timestamp);
316+
317+
if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
318+
// when the log end offset is unknown, the write result must be
319+
// null, otherwise, we should throw exception directly to avoid data
320+
// inconsistent
321+
checkState(
322+
bucketWriteResult.writeResult() == null,
323+
"bucketWriteResult must be null when log end offset is unknown when tiering "
324+
+ split);
325+
}
326+
312327
writeResults.put(bucket, bucketWriteResult);
313328
finishedSplitIds.put(bucket, split.splitId());
314329
LOG.info(
315-
"Split {} is forced to be finished due to tiering timeout.",
330+
"Split {} is forced to be finished due to tiering reach max duration.",
316331
split.splitId());
317332
currentTieringSplitsIterator.remove();
318333
}
319334
}
320-
reachTieringDeadlineTables.remove(this.currentTableId);
335+
reachTieringMaxDurationTables.remove(this.currentTableId);
321336
mayFinishCurrentTable();
322337
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
323338
}
@@ -530,13 +545,13 @@ private void finishCurrentTable() throws IOException {
530545
}
531546

532547
/**
533-
* Handle a table reach tiered deadline. This will mark the current table as timed out, and it
534-
* will be force completed in the next fetch cycle.
548+
* Handle a table reach max tiering duration. This will mark the current table as reaching max
549+
* duration, and it will be force completed in the next fetch cycle.
535550
*/
536-
public void handleTableReachTieringDeadline(long tableId) {
537-
if ((currentTableId != null && currentTableId.equals(tableId)
538-
|| pendingTieringSplits.containsKey(tableId))) {
539-
reachTieringDeadlineTables.add(tableId);
551+
public void handleTableReachTieringMaxDuration(long tableId) {
552+
if ((currentTableId != null && currentTableId.equals(tableId))
553+
|| pendingTieringSplits.containsKey(tableId)) {
554+
reachTieringMaxDurationTables.add(tableId);
540555
}
541556
}
542557

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public TieringLogSplit(
7272
long stoppingOffset,
7373
boolean forceIgnore,
7474
int numberOfSplits) {
75-
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
75+
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
7676
this.startingOffset = startingOffset;
7777
this.stoppingOffset = stoppingOffset;
7878
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public TieringSnapshotSplit(
6363
long logOffsetOfSnapshot,
6464
int numberOfSplits,
6565
boolean forceIgnore) {
66-
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
66+
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
6767
this.snapshotId = snapshotId;
6868
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
6969
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,16 @@ public abstract class TieringSplit implements SourceSplit {
3838
protected final TableBucket tableBucket;
3939
@Nullable protected final String partitionName;
4040

41+
protected boolean forceIgnore;
4142
// the total number of splits in one round of tiering
4243
protected final int numberOfSplits;
4344

44-
protected boolean forceIgnore;
45-
4645
public TieringSplit(
4746
TablePath tablePath,
4847
TableBucket tableBucket,
4948
@Nullable String partitionName,
50-
int numberOfSplits,
51-
boolean forceIgnore) {
49+
boolean forceIgnore,
50+
int numberOfSplits) {
5251
this.tablePath = tablePath;
5352
this.tableBucket = tableBucket;
5453
this.partitionName = partitionName;
@@ -57,8 +56,8 @@ public TieringSplit(
5756
throw new IllegalArgumentException(
5857
"Partition name and partition id must be both null or both not null.");
5958
}
60-
this.numberOfSplits = numberOfSplits;
6159
this.forceIgnore = forceIgnore;
60+
this.numberOfSplits = numberOfSplits;
6261
}
6362

6463
/** Checks whether this split is a primary key table split to tier. */
@@ -140,11 +139,12 @@ public boolean equals(Object object) {
140139
return Objects.equals(tablePath, that.tablePath)
141140
&& Objects.equals(tableBucket, that.tableBucket)
142141
&& Objects.equals(partitionName, that.partitionName)
142+
&& forceIgnore == that.forceIgnore
143143
&& numberOfSplits == that.numberOfSplits;
144144
}
145145

146146
@Override
147147
public int hashCode() {
148-
return Objects.hash(tablePath, tableBucket, partitionName, numberOfSplits);
148+
return Objects.hash(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
149149
}
150150
}

0 commit comments

Comments
 (0)