Skip to content

Commit e542d6d

Browse files
committed
tiering support commit by time
1 parent e91106c commit e542d6d

File tree

4 files changed

+180
-12
lines changed

4 files changed

+180
-12
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering.event;
19+
20+
import org.apache.flink.api.connector.source.SourceEvent;
21+
22+
/** SourceEvent used to notify TieringSourceReader that a table has timed out and should be completed. */
23+
public class TieringTimeoutEvent implements SourceEvent {
24+
25+
private static final long serialVersionUID = 1L;
26+
27+
private final long tableId;
28+
29+
public TieringTimeoutEvent(long tableId) {
30+
this.tableId = tableId;
31+
}
32+
33+
public long getTableId() {
34+
return tableId;
35+
}
36+
}
37+

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.client.Connection;
22+
import org.apache.fluss.flink.tiering.event.TieringTimeoutEvent;
2223
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2324
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2425
import org.apache.fluss.lake.writer.LakeTieringFactory;
2526

27+
import org.apache.flink.api.connector.source.SourceEvent;
2628
import org.apache.flink.api.connector.source.SourceReader;
2729
import org.apache.flink.api.connector.source.SourceReaderContext;
2830
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
@@ -41,17 +43,27 @@ public final class TieringSourceReader<WriteResult>
4143
TieringSplitState> {
4244

4345
private final Connection connection;
46+
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
47+
// Thread-local storage for split reader to handle timeout events
48+
private static final ThreadLocal<TieringSplitReader<?>> CURRENT_SPLIT_READER = new ThreadLocal<>();
4449

4550
public TieringSourceReader(
4651
SourceReaderContext context,
4752
Connection connection,
4853
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
4954
super(
50-
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
55+
() -> {
56+
TieringSplitReader<WriteResult> reader =
57+
new TieringSplitReader<>(connection, lakeTieringFactory);
58+
// Store reference in thread-local for timeout handling
59+
CURRENT_SPLIT_READER.set(reader);
60+
return reader;
61+
},
5162
new TableBucketWriteResultEmitter<>(),
5263
context.getConfiguration(),
5364
context);
5465
this.connection = connection;
66+
this.lakeTieringFactory = lakeTieringFactory;
5567
}
5668

5769
@Override
@@ -89,8 +101,20 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState)
89101
return splitState.toSourceSplit();
90102
}
91103

