From 5d8bcf61344f136434263f328fb420f91ce4af05 Mon Sep 17 00:00:00 2001 From: liangrui Date: Fri, 20 Mar 2026 16:52:02 +0800 Subject: [PATCH 1/2] TM It is impossible to bypass the KDC login process, yet the TOKEN issued by JM has not been actually utilized. --- .../deployment/security/security-kerberos.md | 7 ++- .../deployment/security/security-kerberos.md | 3 +- .../flink/configuration/SecurityOptions.java | 12 ++++ .../security/modules/HadoopModule.java | 4 +- .../security/modules/HadoopModuleTest.java | 55 +++++++++++++++++++ 5 files changed, 78 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/deployment/security/security-kerberos.md b/docs/content.zh/docs/deployment/security/security-kerberos.md index 9f670ffab579e..2d13454f8d255 100644 --- a/docs/content.zh/docs/deployment/security/security-kerberos.md +++ b/docs/content.zh/docs/deployment/security/security-kerberos.md @@ -66,7 +66,12 @@ Flink 安全内部架构是建立在安全模块(实现 `org.apache.flink.runt ### Hadoop Security 模块 该模块使用 Hadoop UserGroupInformation(UGI)类来建立进程范围的 *登录用户* 上下文。然后,登录用户用于与 Hadoop 组件的所有交互,包括 HDFS、HBase 和 YARN。 -如果启用了 Hadoop security(在 `core-site.xml` 中),登录用户将拥有所有配置的 Kerberos 凭据。否则,登录用户仅继承启动集群的操作系统帐户的用户身份。 +如果启用了 Hadoop security(在 `core-site.xml` 中),登录用户将拥有所有配置的 Kerberos 凭据。否则,登录用户仅继承启动集群的操作系统帐户的用户身份。具体来说,登录过程的优先级如下: +* 当 `hadoop.security.authentication` 设置为 `kerberos` 时 + * 当 `security.kerberos.login.keytab-login.enabled` 为 `true`(默认值)且配置了 `security.kerberos.login.keytab` 和 `security.kerberos.login.principal` 时,执行 keytab 登录 + * 当 `security.kerberos.login.keytab-login.enabled` 设置为 `false` 时,跳过 keytab 登录,进程将依赖通过 `HADOOP_TOKEN_FILE_LOCATION` 分发的 delegation token。这对于 YARN/Kubernetes 部署中的 TaskManager 容器很有用,可以避免在容器启动上下文中已有 delegation token 时发起不必要的 KDC 请求。 + * 当配置了 `security.kerberos.login.use-ticket-cache` 时,执行 credential cache 登录 +* 其他情况使用启动集群的操作系统账户的用户身份 diff --git a/docs/content/docs/deployment/security/security-kerberos.md b/docs/content/docs/deployment/security/security-kerberos.md index 8d7d3bfd55c30..66be5a501c3f9 100644 --- a/docs/content/docs/deployment/security/security-kerberos.md +++ b/docs/content/docs/deployment/security/security-kerberos.md @@ -77,7 +77,8 @@ If Hadoop security is enabled (in `core-site.xml`), the login user will have wha Otherwise, the login user conveys only the user identity of the OS account that launched the cluster. In order to be specific the login process has the following order of precedence: * When `hadoop.security.authentication` is set to `kerberos` - * When `security.kerberos.login.keytab` and `security.kerberos.login.principal` configured then keytab login performed + * When `security.kerberos.login.keytab-login.enabled` is set to `true` (default) and `security.kerberos.login.keytab` and `security.kerberos.login.principal` configured then keytab login performed + * When `security.kerberos.login.keytab-login.enabled` is set to `false`, keytab login is skipped and the process relies on delegation tokens distributed via `HADOOP_TOKEN_FILE_LOCATION` instead. This is useful for TaskManager containers in YARN/Kubernetes deployments to avoid unnecessary KDC requests when delegation tokens are already available from the container launch context. * When `security.kerberos.login.use-ticket-cache` configured then credential cache login performed * All other cases user identity of the OS account used diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 7ec79f0c80a5b..6440250705265 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -183,6 +183,18 @@ public class SecurityOptions { + "possible to disable that behavior if it somehow conflicts " + "with the application being run."); + @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) + public static final ConfigOption KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED = + key("security.kerberos.login.keytab-login.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to perform Kerberos keytab login during process startup. " + + "When set to false, the process will rely on delegation tokens " + + "distributed via HADOOP_TOKEN_FILE_LOCATION instead of performing " + + "a direct KDC login. This is useful for TaskManager containers in " + + "YARN/Kubernetes where delegation tokens are already available."); + /** * Returns a view over the given configuration via which options can be set/retrieved for the * given provider. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 37ef38a9457fe..faaba0645bde7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -71,7 +71,9 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); - if (kerberosLoginProvider.isLoginPossible(true)) { + boolean keytabLoginEnabled = securityConfig.getFlinkConfig(). + get(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED); + if (keytabLoginEnabled && kerberosLoginProvider.isLoginPossible(true)) { kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java index ee26e3c6e4d04..921a31c4ffa9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java @@ -29,10 +29,13 @@ import java.io.IOException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -79,4 +82,56 @@ public void hadoopProxyUserSetWithDelegationTokensEnabledShouldThrow() { assertTrue(exception.getCause() instanceof UnsupportedOperationException); } } + + @Test + public void keytabLoginDisabledShouldSkipKdcLogin() { + try (MockedStatic ugi = mockStatic(UserGroupInformation.class)) { + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); + ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); + ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation); + when(userGroupInformation.getAuthenticationMethod()) + .thenReturn(UserGroupInformation.AuthenticationMethod.SIMPLE); + + Configuration flinkConf = new Configuration(); + flinkConf.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED, false); + SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf); + org.apache.hadoop.conf.Configuration hadoopConf = + new org.apache.hadoop.conf.Configuration(); + HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf); + + assertDoesNotThrow(hadoopModule::install); + + // loginUserFromKeytab should never be called when keytab login is disabled + ugi.verify( + () -> UserGroupInformation.loginUserFromKeytab(anyString(), anyString()), + never()); + } + } + + @Test + public void keytabLoginEnabledByDefaultShouldPerformKdcLogin() { + try (MockedStatic ugi = mockStatic(UserGroupInformation.class)) { + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true); + ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); + ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation); + when(userGroupInformation.getAuthenticationMethod()) + .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS); + when(userGroupInformation.isFromKeytab()).thenReturn(true); + when(userGroupInformation.hasKerberosCredentials()).thenReturn(true); + + Configuration flinkConf = new Configuration(); + // default is true, not setting KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED explicitly + SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf); + org.apache.hadoop.conf.Configuration hadoopConf = + new org.apache.hadoop.conf.Configuration(); + HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf); + + // Without keytab/principal configured, isLoginPossible returns false (no principal), + // so it falls through to the else branch regardless. + // This test verifies the default config value is true and doesn't cause errors. + assertDoesNotThrow(hadoopModule::install); + } + } } From a832ea4ecbea2c70e35d4162b6b1fa41339e406d Mon Sep 17 00:00:00 2001 From: liangrui Date: Mon, 23 Mar 2026 17:50:28 +0800 Subject: [PATCH 2/2] fix spotless-check --- .../apache/flink/runtime/security/modules/HadoopModule.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index faaba0645bde7..7ce43d10b7e98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -71,8 +71,10 @@ public void install() throws SecurityInstallException { try { KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig); - boolean keytabLoginEnabled = securityConfig.getFlinkConfig(). - get(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED); + boolean keytabLoginEnabled = + securityConfig + .getFlinkConfig() + .get(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED); if (keytabLoginEnabled && kerberosLoginProvider.isLoginPossible(true)) { kerberosLoginProvider.doLogin(true); loginUser = UserGroupInformation.getLoginUser();