Skip to content

Commit 06fcd47

Browse files
committed
[lake] support lake tiering service force to commit due to timeout
1 parent e542d6d commit 06fcd47

File tree

13 files changed

+590
-242
lines changed

13 files changed

+590
-242
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636

3737
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3838
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
39+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
40+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
3941
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4042

4143
/** The builder to build Flink lake tiering job. */
@@ -84,6 +86,17 @@ public JobClient build() throws Exception {
8486
tieringSourceBuilder.withPollTieringTableIntervalMs(
8587
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
8688
}
89+
90+
if (flussConfig.get(TIERING_TABLE_DURATION_MAX) != null) {
91+
tieringSourceBuilder.withTieringTableDurationMax(
92+
flussConfig.get(TIERING_TABLE_DURATION_MAX).toMillis());
93+
}
94+
95+
if (flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
96+
tieringSourceBuilder.withTieringTableDurationDetectInterval(
97+
flussConfig.get(TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
98+
}
99+
87100
TieringSource<?> tieringSource = tieringSourceBuilder.build();
88101
DataStreamSource<?> source =
89102
env.fromSource(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTimeoutEvent.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,40 @@
1919

2020
import org.apache.flink.api.connector.source.SourceEvent;
2121

22-
/** SourceEvent used to notify TieringSourceReader that a table has timed out and should be completed. */
23-
public class TieringTimeoutEvent implements SourceEvent {
22+
import java.util.Objects;
23+
24+
/**
25+
* SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering
26+
* duration and should be force completed.
27+
*/
28+
public class TieringReachMaxDurationEvent implements SourceEvent {
2429

2530
private static final long serialVersionUID = 1L;
2631

2732
private final long tableId;
2833

29-
public TieringTimeoutEvent(long tableId) {
34+
public TieringReachMaxDurationEvent(long tableId) {
3035
this.tableId = tableId;
3136
}
3237

3338
public long getTableId() {
3439
return tableId;
3540
}
36-
}
3741

42+
@Override
43+
public boolean equals(Object o) {
44+
if (this == o) {
45+
return true;
46+
}
47+
if (!(o instanceof TieringReachMaxDurationEvent)) {
48+
return false;
49+
}
50+
TieringReachMaxDurationEvent that = (TieringReachMaxDurationEvent) o;
51+
return tableId == that.tableId;
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hashCode(tableId);
57+
}
58+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@
3636
import org.apache.flink.api.connector.source.SourceReaderContext;
3737
import org.apache.flink.api.connector.source.SplitEnumerator;
3838
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
39+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
40+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3941
import org.apache.flink.core.io.SimpleVersionedSerializer;
4042
import org.apache.flink.runtime.jobgraph.OperatorID;
4143
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
4244

4345
import java.nio.charset.StandardCharsets;
4446

4547
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
48+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_DETECT_INTERVAL;
49+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TIERING_TABLE_DURATION_MAX;
4650

4751
/**
4852
* The flink source implementation for tiering data from Fluss to downstream lake.
@@ -61,14 +65,20 @@ public class TieringSource<WriteResult>
6165
private final Configuration flussConf;
6266
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
6367
private final long pollTieringTableIntervalMs;
68+
private final long tieringTableDurationMaxMs;
69+
private final long tieringTableDurationDetectIntervalMs;
6470

6571
public TieringSource(
6672
Configuration flussConf,
6773
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
68-
long pollTieringTableIntervalMs) {
74+
long pollTieringTableIntervalMs,
75+
long tieringTableDurationMaxMs,
76+
long tieringTableDurationDetectIntervalMs) {
6977
this.flussConf = flussConf;
7078
this.lakeTieringFactory = lakeTieringFactory;
7179
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
80+
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
81+
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
7282
}
7383

7484
@Override
@@ -78,19 +88,26 @@ public Boundedness getBoundedness() {
7888

7989
@Override
8090
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumerator(
81-
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) throws Exception {
91+
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) {
8292
return new TieringSourceEnumerator(
83-
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
93+
flussConf,
94+
splitEnumeratorContext,
95+
pollTieringTableIntervalMs,
96+
tieringTableDurationMaxMs,
97+
tieringTableDurationDetectIntervalMs);
8498
}
8599

86100
@Override
87101
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
88102
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
89-
TieringSourceEnumeratorState tieringSourceEnumeratorState)
90-
throws Exception {
103+
TieringSourceEnumeratorState tieringSourceEnumeratorState) {
91104
// stateless operator
92105
return new TieringSourceEnumerator(
93-
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
106+
flussConf,
107+
splitEnumeratorContext,
108+
pollTieringTableIntervalMs,
109+
tieringTableDurationMaxMs,
110+
tieringTableDurationDetectIntervalMs);
94111
}
95112

96113
@Override
@@ -107,8 +124,11 @@ public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
107124
@Override
108125
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
109126
SourceReaderContext sourceReaderContext) {
127+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
128+
elementsQueue = new FutureCompletingBlockingQueue<>();
110129
Connection connection = ConnectionFactory.createConnection(flussConf);
111-
return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory);
130+
return new TieringSourceReader<>(
131+
elementsQueue, sourceReaderContext, connection, lakeTieringFactory);
112132
}
113133

114134
/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */
@@ -126,6 +146,10 @@ public static class Builder<WriteResult> {
126146
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
127147
private long pollTieringTableIntervalMs =
128148
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
149+
private long tieringTableDurationMaxMs =
150+
TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
151+
private long tieringTableDurationDetectIntervalMs =
152+
TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
129153

130154
public Builder(
131155
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
@@ -138,8 +162,24 @@ public Builder<WriteResult> withPollTieringTableIntervalMs(long pollTieringTable
138162
return this;
139163
}
140164

165+
public Builder<WriteResult> withTieringTableDurationMax(long tieringTableDurationMaxMs) {
166+
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
167+
return this;
168+
}
169+
170+
public Builder<WriteResult> withTieringTableDurationDetectInterval(
171+
long tieringTableDurationDetectIntervalMs) {
172+
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
173+
return this;
174+
}
175+
141176
public TieringSource<WriteResult> build() {
142-
return new TieringSource<>(flussConf, lakeTieringFactory, pollTieringTableIntervalMs);
177+
return new TieringSource<>(
178+
flussConf,
179+
lakeTieringFactory,
180+
pollTieringTableIntervalMs,
181+
tieringTableDurationMaxMs,
182+
tieringTableDurationDetectIntervalMs);
143183
}
144184
}
145185
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering.source;
20+
21+
import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
22+
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
23+
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
26+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
27+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
28+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
29+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
30+
31+
import java.util.Collection;
32+
import java.util.function.Consumer;
33+
import java.util.function.Supplier;
34+
35+
/**
36+
* The SplitFetcherManager for Fluss source. This class is needed to help notify a table reaches to
37+
* deadline of tiering to {@link TieringSplitReader}.
38+
*/
39+
public class TieringSourceFetcherManager<WriteResult>
40+
extends SingleThreadFetcherManagerAdapter<
41+
TableBucketWriteResult<WriteResult>, TieringSplit> {
42+
43+
public TieringSourceFetcherManager(
44+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
45+
elementsQueue,
46+
Supplier<SplitReader<TableBucketWriteResult<WriteResult>, TieringSplit>>
47+
splitReaderSupplier,
48+
Configuration configuration,
49+
Consumer<Collection<String>> splitFinishedHook) {
50+
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
51+
}
52+
53+
public void markTableAsTieringTimeOut(long tableId) {
54+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
55+
fetchers.get(0);
56+
if (splitFetcher != null) {
57+
// The fetcher thread is still running. This should be the majority of the cases.
58+
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
59+
} else {
60+
splitFetcher = createSplitFetcher();
61+
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
62+
startFetcher(splitFetcher);
63+
}
64+
}
65+
66+
private void enqueueMarkTableTieringTimeOutTask(
67+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
68+
long tieringTimeOutTable) {
69+
splitFetcher.enqueueTask(
70+
new SplitFetcherTask() {
71+
@Override
72+
public boolean run() {
73+
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
74+
.handleTableTimeout(tieringTimeOutTable);
75+
return true;
76+
}
77+
78+
@Override
79+
public void wakeUp() {
80+
// do nothing
81+
}
82+
});
83+
}
84+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,21 @@ public class TieringSourceOptions {
3434
.defaultValue(Duration.ofSeconds(30))
3535
.withDescription(
3636
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
37+
38+
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_MAX =
39+
key("tiering.table.duration.max")
40+
.durationType()
41+
.defaultValue(Duration.ofMinutes(10))
42+
.withDescription(
43+
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
44+
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
45+
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
46+
47+
public static final ConfigOption<Duration> TIERING_TABLE_DURATION_DETECT_INTERVAL =
48+
key("tiering.table.duration.detect-interval")
49+
.durationType()
50+
.defaultValue(Duration.ofSeconds(30))
51+
.withDescription(
52+
"The interval to check if a table tiering operation has reached the maximum duration. "
53+
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
3754
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.client.Connection;
22-
import org.apache.fluss.flink.tiering.event.TieringTimeoutEvent;
22+
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
23+
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
2324
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2425
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2526
import org.apache.fluss.lake.writer.LakeTieringFactory;
2627

2728
import org.apache.flink.api.connector.source.SourceEvent;
2829
import org.apache.flink.api.connector.source.SourceReader;
2930
import org.apache.flink.api.connector.source.SourceReaderContext;
30-
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
31+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
32+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3133

3234
import java.util.Collections;
3335
import java.util.List;
@@ -36,34 +38,31 @@
3638
/** A {@link SourceReader} that read records from Fluss and write to lake. */
3739
@Internal
3840
public final class TieringSourceReader<WriteResult>
39-
extends SingleThreadMultiplexSourceReaderBase<
41+
extends SingleThreadMultiplexSourceReaderBaseAdapter<
4042
TableBucketWriteResult<WriteResult>,
4143
TableBucketWriteResult<WriteResult>,
4244
TieringSplit,
4345
TieringSplitState> {
4446

4547
private final Connection connection;
46-
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
47-
// Thread-local storage for split reader to handle timeout events
48-
private static final ThreadLocal<TieringSplitReader<?>> CURRENT_SPLIT_READER = new ThreadLocal<>();
4948

5049
public TieringSourceReader(
50+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
51+
elementsQueue,
5152
SourceReaderContext context,
5253
Connection connection,
5354
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
5455
super(
55-
() -> {
56-
TieringSplitReader<WriteResult> reader =
57-
new TieringSplitReader<>(connection, lakeTieringFactory);
58-
// Store reference in thread-local for timeout handling
59-
CURRENT_SPLIT_READER.set(reader);
60-
return reader;
61-
},
56+
elementsQueue,
57+
new TieringSourceFetcherManager<>(
58+
elementsQueue,
59+
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
60+
context.getConfiguration(),
61+
(ignore) -> {}),
6262
new TableBucketWriteResultEmitter<>(),
6363
context.getConfiguration(),
6464
context);
6565
this.connection = connection;
66-
this.lakeTieringFactory = lakeTieringFactory;
6766
}
6867

6968
@Override
@@ -103,18 +102,17 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState)
103102

104103
@Override
105104
public void handleSourceEvents(SourceEvent sourceEvent) {
106-
if (sourceEvent instanceof TieringTimeoutEvent) {
107-
TieringTimeoutEvent timeoutEvent = (TieringTimeoutEvent) sourceEvent;
108-
TieringSplitReader<?> splitReader = CURRENT_SPLIT_READER.get();
109-
if (splitReader != null) {
110-
splitReader.handleTableTimeout(timeoutEvent.getTableId());
111-
}
105+
if (sourceEvent instanceof TieringReachMaxDurationEvent) {
106+
TieringReachMaxDurationEvent reachMaxDurationEvent =
107+
(TieringReachMaxDurationEvent) sourceEvent;
108+
long tableId = reachMaxDurationEvent.getTableId();
109+
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
110+
.markTableAsTieringTimeOut(tableId);
112111
}
113112
}
114113

115114
@Override
116115
public void close() throws Exception {
117-
CURRENT_SPLIT_READER.remove();
118116
super.close();
119117
connection.close();
120118
}

0 commit comments

Comments
 (0)