Skip to content

Commit 46c293e

Browse files
committed
[lake] support lake tiering service force to commit due to timeout
1 parent 06fcd47 commit 46c293e

File tree

11 files changed

+379
-54
lines changed

11 files changed

+379
-54
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ public TieringSourceFetcherManager(
5151
}
5252

5353
public void markTableAsTieringTimeOut(long tableId) {
54-
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
55-
fetchers.get(0);
56-
if (splitFetcher != null) {
54+
if (!fetchers.isEmpty()) {
5755
// The fetcher thread is still running. This should be the majority of the cases.
58-
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
56+
fetchers.values()
57+
.forEach(
58+
splitFetcher ->
59+
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId));
5960
} else {
60-
splitFetcher = createSplitFetcher();
61+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
62+
createSplitFetcher();
6163
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
6264
startFetcher(splitFetcher);
6365
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.client.Connection;
2223
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
2324
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
@@ -31,10 +32,13 @@
3132
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
3233
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3334

35+
import java.time.Duration;
3436
import java.util.Collections;
3537
import java.util.List;
3638
import java.util.Map;
3739

40+
import static org.apache.fluss.flink.tiering.source.TieringSplitReader.DEFAULT_POLL_TIMEOUT;
41+
3842
/** A {@link SourceReader} that read records from Fluss and write to lake. */
3943
@Internal
4044
public final class TieringSourceReader<WriteResult>
@@ -52,11 +56,22 @@ public TieringSourceReader(
5256
SourceReaderContext context,
5357
Connection connection,
5458
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
59+
this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
60+
}
61+
62+
@VisibleForTesting
63+
TieringSourceReader(
64+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
65+
elementsQueue,
66+
SourceReaderContext context,
67+
Connection connection,
68+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
69+
Duration pollTimeout) {
5570
super(
5671
elementsQueue,
5772
new TieringSourceFetcherManager<>(
5873
elementsQueue,
59-
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
74+
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
6075
context.getConfiguration(),
6176
(ignore) -> {}),
6277
new TableBucketWriteResultEmitter<>(),

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class TieringSplitReader<WriteResult>
6464

6565
private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class);
6666

67-
private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
67+
public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10_000L);
6868

6969
// unknown bucket timestamp for empty split or snapshot split
7070
private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
@@ -73,11 +73,15 @@ public class TieringSplitReader<WriteResult>
7373

7474
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
7575

76+
private final Duration pollTimeout;
77+
7678
// the id for the pending tables to be tiered
7779
private final Queue<Long> pendingTieringTables;
7880
// the table_id to the pending splits
7981
private final Map<Long, Set<TieringSplit>> pendingTieringSplits;
8082

83+
private final Set<Long> timeoutTables;
84+
8185
private final Map<TableBucket, LakeWriter<WriteResult>> lakeWriters;
8286
private final Connection connection;
8387

@@ -99,11 +103,15 @@ public class TieringSplitReader<WriteResult>
99103

100104
private final Set<TieringSplit> currentEmptySplits;
101105

102-
// Flag to indicate if the current table has timed out and should be force completed
103-
private boolean currentTableTieringTimedOut;
104-
105106
public TieringSplitReader(
106107
Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
108+
this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
109+
}
110+
111+
protected TieringSplitReader(
112+
Connection connection,
113+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
114+
Duration pollTimeout) {
107115
this.lakeTieringFactory = lakeTieringFactory;
108116
// owned by TieringSourceReader
109117
this.connection = connection;
@@ -115,7 +123,8 @@ public TieringSplitReader(
115123
this.currentTableSplitsByBucket = new HashMap<>();
116124
this.lakeWriters = new HashMap<>();
117125
this.currentPendingSnapshotSplits = new ArrayDeque<>();
118-
this.currentTableTieringTimedOut = false;
126+
this.timeoutTables = new HashSet<>();
127+
this.pollTimeout = pollTimeout;
119128
}
120129

121130
@Override
@@ -147,10 +156,10 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
147156
}
148157
} else {
149158
if (currentLogScanner != null) {
150-
if (currentTableTieringTimedOut) {
159+
if (timeoutTables.contains(currentTableId)) {
151160
return forceCompleteTieringLogRecords();
152161
}
153-
ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT);
162+
ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
154163
// force to complete records
155164
return forLogRecords(scanRecords);
156165
} else {
@@ -273,25 +282,37 @@ private void mayCreateLogScanner() {
273282
forceCompleteTieringLogRecords() throws IOException {
274283
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
275284
Map<TableBucket, String> finishedSplitIds = new HashMap<>();
276-
for (Map.Entry<TableBucket, LakeWriter<WriteResult>> entry : lakeWriters.entrySet()) {
285+
286+
Iterator<Map.Entry<TableBucket, TieringSplit>> currentTieringSplitsIterator =
287+
currentTableSplitsByBucket.entrySet().iterator();
288+
while (currentTieringSplitsIterator.hasNext()) {
289+
Map.Entry<TableBucket, TieringSplit> entry = currentTieringSplitsIterator.next();
277290
TableBucket bucket = entry.getKey();
278-
TieringSplit split = currentTableSplitsByBucket.get(bucket);
291+
TieringSplit split = entry.getValue();
279292
if (split != null && split.isTieringLogSplit()) {
280293
LogOffsetAndTimestamp logOffsetAndTimestamp =
281294
currentTableTieredOffsetAndTimestamp.get(bucket);
295+
long logEndOffset =
296+
logOffsetAndTimestamp == null
297+
? UNKNOW_BUCKET_OFFSET
298+
: logOffsetAndTimestamp.logOffset + 1;
299+
long timestamp =
300+
logOffsetAndTimestamp == null
301+
? UNKNOWN_BUCKET_TIMESTAMP
302+
: logOffsetAndTimestamp.timestamp;
282303
TableBucketWriteResult<WriteResult> bucketWriteResult =
283304
completeLakeWriter(
284-
bucket,
285-
split.getPartitionName(),
286-
logOffsetAndTimestamp.logOffset,
287-
logOffsetAndTimestamp.timestamp);
305+
bucket, split.getPartitionName(), logEndOffset, timestamp);
288306
writeResults.put(bucket, bucketWriteResult);
289307
finishedSplitIds.put(bucket, split.splitId());
290308
LOG.info(
291309
"Split {} is forced to be finished due to tiering timeout.",
292310
split.splitId());
311+
currentTieringSplitsIterator.remove();
293312
}
294313
}
314+
timeoutTables.remove(this.currentTableId);
315+
mayFinishCurrentTable();
295316
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
296317
}
297318

@@ -343,7 +364,11 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
343364
lastRecord.timestamp()));
344365
// put split of the bucket
345366
finishedSplitIds.put(bucket, currentSplitId);
346-
LOG.info("Split {} has been finished.", currentSplitId);
367+
LOG.info(
368+
"Finish tier bucket {} for table {}, split: {}.",
369+
bucket,
370+
currentTablePath,
371+
currentSplitId);
347372
}
348373
}
349374

