Skip to content

Introduce authenticate in Fluss. #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2025

Conversation

loserwang1024
Copy link
Contributor

@loserwang1024 loserwang1024 commented Mar 11, 2025

Purpose

Linked issue: close #484 . This Pr is depended on #531 .

Brief change log

Introduce authenticate in Fluss.

Tests

com.alibaba.fluss.rpc.netty.authenticate.AuthenticationTest

API and Format

Config

  • server.authenticate.protocol.map: A map defining the authentication protocol for each listener. The format is 'listenerName1:protocol1,listenerName2:protocol2', e.g., 'INTERNAL:PLAINTEXT,CLIENT:GSSAPI'. Each listener can be associated with a specific authentication protocol. Listeners not included in the map will use PLAINTEXT by default, which does not require authentication.
  • client.authenticate.protocol:The authentication protocol used to authenticate the client.

The config of one protocol:

  • client.authenticate.${protocol}.xxx
  • server.authenticate.${protocol}.xxx

Interface

AuthenticationPlugin for client and server:

/**
 * The AuthenticationPlugin interface defines a contract for authentication mechanisms in the Fluss
 * RPC system. Implementations of this interface provide specific authentication protocols (e.g.,
 * PLAINTEXT, AK-SK) and must be registered via the Service Provider Interface (SPI) mechanism to be
 * discoverable by the system.
 *
 * <p>Authentication plugins are used by {@link AuthenticatorLoader} to create client and server
 * authenticators based on configuration settings such as {@link
 * com.alibaba.fluss.config.ConfigOptions#CLIENT_AUTHENTICATE_PROTOCOL} and {@link
 * com.alibaba.fluss.config.ConfigOptions#SERVER_AUTHENTICATE_PROTOCOL_MAP}.
 *
 * <p>To implement this interface:
 *
 * <ol>
 *   <li>Define the supported authentication protocol via {@link #authProtocol()}.
 *   <li>Implement the required logic for client/server authentication (via subclasses like {@link
 *       ClientAuthenticationPlugin}).
 *   <li>Register the plugin implementation in {@code
 *       META-INF/services/com.alibaba.fluss.rpc.authenticate.AuthenticationPlugin}.
 * </ol>
 *
 * @since 0.1
 */
public interface AuthenticationPlugin extends Plugin {
    /**
     * Returns the authentication protocol identifier for this plugin (e.g., "PLAINTEXT", "AK-SK").
     * This identifier is used by the system to match plugins with configuration settings and select
     * the appropriate authentication mechanism.
     *
     * <p>It is typically configured via:
     *
     * <ul>
     *   <li>{@link com.alibaba.fluss.config.ConfigOptions#CLIENT_AUTHENTICATE_PROTOCOL} for
     *       client-side authentication.
     *   <li>{@link com.alibaba.fluss.config.ConfigOptions#SERVER_AUTHENTICATE_PROTOCOL_MAP} for
     *       server listener-specific authentication.
     * </ul>
     *
     * @return The protocol name (e.g., "PLAINTEXT").
     */
    String authProtocol();
}


public interface ClientAuthenticationPlugin extends AuthenticationPlugin {

    ClientAuthenticator createClientAuthenticator(Configuration configuration);
}

public interface ServerAuthenticationPlugin extends AuthenticationPlugin {

    ServerAuthenticator createServerAuthenticator(Configuration configuration);
}

Authenticator for client and server:

public interface ServerAuthenticator {

    String protocol();

    byte[] evaluateResponse(byte[] token) throws AuthenticationException;

    /**
     * Create principal from authenticated token for later authorization.(this can only invoke if is
     * complete).
     */
    FlussPrincipal createPrincipal();

    boolean isComplete();
}
public interface ClientAuthenticator {

    String protocol();

    byte[] authenticate(byte[] data);

    boolean isComplete();
}

Principal to identify user:

