Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>io.hops.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>3.4.3.0-EE-RC0</version>
<version>3.4.3.0-EE-RC1</version>

<name>hadoop-apache</name>
<description>Shaded version of Apache Hadoop for Trino</description>
Expand Down Expand Up @@ -54,8 +54,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.targetJdk>21</project.build.targetJdk>
<shadeBase>io.trino.hadoop.\$internal</shadeBase>
<dep.slf4j.version>2.0.17</dep.slf4j.version>
<dep.hadoop.version>3.4.3.0-EE-RC0</dep.hadoop.version>
<dep.slf4j.version>1.7.36</dep.slf4j.version>
<dep.hadoop.version>3.4.3.0-EE-RC1</dep.hadoop.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -663,6 +663,10 @@
<configuration>
<source>${project.build.targetJdk}</source>
<target>${project.build.targetJdk}</target>
<compilerArgs>
<arg>--add-exports</arg>
<arg>java.base/sun.security.x509=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import io.hops.security.HopsFileBasedKeyStoresFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
Expand Down Expand Up @@ -387,6 +388,10 @@ public KMSClientProvider(URI uri, Configuration conf) throws IOException {
canonicalService = SecurityUtil.buildTokenService(serviceUri);

if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
if (conf.getBoolean(CommonConfigurationKeysPublic.IPC_SERVER_SSL_ENABLED,
CommonConfigurationKeysPublic.IPC_SERVER_SSL_ENABLED_DEFAULT)) {
conf.set(SSLFactory.KEYSTORES_FACTORY_CLASS_KEY, HopsFileBasedKeyStoresFactory.class.getCanonicalName());
}
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
sslFactory.init();
Expand Down Expand Up @@ -1135,6 +1140,18 @@ private DelegationTokenAuthenticatedURL.Token generateDelegationToken(
return token;
}

private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
// Add existing credentials from the UGI, since provider is cached.
Credentials creds = ugi.getCredentials();
if (!creds.getAllTokens().isEmpty()) {
LOG.debug("Searching for KMS delegation token in user {}'s credentials",
ugi);
return clientTokenProvider.selectDelegationToken(creds) != null;
}

return false;
}

