From e918e31de68b8dac088198d009bf5aebb0ea0ed1 Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 1 Apr 2025 16:33:31 -0500 Subject: [PATCH 1/4] Add Custom Auth Provider with support for gRPC, plus tests and exception handling Signed-off-by: Siqi Ding --- .../CustomAuthenticationExceptionHandler.java | 98 +++++++++++++ .../CustomAuthenticationConfig.java | 23 +++ .../CustomAuthenticationProvider.java | 25 ++++ .../CustomGrpcAuthenticationProvider.java | 77 ++++++++++ ...catedCustomGrpcAuthenticationProvider.java | 38 +++++ ...tomAuthenticationExceptionHandlerTest.java | 126 ++++++++++++++++ .../CustomAuthenticationProviderTest.java | 26 ++++ ...CustomBasicAuthenticationProviderTest.java | 137 ++++++++++++++++++ ...catedCustomAuthenticationProviderTest.java | 89 ++++++++++++ data-prepper-plugins/http-source/README.md | 2 +- 10 files changed, 640 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java new file mode 100644 index 0000000000..5a7d41126b --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java @@ -0,0 +1,98 @@ +package org.opensearch.dataprepper; + +import com.google.protobuf.Any; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction; +import com.linecorp.armeria.server.RequestTimeoutException; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.micrometer.core.instrument.Counter; + +import org.opensearch.dataprepper.exceptions.BadRequestException; +import org.opensearch.dataprepper.exceptions.BufferWriteException; +import org.opensearch.dataprepper.exceptions.RequestCancelledException; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class CustomAuthenticationExceptionHandler implements GoogleGrpcExceptionHandlerFunction { + private static final Logger LOG = LoggerFactory.getLogger(CustomAuthenticationExceptionHandler.class); + private static final String TIMEOUT_MESSAGE = "Request timed out. Check buffer availability or processing delays."; + + public static final String REQUEST_TIMEOUTS = "customAuthRequestTimeouts"; + public static final String BAD_REQUESTS = "customAuthBadRequests"; + public static final String REQUESTS_TOO_LARGE = "customAuthRequestsTooLarge"; + public static final String INTERNAL_SERVER_ERROR = "customAuthInternalServerError"; + + private final Counter requestTimeoutsCounter; + private final Counter badRequestsCounter; + private final Counter requestsTooLargeCounter; + private final Counter internalServerErrorCounter; + private final GrpcRetryInfoCalculator retryInfoCalculator; + + public CustomAuthenticationExceptionHandler(final PluginMetrics pluginMetrics, + final Duration retryInfoMinDelay, + final Duration retryInfoMaxDelay) { + this.requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); + this.badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); + this.requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); + this.internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); + this.retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay); + } + + @Override + public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable, Metadata metadata) { + final Throwable actualCause = (throwable instanceof BufferWriteException) + ? throwable.getCause() : throwable; + return handleException(actualCause); + } + + private com.google.rpc.Status handleException(Throwable e) { + final String msg = e.getMessage(); + if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { + requestTimeoutsCounter.increment(); + return buildStatus(e, Status.Code.RESOURCE_EXHAUSTED); + } else if (e instanceof SizeOverflowException) { + requestsTooLargeCounter.increment(); + return buildStatus(e, Status.Code.RESOURCE_EXHAUSTED); + } else if (e instanceof BadRequestException) { + badRequestsCounter.increment(); + return buildStatus(e, Status.Code.INVALID_ARGUMENT); + } else if ((e instanceof StatusRuntimeException) && + (msg.contains("Invalid protobuf byte sequence") || msg.contains("Can't decode compressed frame"))) { + badRequestsCounter.increment(); + return buildStatus(e, Status.Code.INVALID_ARGUMENT); + } else if (e instanceof RequestCancelledException) { + requestTimeoutsCounter.increment(); + return buildStatus(e, Status.Code.CANCELLED); + } + + internalServerErrorCounter.increment(); + LOG.error("CustomAuth gRPC handler caught unexpected exception", e); + return buildStatus(e, Status.Code.INTERNAL); + } + + private com.google.rpc.Status buildStatus(Throwable e, Status.Code code) { + com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder() + .setCode(code.value()); + + if (e instanceof RequestTimeoutException) { + builder.setMessage(TIMEOUT_MESSAGE); + } else { + builder.setMessage(e.getMessage() != null ? e.getMessage() : code.name()); + } + + if (code == Status.Code.RESOURCE_EXHAUSTED) { + builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo())); + } + + return builder.build(); + } +} + diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java new file mode 100644 index 0000000000..120cffe060 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.armeria.authentication; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class CustomAuthenticationConfig { + private final String customToken; + + @JsonCreator + public CustomAuthenticationConfig( + @JsonProperty("custom_token") String customToken) { + this.customToken = customToken; + } + + public String customToken() { + return customToken; + } +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java new file mode 100644 index 0000000000..a8bbcf97de --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.armeria.authentication; + +import com.linecorp.armeria.server.HttpService; +import io.grpc.ServerInterceptor; + +import java.util.Optional; +import java.util.function.Function; + +public interface CustomAuthenticationProvider { + + String UNAUTHENTICATED_PLUGIN_NAME = "unauthenticated"; + + + ServerInterceptor getAuthenticationInterceptor(); + + default Optional> getHttpAuthenticationService() { + return Optional.empty(); + } +} + diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java new file mode 100644 index 0000000000..fb95ac00c7 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.HttpService; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationConfig; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; + +import java.util.Optional; +import java.util.function.Function; + +@DataPrepperPlugin( + name = "custom_auth", + pluginType = GrpcAuthenticationProvider.class, + pluginConfigurationType = CustomAuthenticationConfig.class +) +public class CustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { + private final String token; + private static final String AUTH_HEADER = "authentication"; + + + @DataPrepperPluginConstructor + public CustomGrpcAuthenticationProvider(final CustomAuthenticationConfig config) { + this.token = config.customToken(); + } + + @Override + public ServerInterceptor getAuthenticationInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + + String auth = headers.get(Metadata.Key.of("authentication", Metadata.ASCII_STRING_MARSHALLER)); + + if (auth == null || !auth.equals(token)) { + call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), new Metadata()); + return new ServerCall.Listener<>() {}; + } + + return next.startCall(call, headers); + } + }; + } + + @Override + public Optional> getHttpAuthenticationService() { + return Optional.of(delegate -> (ctx, req) -> { + final String auth = req.headers().get(AUTH_HEADER); + if (auth == null || !auth.equals(token)) { + return HttpResponse.of( + HttpStatus.UNAUTHORIZED, + MediaType.PLAIN_TEXT_UTF_8, + "Unauthorized: Invalid or missing token" + ); + } + return delegate.serve(ctx, req); + }); + } +} + + diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java new file mode 100644 index 0000000000..42c4ed4e3e --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins; + +import io.grpc.ServerInterceptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.Metadata; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; + + +/** + * Plugin that allows unauthenticated gRPC access. + */ +@DataPrepperPlugin( + name = GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, + pluginType = GrpcAuthenticationProvider.class +) +public class UnauthenticatedCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { + + @Override + public ServerInterceptor getAuthenticationInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + // No authentication is performed; allow the request to continue + return next.startCall(call, headers); + } + }; + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java new file mode 100644 index 0000000000..940f32744e --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java @@ -0,0 +1,126 @@ +package org.opensearch.dataprepper; + +import com.google.protobuf.Any; +import com.google.rpc.RetryInfo; +import com.linecorp.armeria.common.RequestContext; +import io.grpc.Metadata; +import io.grpc.Status; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.exceptions.BadRequestException; +import org.opensearch.dataprepper.exceptions.BufferWriteException; +import org.opensearch.dataprepper.exceptions.RequestCancelledException; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; + +import java.io.IOException; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CustomAuthenticationExceptionHandlerTest { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter requestTimeoutsCounter; + + @Mock + private Counter badRequestsCounter; + + @Mock + private Counter requestsTooLargeCounter; + + @Mock + private Counter internalServerErrorCounter; + + @Mock + private RequestContext requestContext; + + @Mock + private Metadata metadata; + + private CustomAuthenticationExceptionHandler handler; + + @BeforeEach + public void setUp() { + when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter); + when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter); + when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter); + when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter); + + handler = new CustomAuthenticationExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); + } + + @Test + public void testBadRequestExceptionHandling() { + final String message = UUID.randomUUID().toString(); + BadRequestException exception = new BadRequestException(message, new IOException()); + + com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); + + assertThat(status.getCode(), equalTo(Status.Code.INVALID_ARGUMENT.value())); + assertThat(status.getMessage(), equalTo(message)); + verify(badRequestsCounter).increment(); + } + + @Test + public void testTimeoutExceptionHandling() { + TimeoutException timeout = new TimeoutException(); + BufferWriteException bufferWriteException = new BufferWriteException("timeout", timeout); + + com.google.rpc.Status status = handler.applyStatusProto(requestContext, bufferWriteException, metadata); + + assertThat(status.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED.value())); + verify(requestTimeoutsCounter).increment(); + Optional retryInfo = status.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst(); + assertTrue(retryInfo.isPresent()); + } + + @Test + public void testSizeOverflowExceptionHandling() { + SizeOverflowException overflow = new SizeOverflowException("Overflow"); + BufferWriteException bufferWriteException = new BufferWriteException("overflow", overflow); + + com.google.rpc.Status status = handler.applyStatusProto(requestContext, bufferWriteException, metadata); + + assertThat(status.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED.value())); + verify(requestsTooLargeCounter).increment(); + } + + @Test + public void testCancelledRequestHandling() { + String message = UUID.randomUUID().toString(); + RequestCancelledException exception = new RequestCancelledException(message); + + com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); + + assertThat(status.getCode(), equalTo(Status.Code.CANCELLED.value())); + assertThat(status.getMessage(), equalTo(message)); + verify(requestTimeoutsCounter).increment(); + } + + @Test + public void testInternalExceptionHandling() { + String message = UUID.randomUUID().toString(); + RuntimeException exception = new RuntimeException(message); + + com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); + + assertThat(status.getCode(), equalTo(Status.Code.INTERNAL.value())); + assertThat(status.getMessage(), equalTo(message)); + verify(internalServerErrorCounter).increment(); + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java new file mode 100644 index 0000000000..c9801d23cf --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationProvider; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CustomAuthenticationProviderTest { + + @Mock + private CustomAuthenticationProvider customAuthenticationProvider; + + @Test + public void testCustomAuthenticationProvider() { + when(customAuthenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); + + var result = customAuthenticationProvider.getHttpAuthenticationService(); + Assertions.assertNotNull(result); + Assertions.assertFalse(result.isPresent()); + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java new file mode 100644 index 0000000000..52ad1f4740 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java @@ -0,0 +1,137 @@ +package org.opensearch.dataprepper.plugins; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.grpc.ServerInterceptors; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationConfig; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CustomBasicAuthenticationProviderTest { + private static final String TOKEN = UUID.randomUUID().toString(); + private static GrpcAuthenticationProvider grpcAuthenticationProvider; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + CustomAuthenticationConfig config = mock(CustomAuthenticationConfig.class); + when(config.customToken()).thenReturn(TOKEN); + + grpcAuthenticationProvider = new CustomGrpcAuthenticationProvider(config); + + GrpcServiceBuilder grpcServiceBuilder = GrpcService.builder() + .enableUnframedRequests(true) + .addService(ServerInterceptors.intercept( + new SampleHealthGrpcService(), + Collections.singletonList(grpcAuthenticationProvider.getAuthenticationInterceptor()))); + + sb.service(grpcServiceBuilder.build()); + } + }; + + private static class SampleHealthGrpcService extends HealthGrpc.HealthImplBase { + @Override + public void check(HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + @Nested + class ConstructorTests { + CustomAuthenticationConfig config; + + @BeforeEach + void setUp() { + config = mock(CustomAuthenticationConfig.class); + } + + @Test + void constructor_with_null_config_throws() { + assertThrows(NullPointerException.class, () -> new CustomGrpcAuthenticationProvider(null)); + } + } + + @Nested + class WithServer { + @Test + void request_without_token_responds_Unauthorized() { + WebClient client = WebClient.of(server.httpUri()); + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build()); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void request_with_invalid_token_responds_Unauthorized() { + WebClient client = WebClient.builder(server.httpUri()) + .addHeader("authentication", "invalid-token") + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build()); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void request_with_valid_token_responds_OK() { + WebClient client = WebClient.builder(server.httpUri()) + .addHeader("authentication", TOKEN) + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + } +} + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java new file mode 100644 index 0000000000..ea7f750526 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java @@ -0,0 +1,89 @@ +package org.opensearch.dataprepper.plugins; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.grpc.ServerInterceptors; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; + +import java.nio.charset.Charset; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class UnauthenticatedCustomAuthenticationProviderTest { + private static GrpcAuthenticationProvider grpcAuthenticationProvider; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(final ServerBuilder sb) { + grpcAuthenticationProvider = new UnauthenticatedGrpcAuthenticationProvider(); + + final GrpcServiceBuilder grpcServiceBuilder = GrpcService + .builder() + .enableUnframedRequests(true) + .addService(ServerInterceptors.intercept(new SampleHealthGrpcService(), grpcAuthenticationProvider.getAuthenticationInterceptor())); + sb.service(grpcServiceBuilder.build()); + } + }; + + private static class SampleHealthGrpcService extends HealthGrpc.HealthImplBase { + @Override + public void check(final HealthCheckRequest request, final StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + @Test + void httpRequest_without_authentication_responds_OK() { + final WebClient client = WebClient.of(server.httpUri()); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + + @Test + void httpRequest_with_random_authentication_responds_OK() { + final WebClient client = WebClient.builder(server.httpUri()) + .addHeader("authorization", UUID.randomUUID().toString()) + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } +} diff --git a/data-prepper-plugins/http-source/README.md b/data-prepper-plugins/http-source/README.md index 19d9a5a543..46141db03b 100644 --- a/data-prepper-plugins/http-source/README.md +++ b/data-prepper-plugins/http-source/README.md @@ -96,7 +96,7 @@ Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` Send a sample log with the following https curl command ``` -curl -k -XPOST -H "Content-Type: application/json" -d '[{"log": "sample log"}]' https://localhost:2021/log/ingest +curl -k -X POST -H "Content-Type: application/json" -d '[{"log": "sample log"}]' https://localhost:2021/log/ingest ``` # Metrics From be19db6e2c3a856d14d081d363c00f5977e3fa1d Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 8 Apr 2025 11:42:44 -0500 Subject: [PATCH 2/4] Making change on code based on comments. Refactor token avalidation logic, rename test classes, and move test into a new package 'testcustomauth' Signed-off-by: Siqi Ding --- .../CustomAuthenticationExceptionHandler.java | 98 -------------- .../CustomAuthenticationConfig.java | 23 ---- ...tomAuthenticationExceptionHandlerTest.java | 126 ------------------ .../CustomAuthenticationProviderTest.java | 26 ---- ...CustomBasicAuthenticationProviderTest.java | 19 +-- .../TestCustomAuthenticationProviderTest.java | 45 +++++++ .../TestCustomAuthenticationConfig.java | 30 +++++ .../TestCustomAuthenticationProvider.java} | 4 +- ...TestCustomGrpcAuthenticationProvider.java} | 33 +++-- 9 files changed, 109 insertions(+), 295 deletions(-) delete mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java delete mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java delete mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java delete mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java rename data-prepper-plugins/armeria-common/src/{main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java => test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java} (81%) rename data-prepper-plugins/armeria-common/src/{main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java => test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java} (67%) diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java deleted file mode 100644 index 5a7d41126b..0000000000 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandler.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.opensearch.dataprepper; - -import com.google.protobuf.Any; -import com.linecorp.armeria.common.RequestContext; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction; -import com.linecorp.armeria.server.RequestTimeoutException; -import io.grpc.Metadata; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.micrometer.core.instrument.Counter; - -import org.opensearch.dataprepper.exceptions.BadRequestException; -import org.opensearch.dataprepper.exceptions.BufferWriteException; -import org.opensearch.dataprepper.exceptions.RequestCancelledException; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.concurrent.TimeoutException; - -public class CustomAuthenticationExceptionHandler implements GoogleGrpcExceptionHandlerFunction { - private static final Logger LOG = LoggerFactory.getLogger(CustomAuthenticationExceptionHandler.class); - private static final String TIMEOUT_MESSAGE = "Request timed out. Check buffer availability or processing delays."; - - public static final String REQUEST_TIMEOUTS = "customAuthRequestTimeouts"; - public static final String BAD_REQUESTS = "customAuthBadRequests"; - public static final String REQUESTS_TOO_LARGE = "customAuthRequestsTooLarge"; - public static final String INTERNAL_SERVER_ERROR = "customAuthInternalServerError"; - - private final Counter requestTimeoutsCounter; - private final Counter badRequestsCounter; - private final Counter requestsTooLargeCounter; - private final Counter internalServerErrorCounter; - private final GrpcRetryInfoCalculator retryInfoCalculator; - - public CustomAuthenticationExceptionHandler(final PluginMetrics pluginMetrics, - final Duration retryInfoMinDelay, - final Duration retryInfoMaxDelay) { - this.requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); - this.badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); - this.requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); - this.internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); - this.retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay); - } - - @Override - public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable, Metadata metadata) { - final Throwable actualCause = (throwable instanceof BufferWriteException) - ? throwable.getCause() : throwable; - return handleException(actualCause); - } - - private com.google.rpc.Status handleException(Throwable e) { - final String msg = e.getMessage(); - if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { - requestTimeoutsCounter.increment(); - return buildStatus(e, Status.Code.RESOURCE_EXHAUSTED); - } else if (e instanceof SizeOverflowException) { - requestsTooLargeCounter.increment(); - return buildStatus(e, Status.Code.RESOURCE_EXHAUSTED); - } else if (e instanceof BadRequestException) { - badRequestsCounter.increment(); - return buildStatus(e, Status.Code.INVALID_ARGUMENT); - } else if ((e instanceof StatusRuntimeException) && - (msg.contains("Invalid protobuf byte sequence") || msg.contains("Can't decode compressed frame"))) { - badRequestsCounter.increment(); - return buildStatus(e, Status.Code.INVALID_ARGUMENT); - } else if (e instanceof RequestCancelledException) { - requestTimeoutsCounter.increment(); - return buildStatus(e, Status.Code.CANCELLED); - } - - internalServerErrorCounter.increment(); - LOG.error("CustomAuth gRPC handler caught unexpected exception", e); - return buildStatus(e, Status.Code.INTERNAL); - } - - private com.google.rpc.Status buildStatus(Throwable e, Status.Code code) { - com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder() - .setCode(code.value()); - - if (e instanceof RequestTimeoutException) { - builder.setMessage(TIMEOUT_MESSAGE); - } else { - builder.setMessage(e.getMessage() != null ? e.getMessage() : code.name()); - } - - if (code == Status.Code.RESOURCE_EXHAUSTED) { - builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo())); - } - - return builder.build(); - } -} - diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java deleted file mode 100644 index 120cffe060..0000000000 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.armeria.authentication; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -public class CustomAuthenticationConfig { - private final String customToken; - - @JsonCreator - public CustomAuthenticationConfig( - @JsonProperty("custom_token") String customToken) { - this.customToken = customToken; - } - - public String customToken() { - return customToken; - } -} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java deleted file mode 100644 index 940f32744e..0000000000 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/CustomAuthenticationExceptionHandlerTest.java +++ /dev/null @@ -1,126 +0,0 @@ -package org.opensearch.dataprepper; - -import com.google.protobuf.Any; -import com.google.rpc.RetryInfo; -import com.linecorp.armeria.common.RequestContext; -import io.grpc.Metadata; -import io.grpc.Status; -import io.micrometer.core.instrument.Counter; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.exceptions.BadRequestException; -import org.opensearch.dataprepper.exceptions.BufferWriteException; -import org.opensearch.dataprepper.exceptions.RequestCancelledException; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; - -import java.io.IOException; -import java.time.Duration; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeoutException; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class CustomAuthenticationExceptionHandlerTest { - @Mock - private PluginMetrics pluginMetrics; - - @Mock - private Counter requestTimeoutsCounter; - - @Mock - private Counter badRequestsCounter; - - @Mock - private Counter requestsTooLargeCounter; - - @Mock - private Counter internalServerErrorCounter; - - @Mock - private RequestContext requestContext; - - @Mock - private Metadata metadata; - - private CustomAuthenticationExceptionHandler handler; - - @BeforeEach - public void setUp() { - when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter); - when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter); - when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter); - when(pluginMetrics.counter(CustomAuthenticationExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter); - - handler = new CustomAuthenticationExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); - } - - @Test - public void testBadRequestExceptionHandling() { - final String message = UUID.randomUUID().toString(); - BadRequestException exception = new BadRequestException(message, new IOException()); - - com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); - - assertThat(status.getCode(), equalTo(Status.Code.INVALID_ARGUMENT.value())); - assertThat(status.getMessage(), equalTo(message)); - verify(badRequestsCounter).increment(); - } - - @Test - public void testTimeoutExceptionHandling() { - TimeoutException timeout = new TimeoutException(); - BufferWriteException bufferWriteException = new BufferWriteException("timeout", timeout); - - com.google.rpc.Status status = handler.applyStatusProto(requestContext, bufferWriteException, metadata); - - assertThat(status.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED.value())); - verify(requestTimeoutsCounter).increment(); - Optional retryInfo = status.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst(); - assertTrue(retryInfo.isPresent()); - } - - @Test - public void testSizeOverflowExceptionHandling() { - SizeOverflowException overflow = new SizeOverflowException("Overflow"); - BufferWriteException bufferWriteException = new BufferWriteException("overflow", overflow); - - com.google.rpc.Status status = handler.applyStatusProto(requestContext, bufferWriteException, metadata); - - assertThat(status.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED.value())); - verify(requestsTooLargeCounter).increment(); - } - - @Test - public void testCancelledRequestHandling() { - String message = UUID.randomUUID().toString(); - RequestCancelledException exception = new RequestCancelledException(message); - - com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); - - assertThat(status.getCode(), equalTo(Status.Code.CANCELLED.value())); - assertThat(status.getMessage(), equalTo(message)); - verify(requestTimeoutsCounter).increment(); - } - - @Test - public void testInternalExceptionHandling() { - String message = UUID.randomUUID().toString(); - RuntimeException exception = new RuntimeException(message); - - com.google.rpc.Status status = handler.applyStatusProto(requestContext, exception, metadata); - - assertThat(status.getCode(), equalTo(Status.Code.INTERNAL.value())); - assertThat(status.getMessage(), equalTo(message)); - verify(internalServerErrorCounter).increment(); - } -} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java deleted file mode 100644 index c9801d23cf..0000000000 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomAuthenticationProviderTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.opensearch.dataprepper.plugins; - -import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationProvider; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class CustomAuthenticationProviderTest { - - @Mock - private CustomAuthenticationProvider customAuthenticationProvider; - - @Test - public void testCustomAuthenticationProvider() { - when(customAuthenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); - - var result = customAuthenticationProvider.getHttpAuthenticationService(); - Assertions.assertNotNull(result); - Assertions.assertFalse(result.isPresent()); - } -} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java index 52ad1f4740..7e45f720f3 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java @@ -21,8 +21,9 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationConfig; +import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomAuthenticationConfig; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomGrpcAuthenticationProvider; import java.nio.charset.Charset; import java.util.Collections; @@ -36,16 +37,18 @@ public class CustomBasicAuthenticationProviderTest { private static final String TOKEN = UUID.randomUUID().toString(); + private static final String HEADER_NAME = "x-" + UUID.randomUUID(); private static GrpcAuthenticationProvider grpcAuthenticationProvider; @RegisterExtension static ServerExtension server = new ServerExtension() { @Override protected void configure(ServerBuilder sb) { - CustomAuthenticationConfig config = mock(CustomAuthenticationConfig.class); + TestCustomAuthenticationConfig config = mock(TestCustomAuthenticationConfig.class); when(config.customToken()).thenReturn(TOKEN); + when(config.header()).thenReturn(HEADER_NAME); - grpcAuthenticationProvider = new CustomGrpcAuthenticationProvider(config); + grpcAuthenticationProvider = new TestCustomGrpcAuthenticationProvider(config); GrpcServiceBuilder grpcServiceBuilder = GrpcService.builder() .enableUnframedRequests(true) @@ -68,16 +71,16 @@ public void check(HealthCheckRequest request, StreamObserver new CustomGrpcAuthenticationProvider(null)); + assertThrows(NullPointerException.class, () -> new TestCustomGrpcAuthenticationProvider(null)); } } @@ -100,7 +103,7 @@ void request_without_token_responds_Unauthorized() { @Test void request_with_invalid_token_responds_Unauthorized() { WebClient client = WebClient.builder(server.httpUri()) - .addHeader("authentication", "invalid-token") + .addHeader(HEADER_NAME, "invalid-token") .build(); HttpRequest request = HttpRequest.of(RequestHeaders.builder() @@ -117,7 +120,7 @@ void request_with_invalid_token_responds_Unauthorized() { @Test void request_with_valid_token_responds_OK() { WebClient client = WebClient.builder(server.httpUri()) - .addHeader("authentication", TOKEN) + .addHeader(HEADER_NAME, TOKEN) .build(); HttpRequest request = HttpRequest.of(RequestHeaders.builder() diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java new file mode 100644 index 0000000000..b5c813d0cb --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java @@ -0,0 +1,45 @@ +package org.opensearch.dataprepper.plugins; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomAuthenticationConfig; +import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomGrpcAuthenticationProvider; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestCustomAuthenticationProviderTest { + + private static final String TOKEN = "test-token"; + private static final String HEADER = "authentication"; + + @Mock + private TestCustomAuthenticationConfig config; + + private TestCustomGrpcAuthenticationProvider provider; + + @BeforeEach + void setUp() { + when(config.customToken()).thenReturn(TOKEN); + when(config.header()).thenReturn(HEADER); + + provider = new TestCustomGrpcAuthenticationProvider(config); + } + + @Test + void testGetHttpAuthenticationService_shouldReturnValidOptional() { + var optionalService = provider.getHttpAuthenticationService(); + Assertions.assertTrue(optionalService.isPresent()); + } + + @Test + void testGetAuthenticationInterceptor_shouldReturnNonNull() { + var interceptor = provider.getAuthenticationInterceptor(); + Assertions.assertNotNull(interceptor); + } +} + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java new file mode 100644 index 0000000000..8da071b996 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestCustomAuthenticationConfig { + private final String customToken; + private final String header; + + @JsonCreator + public TestCustomAuthenticationConfig( + @JsonProperty("custom_token") String customToken, + @JsonProperty("header") String header) { + this.customToken = customToken; + this.header = header != null ? header : "authentication"; + } + + public String customToken() { + return customToken; + } + + public String header() { + return header; + } +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java similarity index 81% rename from data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java index a8bbcf97de..5e4acda4c4 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/CustomAuthenticationProvider.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.armeria.authentication; +package org.opensearch.dataprepper.plugins.testcustomauth; import com.linecorp.armeria.server.HttpService; import io.grpc.ServerInterceptor; @@ -11,7 +11,7 @@ import java.util.Optional; import java.util.function.Function; -public interface CustomAuthenticationProvider { +public interface TestCustomAuthenticationProvider { String UNAUTHENTICATED_PLUGIN_NAME = "unauthenticated"; diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java similarity index 67% rename from data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java index fb95ac00c7..c35c013471 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/CustomGrpcAuthenticationProvider.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.testcustomauth; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; @@ -14,7 +14,6 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; -import org.opensearch.dataprepper.armeria.authentication.CustomAuthenticationConfig; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -23,18 +22,18 @@ import java.util.function.Function; @DataPrepperPlugin( - name = "custom_auth", + name = "test_custom_auth", pluginType = GrpcAuthenticationProvider.class, - pluginConfigurationType = CustomAuthenticationConfig.class + pluginConfigurationType = TestCustomAuthenticationConfig.class ) -public class CustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { +public class TestCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { private final String token; - private static final String AUTH_HEADER = "authentication"; - + private final String header; @DataPrepperPluginConstructor - public CustomGrpcAuthenticationProvider(final CustomAuthenticationConfig config) { + public TestCustomGrpcAuthenticationProvider(final TestCustomAuthenticationConfig config) { this.token = config.customToken(); + this.header = config.header(); } @Override @@ -46,9 +45,9 @@ public ServerCall.Listener interceptCall( Metadata headers, ServerCallHandler next) { - String auth = headers.get(Metadata.Key.of("authentication", Metadata.ASCII_STRING_MARSHALLER)); + String auth = headers.get(Metadata.Key.of(header, Metadata.ASCII_STRING_MARSHALLER)); - if (auth == null || !auth.equals(token)) { + if (!isValid(auth)) { call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), new Metadata()); return new ServerCall.Listener<>() {}; } @@ -61,8 +60,8 @@ public ServerCall.Listener interceptCall( @Override public Optional> getHttpAuthenticationService() { return Optional.of(delegate -> (ctx, req) -> { - final String auth = req.headers().get(AUTH_HEADER); - if (auth == null || !auth.equals(token)) { + final String auth = req.headers().get(header); + if (!isValid(auth)) { return HttpResponse.of( HttpStatus.UNAUTHORIZED, MediaType.PLAIN_TEXT_UTF_8, @@ -72,6 +71,16 @@ public ServerCall.Listener interceptCall( return delegate.serve(ctx, req); }); } + + /** + * Checks if the provided authentication token is valid. + * + * @param authHeader the value of the authentication header + * @return true if valid, false otherwise + */ + private boolean isValid(final String authHeader) { + return authHeader != null && authHeader.equals(token); + } } From 741d81af30c6ebd9340b96eb7914d671d602461a Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Wed, 9 Apr 2025 10:22:07 -0500 Subject: [PATCH 3/4] Move UnauthenticatedCustomGrpcAuthenticationProvider and test to src/test for test-only use Signed-off-by: Siqi Ding --- .../TestCustomAuthenticationProviderTest.java | 4 +--- .../TestCustomBasicAuthenticationProviderTest.java} | 6 ++---- .../UnauthenticatedCustomAuthenticationProviderTest.java | 3 ++- .../UnauthenticatedCustomGrpcAuthenticationProvider.java | 2 +- 4 files changed, 6 insertions(+), 9 deletions(-) rename data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/{ => testcustomauth}/TestCustomAuthenticationProviderTest.java (84%) rename data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/{CustomBasicAuthenticationProviderTest.java => testcustomauth/TestCustomBasicAuthenticationProviderTest.java} (95%) rename data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/{ => testcustomauth}/UnauthenticatedCustomAuthenticationProviderTest.java (96%) rename data-prepper-plugins/armeria-common/src/{main/java/org/opensearch/dataprepper/plugins => test/java/org/opensearch/dataprepper/plugins/testcustomauth}/UnauthenticatedCustomGrpcAuthenticationProvider.java (95%) diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java similarity index 84% rename from data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java index b5c813d0cb..4ab88dbe06 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/TestCustomAuthenticationProviderTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.testcustomauth; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -6,8 +6,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomAuthenticationConfig; -import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomGrpcAuthenticationProvider; import static org.mockito.Mockito.when; diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java similarity index 95% rename from data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java index 7e45f720f3..ea431971f5 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/CustomBasicAuthenticationProviderTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.testcustomauth; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; @@ -21,9 +21,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomAuthenticationConfig; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; -import org.opensearch.dataprepper.plugins.testcustomauth.TestCustomGrpcAuthenticationProvider; import java.nio.charset.Charset; import java.util.Collections; @@ -35,7 +33,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class CustomBasicAuthenticationProviderTest { +public class TestCustomBasicAuthenticationProviderTest { private static final String TOKEN = UUID.randomUUID().toString(); private static final String HEADER_NAME = "x-" + UUID.randomUUID(); private static GrpcAuthenticationProvider grpcAuthenticationProvider; diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java similarity index 96% rename from data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java index ea7f750526..d574c125d8 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomAuthenticationProviderTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.testcustomauth; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.plugins.UnauthenticatedGrpcAuthenticationProvider; import java.nio.charset.Charset; import java.util.UUID; diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java similarity index 95% rename from data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java index 42c4ed4e3e..2cc9e908fc 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/plugins/UnauthenticatedCustomGrpcAuthenticationProvider.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.testcustomauth; import io.grpc.ServerInterceptor; import io.grpc.ServerCall; From 57232dd2b35275804890f6ae41e7a5367d3d05fc Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Wed, 9 Apr 2025 10:22:54 -0500 Subject: [PATCH 4/4] Move UnauthenticatedCustomGrpcAuthenticationProvider and test to src/test for test-only use Signed-off-by: Siqi Ding --- ...=> TestUnauthenticatedCustomAuthenticationProviderTest.java} | 2 +- ...=> TestUnauthenticatedCustomGrpcAuthenticationProvider.java} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/{UnauthenticatedCustomAuthenticationProviderTest.java => TestUnauthenticatedCustomAuthenticationProviderTest.java} (98%) rename data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/{UnauthenticatedCustomGrpcAuthenticationProvider.java => TestUnauthenticatedCustomGrpcAuthenticationProvider.java} (91%) diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java similarity index 98% rename from data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java index d574c125d8..ee69d6255f 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomAuthenticationProviderTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java @@ -28,7 +28,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -class UnauthenticatedCustomAuthenticationProviderTest { +class TestUnauthenticatedCustomAuthenticationProviderTest { private static GrpcAuthenticationProvider grpcAuthenticationProvider; @RegisterExtension diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java similarity index 91% rename from data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java rename to data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java index 2cc9e908fc..42a09cdb34 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/UnauthenticatedCustomGrpcAuthenticationProvider.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java @@ -20,7 +20,7 @@ name = GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, pluginType = GrpcAuthenticationProvider.class ) -public class UnauthenticatedCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { +public class TestUnauthenticatedCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { @Override public ServerInterceptor getAuthenticationInterceptor() {