Skip to content

Commit ca53277

Browse files
authored
[client] Allow FileSystem implementations expose customized options via 'client.fs.*' prefix (#938)
1 parent 51333a0 commit ca53277

File tree

5 files changed

+51
-7
lines changed

5 files changed

+51
-7
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.alibaba.fluss.config.ConfigOptions;
3232
import com.alibaba.fluss.config.Configuration;
3333
import com.alibaba.fluss.exception.FlussRuntimeException;
34+
import com.alibaba.fluss.fs.FileSystem;
3435
import com.alibaba.fluss.metadata.TableInfo;
3536
import com.alibaba.fluss.metadata.TablePath;
3637
import com.alibaba.fluss.metrics.registry.MetricRegistry;
@@ -40,9 +41,12 @@
4041
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
4142

4243
import java.time.Duration;
44+
import java.util.HashMap;
4345
import java.util.List;
4446

4547
import static com.alibaba.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
48+
import static com.alibaba.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
49+
import static com.alibaba.fluss.utils.PropertiesUtils.extractPrefix;
4650

4751
/** A connection to Fluss cluster, and holds the client session resources. */
4852
public final class FlussConnection implements Connection {
@@ -63,6 +67,12 @@ public final class FlussConnection implements Connection {
6367

6468
FlussConnection(Configuration conf, MetricRegistry metricRegistry) {
6569
this.conf = conf;
70+
// init Filesystem with configuration from FlussConnection,
71+
// only pass options with 'client.fs.' prefix
72+
FileSystem.initialize(
73+
Configuration.fromMap(
74+
extractPrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")),
75+
null);
6676
// for client metrics.
6777
setupClientMetricsConfiguration();
6878
String clientId = conf.getString(ConfigOptions.CLIENT_ID);

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.alibaba.fluss.config.ConfigOptions;
3232
import com.alibaba.fluss.config.Configuration;
3333
import com.alibaba.fluss.config.MemorySize;
34+
import com.alibaba.fluss.fs.FsPath;
35+
import com.alibaba.fluss.fs.TestFileSystem;
3436
import com.alibaba.fluss.metadata.DataLakeFormat;
3537
import com.alibaba.fluss.metadata.KvFormat;
3638
import com.alibaba.fluss.metadata.LogFormat;
@@ -61,6 +63,7 @@
6163

6264
import java.time.Duration;
6365
import java.util.ArrayList;
66+
import java.util.Collections;
6467
import java.util.Iterator;
6568
import java.util.List;
6669
import java.util.concurrent.CompletableFuture;
@@ -1160,4 +1163,19 @@ void testMergeEngineWithVersion(boolean doProjection) throws Exception {
11601163
}
11611164
}
11621165
}
1166+
1167+
@Test
1168+
void testFileSystemRecognizeConnectionConf() throws Exception {
1169+
Configuration config = new Configuration(clientConf);
1170+
config.setString("client.fs.test.key", "fs_test_value");
1171+
config.setString("client.test.key", "client_test_value");
1172+
try (Connection ignore = ConnectionFactory.createConnection(config)) {
1173+
FsPath fsPath = new FsPath("test:///f1");
1174+
TestFileSystem testFileSystem = (TestFileSystem) fsPath.getFileSystem();
1175+
Configuration filesystemConf = testFileSystem.getConfiguration();
1176+
assertThat(filesystemConf.toMap())
1177+
.containsExactlyEntriesOf(
1178+
Collections.singletonMap("client.fs.test.key", "fs_test_value"));
1179+
}
1180+
}
11631181
}

fluss-common/src/test/java/com/alibaba/fluss/fs/TestFileSystem.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ public class TestFileSystem extends LocalFileSystem {
4444
private static final Map<FsPath, Integer> currentUnclosedOutputStream =
4545
MapUtils.newConcurrentHashMap();
4646

47+
private final Configuration configuration;
48+
49+
public TestFileSystem(Configuration configuration) {
50+
this.configuration = configuration;
51+
}
52+
53+
public Configuration getConfiguration() {
54+
return configuration;
55+
}
56+
4757
public static int getNumtimeStreamOpened() {
4858
return streamOpenCounter.get();
4959
}
@@ -143,7 +153,7 @@ public String getScheme() {
143153

144154
@Override
145155
public FileSystem create(URI fsUri, Configuration configuration) throws IOException {
146-
return new TestFileSystem();
156+
return new TestFileSystem(configuration);
147157
}
148158
}
149159
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import java.io.File;
4949
import java.time.ZoneId;
50+
import java.util.ArrayList;
5051
import java.util.Arrays;
5152
import java.util.Collections;
5253
import java.util.HashSet;
@@ -82,11 +83,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
8283
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
8384
final ReadableConfig tableOptions = helper.getOptions();
8485
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
85-
if (datalakeFormat.isPresent()) {
86-
helper.validateExcept("table.", "client.", datalakeFormat.get() + ".");
87-
} else {
88-
helper.validateExcept("table.", "client.");
89-
}
86+
List<String> prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client."));
87+
datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + "."));
88+
helper.validateExcept(prefixesToSkip.toArray(new String[0]));
9089

9190
boolean isStreamingMode =
9291
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,14 @@ void testPkTableReadOnlySnapshot() throws Exception {
145145
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]");
146146

147147
assertResultsIgnoreOrder(
148-
tEnv.executeSql("select * from read_snapshot_test").collect(), expectedRows, true);
148+
tEnv.executeSql(
149+
// the options is just used to check option with prefix 'client.fs'
150+
// should
151+
// pass Flink validation
152+
"select * from read_snapshot_test /*+ OPTIONS('client.fs.oss.endpoint' = 'test') */")
153+
.collect(),
154+
expectedRows,
155+
true);
149156
}
150157

151158
@Test

0 commit comments

Comments
 (0)