@VisibleForTesting
UserGroupInformation getActualUgi() throws IOException {
final UserGroupInformation currentUgi = UserGroupInformation
Expand All @@ -1148,6 +1165,15 @@ UserGroupInformation getActualUgi() throws IOException {
// Use real user for proxy user
actualUgi = currentUgi.getRealUser();
}
if (UserGroupInformation.isSecurityEnabled() &&
!containsKmsDt(actualUgi) && !actualUgi.shouldRelogin()) {
// Use login user is only necessary when Kerberos is enabled
// but the actual user does not have either
// Kerberos credential or KMS delegation token for KMS operations
LOG.debug("Using loginUser when Kerberos is enabled but the actual user" +
" does not have either KMS Delegation Token or Kerberos Credentials");
actualUgi = UserGroupInformation.getLoginUser();
}
return actualUgi;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/apache/hadoop/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ static int cacheSize() {
* @param user to perform the get as
* @return the filesystem instance
* @throws IOException failure to load
* @throws InterruptedException If the {@code UGI.doAs()} call was
* @throws InterruptedException If the {@code UGI.callAs()} call was
* somehow interrupted.
*/
public static FileSystem get(final URI uri, final Configuration conf,
Expand Down Expand Up @@ -580,7 +580,7 @@ public static FileSystem get(URI uri, Configuration conf) throws IOException {
* @param user to perform the get as
* @return filesystem instance
* @throws IOException if the FileSystem cannot be instantiated.
* @throws InterruptedException If the {@code UGI.doAs()} call was
* @throws InterruptedException If the {@code UGI.callAs()} call was
* somehow interrupted.
*/
public static FileSystem newInstance(final URI uri, final Configuration conf,
Expand Down
117 changes: 97 additions & 20 deletions src/main/java/org/apache/hadoop/security/UserGroupInformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.security;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT;
Expand All @@ -26,7 +25,6 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_TOKEN_FILES;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_TOKENS;
import static org.apache.hadoop.security.UGIExceptionMessages.*;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;

Expand Down Expand Up @@ -84,21 +82,26 @@
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.authentication.util.SubjectUtil;
import org.apache.hadoop.security.ssl.X509SecurityMaterial;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time;

import io.hops.security.GroupAlreadyExistsException;
import io.hops.security.SuperuserKeystoresLoader;
import io.hops.security.UserAlreadyExistsException;
import io.hops.security.UserAlreadyInGroupException;
import io.hops.security.UsersGroups;

import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.security.x509.X500Name;

/**
* User and group information for Hadoop.
Expand Down Expand Up @@ -214,6 +217,28 @@ public boolean commit() throws LoginException {
}
user = envUser == null ? null : new User(envUser);
}
if (conf.getBoolean(CommonConfigurationKeysPublic.IPC_SERVER_SSL_ENABLED,
CommonConfigurationKeysPublic.IPC_SERVER_SSL_ENABLED_DEFAULT)
&& user == null) {
user = getCanonicalUser(KEYSTORE_PRINCIPAL_CLASS);
if (user != null) {
String subject = user.getName();
LOG.debug("X500 subject is " + subject);
try {
X500Name name = new X500Name(user.getName());
String username = name.getLocality();
if (username == null) {
username = name.getCommonName();
}
if (username != null) {
LOG.debug("New local user with username " + username);
user = new User(username);
}
} catch (IOException ex) {
throw (LoginException)(new LoginException(ex.toString()).initCause(ex));
}
}
}
// use the OS user
if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS);
Expand Down Expand Up @@ -430,6 +455,7 @@ static Optional<ExecutorService> getKerberosLoginRenewalExecutor() {

private static String OS_LOGIN_MODULE_NAME;
private static Class<? extends Principal> OS_PRINCIPAL_CLASS;
private static Class<? extends Principal> KEYSTORE_PRINCIPAL_CLASS;

private static final boolean windows =
System.getProperty("os.name").startsWith("Windows");
Expand Down Expand Up @@ -464,9 +490,22 @@ private static Class<? extends Principal> getOsPrincipalClass() {
}
return null;
}

@SuppressWarnings("unchecked")
private static Class<? extends Principal> getKeystorePrincipalClass() {
ClassLoader cl = ClassLoader.getSystemClassLoader();
try {
return (Class<? extends Principal>) cl.loadClass("javax.security.auth.x500.X500Principal");
} catch (ClassNotFoundException e) {
LOG.error("Unable to find JAAS classes:" + e.getMessage());
}
return null;
}

static {
OS_LOGIN_MODULE_NAME = getOSLoginModuleName();
OS_PRINCIPAL_CLASS = getOsPrincipalClass();
KEYSTORE_PRINCIPAL_CLASS = getKeystorePrincipalClass();
}

/**
Expand Down Expand Up @@ -2324,16 +2363,26 @@ private static class HadoopConfiguration
LoginModuleControlFlag.REQUIRED,
BASIC_JAAS_OPTIONS);

static final AppConfigurationEntry OPTIONAL_OS_SPECIFIC_LOGIN =
new AppConfigurationEntry(
OS_LOGIN_MODULE_NAME,
LoginModuleControlFlag.OPTIONAL,
BASIC_JAAS_OPTIONS);

static final AppConfigurationEntry HADOOP_LOGIN =
new AppConfigurationEntry(
HadoopLoginModule.class.getName(),
LoginModuleControlFlag.REQUIRED,
BASIC_JAAS_OPTIONS);

private static final String FILE_URL = "file://%s";
private final SuperuserKeystoresLoader keystoresLoader;

private final LoginParams params;

HadoopConfiguration(LoginParams params) {
this.params = params;
keystoresLoader = new SuperuserKeystoresLoader(conf);
}

@Override
Expand All @@ -2358,10 +2407,36 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
}
entries.add(getKerberosEntry());
}

// In kubernetes we might be running with an arbitrary Container User ID.
// The UnixLoginModule will fail to authenticate the user because the UID
// does not necessarily exist.
// Instead we rely on the provisioned Keystores and the KeystoreLoginModule
// User's username will be the Locality field of certificate's Subject
// See HadoopLoginModule#commit()
String envVariableUsername = System.getenv(HADOOP_USER_NAME);
if (amIRunningInKubernetes() && !Strings.isNullOrEmpty(envVariableUsername)) {
try {
AppConfigurationEntry keystoreEntry = getKeystoreEntry(envVariableUsername);
entries.clear();
entries.add(OPTIONAL_OS_SPECIFIC_LOGIN);
entries.add(keystoreEntry);
} catch (IOException ex) {
LOG.error("Could not construct path to keystore required for the Keystore LoginModule", ex);
throw new RuntimeException(ex);
}
}

entries.add(HADOOP_LOGIN);

return entries.toArray(new AppConfigurationEntry[0]);
}

private boolean amIRunningInKubernetes() {
String kubernetesApiServer = System.getenv("KUBERNETES_SERVICE_HOST");
return !Strings.isNullOrEmpty(kubernetesApiServer);
}

private AppConfigurationEntry getKerberosEntry() {
final Map<String,String> options = new HashMap<>(BASIC_JAAS_OPTIONS);
LoginModuleControlFlag controlFlag = LoginModuleControlFlag.OPTIONAL;
Expand Down Expand Up @@ -2417,31 +2492,33 @@ private AppConfigurationEntry getKerberosEntry() {
KRB5_LOGIN_MODULE, controlFlag, options);
}

private AppConfigurationEntry getKeystoreEntry(String username) throws IOException {
X509SecurityMaterial material = keystoresLoader.loadSuperUserMaterial(username);

String keystoreAlias = getKeystoreAlias();
Map<String, String> params = new HashMap<>();
params.put("keyStoreAlias", keystoreAlias);
params.put("keyStoreURL", String.format(FILE_URL, material.getKeyStoreLocation().toAbsolutePath()));
params.put("keyStorePasswordURL", String.format(FILE_URL, material.getPasswdLocation().toAbsolutePath()));
params.put("privateKeyPasswordURL", String.format(FILE_URL, material.getPasswdLocation().toAbsolutePath()));

return new AppConfigurationEntry("com.sun.security.auth.module.KeyStoreLoginModule",
LoginModuleControlFlag.OPTIONAL, params);
}

private String getKeystoreAlias() {
String keystoreAlias = System.getenv("UGI_KEYSTORE_ALIAS");
return keystoreAlias != null ? keystoreAlias
: System.getProperty("ugi.keystoreloginmodule.alias", "own");
}

private static String prependFileAuthority(String keytabPath) {
return keytabPath.startsWith("file://")
? keytabPath
: "file://" + keytabPath;
}
}

public static UserGroupInformation createUserGroupInformationForSubject(Subject subject) {
requireNonNull(subject, "subject is null");
Set<KerberosPrincipal> kerberosPrincipals = subject.getPrincipals(KerberosPrincipal.class);
if (kerberosPrincipals.isEmpty()) {
throw new IllegalArgumentException("subject must contain a KerberosPrincipal");
}
if (kerberosPrincipals.size() != 1) {
throw new IllegalArgumentException("subject must contain only a single KerberosPrincipal");
}

KerberosPrincipal principal = kerberosPrincipals.iterator().next();
User user = new User(principal.getName(), KERBEROS, null);
subject.getPrincipals().add(user);
UserGroupInformation userGroupInformation = new UserGroupInformation(subject);
userGroupInformation.setAuthenticationMethod(KERBEROS);
return userGroupInformation;
}

/**
* A test method to print out the current user's UGI.
* @param args if there are two arguments, read the user from the keytab
Expand Down
Loading