Skip to content

Commit 88ee6d8

Browse files
[client] Fluss client shouldn't load plugin by thread context classloader (#1267)
1 parent c1710cc commit 88ee6d8

File tree

10 files changed

+141
-18
lines changed

10 files changed

+141
-18
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ private Map<String, SecurityTokenReceiver> loadReceivers() {
6161
receiver.scheme());
6262
};
6363

64-
ServiceLoader.load(SecurityTokenReceiver.class).iterator().forEachRemaining(loadReceiver);
64+
ServiceLoader.load(
65+
SecurityTokenReceiver.class, SecurityTokenReceiver.class.getClassLoader())
66+
.iterator()
67+
.forEachRemaining(loadReceiver);
6568

6669
LOG.info("Security token receivers loaded successfully");
6770
return receivers;

fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,11 @@ public static void initialize(Configuration config, @Nullable PluginManager plug
266266
Collection<Supplier<Iterator<FileSystemPlugin>>> pluginSuppliers =
267267
new ArrayList<>(2);
268268
pluginSuppliers.add(
269-
() -> ServiceLoader.load(FileSystemPlugin.class).iterator());
269+
() ->
270+
ServiceLoader.load(
271+
FileSystemPlugin.class,
272+
FileSystem.class.getClassLoader())
273+
.iterator());
270274

