Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,8 @@ public class ConfigOptions {
.stringType()
.defaultValue("PLAIN")
.withDescription(
"SASL mechanism to use for authentication.Currently, we only support plain.");
"SASL mechanism to use for authentication. "
+ "Currently, we only support PLAIN and GSSAPI (Kerberos).");

public static final ConfigOption<String> CLIENT_SASL_JAAS_CONFIG =
key("client.security.sasl.jaas.config")
Expand Down Expand Up @@ -1233,6 +1234,14 @@ public class ConfigOptions {
+ "This is used when the client connects to the Fluss cluster with SASL authentication enabled. "
+ "If not provided, the password will be read from the JAAS configuration string specified by `client.security.sasl.jaas.config`.");

public static final ConfigOption<String> CLIENT_KERBEROS_SERVICE_NAME =
key("client.security.kerberos.service.name")
.stringType()
.defaultValue("fluss")
.withDescription(
"The Kerberos principal name that the server runs as. This can be defined either in "
+ "Fluss's JAAS config or in Fluss's config.");

// ------------------------------------------------------------------------
// ConfigOptions for Fluss Table
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.fluss.security.auth.sasl.plain.PlainSaslServer;

import javax.annotation.Nullable;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslClient;

import java.security.PrivilegedExceptionAction;
import java.util.Map;

