Skip to content

Commit 025ffd1

Browse files
authored
[lake] Avoid create/drop connection frequently in TieringSourceReader (apache#1828)
1 parent 9e27398 commit 025ffd1

File tree

4 files changed

+36
-17
lines changed

4 files changed

+36
-17
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.fluss.flink.tiering.source;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
2022
import org.apache.fluss.config.Configuration;
2123
import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
2224
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
@@ -104,8 +106,9 @@ public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
104106

105107
@Override
106108
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
107-
SourceReaderContext sourceReaderContext) throws Exception {
108-
return new TieringSourceReader<>(sourceReaderContext, flussConf, lakeTieringFactory);
109+
SourceReaderContext sourceReaderContext) {
110+
Connection connection = ConnectionFactory.createConnection(flussConf);
111+
return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory);
109112
}
110113

111114
/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.annotation.Internal;
21-
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.client.Connection;
2222
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2323
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2424
import org.apache.fluss.lake.writer.LakeTieringFactory;
@@ -40,15 +40,18 @@ public final class TieringSourceReader<WriteResult>
4040
TieringSplit,
4141
TieringSplitState> {
4242

43+
private final Connection connection;
44+
4345
public TieringSourceReader(
4446
SourceReaderContext context,
45-
Configuration flussConf,
47+
Connection connection,
4648
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
4749
super(
48-
() -> new TieringSplitReader<>(flussConf, lakeTieringFactory),
50+
() -> new TieringSplitReader<>(connection, lakeTieringFactory),
4951
new TableBucketWriteResultEmitter<>(),
5052
context.getConfiguration(),
5153
context);
54+
this.connection = connection;
5255
}
5356

5457
@Override
@@ -85,4 +88,10 @@ protected TieringSplitState initializedState(TieringSplit split) {
8588
protected TieringSplit toSplitType(String splitId, TieringSplitState splitState) {
8689
return splitState.toSourceSplit();
8790
}
91+
92+
@Override
93+
public void close() throws Exception {
94+
super.close();
95+
connection.close();
96+
}
8897
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.client.Connection;
21-
import org.apache.fluss.client.ConnectionFactory;
2221
import org.apache.fluss.client.table.Table;
2322
import org.apache.fluss.client.table.scanner.ScanRecord;
2423
import org.apache.fluss.client.table.scanner.log.LogScanner;
2524
import org.apache.fluss.client.table.scanner.log.ScanRecords;
26-
import org.apache.fluss.config.Configuration;
2725
import org.apache.fluss.flink.source.reader.BoundedSplitReader;
2826
import org.apache.fluss.flink.source.reader.RecordAndPos;
2927
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
@@ -97,9 +95,10 @@ public class TieringSplitReader<WriteResult>
9795
private final Set<TieringLogSplit> currentTableEmptyLogSplits;
9896

9997
public TieringSplitReader(
100-
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
98+
Connection connection, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
10199
this.lakeTieringFactory = lakeTieringFactory;
102-
this.connection = ConnectionFactory.createConnection(flussConf);
100+
// owned by TieringSourceReader
101+
this.connection = connection;
103102
this.pendingTieringTables = new ArrayDeque<>();
104103
this.pendingTieringSplits = new HashMap<>();
105104
this.currentTableStoppingOffsets = new HashMap<>();
@@ -457,9 +456,8 @@ public void close() throws Exception {
457456
if (currentTable != null) {
458457
currentTable.close();
459458
}
460-
if (connection != null) {
461-
connection.close();
462-
}
459+
460+
// don't need to close connection, will be closed by TieringSourceReader
463461
}
464462

465463
private void subscribeLog(TieringLogSplit logSplit) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.fluss.flink.tiering.source;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
2022
import org.apache.fluss.client.table.Table;
2123
import org.apache.fluss.client.table.writer.AppendWriter;
2224
import org.apache.fluss.client.table.writer.TableWriter;
@@ -61,7 +63,11 @@ class TieringSplitReaderTest extends FlinkTestBase {
6163
void testTieringTable() throws Exception {
6264
TablePath tablePath = TablePath.of("fluss", "fluss_test_tiering_one_table");
6365
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
64-
try (TieringSplitReader<TestingWriteResult> tieringSplitReader = createTieringReader()) {
66+
try (Connection connection =
67+
ConnectionFactory.createConnection(
68+
FLUSS_CLUSTER_EXTENSION.getClientConfig());
69+
TieringSplitReader<TestingWriteResult> tieringSplitReader =
70+
createTieringReader(connection)) {
6571
// test empty splits
6672
SplitsAddition<TieringSplit> splitsAddition =
6773
new SplitsAddition<>(
@@ -155,7 +161,11 @@ void testTieringMixTables() throws Exception {
155161
TablePath tablePath1 = TablePath.of("fluss", "tiering_table1");
156162
long tableId1 = createTable(tablePath1, DEFAULT_PK_TABLE_DESCRIPTOR);
157163

158-
try (TieringSplitReader<TestingWriteResult> tieringSplitReader = createTieringReader()) {
164+
try (Connection connection =
165+
ConnectionFactory.createConnection(
166+
FLUSS_CLUSTER_EXTENSION.getClientConfig());
167+
TieringSplitReader<TestingWriteResult> tieringSplitReader =
168+
createTieringReader(connection)) {
159169
Map<TableBucket, List<InternalRow>> table0Rows = putRows(tableId0, tablePath0, 10);
160170
Map<TableBucket, List<InternalRow>> table1Rows = putRows(tableId1, tablePath1, 10);
161171
waitUntilSnapshot(tableId0, 0);
@@ -266,9 +276,8 @@ void testTieringMixTables() throws Exception {
266276
}
267277
}
268278

269-
private TieringSplitReader<TestingWriteResult> createTieringReader() {
270-
return new TieringSplitReader<>(
271-
FLUSS_CLUSTER_EXTENSION.getClientConfig(), new TestingLakeTieringFactory());
279+
private TieringSplitReader<TestingWriteResult> createTieringReader(Connection connection) {
280+
return new TieringSplitReader<>(connection, new TestingLakeTieringFactory());
272281
}
273282

274283
private void verifyTieringRows(

0 commit comments

Comments
 (0)