271275
if (pluginManager != null) {
272276
pluginSuppliers.add(

fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public static LakeStoragePlugin fromDataLakeFormat(
5454
private static Iterator<LakeStoragePlugin> getAllLakeStoragePlugins(
5555
@Nullable PluginManager pluginManager) {
5656
final Iterator<LakeStoragePlugin> pluginIteratorSPI =
57-
ServiceLoader.load(LakeStoragePlugin.class).iterator();
57+
ServiceLoader.load(
58+
LakeStoragePlugin.class, LakeStoragePlugin.class.getClassLoader())
59+
.iterator();
5860
if (pluginManager == null) {
5961
return pluginIteratorSPI;
6062
} else {

fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ private static <T extends AuthenticationPlugin> T discoverPlugin(
118118
String protocol, Class<T> pluginInterface, @Nullable PluginManager pluginManager) {
119119

120120
Collection<Supplier<Iterator<AuthenticationPlugin>>> pluginSuppliers = new ArrayList<>(2);
121-
pluginSuppliers.add(() -> ServiceLoader.load(AuthenticationPlugin.class).iterator());
121+
pluginSuppliers.add(
122+
() ->
123+
ServiceLoader.load(
124+
AuthenticationPlugin.class,
125+
AuthenticationPlugin.class.getClassLoader())
126+
.iterator());
122127
if (pluginManager != null) {
123128
pluginSuppliers.add(() -> pluginManager.load(AuthenticationPlugin.class));
124129
}

fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.alibaba.fluss.security.auth.sasl.jaas;
1919

20+
import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

@@ -45,18 +47,21 @@ public void configure(String contextName, javax.security.auth.login.Configuratio
4547

4648
@Override
4749
public LoginContext login() throws LoginException {
48-
loginContext =
49-
new LoginContext(
50-
contextName,
51-
null,
52-
callbacks -> {
53-
// Nothing here until we support some mechanisms such as sasl/GSSAPI
54-
// later.
55-
throw new UnsupportedCallbackException(
56-
callbacks[0], "Unrecognized SASL mechanism.");
57-
},
58-
jaasConfig);
59-
loginContext.login();
50+
try (TemporaryClassLoaderContext ignored =
51+
TemporaryClassLoaderContext.of(DefaultLogin.class.getClassLoader())) {
52+
loginContext =
53+
new LoginContext(
54+
contextName,
55+
null,
56+
callbacks -> {
57+
// Nothing here until we support some mechanisms such as sasl/GSSAPI
58+
// later.
59+
throw new UnsupportedCallbackException(
60+
callbacks[0], "Unrecognized SASL mechanism.");
61+
},
62+
jaasConfig);
63+
loginContext.login();
64+
}
6065
LOG.info("Successfully logged in.");
6166
return loginContext;
6267
}

fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import com.alibaba.fluss.metadata.ValidationException;
2222
import com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierClientAuthenticator;
2323
import com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierServerAuthenticator;
24+
import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
25+
import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
2426

2527
import org.junit.jupiter.api.Test;
2628

29+
import java.net.URL;
30+
2731
import static org.assertj.core.api.Assertions.assertThat;
2832
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2933

@@ -95,4 +99,16 @@ void testIdentifierCaseInsensitive() {
9599
.get())
96100
.isInstanceOf(TestIdentifierServerAuthenticator.class);
97101
}
102+
103+
@Test
104+
void testNotIncludedInThreadContextClassloader() {
105+
try (TemporaryClassLoaderContext ignored =
106+
TemporaryClassLoaderContext.of(new ParentResourceBlockingClassLoader(new URL[0]))) {
107+
Configuration configuration = new Configuration();
108+
configuration.setString("client.security.protocol", "SSL_TEST");
109+
configuration.setString("security.protocol.map", "FLUSS:SSL_TEST");
110+
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get())
111+
.isInstanceOf(TestIdentifierClientAuthenticator.class);
112+
}
113+
}
98114
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.utils;
19+
20+
import java.io.IOException;
21+
import java.net.URL;
22+
import java.net.URLClassLoader;
23+
import java.util.Enumeration;
24+
25+
import static org.apache.logging.log4j.util.LoaderUtil.findResources;
26+
27+
/**
28+
* A class loader that blocks resource loading from parent class loader. Designed to simulate SPI
29+
* loading behavior without delegation to parent.
30+
*/
31+
public class ParentResourceBlockingClassLoader extends URLClassLoader {
32+
public ParentResourceBlockingClassLoader(URL[] urls) {
33+
super(urls);
34+
}
35+
36+
@Override
37+
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
38+
synchronized (getClassLoadingLock(name)) {
39+
// First, check if the class has already been loaded
40+
Class<?> c = findLoadedClass(name);
41+
c = findClass(name);
42+
if (resolve) {
43+
resolveClass(c);
44+
}
45+
return c;
46+
}
47+
}
48+
49+
@Override
50+
public Enumeration<URL> getResources(String name) throws IOException {
51+
// Skip parent class loader resource loading during Service.load
52+
return findResources(name);
53+
}
54+
}

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import com.alibaba.fluss.security.acl.FlussPrincipal;
2828
import com.alibaba.fluss.security.acl.OperationType;
2929
import com.alibaba.fluss.security.acl.Resource;
30+
import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
3031
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
32+
import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
33+
import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
3134

3235
import org.apache.commons.lang3.RandomUtils;
3336
import org.apache.flink.table.api.EnvironmentSettings;
@@ -43,6 +46,7 @@
4346
import org.junit.jupiter.params.ParameterizedTest;
4447
import org.junit.jupiter.params.provider.ValueSource;
4548

49+
import java.net.URL;
4650
import java.time.Duration;
4751
import java.util.Arrays;
4852
import java.util.Collections;
@@ -385,6 +389,28 @@ void testPutAndLookupKvTable() throws Exception {
385389
Arrays.asList("+I[1, beijing, zhangsan]", "+I[2, shanghai, lisi]"));
386390
}
387391

392+
// this test is to mock `--jar` which is not loaded by the app classloader.
393+
@Test
394+
void testNotIncludedInThreadContextClassloader() throws Exception {
395+
try (TemporaryClassLoaderContext ignored =
396+
TemporaryClassLoaderContext.of(new ParentResourceBlockingClassLoader(new URL[0]))) {
397+
// clear the cache of login context to make sure load the class again.
398+
LoginManager.closeAll();
399+
tEnv.executeSql(
400+
String.format(
401+
"create catalog test_classloader_catalog with ('type' = 'fluss', "
402+
+ "'bootstrap.servers' = '%s',"
403+
+ "'client.security.protocol' = 'sasl',"
404+
+ "'client.security.sasl.mechanism' = 'PLAIN', \n"
405+
+ "'client.security.sasl.username' = 'guest', \n"
406+
+ "'client.security.sasl.password' = 'password2' \n"
407+
+ ")",
408+
String.join(
409+
",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS))))
410+
.await();
411+
}
412+
}
413+
388414
void addAcl(Resource resource, OperationType operationType)
389415
throws ExecutionException, InterruptedException {
390416
tEnv.executeSql(

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ private static NetworkProtocolPlugin loadProtocolPlugin(String protocolName) {
256256
protocol.name());
257257
protocols.put(protocol.name(), protocol);
258258
};
259-
ServiceLoader.load(NetworkProtocolPlugin.class).iterator().forEachRemaining(loadProtocol);
259+
ServiceLoader.load(
260+
NetworkProtocolPlugin.class, NetworkProtocolPlugin.class.getClassLoader())
261+
.iterator()
262+
.forEachRemaining(loadProtocol);
260263

261264
if (protocols.containsKey(protocolName)) {
262265
LOG.info("Protocol plugin {} loaded successfully", protocolName);

fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ public class AuthorizerLoader {
4949
}
5050
String authorizerType = configuration.get(AUTHORIZER_TYPE);
5151
Collection<Supplier<Iterator<AuthorizationPlugin>>> pluginSuppliers = new ArrayList<>(2);
52-
pluginSuppliers.add(() -> ServiceLoader.load(AuthorizationPlugin.class).iterator());
52+
pluginSuppliers.add(
53+
() ->
54+
ServiceLoader.load(
55+
AuthorizationPlugin.class,
56+
AuthorizationPlugin.class.getClassLoader())
57+
.iterator());
5358

5459
if (pluginManager != null) {
5560
pluginSuppliers.add(() -> pluginManager.load(AuthorizationPlugin.class));

0 commit comments

Comments
 (0)