import static org.apache.fluss.config.ConfigOptions.CLIENT_KERBEROS_SERVICE_NAME;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME;
Expand All @@ -43,12 +46,14 @@ public class SaslClientAuthenticator implements ClientAuthenticator {
private final String mechanism;
private final Map<String, String> pros;
private final String jaasConfig;
private final String serviceName;

private SaslClient saslClient;
private LoginManager loginManager;

public SaslClientAuthenticator(Configuration configuration) {
this.mechanism = configuration.get(CLIENT_SASL_MECHANISM).toUpperCase();
this.serviceName = configuration.get(CLIENT_KERBEROS_SERVICE_NAME);
String jaasConfigStr = configuration.getString(CLIENT_SASL_JAAS_CONFIG);
if (jaasConfigStr == null && mechanism.equals(PlainSaslServer.PLAIN_MECHANISM)) {
String username = configuration.get(CLIENT_SASL_JAAS_USERNAME);
Expand Down Expand Up @@ -77,7 +82,9 @@ public String protocol() {
@Override
public byte[] authenticate(byte[] data) throws AuthenticationException {
try {
return saslClient.evaluateChallenge(data);
return Subject.doAs(
loginManager.subject(),
(PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(data));
} catch (Exception e) {
throw new AuthenticationException("Failed to evaluate SASL challenge", e);
}
Expand Down Expand Up @@ -105,7 +112,7 @@ public void initialize(AuthenticateContext context) throws AuthenticationExcepti
}

try {
saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager);
saslClient = createSaslClient(mechanism, hostAddress, pros, loginManager, serviceName);
} catch (Exception e) {
throw new AuthenticationException("Failed to create SASL client", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.sasl.SaslException;
import javax.annotation.Nullable;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -46,6 +49,7 @@ public class SaslServerAuthenticator implements ServerAuthenticator {
private static final String SERVER_AUTHENTICATOR_PREFIX = "security.sasl.";
private final List<String> enabledMechanisms;
private SaslServer saslServer;
private LoginManager loginManager;
private final Map<String, String> configs;

public SaslServerAuthenticator(Configuration configuration) {
Expand Down Expand Up @@ -103,7 +107,7 @@ public void initialize(AuthenticateContext context) {
JaasContext jaasContext = JaasContext.loadServerContext(listenerName, dynamicJaasConfig);

try {
LoginManager loginManager = LoginManager.acquireLoginManager(jaasContext);
loginManager = LoginManager.acquireLoginManager(jaasContext);
saslServer =
createSaslServer(
mechanism,
Expand Down Expand Up @@ -131,13 +135,16 @@ public void matchProtocol(String protocol) {
}
}

@Nullable
@Override
public byte[] evaluateResponse(byte[] token) throws AuthenticationException {
try {
return saslServer.evaluateResponse(token);
} catch (SaslException e) {
return Subject.doAs(
loginManager.subject(),
(PrivilegedExceptionAction<byte[]>) () -> saslServer.evaluateResponse(token));
} catch (Exception e) {
throw new AuthenticationException(
String.format("Failed to evaluate SASL response,reason is %s", e.getMessage()));
String.format("Failed to evaluate SASL response: %s", e.getMessage()));
}
}

Expand All @@ -150,4 +157,11 @@ public boolean isCompleted() {
public FlussPrincipal createPrincipal() {
return new FlussPrincipal(saslServer.getAuthorizationID(), "User");
}

@Override
public void close() throws IOException {
if (loginManager != null) {
loginManager.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.security.auth.sasl.gssapi;

import org.apache.fluss.security.auth.sasl.jaas.AuthenticateCallbackHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.AuthorizeCallback;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* A server-side {@link javax.security.auth.callback.CallbackHandler} implementation for the
* SASL/GSSAPI (Kerberos) mechanism.
*
* <p>This handler does not perform credential validation (e.g., checking passwords or kerberos
* tickets) itself. That responsibility is handled by the underlying JAVA GSSAPI library and the
* JAAS configuration, which use the server's keytab to validate the clients' service tickets.
*
* <p>The primary role of this handler is to process the {@link AuthorizeCallback} , which is
* invoked after successful authentication to determine if the authenticated principal is permitted
* to act as the requested authorization identity.
*/
public class GssapiServerCallbackHandler implements AuthenticateCallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(GssapiServerCallbackHandler.class);

/**
* Configures this callback handler. For GSSAPI, this is often a no-op because the necessary
* principal and keytab information is already in the JAAS configuration entries and is used
* directly by the Krb5LoginModule.
*/
@Override
public void configure(String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
LOG.debug("Configuring GssapiServerCallbackHandler for mechanism: {}", saslMechanism);
}

/** Handles the callbacks provided by the SASL/GSSAPI mechanism. */
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
AuthorizeCallback ac = (AuthorizeCallback) callback;
String authenticationId = ac.getAuthenticationID();
String authorizationId = ac.getAuthorizationID();

if (authenticationId == null || authenticationId.isEmpty()) {
throw new IOException("Authentication ID cannot be null or empty");
}
if (authorizationId == null || authorizationId.isEmpty()) {
// if authorizationId is not specified, use authenticationId
authorizationId = authenticationId;
}

LOG.info(
"Authorizing client: authenticationID='{}', authorizationID='{}'",
authenticationId,
authorizationId);

if (isAuthorizedActAs(authenticationId, authorizationId)) {
ac.setAuthorized(true);
// set the authorized ID to the short name (without realm)
String shortName = getShortName(authenticationId);
ac.setAuthorizedID(shortName);
LOG.info("Successfully authorized client: {}", shortName);
} else {
ac.setAuthorized(false);
LOG.warn(
"Authorization failed. Authenticated user '{}' is not authorized to act as '{}'",
authenticationId,
authorizationId);
}
} else {
throw new UnsupportedCallbackException(
callback, "Unsupported callback type: " + callback.getClass().getName());
}
}
}

/** Checks if the authenticated user has the permission to act as the authorization user. */
private boolean isAuthorizedActAs(String authnId, String authzId) {
// Default policy: allow the authenticated user to act as themselves.
// 1. Exact match
if (Objects.equals(authnId, authzId)) {
return true;
}

// 2. Check if authnId is "user@REALM" and authzId is "user" or "user@REALM"
String shortAuthnId = getShortName(authnId);
String shortAuthzId = getShortName(authzId);

return Objects.equals(shortAuthnId, shortAuthzId);
}

private String getShortName(String principal) {
if (principal == null) {
return null;
}
int realmIndex = principal.indexOf('@');
if (realmIndex > 0) {
return principal.substring(0, realmIndex);
}
return principal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import javax.annotation.Nullable;
import javax.security.auth.Subject;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;

import java.util.Set;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
Expand Down Expand Up @@ -73,6 +76,23 @@ public Subject subject() {

@Override
public String serviceName() {
if (loginContext != null && loginContext.getSubject() != null) {
Set<KerberosPrincipal> principals =
loginContext.getSubject().getPrincipals(KerberosPrincipal.class);
if (!principals.isEmpty()) {
KerberosPrincipal principal = principals.iterator().next();
String name = principal.getName();
int slash = name.indexOf('/');
if (slash > 0) {
return name.substring(0, slash);
}
int at = name.indexOf('@');
if (at > 0) {
return name.substring(0, at);
}
return name;
}
}
return contextName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.security.auth.sasl.jaas;

import org.apache.fluss.security.auth.sasl.gssapi.GssapiServerCallbackHandler;
import org.apache.fluss.security.auth.sasl.plain.PlainServerCallbackHandler;

import org.slf4j.Logger;
Expand Down Expand Up @@ -56,6 +57,8 @@ public static SaslServer createSaslServer(
AuthenticateCallbackHandler callbackHandler;
if (mechanism.equals("PLAIN")) {
callbackHandler = new PlainServerCallbackHandler();
} else if (mechanism.equals("GSSAPI")) {
callbackHandler = new GssapiServerCallbackHandler();
} else {
throw new IllegalArgumentException("Unsupported mechanism: " + mechanism);
}
Expand All @@ -68,7 +71,7 @@ public static SaslServer createSaslServer(
() ->
Sasl.createSaslServer(
mechanism,
"fluss",
loginManager.serviceName(),
hostName,
props,
callbackHandler));
Expand All @@ -88,15 +91,18 @@ public static SaslServer createSaslServer(
}

public static SaslClient createSaslClient(
String mechanism, String hostAddress, Map<String, ?> props, LoginManager loginManager)
String mechanism,
String hostAddress,
Map<String, ?> props,
LoginManager loginManager,
String serviceName)
throws PrivilegedActionException {

return Subject.doAs(
loginManager.subject(),
(PrivilegedExceptionAction<SaslClient>)
() -> {
String[] mechs = {mechanism};
String serviceName = loginManager.serviceName();
LOG.debug(
"Creating SaslClient: service={};mechs={}",
serviceName,
Expand Down
Loading