Skip to content

Commit 75dd0df

Browse files
committed
[flink] Replace CLIENT_SCANNER_IO_TMP_DIR with flink's TMP_DIRS in runtime rather then compile phase.
1 parent 3be2f3d commit 75dd0df

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import org.apache.flink.api.common.RuntimeExecutionMode;
3232
import org.apache.flink.configuration.ConfigOption;
33-
import org.apache.flink.configuration.CoreOptions;
3433
import org.apache.flink.configuration.ExecutionOptions;
3534
import org.apache.flink.configuration.ReadableConfig;
3635
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -47,7 +46,6 @@
4746
import org.apache.flink.table.factories.FactoryUtil;
4847
import org.apache.flink.table.types.logical.RowType;
4948

50-
import java.io.File;
5149
import java.time.ZoneId;
5250
import java.util.ArrayList;
5351
import java.util.Arrays;
@@ -247,11 +245,6 @@ private static Configuration toFlussClientConfig(
247245
// retry. The option 'client.lookup.max-retries' is only for dealing with the
248246
// RetriableException return by server not all exceptions. Trace by:
249247
// https://github.com/apache/fluss/issues/2099
250-
251-
// pass flink io tmp dir to fluss client.
252-
flussConfig.setString(
253-
ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR,
254-
new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath());
255248
return flussConfig;
256249
}
257250

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050

5151
import javax.annotation.Nullable;
5252

53+
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
54+
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir;
55+
5356
/** Flink source for Fluss. */
5457
public class FlinkSource<OUT>
5558
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -181,6 +184,9 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
181184
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
182185
new FlinkSourceReaderMetrics(context.metricGroup());
183186

187+
flussConf.set(
188+
CLIENT_SCANNER_IO_TMP_DIR,
189+
getClientScannerIoTmpDir(flussConf, context.getConfiguration()));
184190
deserializationSchema.open(
185191
new DeserializerInitContextImpl(
186192
context.metricGroup().addGroup("deserializer"),

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717

1818
package org.apache.fluss.flink.utils;
1919

20+
import org.apache.fluss.config.Configuration;
2021
import org.apache.fluss.flink.FlinkConnectorOptions;
2122
import org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode;
2223

24+
import org.apache.flink.configuration.CoreOptions;
2325
import org.apache.flink.configuration.ReadableConfig;
2426
import org.apache.flink.table.api.ValidationException;
2527
import org.apache.flink.table.api.config.TableConfigOptions;
2628
import org.apache.flink.table.types.logical.RowType;
2729

30+
import java.io.File;
2831
import java.time.LocalDateTime;
2932
import java.time.ZoneId;
3033
import java.time.format.DateTimeFormatter;
@@ -34,6 +37,8 @@
3437
import java.util.Optional;
3538
import java.util.stream.Collectors;
3639

40+
import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
41+
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
3742
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
3843
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
3944
import static org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP;
@@ -148,6 +153,17 @@ public static long parseTimestamp(String timestampStr, String optionKey, ZoneId
148153
}
149154
}
150155

156+
public static String getClientScannerIoTmpDir(
157+
Configuration flussConf, org.apache.flink.configuration.Configuration flinkConfig) {
158+
if (!flussConf.contains(CLIENT_SCANNER_IO_TMP_DIR)) {
159+
if (flinkConfig.contains(TMP_DIRS)) {
160+
// pass flink io tmp dir to fluss client.
161+
return new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath();
162+
}
163+
}
164+
return flussConf.getString(CLIENT_SCANNER_IO_TMP_DIR);
165+
}
166+
151167
/** Fluss startup options. * */
152168
public static class StartupOptions {
153169
public ScanStartupMode startupMode;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.fluss.flink.utils;
1919

20+
import org.apache.fluss.config.Configuration;
21+
2022
import org.apache.flink.table.api.ValidationException;
2123
import org.junit.jupiter.api.Test;
2224

2325
import java.time.ZoneId;
2426
import java.util.TimeZone;
2527

28+
import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
29+
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
2630
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
2731
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
2832
import static org.assertj.core.api.Assertions.assertThat;
@@ -58,4 +62,31 @@ void testParseTimestamp() {
5862
+ "or 'timestamp', but is '2023-12-09T23:09:12'. "
5963
+ "You can config like: '2023-12-09 23:09:12' or '1678883047356'.");
6064
}
65+
66+
@Test
67+
void testGetClientScannerIoTmpDir() {
68+
Configuration flussConfig =
69+
new Configuration().set(CLIENT_SCANNER_IO_TMP_DIR, "/fluss_tmp_dir");
70+
org.apache.flink.configuration.Configuration flinkConfig =
71+
new org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir");
72+
assertThat(
73+
FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
74+
flussConfig, new org.apache.flink.configuration.Configuration()))
75+
.isEqualTo("/fluss_tmp_dir");
76+
assertThat(FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(flussConfig, flinkConfig))
77+
.isEqualTo("/fluss_tmp_dir");
78+
String property = System.getProperty("java.io.tmpdir");
79+
assertThat(
80+
FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
81+
new Configuration(),
82+
new org.apache.flink.configuration.Configuration()))
83+
.isEqualTo(property + "/fluss");
84+
85+
// only replace when flussConfig not contains CLIENT_SCANNER_IO_TMP_DIR while flinkConfig
86+
// contains TMP_DIRS.
87+
assertThat(
88+
FlinkConnectorOptionsUtils.getClientScannerIoTmpDir(
89+
new Configuration(), flinkConfig))
90+
.isEqualTo("/flink_tmp_dir/fluss");
91+
}
6192
}

0 commit comments

Comments
 (0)