104+
@Override
105+
public void handleSourceEvents(SourceEvent sourceEvent) {
106+
if (sourceEvent instanceof TieringTimeoutEvent) {
107+
TieringTimeoutEvent timeoutEvent = (TieringTimeoutEvent) sourceEvent;
108+
TieringSplitReader<?> splitReader = CURRENT_SPLIT_READER.get();
109+
if (splitReader != null) {
110+
splitReader.handleTableTimeout(timeoutEvent.getTableId());
111+
}
112+
}
113+
}
114+
92115
@Override
93116
public void close() throws Exception {
117+
CURRENT_SPLIT_READER.remove();
94118
super.close();
95119
connection.close();
96120
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public class TieringSplitReader<WriteResult>
9393
private final Map<TableBucket, TieringSplit> currentTableSplitsByBucket;
9494
private final Map<TableBucket, Long> currentTableStoppingOffsets;
9595
private final Set<TieringLogSplit> currentTableEmptyLogSplits;
96+
// Flag to indicate if the current table has timed out and should be force completed
97+
private boolean currentTableTimedOut;
9698

9799
public TieringSplitReader(
98100
Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
@@ -106,10 +108,24 @@ public TieringSplitReader(
106108
this.currentTableSplitsByBucket = new HashMap<>();
107109
this.lakeWriters = new HashMap<>();
108110
this.currentPendingSnapshotSplits = new ArrayDeque<>();
111+
this.currentTableTimedOut = false;
109112
}
110113

111114
@Override
112115
public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws IOException {
116+
// Check if current table has timed out and should be force completed
117+
if (currentTableTimedOut && currentTableId != null) {
118+
LOG.warn(
119+
"Table {} has timed out, force completing. Remaining splits: {}",
120+
currentTableId,
121+
currentTableSplitsByBucket.size());
122+
mayFinishCurrentTable();
123+
// Return empty result after force completion
124+
if (currentTableId == null) {
125+
return emptyTableBucketWriteResultWithSplitIds();
126+
}
127+
}
128+
113129
// check empty splits
114130
if (!currentTableEmptyLogSplits.isEmpty()) {
115131
LOG.info("Empty split(s) {} finished.", currentTableEmptyLogSplits);
@@ -360,13 +376,54 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set<TieringLogSplit> e
360376
}
361377

362378
private void mayFinishCurrentTable() throws IOException {
363-
// no any pending splits for the table, just finish the table
364-
if (currentTableSplitsByBucket.isEmpty()) {
365-
LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId);
379+
// Finish the table if:
380+
// 1. No pending splits, OR
381+
// 2. Table has timed out (force completion)
382+
if (currentTableSplitsByBucket.isEmpty() || currentTableTimedOut) {
383+
if (currentTableTimedOut) {
384+
LOG.warn(
385+
"Force finishing table {} of table id {} due to timeout. "
386+
+ "Remaining splits: {}",
387+
currentTablePath,
388+
currentTableId,
389+
currentTableSplitsByBucket.size());
390+
// Complete all remaining writers for buckets that have been processed
391+
forceCompleteRemainingWriters();
392+
} else {
393+
LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId);
394+
}
366395
finishCurrentTable();
367396
}
368397
}
369398

399+
/**
400+
* Force complete remaining lake writers for buckets that have been processed but splits are not
401+
* finished yet. This is called when a table times out.
402+
*/
403+
private void forceCompleteRemainingWriters() throws IOException {
404+
// Complete writers for all buckets that have been started but not yet completed
405+
// Create a copy of the entry set to avoid concurrent modification
406+
Set<Map.Entry<TableBucket, LakeWriter<WriteResult>>> writersToComplete =
407+
new HashSet<>(lakeWriters.entrySet());
408+
for (Map.Entry<TableBucket, LakeWriter<WriteResult>> entry : writersToComplete) {
409+
TableBucket bucket = entry.getKey();
410+
TieringSplit split = currentTableSplitsByBucket.get(bucket);
411+
if (split != null) {
412+
// This bucket has a pending split, complete the writer with current stopping offset
413+
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
414+
if (stoppingOffset == null) {
415+
// If no stopping offset, use a safe value (current log end or 0)
416+
stoppingOffset = 0L;
417+
}
418+
completeLakeWriter(
419+
bucket,
420+
split.getPartitionName(),
421+
stoppingOffset,
422+
UNKNOWN_BUCKET_TIMESTAMP);
423+
}
424+
}
425+
}
426+
370427
private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws IOException {
371428
TableBucket tableBucket = currentSnapshotSplit.getTableBucket();
372429
long logEndOffset = currentSnapshotSplit.getLogOffsetOfSnapshot();
@@ -439,6 +496,18 @@ private void finishCurrentTable() throws IOException {
439496
currentTableStoppingOffsets.clear();
440497
currentTableEmptyLogSplits.clear();
441498
currentTableSplitsByBucket.clear();
499+
currentTableTimedOut = false;
500+
}
501+
502+
/**
503+
* Handle timeout event for a table. This will mark the current table as timed out, and it will
504+
* be force completed in the next fetch cycle.
505+
*/
506+
public void handleTableTimeout(long tableId) {
507+
if (currentTableId != null && currentTableId.equals(tableId)) {
508+
LOG.warn("Table {} timeout event received, will force complete after current processing.", tableId);
509+
currentTableTimedOut = true;
510+
}
442511
}
443512

444513
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
package org.apache.fluss.flink.tiering.source.enumerator;
1919

20+
import org.apache.flink.api.connector.source.SourceEvent;
21+
import org.apache.flink.api.connector.source.SplitEnumerator;
22+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
23+
import org.apache.flink.api.java.tuple.Tuple3;
24+
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
25+
import org.apache.flink.util.FlinkRuntimeException;
2026
import org.apache.fluss.annotation.VisibleForTesting;
2127
import org.apache.fluss.client.Connection;
2228
import org.apache.fluss.client.ConnectionFactory;
@@ -27,6 +33,7 @@
2733
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2834
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
2935
import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
36+
import org.apache.fluss.flink.tiering.event.TieringTimeoutEvent;
3037
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
3138
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
3239
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -40,21 +47,16 @@
4047
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
4148
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
4249
import org.apache.fluss.utils.MapUtils;
43-
44-
import org.apache.flink.api.connector.source.SourceEvent;
45-
import org.apache.flink.api.connector.source.SplitEnumerator;
46-
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
47-
import org.apache.flink.api.java.tuple.Tuple3;
48-
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
49-
import org.apache.flink.util.FlinkRuntimeException;
5050
import org.slf4j.Logger;
5151
import org.slf4j.LoggerFactory;
5252

5353
import javax.annotation.Nullable;
54-
5554
import java.io.IOException;
55+
import java.time.Duration;
5656
import java.util.ArrayList;
57+
import java.util.Collections;
5758
import java.util.HashMap;
59+
import java.util.HashSet;
5860
import java.util.List;
5961
import java.util.Map;
6062
import java.util.Set;
@@ -85,6 +87,8 @@
8587
public class TieringSourceEnumerator
8688
implements SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> {
8789

90+
private static final long TIERING_TIMEOUT_MS = Duration.ofMinutes(10).toMillis();
91+
8892
private static final Logger LOG = LoggerFactory.getLogger(TieringSourceEnumerator.class);
8993

9094
private final Configuration flussConf;
@@ -93,6 +97,8 @@ public class TieringSourceEnumerator
9397
private final long pollTieringTableIntervalMs;
9498
private final List<TieringSplit> pendingSplits;
9599
private final Set<Integer> readersAwaitingSplit;
100+
101+
private final Map<Long, Long> tieringTableDeadline;
96102
private final Map<Long, Long> tieringTableEpochs;
97103
private final Map<Long, Long> failedTableEpochs;
98104
private final Map<Long, Long> finishedTableEpochs;
@@ -120,6 +126,7 @@ public TieringSourceEnumerator(
120126
this.tieringTableEpochs = MapUtils.newConcurrentHashMap();
121127
this.finishedTableEpochs = MapUtils.newConcurrentHashMap();
122128
this.failedTableEpochs = MapUtils.newConcurrentHashMap();
129+
this.tieringTableDeadline = MapUtils.newConcurrentHashMap();
123130
}
124131

125132
@Override
@@ -167,6 +174,9 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
167174
readersAwaitingSplit.add(subtaskId);
168175
this.context.callAsync(
169176
this::requestTieringTableSplitsViaHeartBeat, this::generateAndAssignSplits);
177+
178+
this.context.callAsync(
179+
this::checkTieringTimeoutTables, this::handleTieringTimeoutTables, 10_000L, 10_000);
170180
}
171181

172182
@Override
@@ -236,6 +246,26 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
236246
}
237247
}
238248

249+
private Set<Long> checkTieringTimeoutTables() {
250+
Set<Long> tieringTimeoutTables = new HashSet<>();
251+
252+
}
253+
254+
private void handleTieringTimeoutTables(Set<Long> tieringTimeOutTables, Throwable throwable) {
255+
if (throwable != null) {
256+
LOG.error("Fail to check tiering timeout tables.", throwable);
257+
return;
258+
}
259+
260+
for (Long tieringTimeOutTable : tieringTimeOutTables) {
261+
Set<Integer> readers = new HashSet<>(context.registeredReaders().keySet());
262+
for (int reader : readers) {
263+
context.sendEventToSourceReader(
264+
reader, new TieringTimeoutEvent(tieringTimeOutTable));
265+
}
266+
}
267+
}
268+
239269
private void generateAndAssignSplits(
240270
@Nullable Tuple3<Long, Long, TablePath> tieringTable, Throwable throwable) {
241271
if (throwable != null) {
@@ -260,6 +290,12 @@ private void assignSplits() {
260290
if (!pendingSplits.isEmpty()) {
261291
TieringSplit tieringSplit = pendingSplits.remove(0);
262292
context.assignSplit(tieringSplit, nextAwaitingReader);
293+
294+
long tableId = tieringSplit.getTableBucket().getTableId();
295+
if (!tieringTableDeadline.containsKey(tableId)) {
296+
tieringTableDeadline.put(
297+
tableId, System.currentTimeMillis() + TIERING_TIMEOUT_MS);
298+
}
263299
readersAwaitingSplit.remove(nextAwaitingReader);
264300
}
265301
}
@@ -324,6 +360,8 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
324360
List<TieringSplit> tieringSplits =
325361
populateNumberOfTieringSplits(
326362
splitGenerator.generateTableSplits(tieringTable.f2));
363+
// shuffle tiering split to avoid splits tiering skew
364+
Collections.shuffle(tieringSplits);
327365
LOG.info(
328366
"Generate Tiering {} splits for table {} with cost {}ms.",
329367
tieringSplits.size(),

0 commit comments

Comments
 (0)