@@ -377,8 +402,11 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter(
377402
long maxTimestamp)
378403
throws IOException {
379404
LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
380-
WriteResult writeResult = lakeWriter.complete();
381-
lakeWriter.close();
405+
WriteResult writeResult = null;
406+
if (lakeWriter != null) {
407+
writeResult = lakeWriter.complete();
408+
lakeWriter.close();
409+
}
382410
return toTableBucketWriteResult(
383411
currentTablePath,
384412
bucket,
@@ -412,7 +440,6 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringSplit> empt
412440
private void mayFinishCurrentTable() throws IOException {
413441
// no any pending splits for the table, just finish the table
414442
if (currentTableSplitsByBucket.isEmpty()) {
415-
LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId);
416443
finishCurrentTable();
417444
}
418445
}
@@ -427,6 +454,11 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I
427454
currentSnapshotSplit.getPartitionName(),
428455
logEndOffset,
429456
UNKNOWN_BUCKET_TIMESTAMP);
457+
LOG.info(
458+
"Finish tier bucket {} for table {}, split: {}.",
459+
tableBucket,
460+
currentTablePath,
461+
splitId);
430462
closeCurrentSnapshotSplit();
431463
mayFinishCurrentTable();
432464
return new TableBucketWriteResultWithSplitIds(
@@ -496,11 +528,9 @@ private void finishCurrentTable() throws IOException {
496528
* be force completed in the next fetch cycle.
497529
*/
498530
public void handleTableTimeout(long tableId) {
499-
if (currentTableId != null && currentTableId.equals(tableId)) {
500-
LOG.debug(
501-
"Table {} tiering timeout event received, will try best to force complete after current processing.",
502-
tableId);
503-
currentTableTieringTimedOut = true;
531+
if ((currentTableId != null && currentTableId.equals(tableId)
532+
|| pendingTieringSplits.containsKey(tableId))) {
533+
timeoutTables.add(tableId);
504534
}
505535
}
506536

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering.source.enumerator;
1919

20-
import org.apache.flink.api.connector.source.SourceEvent;
21-
import org.apache.flink.api.connector.source.SplitEnumerator;
22-
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
23-
import org.apache.flink.api.java.tuple.Tuple3;
24-
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
25-
import org.apache.flink.util.FlinkRuntimeException;
2620
import org.apache.fluss.annotation.VisibleForTesting;
2721
import org.apache.fluss.client.Connection;
2822
import org.apache.fluss.client.ConnectionFactory;
@@ -49,10 +43,18 @@
4943
import org.apache.fluss.utils.MapUtils;
5044
import org.apache.fluss.utils.clock.Clock;
5145
import org.apache.fluss.utils.clock.SystemClock;
46+
47+
import org.apache.flink.api.connector.source.SourceEvent;
48+
import org.apache.flink.api.connector.source.SplitEnumerator;
49+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
50+
import org.apache.flink.api.java.tuple.Tuple3;
51+
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
52+
import org.apache.flink.util.FlinkRuntimeException;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455

5556
import javax.annotation.Nullable;
57+
5658
import java.io.IOException;
5759
import java.util.ArrayList;
5860
import java.util.Collections;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,25 @@ public TieringLogSplit(
5454
long startingOffset,
5555
long stoppingOffset,
5656
int numberOfSplits) {
57-
super(tablePath, tableBucket, partitionName, numberOfSplits);
57+
this(
58+
tablePath,
59+
tableBucket,
60+
partitionName,
61+
startingOffset,
62+
stoppingOffset,
63+
numberOfSplits,
64+
false);
65+
}
66+
67+
public TieringLogSplit(
68+
TablePath tablePath,
69+
TableBucket tableBucket,
70+
@Nullable String partitionName,
71+
long startingOffset,
72+
long stoppingOffset,
73+
int numberOfSplits,
74+
boolean forceIgnore) {
75+
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
5876
this.startingOffset = startingOffset;
5977
this.stoppingOffset = stoppingOffset;
6078
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,16 @@ public TieringSnapshotSplit(
4343
TableBucket tableBucket,
4444
@Nullable String partitionName,
4545
long snapshotId,
46-
long logOffsetOfSnapshot) {
47-
super(tablePath, tableBucket, partitionName, UNKNOWN_NUMBER_OF_SPLITS);
48-
this.snapshotId = snapshotId;
49-
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
46+
long logOffsetOfSnapshot,
47+
int numberOfSplits) {
48+
this(
49+
tablePath,
50+
tableBucket,
51+
partitionName,
52+
snapshotId,
53+
logOffsetOfSnapshot,
54+
numberOfSplits,
55+
false);
5056
}
5157

5258
public TieringSnapshotSplit(
@@ -55,8 +61,9 @@ public TieringSnapshotSplit(
5561
@Nullable String partitionName,
5662
long snapshotId,
5763
long logOffsetOfSnapshot,
58-
int numberOfSplits) {
59-
super(tablePath, tableBucket, partitionName, numberOfSplits);
64+
int numberOfSplits,
65+
boolean forceIgnore) {
66+
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
6067
this.snapshotId = snapshotId;
6168
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
6269
}
@@ -103,7 +110,8 @@ public TieringSnapshotSplit copy(int numberOfSplits) {
103110
partitionName,
104111
snapshotId,
105112
logOffsetOfSnapshot,
106-
numberOfSplits);
113+
numberOfSplits,
114+
forceIgnore);
107115
}
108116

109117
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ public abstract class TieringSplit implements SourceSplit {
4141
// the total number of splits in one round of tiering
4242
protected final int numberOfSplits;
4343

44-
protected boolean forceIgnore = false;
44+
protected boolean forceIgnore;
4545

4646
public TieringSplit(
4747
TablePath tablePath,
4848
TableBucket tableBucket,
4949
@Nullable String partitionName,
50-
int numberOfSplits) {
50+
int numberOfSplits,
51+
boolean forceIgnore) {
5152
this.tablePath = tablePath;
5253
this.tableBucket = tableBucket;
5354
this.partitionName = partitionName;
@@ -57,6 +58,7 @@ public TieringSplit(
5758
"Partition name and partition id must be both null or both not null.");
5859
}
5960
this.numberOfSplits = numberOfSplits;
61+
this.forceIgnore = forceIgnore;
6062
}
6163

6264
/** Checks whether this split is a primary key table split to tier. */

0 commit comments

Comments
 (0)