Skip to content

Commit 8d85609

Browse files
authored
Otel logs source http service (#6250)
Introduce HTTP/protobuf and HTTP/JSON support for OTel Logs source. Adds endpoint to receive OTLP data over HTTP. Aligns with similar support for OTelTraceSource. * [WIP] Integrate http service and make sure it works properly Signed-off-by: Tomas Longo <tlongo@sternad.de> * Integrate grpc and http service into a single server Signed-off-by: Tomas Longo <tlongo@sternad.de> * Extract tests that assert grpc requests Signed-off-by: Tomas Longo <tlongo@sternad.de> * Fix return value of http service Signed-off-by: Tomas Longo <tlongo@sternad.de> * Re-introduce unframed request for the grpc service Signed-off-by: Tomas Longo <tlongo@sternad.de> * Add E2E test Signed-off-by: Tomas Longo <tlongo@sternad.de> * Add E2E test for gRPC Signed-off-by: Tomas Longo <tlongo@sternad.de> * Add test for unframed requests Signed-off-by: Tomas Longo <tlongo@sternad.de> * Add e2e test for unframed requests Signed-off-by: Tomas Longo <tlongo@sternad.de> * Add e2e test for protobuf requests Signed-off-by: Tomas Longo <tlongo@sternad.de> * Fix media type for protobuf payload Signed-off-by: Tomas Longo <tlongo@sternad.de> * Update license headers Signed-off-by: Tomas Longo <tlongo@sternad.de> * Adhere to config when it comes to chose a codec Signed-off-by: Tomas Longo <tlongo@sternad.de> * Inject OtelProtoCodec into ArmeriaHttpService Signed-off-by: Tomas Longo <tlongo@sternad.de> --------- Signed-off-by: Tomas Longo <tlongo@sternad.de>
1 parent 6b76b41 commit 8d85609

File tree

22 files changed

+1890
-489
lines changed

22 files changed

+1890
-489
lines changed

data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer,
268268
if (serverConfiguration.getMaxRequestLength() != null) {
269269
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
270270
}
271+
271272
final int threads = serverConfiguration.getThreadCount();
272273
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
273274
sb.blockingTaskExecutor(blockingTaskExecutor, true);

data-prepper-plugins/otel-logs-source/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation project(':data-prepper-plugins:blocking-buffer')
1414
implementation project(':data-prepper-plugins:otel-proto-common')
1515
implementation project(':data-prepper-plugins:http-common')
16+
implementation project(':data-prepper-plugins:http-source-common' )
1617
implementation libs.commons.codec
1718
implementation project(':data-prepper-plugins:armeria-common')
1819
testImplementation project(':data-prepper-api').sourceSets.test.output
@@ -22,6 +23,8 @@ dependencies {
2223
implementation 'software.amazon.awssdk:auth'
2324
implementation 'software.amazon.awssdk:regions'
2425
implementation 'software.amazon.awssdk:s3'
26+
implementation 'org.projectlombok:lombok:1.18.26'
27+
annotationProcessor 'org.projectlombok:lombok:1.18.26'
2528
implementation libs.protobuf.util
2629
implementation libs.armeria.core
2730
implementation libs.armeria.grpc

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/ConvertConfiguration.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java

Lines changed: 193 additions & 28 deletions
Large diffs are not rendered by default.

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,21 @@
99
import com.fasterxml.jackson.annotation.JsonProperty;
1010
import jakarta.validation.constraints.AssertTrue;
1111
import jakarta.validation.constraints.Size;
12+
import lombok.AllArgsConstructor;
13+
import lombok.Builder;
14+
import lombok.Getter;
15+
import lombok.NoArgsConstructor;
16+
1217
import org.apache.commons.lang3.StringUtils;
1318
import org.opensearch.dataprepper.model.types.ByteCount;
1419
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
1520
import org.opensearch.dataprepper.model.configuration.PluginModel;
1621
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
1722
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
1823

24+
@Builder
25+
@NoArgsConstructor
26+
@AllArgsConstructor
1927
public class OTelLogsSourceConfig {
2028
static final String REQUEST_TIMEOUT = "request_timeout";
2129
static final String PORT = "port";
@@ -57,6 +65,11 @@ public class OTelLogsSourceConfig {
5765
@Size(min = 1, message = "path length should be at least 1")
5866
private String path;
5967

68+
@Getter
69+
@JsonProperty("http_path")
70+
@Size(min = 1, message = "path length should be at least 1")
71+
private String httpPath;
72+
6073
@JsonProperty(HEALTH_CHECK_SERVICE)
6174
private boolean healthCheck = DEFAULT_HEALTH_CHECK;
6275

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/certificate/CertificateProviderFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ public CertificateProvider getCertificateProvider() {
5151
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)))
5252
.build();
5353

54-
return new ACMCertificateProvider(awsCertificateManager, oTelLogsSourceConfig.getAcmCertificateArn(),
55-
oTelLogsSourceConfig.getAcmCertIssueTimeOutMillis(), oTelLogsSourceConfig.getAcmPrivateKeyPassword());
54+
return new ACMCertificateProvider(awsCertificateManager,
55+
oTelLogsSourceConfig.getAcmCertificateArn(),
56+
oTelLogsSourceConfig.getAcmCertIssueTimeOutMillis(),
57+
oTelLogsSourceConfig.getAcmPrivateKeyPassword());
5658
} else if (oTelLogsSourceConfig.isSslCertAndKeyFileInS3()) {
5759
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS.");
5860
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.otellogs.http;
12+
13+
import java.time.Instant;
14+
import java.util.List;
15+
import java.util.stream.Collectors;
16+
17+
import org.opensearch.dataprepper.exceptions.BadRequestException;
18+
import org.opensearch.dataprepper.exceptions.BufferWriteException;
19+
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
20+
import org.opensearch.dataprepper.metrics.PluginMetrics;
21+
import org.opensearch.dataprepper.model.buffer.Buffer;
22+
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
23+
import org.opensearch.dataprepper.model.record.Record;
24+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import com.linecorp.armeria.server.ServiceRequestContext;
29+
import com.linecorp.armeria.server.annotation.ConsumesJson;
30+
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
31+
import com.linecorp.armeria.server.annotation.Post;
32+
33+
import io.micrometer.core.instrument.Counter;
34+
import io.micrometer.core.instrument.DistributionSummary;
35+
import io.micrometer.core.instrument.Timer;
36+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
37+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
38+
39+
public class ArmeriaHttpService {
40+
private static final Logger LOG = LoggerFactory.getLogger(ArmeriaHttpService.class);
41+
42+
public static final String REQUESTS_RECEIVED = "requestsReceived";
43+
public static final String SUCCESS_REQUESTS = "successRequests";
44+
public static final String PAYLOAD_SIZE = "payloadSize";
45+
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";
46+
47+
private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder;
48+
private final Buffer<Record<Object>> buffer;
49+
50+
private final int bufferWriteTimeoutInMillis;
51+
52+
private final Counter requestsReceivedCounter;
53+
private final Counter successRequestsCounter;
54+
private final DistributionSummary payloadSizeSummary;
55+
private final Timer requestProcessDuration;
56+
57+
public ArmeriaHttpService(
58+
Buffer<Record<Object>> buffer,
59+
final PluginMetrics pluginMetrics,
60+
final int bufferWriteTimeoutInMillis,
61+
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder
62+
) {
63+
this.buffer = buffer;
64+
this.oTelProtoDecoder = oTelProtoDecoder;
65+
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
66+
67+
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
68+
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
69+
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
70+
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
71+
}
72+
73+
// no path provided. Will be set by config.
74+
@Post("")
75+
@ConsumesJson
76+
@ConsumesProtobuf
77+
public ExportLogsServiceResponse exportLog(ExportLogsServiceRequest request) {
78+
requestsReceivedCounter.increment();
79+
payloadSizeSummary.record(request.getSerializedSize());
80+
81+
requestProcessDuration.record(() -> processRequest(request));
82+
83+
return ExportLogsServiceResponse.newBuilder().build();
84+
}
85+
86+
private void processRequest(final ExportLogsServiceRequest request) {
87+
final List<OpenTelemetryLog> logs;
88+
89+
try {
90+
logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now());
91+
} catch (Exception e) {
92+
LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse the request with error {}. Request body: {}", e, request);
93+
throw new BadRequestException(e.getMessage(), e);
94+
}
95+
96+
try {
97+
if (buffer.isByteBuffer()) {
98+
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
99+
} else {
100+
final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
101+
buffer.writeAll(records, bufferWriteTimeoutInMillis);
102+
}
103+
} catch (Exception e) {
104+
if (ServiceRequestContext.current().isTimedOut()) {
105+
LOG.warn("Exception writing to buffer but request already timed out.", e);
106+
return;
107+
}
108+
109+
LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
110+
throw new BufferWriteException(e.getMessage(), e);
111+
}
112+
113+
if (ServiceRequestContext.current().isTimedOut()) {
114+
LOG.warn("Buffer write completed successfully but request already timed out.");
115+
return;
116+
}
117+
118+
successRequestsCounter.increment();
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.otellogs.http;
12+
13+
14+
import java.time.Duration;
15+
import java.util.concurrent.TimeoutException;
16+
17+
import org.opensearch.dataprepper.RetryInfoCalculator;
18+
import org.opensearch.dataprepper.exceptions.BadRequestException;
19+
import org.opensearch.dataprepper.exceptions.BufferWriteException;
20+
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
21+
import org.opensearch.dataprepper.metrics.PluginMetrics;
22+
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import com.google.protobuf.Any;
27+
import com.google.protobuf.InvalidProtocolBufferException;
28+
import com.google.protobuf.util.JsonFormat;
29+
import com.google.rpc.RetryInfo;
30+
import com.linecorp.armeria.common.ContentTooLargeException;
31+
import com.linecorp.armeria.common.HttpRequest;
32+
import com.linecorp.armeria.common.HttpResponse;
33+
import com.linecorp.armeria.common.HttpStatus;
34+
import com.linecorp.armeria.common.MediaType;
35+
import com.linecorp.armeria.server.HttpStatusException;
36+
import com.linecorp.armeria.server.RequestTimeoutException;
37+
import com.linecorp.armeria.server.ServiceRequestContext;
38+
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
39+
40+
import io.grpc.Status;
41+
import io.grpc.StatusRuntimeException;
42+
import io.micrometer.core.instrument.Counter;
43+
44+
public class HttpExceptionHandler implements ExceptionHandlerFunction {
45+
private static final Logger LOG = LoggerFactory.getLogger(HttpExceptionHandler.class);
46+
47+
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";
48+
public static final String REQUEST_TIMEOUTS = "requestTimeouts";
49+
public static final String BAD_REQUESTS = "badRequests";
50+
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
51+
public static final String INTERNAL_SERVER_ERROR = "internalServerError";
52+
53+
private final Counter requestTimeoutsCounter;
54+
private final Counter badRequestsCounter;
55+
private final Counter requestsTooLargeCounter;
56+
private final Counter internalServerErrorCounter;
57+
private final RetryInfoCalculator retryInfoCalculator;
58+
59+
public HttpExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
60+
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
61+
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
62+
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
63+
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
64+
this.retryInfoCalculator = new RetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
65+
}
66+
67+
@Override
68+
public HttpResponse handleException(final ServiceRequestContext ctx,
69+
final HttpRequest req,
70+
final Throwable e) {
71+
final Throwable exceptionCause;
72+
if (e instanceof BufferWriteException) {
73+
exceptionCause = e.getCause();
74+
} else if (e instanceof HttpStatusException) {
75+
exceptionCause = e.getCause();
76+
} else {
77+
exceptionCause = e;
78+
}
79+
80+
StatusHolder statusHolder = createStatus(exceptionCause);
81+
82+
try {
83+
JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder()
84+
.add(RetryInfo.getDescriptor())
85+
.build();
86+
87+
JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry);
88+
return HttpResponse.of(statusHolder.getHttpStatus(), MediaType.JSON, printer.print(statusHolder.getStatus()));
89+
} catch (InvalidProtocolBufferException ipbe) {
90+
throw new RuntimeException(ipbe);
91+
}
92+
}
93+
94+
private StatusHolder createStatus(Throwable e) {
95+
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
96+
requestTimeoutsCounter.increment();
97+
return new StatusHolder(createStatus(e, Status.Code.RESOURCE_EXHAUSTED), createHttpStatusFromProtoBufStatus(Status.Code.RESOURCE_EXHAUSTED));
98+
} else if (e instanceof SizeOverflowException || e instanceof ContentTooLargeException) {
99+
requestsTooLargeCounter.increment();
100+
return new StatusHolder(createStatus(e, Status.Code.RESOURCE_EXHAUSTED), createHttpStatusFromProtoBufStatus(Status.Code.RESOURCE_EXHAUSTED));
101+
} else if (e instanceof BadRequestException) {
102+
badRequestsCounter.increment();
103+
return new StatusHolder(createStatus(e, Status.Code.INVALID_ARGUMENT), createHttpStatusFromProtoBufStatus(Status.Code.INVALID_ARGUMENT));
104+
} else if ((e instanceof StatusRuntimeException) && (e.getMessage().contains("Invalid protobuf byte sequence") || e.getMessage().contains("Can't decode compressed frame"))) {
105+
badRequestsCounter.increment();
106+
return new StatusHolder(createStatus(e, Status.Code.INVALID_ARGUMENT), createHttpStatusFromProtoBufStatus(Status.Code.INVALID_ARGUMENT));
107+
} else if (e instanceof RequestCancelledException) {
108+
requestTimeoutsCounter.increment();
109+
return new StatusHolder(createStatus(e, Status.Code.CANCELLED), createHttpStatusFromProtoBufStatus(Status.Code.CANCELLED));
110+
} else {
111+
LOG.error("Unexpected exception handling http request", e);
112+
internalServerErrorCounter.increment();
113+
return new StatusHolder(createStatus(e, Status.Code.INTERNAL), createHttpStatusFromProtoBufStatus(Status.Code.INTERNAL));
114+
}
115+
}
116+
117+
private HttpStatus createHttpStatusFromProtoBufStatus(Status.Code status) {
118+
if (status == Status.Code.RESOURCE_EXHAUSTED) {
119+
return HttpStatus.INSUFFICIENT_STORAGE;
120+
} else if (status == Status.Code.INVALID_ARGUMENT) {
121+
return HttpStatus.BAD_REQUEST;
122+
} else {
123+
return HttpStatus.INTERNAL_SERVER_ERROR;
124+
}
125+
}
126+
127+
private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) {
128+
com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value());
129+
if (e instanceof RequestTimeoutException) {
130+
builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE);
131+
} else {
132+
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
133+
}
134+
if (code == Status.Code.RESOURCE_EXHAUSTED) {
135+
builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo()));
136+
}
137+
return builder.build();
138+
}
139+
140+
private static class StatusHolder {
141+
private final HttpStatus httpStatus;
142+
private final com.google.rpc.Status status;
143+
144+
public StatusHolder(com.google.rpc.Status status, HttpStatus httpStatus) {
145+
this.httpStatus = httpStatus;
146+
this.status = status;
147+
}
148+
149+
public HttpStatus getHttpStatus() {
150+
return httpStatus;
151+
}
152+
153+
public com.google.rpc.Status getStatus() {
154+
return status;
155+
}
156+
}
157+
158+
}

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.source.otellogs;

0 commit comments

Comments
 (0)