Skip to content

Commit 38395e1

Browse files
committed
[lake] support lake tiering service force to commit due to timeout
1 parent e542d6d commit 38395e1

File tree

13 files changed

+325
-139
lines changed

13 files changed

+325
-139
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3838
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
39+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TABLE_TIERING_COMMIT_TIMEOUT;
3940
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4041

4142
/** The builder to build Flink lake tiering job. */
@@ -84,6 +85,12 @@ public JobClient build() throws Exception {
8485
tieringSourceBuilder.withPollTieringTableIntervalMs(
8586
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
8687
}
88+
89+
if (flussConfig.get(TABLE_TIERING_COMMIT_TIMEOUT) != null) {
90+
tieringSourceBuilder.withTieringCommitTimeout(
91+
flussConfig.get(TABLE_TIERING_COMMIT_TIMEOUT).toMillis());
92+
}
93+
8794
TieringSource<?> tieringSource = tieringSourceBuilder.build();
8895
DataStreamSource<?> source =
8996
env.fromSource(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTimeoutEvent.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import org.apache.flink.api.connector.source.SourceEvent;
2121

22-
/** SourceEvent used to notify TieringSourceReader that a table has timed out and should be completed. */
22+
/**
23+
* SourceEvent used to notify TieringSourceReader that a table has timed out and should be
24+
* completed.
25+
*/
2326
public class TieringTimeoutEvent implements SourceEvent {
2427

2528
private static final long serialVersionUID = 1L;
@@ -34,4 +37,3 @@ public long getTableId() {
3437
return tableId;
3538
}
3639
}
37-

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@
3636
import org.apache.flink.api.connector.source.SourceReaderContext;
3737
import org.apache.flink.api.connector.source.SplitEnumerator;
3838
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
39+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
40+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3941
import org.apache.flink.core.io.SimpleVersionedSerializer;
4042
import org.apache.flink.runtime.jobgraph.OperatorID;
4143
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
4244

4345
import java.nio.charset.StandardCharsets;
4446

4547
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
48+
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.TABLE_TIERING_COMMIT_TIMEOUT;
4649

4750
/**
4851
* The flink source implementation for tiering data from Fluss to downstream lake.
@@ -61,14 +64,17 @@ public class TieringSource<WriteResult>
6164
private final Configuration flussConf;
6265
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
6366
private final long pollTieringTableIntervalMs;
67+
private final long tieringCommitTimeoutMs;
6468

6569
public TieringSource(
6670
Configuration flussConf,
6771
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
68-
long pollTieringTableIntervalMs) {
72+
long pollTieringTableIntervalMs,
73+
long tieringCommitTimeoutMs) {
6974
this.flussConf = flussConf;
7075
this.lakeTieringFactory = lakeTieringFactory;
7176
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
77+
this.tieringCommitTimeoutMs = tieringCommitTimeoutMs;
7278
}
7379

7480
@Override
@@ -78,19 +84,24 @@ public Boundedness getBoundedness() {
7884

7985
@Override
8086
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumerator(
81-
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) throws Exception {
87+
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) {
8288
return new TieringSourceEnumerator(
83-
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
89+
flussConf,
90+
splitEnumeratorContext,
91+
pollTieringTableIntervalMs,
92+
tieringCommitTimeoutMs);
8493
}
8594

8695
@Override
8796
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
8897
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
89-
TieringSourceEnumeratorState tieringSourceEnumeratorState)
90-
throws Exception {
98+
TieringSourceEnumeratorState tieringSourceEnumeratorState) {
9199
// stateless operator
92100
return new TieringSourceEnumerator(
93-
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
101+
flussConf,
102+
splitEnumeratorContext,
103+
pollTieringTableIntervalMs,
104+
tieringCommitTimeoutMs);
94105
}
95106

96107
@Override
@@ -107,8 +118,11 @@ public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
107118
@Override
108119
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
109120
SourceReaderContext sourceReaderContext) {
121+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
122+
elementsQueue = new FutureCompletingBlockingQueue<>();
110123
Connection connection = ConnectionFactory.createConnection(flussConf);
111-
return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory);
124+
return new TieringSourceReader<>(
125+
elementsQueue, sourceReaderContext, connection, lakeTieringFactory);
112126
}
113127

114128
/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */
@@ -126,6 +140,8 @@ public static class Builder<WriteResult> {
126140
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
127141
private long pollTieringTableIntervalMs =
128142
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
143+
private long tieringCommitTimeoutMs =
144+
TABLE_TIERING_COMMIT_TIMEOUT.defaultValue().toMillis();
129145

130146
public Builder(
131147
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
@@ -138,8 +154,17 @@ public Builder<WriteResult> withPollTieringTableIntervalMs(long pollTieringTable
138154
return this;
139155
}
140156

157+
public Builder<WriteResult> withTieringCommitTimeout(long tieringCommitTimeoutMs) {
158+
this.tieringCommitTimeoutMs = tieringCommitTimeoutMs;
159+
return this;
160+
}
161+
141162
public TieringSource<WriteResult> build() {
142-
return new TieringSource<>(flussConf, lakeTieringFactory, pollTieringTableIntervalMs);
163+
return new TieringSource<>(
164+
flussConf,
165+
lakeTieringFactory,
166+
pollTieringTableIntervalMs,
167+
tieringCommitTimeoutMs);
143168
}
144169
}
145170
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering.source;
20+
21+
import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
22+
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
23+
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
26+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
27+
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
28+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
29+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
30+
31+
import java.util.Collection;
32+
import java.util.function.Consumer;
33+
import java.util.function.Supplier;
34+
35+
/**
36+
* The SplitFetcherManager for Fluss source. This class is needed to help notify a table reaches to
37+
* deadline of tiering to {@link TieringSplitReader}.
38+
*/
39+
public class TieringSourceFetcherManager<WriteResult>
40+
extends SingleThreadFetcherManagerAdapter<
41+
TableBucketWriteResult<WriteResult>, TieringSplit> {
42+
43+
public TieringSourceFetcherManager(
44+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
45+
elementsQueue,
46+
Supplier<SplitReader<TableBucketWriteResult<WriteResult>, TieringSplit>>
47+
splitReaderSupplier,
48+
Configuration configuration,
49+
Consumer<Collection<String>> splitFinishedHook) {
50+
super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
51+
}
52+
53+
public void markTableAsTieringTimeOut(long tableId) {
54+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
55+
fetchers.get(0);
56+
if (splitFetcher != null) {
57+
// The fetcher thread is still running. This should be the majority of the cases.
58+
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
59+
} else {
60+
splitFetcher = createSplitFetcher();
61+
enqueueMarkTableTieringTimeOutTask(splitFetcher, tableId);
62+
startFetcher(splitFetcher);
63+
}
64+
}
65+
66+
private void enqueueMarkTableTieringTimeOutTask(
67+
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
68+
long tieringTimeOutTable) {
69+
splitFetcher.enqueueTask(
70+
new SplitFetcherTask() {
71+
@Override
72+
public boolean run() {
73+
((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader())
74+
.handleTableTimeout(tieringTimeOutTable);
75+
return true;
76+
}
77+
78+
@Override
79+
public void wakeUp() {
80+
// do nothing
81+
}
82+
});
83+
}
84+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,10 @@ public class TieringSourceOptions {
3434
.defaultValue(Duration.ofSeconds(30))
3535
.withDescription(
3636
"The fixed interval to request tiering table from Fluss cluster, by default 30 seconds.");
37+
38+
public static final ConfigOption<Duration> TABLE_TIERING_COMMIT_TIMEOUT =
39+
key("tiering.commit.interval")
40+
.durationType()
41+
.defaultValue(Duration.ofMinutes(10))
42+
.withDescription("todo");
3743
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.client.Connection;
22+
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
2223
import org.apache.fluss.flink.tiering.event.TieringTimeoutEvent;
2324
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2425
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
@@ -27,7 +28,8 @@
2728
import org.apache.flink.api.connector.source.SourceEvent;
2829
import org.apache.flink.api.connector.source.SourceReader;
2930
import org.apache.flink.api.connector.source.SourceReaderContext;
30-
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
31+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
32+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
3133

3234
import java.util.Collections;
3335
import java.util.List;
@@ -36,34 +38,31 @@
3638
/** A {@link SourceReader} that read records from Fluss and write to lake. */
3739
@Internal
3840
public final class TieringSourceReader<WriteResult>
39-
extends SingleThreadMultiplexSourceReaderBase<
41+
extends SingleThreadMultiplexSourceReaderBaseAdapter<
4042
TableBucketWriteResult<WriteResult>,
4143
TableBucketWriteResult<WriteResult>,
4244
TieringSplit,
4345
TieringSplitState> {
4446

4547
private final Connection connection;
46-
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
47-
// Thread-local storage for split reader to handle timeout events
48-
private static final ThreadLocal<TieringSplitReader<?>> CURRENT_SPLIT_READER = new ThreadLocal<>();
4948

5049
public TieringSourceReader(
50+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
51+
elementsQueue,
5152
SourceReaderContext context,
5253
Connection connection,
5354
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
5455
super(
55-
() -> {
56-
TieringSplitReader<WriteResult> reader =
57-
new TieringSplitReader<>(connection, lakeTieringFactory);
58-
// Store reference in thread-local for timeout handling
59-
CURRENT_SPLIT_READER.set(reader);
60-
return reader;
61-
},
56+
elementsQueue,
57+
new TieringSourceFetcherManager<>(
58+
elementsQueue,
59+
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
60+
context.getConfiguration(),
61+
(ignore) -> {}),
6262
new TableBucketWriteResultEmitter<>(),
6363
context.getConfiguration(),
6464
context);
6565
this.connection = connection;
66-
this.lakeTieringFactory = lakeTieringFactory;
6766
}
6867

6968
@Override
@@ -105,16 +104,14 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState)
105104
public void handleSourceEvents(SourceEvent sourceEvent) {
106105
if (sourceEvent instanceof TieringTimeoutEvent) {
107106
TieringTimeoutEvent timeoutEvent = (TieringTimeoutEvent) sourceEvent;
108-
TieringSplitReader<?> splitReader = CURRENT_SPLIT_READER.get();
109-
if (splitReader != null) {
110-
splitReader.handleTableTimeout(timeoutEvent.getTableId());
111-
}
107+
long tableId = timeoutEvent.getTableId();
108+
((TieringSourceFetcherManager<WriteResult>) splitFetcherManager)
109+
.markTableAsTieringTimeOut(tableId);
112110
}
113111
}
114112

115113
@Override
116114
public void close() throws Exception {
117-
CURRENT_SPLIT_READER.remove();
118115
super.close();
119116
connection.close();
120117
}

0 commit comments

Comments
 (0)