From 211b8c2f4f1309ace9ec9ad05c36633c6317ba03 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 7 Mar 2025 21:31:20 +0100 Subject: [PATCH] Flatten stacks in ServerTransportFilter via promise pattern Follow up to previous stack-flattening in authz, keeping things a little simpler here. --- ...ossClusterAccessServerTransportFilter.java | 17 +++--- .../transport/ServerTransportFilter.java | 54 ++++++++++++------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java index e3cd1d2f123d6..3ccdb896b4208 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -71,27 +72,25 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte } @Override - protected void authenticate( - final String securityAction, - final TransportRequest request, - final ActionListener authenticationListener - ) { + protected ListenableFuture authenticate(final String securityAction, final TransportRequest request) { + final ListenableFuture listener = new ListenableFuture<>(); if (false == Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.check(licenseState)) { onFailureWithDebugLog( securityAction, request, - authenticationListener, + listener, LicenseUtils.newComplianceException(Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.getName()) ); } else { try { validateHeaders(); } catch (Exception ex) { - onFailureWithDebugLog(securityAction, request, authenticationListener, ex); - return; + onFailureWithDebugLog(securityAction, request, listener, ex); + return listener; } - crossClusterAccessAuthcService.authenticate(securityAction, request, authenticationListener); + crossClusterAccessAuthcService.authenticate(securityAction, request, listener); } + return listener; } private void validateHeaders() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 9eac5512520b2..bab0a48a830be 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.open.OpenIndexAction; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.transport.TaskTransportChannel; import org.elasticsearch.transport.TcpChannel; @@ -102,29 +103,44 @@ requests from all the nodes are attached with a user (either a serialize } TransportVersion version = transportChannel.getVersion(); - authenticate(securityAction, request, listener.delegateFailureAndWrap((l, authentication) -> { - if (authentication != null) { - if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) - && SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) { - securityContext.executeAsSystemUser(version, original -> { - final Authentication replaced = securityContext.getAuthentication(); - authzService.authorize(replaced, securityAction, request, l); - }); - } else { - authzService.authorize(authentication, securityAction, request, l); - } + var authFuture = authenticate(securityAction, request); + if (authFuture.isSuccess()) { + handleAuthentication(request, listener, authFuture.result(), securityAction, version); + } else { + authFuture.addListener( + listener.delegateFailureAndWrap( + (l, authentication) -> handleAuthentication(request, l, authentication, securityAction, version) + ) + ); + } + } + + private void handleAuthentication( + TransportRequest request, + ActionListener listener, + Authentication authentication, + String securityAction, + TransportVersion version + ) { + if (authentication != null) { + if (securityAction.equals(TransportService.HANDSHAKE_ACTION_NAME) + && SystemUser.is(authentication.getEffectiveSubject().getUser()) == false) { + securityContext.executeAsSystemUser(version, original -> { + final Authentication replaced = securityContext.getAuthentication(); + authzService.authorize(replaced, securityAction, request, listener); + }); } else { - l.onFailure(new IllegalStateException("no authentication present but auth is allowed")); + authzService.authorize(authentication, securityAction, request, listener); } - })); + } else { + listener.onFailure(new IllegalStateException("no authentication present but auth is allowed")); + } } - protected void authenticate( - final String securityAction, - final TransportRequest request, - final ActionListener authenticationListener - ) { - authcService.authenticate(securityAction, request, true, authenticationListener); + protected ListenableFuture authenticate(final String securityAction, final TransportRequest request) { + final ListenableFuture listener = new ListenableFuture<>(); + authcService.authenticate(securityAction, request, true, listener); + return listener; } protected final ThreadContext getThreadContext() {