/**
 * Represents a security principal in Fluss, defined by a {@code name} and {@code type}.
 *
 * <p>The principal type indicates the category of the principal (e.g., "User", "Group", "Role"),
 * while the name identifies the specific entity within that category. By default, the simple
 * authorizer uses "User" as the principal type, but custom authorizers can extend this to support
 * role-based or group-based access control lists (ACLs).
 *
 * <p>Example usage:
 *
 * <ul>
 *   <li>{@code new FlussPrincipal("admin", "User")} – A standard user principal.
 *   <li>{@code new FlussPrincipal("admins", "Group")} – A group-based principal for authorization.
 * </ul>
 *
 * @since 0.1
 */
public class FlussPrincipal implements Principal {
    public static final FlussPrincipal ANONYMOUS = new FlussPrincipal("ANONYMOUS", "User");

    private final String name;
    private final String type;

    public FlussPrincipal(String name, String type) {
        this.name = name;
        this.type = type;
    }

    @Override
    public String getName() {
        return name;
    }

    public String getType() {
        return type;
    }

    @Override
    public boolean equals(Object o) {
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        FlussPrincipal that = (FlussPrincipal) o;
        return Objects.equals(name, that.name) && Objects.equals(type, that.type);
    }

    @Override
    public int hashCode() {
        return Objects.hash(name, type);
    }
}

Documentation

@loserwang1024 loserwang1024 force-pushed the fluss-authenticate branch 3 times, most recently from 1d3d21f to b8dfc70 Compare March 25, 2025 11:50
@loserwang1024 loserwang1024 changed the title [Draft] Introduce authenticate in Fluss. Introduce authenticate in Fluss. Mar 26, 2025
@loserwang1024 loserwang1024 force-pushed the fluss-authenticate branch 2 times, most recently from 2b2191b to ecbd770 Compare March 26, 2025 05:47
@loserwang1024
Copy link
Contributor Author

@wuchong , I have rebased this pr into main, would you like to help review it?

@wuchong
Copy link
Member

wuchong commented Mar 27, 2025

Flink119CatalogITCase>FlinkCatalogITCase.testAuthentication:449 is failed.

@wuchong
Copy link
Member

wuchong commented Apr 1, 2025

I'm going to review this PR, could you rebase to the main branch?

@loserwang1024
Copy link
Contributor Author

could you rebase to the main branch

@wuchong , Done it

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I appended a commit to address the following minor comemtns.

byte[] token = authenticateRequest.getToken();
byte[] challenge;
if (!authenticator.isCompleted()
&& (challenge = authenticator.evaluateResponse(token)) != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should evaluateResponse first then check authenticator.isCompleted()?

* @since 0.7
*/
public class AuthenticationFactory {
private static final String SERVER_AUTHENTICATOR_PREFIX = "security.";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

*/
@SuppressWarnings("unchecked")
private static <T extends AuthenticationPlugin> T discoverPlugin(
Configuration configuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

final Iterator<AuthenticationPlugin> foundPlugins = pluginIteratorsSupplier.get();
while (foundPlugins.hasNext()) {
AuthenticationPlugin plugin = foundPlugins.next();
if (plugin.authProtocol().equals(protocol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may use uppercase when developing the auth plugin(such as: RAM, SSL, SASL), but we can loose this to equalsIgnoreCase to allow users use lowercase in client side:

client.security.protocol: ram
client.security.ram.ak: xxxx

instead of having to use

client.security.protocol: RAM
client.security.RAM.ak: xxxx

@wuchong
Copy link
Member

wuchong commented Apr 17, 2025

@loserwang1024 please check the appended commit again.

@wuchong wuchong merged commit 4784b91 into apache:main Apr 18, 2025
5 of 7 checks passed
polyzos pushed a commit to polyzos/fluss that referenced this pull request Apr 27, 2025
@loserwang1024 loserwang1024 deleted the fluss-authenticate branch June 17, 2025 03:43
ZmmBigdata pushed a commit to ZmmBigdata/fluss that referenced this pull request Jun 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fluss support authentication
2 participants