Skip to content

Commit 15bc8f8

Browse files
committed
[lake] support lake tiering service force to commit due to timeout
1 parent b2b3691 commit 15bc8f8

File tree

12 files changed

+402
-91
lines changed

12 files changed

+402
-91
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,28 +50,31 @@ public TieringSourceFetcherManager(
5050
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
5151
}
5252

53-
public void markTableAsTieringTimeOut(long tableId) {
54-
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
55-
fetchers.get(0);
56-
if (splitFetcher != null) {
53+
public void markTableReachTieringDeadline(long tableId) {
54+
if (!fetchers.isEmpty()) {
5755
// The fetcher thread is still running. This should be the majority of the cases.
58-
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
56+
fetchers.values()
57+
.forEach(
58+
splitFetcher ->
59+
enqueueMarkTableReachTieringDeadlineTask(
60+
splitFetcher, tableId));
5961
} else {
60-
splitFetcher = createSplitFetcher();
61-
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
62+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
63+
createSplitFetcher();
64+
enqueueMarkTableReachTieringDeadlineTask(splitFetcher, tableId);
6265
startFetcher(splitFetcher);
6366
}
6467
}
6568

66-
private void enqueueMarkTableTieringTimeOutTask(
69+
private void enqueueMarkTableReachTieringDeadlineTask(
6770
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
68-
long tieringTimeOutTable) {
71+
long reachTieringDeadlineTable) {
6972
splitFetcher.enqueueTask(
7073
new SplitFetcherTask() {
7174
@Override
7275
public boolean run() {
7376
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
74-
.handleTableTimeout(tieringTimeOutTable);
77+
.handleTableReachTieringDeadline(reachTieringDeadlineTable);
7578
return true;
7679
}
7780

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.client.Connection;
2223
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
2324
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
@@ -31,10 +32,13 @@
3132
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
3233
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3334

35+
import java.time.Duration;
3436
import java.util.Collections;
3537
import java.util.List;
3638
import java.util.Map;
3739

40+
import static org.apache.fluss.flink.tiering.source.TieringSplitReader.DEFAULT_POLL_TIMEOUT;
41+
3842
/** A {@link SourceReader} that read records from Fluss and write to lake. */
3943
@Internal
4044
public final class TieringSourceReader<WriteResult>
@@ -52,11 +56,22 @@ public TieringSourceReader(
5256
SourceReaderContext context,
5357
Connection connection,
5458
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
59+
this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
60+
}
61+
62+
@VisibleForTesting
63+
TieringSourceReader(
64+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
65+
elementsQueue,
66+
SourceReaderContext context,
67+
Connection connection,
68+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
69+
Duration pollTimeout) {
5570
super(
5671
elementsQueue,
5772
new TieringSourceFetcherManager<>(
5873
elementsQueue,
59-
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
74+
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
6075
context.getConfiguration(),
6176
(ignore) -> {}),
6277
new TableBucketWriteResultEmitter<>(),
@@ -107,7 +122,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
107122
(TieringReachMaxDurationEvent) sourceEvent;
108123
long tableId = reachMaxDurationEvent.getTableId();
109124
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
110-
.markTableAsTieringTimeOut(tableId);
125+
.markTableReachTieringDeadline(tableId);
111126
}
112127
}
113128

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,25 @@ public class TieringSplitReader<WriteResult>
6464

6565
private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class);
6666

67-
private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L);
67+
public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10_000L);
6868

6969
// unknown bucket timestamp for empty split or snapshot split
7070
private static final long UNKNOWN_BUCKET_TIMESTAMP = -1;
7171

72-
private static final long UNKNOW_BUCKET_OFFSET = -1;
72+
// unknown bucket offset for empty split or snapshot split
73+
private static final long UNKNOWN_BUCKET_OFFSET = -1;
7374

7475
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
7576

77+
private final Duration pollTimeout;
78+
7679
// the id for the pending tables to be tiered
7780
private final Queue<Long> pendingTieringTables;
7881
// the table_id to the pending splits
7982
private final Map<Long, Set<TieringSplit>> pendingTieringSplits;
8083

84+
private final Set<Long> reachTieringDeadlineTables;
85+
8186
private final Map<TableBucket, LakeWriter<WriteResult>> lakeWriters;
8287
private final Connection connection;
8388

@@ -99,11 +104,15 @@ public class TieringSplitReader<WriteResult>
99104

100105
private final Set<TieringSplit> currentEmptySplits;
101106

102-
// Flag to indicate if the current table has timed out and should be force completed
103-
private boolean currentTableTieringTimedOut;
104-
105107
public TieringSplitReader(
106108
Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
109+
this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
110+
}
111+
112+
protected TieringSplitReader(
113+
Connection connection,
114+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
115+
Duration pollTimeout) {
107116
this.lakeTieringFactory = lakeTieringFactory;
108117
// owned by TieringSourceReader
109118
this.connection = connection;
@@ -115,7 +124,8 @@ public TieringSplitReader(
115124
this.currentTableSplitsByBucket = new HashMap<>();
116125
this.lakeWriters = new HashMap<>();
117126
this.currentPendingSnapshotSplits = new ArrayDeque<>();
118-
this.currentTableTieringTimedOut = false;
127+
this.reachTieringDeadlineTables = new HashSet<>();
128+
this.pollTimeout = pollTimeout;
119129
}
120130

121131
@Override
@@ -147,10 +157,10 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
147157
}
148158
} else {
149159
if (currentLogScanner != null) {
150-
if (currentTableTieringTimedOut) {
160+
if (reachTieringDeadlineTables.contains(currentTableId)) {
151161
return forceCompleteTieringLogRecords();
152162
}
153-
ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT);
163+
ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
154164
// force to complete records
155165
return forLogRecords(scanRecords);
156166
} else {
@@ -170,6 +180,8 @@ public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
170180
for (TieringSplit split : splitsChange.splits()) {
171181
LOG.info("add split {}", split.splitId());
172182
if (split.isForceIgnore()) {
183+
// if the split is forced to ignore,
184+
// mark it as empty
173185
currentEmptySplits.add(split);
174186
continue;
175187
}
@@ -273,25 +285,40 @@ private void mayCreateLogScanner() {
273285
forceCompleteTieringLogRecords() throws IOException {
274286
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
275287
Map<TableBucket, String> finishedSplitIds = new HashMap<>();
276-
for (Map.Entry<TableBucket, LakeWriter<WriteResult>> entry : lakeWriters.entrySet()) {
288+
289+
// force finish all splits
290+
Iterator<Map.Entry<TableBucket, TieringSplit>> currentTieringSplitsIterator =
291+
currentTableSplitsByBucket.entrySet().iterator();
292+
while (currentTieringSplitsIterator.hasNext()) {
293+
Map.Entry<TableBucket, TieringSplit> entry = currentTieringSplitsIterator.next();
277294
TableBucket bucket = entry.getKey();
278-
TieringSplit split = currentTableSplitsByBucket.get(bucket);
295+
TieringSplit split = entry.getValue();
279296
if (split != null && split.isTieringLogSplit()) {
297+
// get the current offset, timestamp that tiered so far
280298
LogOffsetAndTimestamp logOffsetAndTimestamp =
281299
currentTableTieredOffsetAndTimestamp.get(bucket);
300+
long logEndOffset =
301+
logOffsetAndTimestamp == null
302+
? UNKNOWN_BUCKET_OFFSET
303+
// logEngOffset is equal to offset tiered + 1
304+
: logOffsetAndTimestamp.logOffset + 1;
305+
long timestamp =
306+
logOffsetAndTimestamp == null
307+
? UNKNOWN_BUCKET_TIMESTAMP
308+
: logOffsetAndTimestamp.timestamp;
282309
TableBucketWriteResult<WriteResult> bucketWriteResult =
283310
completeLakeWriter(
284-
bucket,
285-
split.getPartitionName(),
286-
logOffsetAndTimestamp.logOffset,
287-
logOffsetAndTimestamp.timestamp);
311+
bucket, split.getPartitionName(), logEndOffset, timestamp);
288312
writeResults.put(bucket, bucketWriteResult);
289313
finishedSplitIds.put(bucket, split.splitId());
290314
LOG.info(
291315
"Split {} is forced to be finished due to tiering timeout.",
292316
split.splitId());
317+
currentTieringSplitsIterator.remove();
293318
}
294319
}
320+
reachTieringDeadlineTables.remove(this.currentTableId);
321+
mayFinishCurrentTable();
295322
return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds);
296323
}
297324

@@ -343,7 +370,11 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
343370
lastRecord.timestamp()));
344371
// put split of the bucket
345372
finishedSplitIds.put(bucket, currentSplitId);
346-
LOG.info("Split {} has been finished.", currentSplitId);
373+
LOG.info(
374+
"Finish tier bucket {} for table {}, split: {}.",
375+
bucket,
376+
currentTablePath,
377+
currentSplitId);
347378
}
348379
}
349380

@@ -377,8 +408,11 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter(
377408
long maxTimestamp)
378409
throws IOException {
379410
LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
380-
WriteResult writeResult = lakeWriter.complete();
381-
lakeWriter.close();
411+
WriteResult writeResult = null;
412+
if (lakeWriter != null) {
413+
writeResult = lakeWriter.complete();
414+
lakeWriter.close();
415+
}
382416
return toTableBucketWriteResult(
383417
currentTablePath,
384418
bucket,
@@ -402,7 +436,7 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringSplit> empt
402436
tableBucket,
403437
tieringSplit.getPartitionName(),
404438
null,
405-
UNKNOW_BUCKET_OFFSET,
439+
UNKNOWN_BUCKET_OFFSET,
406440
UNKNOWN_BUCKET_TIMESTAMP,
407441
tieringSplit.getNumberOfSplits()));
408442
}
@@ -412,7 +446,6 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringSplit> empt
412446
private void mayFinishCurrentTable() throws IOException {
413447
// no any pending splits for the table, just finish the table
414448
if (currentTableSplitsByBucket.isEmpty()) {
415-
LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId);
416449
finishCurrentTable();
417450
}
418451
}
@@ -427,6 +460,11 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I
427460
currentSnapshotSplit.getPartitionName(),
428461
logEndOffset,
429462
UNKNOWN_BUCKET_TIMESTAMP);
463+
LOG.info(
464+
"Finish tier bucket {} for table {}, split: {}.",
465+
tableBucket,
466+
currentTablePath,
467+
splitId);
430468
closeCurrentSnapshotSplit();
431469
mayFinishCurrentTable();
432470
return new TableBucketWriteResultWithSplitIds(
@@ -492,15 +530,13 @@ private void finishCurrentTable() throws IOException {
492530
}
493531

494532
/**
495-
* Handle timeout event for a table. This will mark the current table as timed out, and it will
496-
* be force completed in the next fetch cycle.
533+
* Handle a table reach tiered deadline. This will mark the current table as timed out, and it
534+
* will be force completed in the next fetch cycle.
497535
*/
498-
public void handleTableTimeout(long tableId) {
499-
if (currentTableId != null && currentTableId.equals(tableId)) {
500-
LOG.debug(
501-
"Table {} tiering timeout event received, will try best to force complete after current processing.",
502-
tableId);
503-
currentTableTieringTimedOut = true;
536+
public void handleTableReachTieringDeadline(long tableId) {
537+
if ((currentTableId != null && currentTableId.equals(tableId)
538+
|| pendingTieringSplits.containsKey(tableId))) {
539+
reachTieringDeadlineTables.add(tableId);
504540
}
505541
}
506542

@@ -624,6 +660,7 @@ public Set<String> finishedSplits() {
624660
}
625661

626662
private static final class LogOffsetAndTimestamp {
663+
627664
private final long logOffset;
628665
private final long timestamp;
629666

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering.source.enumerator;
1919

20-
import org.apache.flink.api.connector.source.SourceEvent;
21-
import org.apache.flink.api.connector.source.SplitEnumerator;
22-
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
23-
import org.apache.flink.api.java.tuple.Tuple3;
24-
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
25-
import org.apache.flink.util.FlinkRuntimeException;
2620
import org.apache.fluss.annotation.VisibleForTesting;
2721
import org.apache.fluss.client.Connection;
2822
import org.apache.fluss.client.ConnectionFactory;
@@ -49,10 +43,18 @@
4943
import org.apache.fluss.utils.MapUtils;
5044
import org.apache.fluss.utils.clock.Clock;
5145
import org.apache.fluss.utils.clock.SystemClock;
46+
47+
import org.apache.flink.api.connector.source.SourceEvent;
48+
import org.apache.flink.api.connector.source.SplitEnumerator;
49+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
50+
import org.apache.flink.api.java.tuple.Tuple3;
51+
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
52+
import org.apache.flink.util.FlinkRuntimeException;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455

5556
import javax.annotation.Nullable;
57+
5658
import java.io.IOException;
5759
import java.util.ArrayList;
5860
import java.util.Collections;
@@ -347,11 +349,6 @@ private void assignSplits() {
347349
if (!pendingSplits.isEmpty()) {
348350
TieringSplit tieringSplit = pendingSplits.remove(0);
349351
context.assignSplit(tieringSplit, nextAwaitingReader);
350-
long tableId = tieringSplit.getTableBucket().getTableId();
351-
if (!tieringTablesDeadline.containsKey(tableId)) {
352-
tieringTablesDeadline.put(
353-
tableId, clock.milliseconds() + tieringTableDurationMaxMs);
354-
}
355352
readersAwaitingSplit.remove(nextAwaitingReader);
356353
}
357354
}
@@ -432,6 +429,8 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
432429
} else {
433430
tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
434431
pendingSplits.addAll(tieringSplits);
432+
tieringTablesDeadline.put(
433+
tieringTable.f0, clock.milliseconds() + tieringTableDurationMaxMs);
435434
}
436435
} catch (Exception e) {
437436
LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,25 @@ public TieringLogSplit(
5454
long startingOffset,
5555
long stoppingOffset,
5656
int numberOfSplits) {
57-
super(tablePath, tableBucket, partitionName, numberOfSplits);
57+
this(
58+
tablePath,
59+
tableBucket,
60+
partitionName,
61+
startingOffset,
62+
stoppingOffset,
63+
false,
64+
numberOfSplits);
65+
}
66+
67+
public TieringLogSplit(
68+
TablePath tablePath,
69+
TableBucket tableBucket,
70+
@Nullable String partitionName,
71+
long startingOffset,
72+
long stoppingOffset,
73+
boolean forceIgnore,
74+
int numberOfSplits) {
75+
super(tablePath, tableBucket, partitionName, numberOfSplits, forceIgnore);
5876
this.startingOffset = startingOffset;
5977
this.stoppingOffset = stoppingOffset;
6078
}
@@ -101,6 +119,7 @@ public TieringLogSplit copy(int numberOfSplits) {
101119
partitionName,
102120
startingOffset,
103121
stoppingOffset,
122+
forceIgnore,
104123
numberOfSplits);
105124
}
106125

0 commit comments

Comments
 (0)