From 8ce58311d2d62068b221dc4bb82a10cccf141884 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:13:47 +0200 Subject: [PATCH 01/10] Add fluss-metric-opentelemetry subproject --- .../fluss-metrics-opentelemetry/pom.xml | 89 +++++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 11 +++ fluss-metrics/pom.xml | 1 + 3 files changed, 101 insertions(+) create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/pom.xml create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE diff --git a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml new file mode 100644 index 0000000000..a7266d52b8 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml @@ -0,0 +1,89 @@ + + + + + 4.0.0 + + com.alibaba.fluss + fluss-metrics + 0.8-SNAPSHOT + + + fluss-metrics-opentelemetry + Fluss : Metrics : OpenTelemetry + + + 1.51.0 + 1.34.0 + + + + + com.alibaba.fluss + fluss-common + ${project.version} + provided + + + + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + io.opentelemetry.semconv + opentelemetry-semconv + ${opentelemetry-semconv.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + *:* + + + + + + + + + \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..d533b2403d --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE @@ -0,0 +1,11 @@ +fluss-metrics-opentelemetry +Copyright 2025 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- io.opentelemetry:opentelemetry-sdk-metrics:1.51.0 +- io.opentelemetry:opentelemetry-semconv:1.34.0 +- io.opentelemetry:opentelemetry-exporter-otlp:1.51.0 diff --git a/fluss-metrics/pom.xml b/fluss-metrics/pom.xml index b6358bbb8c..2af10f75a0 100644 --- a/fluss-metrics/pom.xml +++ b/fluss-metrics/pom.xml @@ -34,6 +34,7 @@ fluss-metrics-prometheus fluss-metrics-jmx + fluss-metrics-opentelemetry ../fluss-lake/fluss-lake-paimon/target/fluss-lake-paimon-${project.version}.jar From 240ef584c9e0c0cb7ad6b8d116e881e3e20b3911 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:15:12 +0200 Subject: [PATCH 03/10] Add ConfigOptions for fluss-metrics-opentelemetry --- .../apache/fluss/config/ConfigOptions.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 8b4664d70a..2ff7aefedd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1653,6 +1653,61 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9990-9999."); + // ------------------------------------------------------------------------ + // ConfigOptions for OpenTelemetry reporter + // ------------------------------------------------------------------------ + /** OpenTelemetry protocol. */ + public enum OpenTelemetryExporter { + GRPC, + HTTP + } + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_ENDPOINT = + key("metrics.reporter.opentelemetry.endpoint") + .stringType() + .noDefaultValue() + .withDescription( + "Target to which the OpenTelemetry metric reporter is going to send metrics to."); + + public static final ConfigOption + METRICS_REPORTER_OPENTELEMETRY_EXPORTER = + key("metrics.reporter.opentelemetry.exporter") + .enumType(OpenTelemetryExporter.class) + .defaultValue(OpenTelemetryExporter.GRPC) + .withDescription( + "The type of exporter that is used by the OpenTelemetry metric exporter to send metrics to the configured endpoint. " + + "The endpoint must accept connections for the given exporter type. Supported exporters: " + + OpenTelemetryExporter.GRPC.name() + + ", " + + OpenTelemetryExporter.HTTP.name() + + "."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL = + key("metrics.reporter.opentelemetry.export-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "Frequency of metric export by the OpenTelemetry metric reporter to the endpoint."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT = + key("metrics.reporter.opentelemetry.export-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "Maximum time the OpenTelemetry metric reporter will wait for each metric export."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME = + key("metrics.reporter.opentelemetry.service.name") + .stringType() + .noDefaultValue() + .withDescription("Service name that is set in the OpenTelemetry Resource."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_SERVICE_VERSION = + key("metrics.reporter.opentelemetry.service.version") + .stringType() + .noDefaultValue() + .withDescription("Service version that is set in the OpenTelemetry Resource."); + // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ From 556e3cec1564266febfb1b8cb47d4ee0dab17a5c Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:17:00 +0200 Subject: [PATCH 04/10] Add fluss-metrics-opentelemetry implementation --- .../metrics/opentelemetry/MetricMetadata.java | 55 +++ .../opentelemetry/OpenTelemetryAdapter.java | 245 ++++++++++++ .../opentelemetry/OpenTelemetryReporter.java | 349 ++++++++++++++++++ .../OpenTelemetryReporterPlugin.java | 67 ++++ ...luss.metrics.reporter.MetricReporterPlugin | 20 + 5 files changed, 736 insertions(+) create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java new file mode 100644 index 0000000000..aa712ce3f9 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.annotation.VisibleForTesting; + +import java.util.Map; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Metadata associated with a given metric, stored alongside it so that it can be provided during + * reporting time. + */ +class MetricMetadata { + + @VisibleForTesting static final String NAME_SEPARATOR = "."; + + private final String name; + private final Map variables; + + public MetricMetadata(String name, Map variables) { + this.name = name; + this.variables = variables; + } + + public MetricMetadata subMetric(String suffix) { + return new MetricMetadata(name + NAME_SEPARATOR + suffix, variables); + } + + public String getName() { + return name; + } + + public Map getVariables() { + return variables; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java new file mode 100644 index 0000000000..140c0b4447 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.metrics.Gauge; +import com.alibaba.fluss.metrics.Histogram; +import com.alibaba.fluss.metrics.Meter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.ValueAtQuantile; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; +import io.opentelemetry.sdk.resources.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * An adapter class which translates from Fluss metrics to OpenTelmetry metrics which can exported + * with the standard OpenTelemetry {@link io.opentelemetry.sdk.metrics.export.MetricExporter}s. + */ +class OpenTelemetryAdapter { + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryAdapter.class); + static final double[] HISTOGRAM_QUANTILES = {0.5, 0.75, 0.95, 0.99}; + + static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO = + InstrumentationScopeInfo.create("com.alibaba.fluss.metrics"); + + public static Optional convertCounter( + CollectionMetadata collectionMetadata, + Long count, + Long previousCount, + MetricMetadata metricMetadata) { + long delta = count - previousCount; + if (delta < 0) { + LOG.warn( + "Non-monotonic counter {}: current count {} is less than previous count {}", + metricMetadata.getName(), + count, + previousCount); + return Optional.empty(); + } + + boolean isMonotonic = true; + return Optional.of( + ImmutableMetricData.createLongSum( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableSumData.create( + isMonotonic, + AggregationTemporality.DELTA, + Collections.singleton( + ImmutableLongPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + delta))))); + } + + /** + * Converts a Fluss Gauge to a {@link MetricData}. + * + * @param collectionMetadata The common collection metadata + * @param gauge The Fluss Gauge to convert + * @param metricMetadata The metric metadata + * @return A {@link MetricData} if it's able to convert successfully + */ + public static Optional convertGauge( + CollectionMetadata collectionMetadata, Gauge gauge, MetricMetadata metricMetadata) { + if (!(gauge.getValue() instanceof Number)) { + LOG.debug( + "Couldn't adapt gauge {} with value {} and type {}", + metricMetadata.getName(), + gauge.getValue(), + gauge.getValue().getClass().getName()); + return Optional.empty(); + } + + Number number = (Number) gauge.getValue(); + if (number instanceof Long || number instanceof Integer) { + return Optional.of( + ImmutableMetricData.createLongGauge( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableGaugeData.create( + Collections.singleton( + ImmutableLongPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + number.longValue()))))); + } else { + return Optional.of( + ImmutableMetricData.createDoubleGauge( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableGaugeData.create( + Collections.singleton( + ImmutableDoublePointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + number.doubleValue()))))); + } + } + + /** + * Converts a Fluss Meter to a {@link MetricData}. + * + * @param collectionMetadata The common collection metadata + * @param meter The Fluss Meter to convert + * @param metricMetadata The metric metadata + * @return A {@link MetricData} if it's able to convert successfully + */ + public static List convertMeter( + CollectionMetadata collectionMetadata, + Meter meter, + Long count, + Long previousCount, + MetricMetadata metricMetadata) { + List metricData = new ArrayList<>(); + convertCounter(collectionMetadata, count, previousCount, metricMetadata.subMetric("count")) + .ifPresent(metricData::add); + convertGauge(collectionMetadata, meter::getRate, metricMetadata.subMetric("rate")) + .ifPresent(metricData::add); + return metricData; + } + + /** + * Converts a Fluss Histogram to a list of {@link MetricData}s. + * + * @param collectionMetadata The common collection metadata + * @param histogram The Fluss Histogram to convert + * @param metricMetadata The metric metadata + * @return A list of {@link MetricData}s if it's able to convert successfully, or empty if not + */ + public static Optional convertHistogram( + CollectionMetadata collectionMetadata, + Histogram histogram, + MetricMetadata metricMetadata) { + List quantileList = new ArrayList<>(); + quantileList.add(ImmutableValueAtQuantile.create(0, histogram.getStatistics().getMin())); + for (double histogramQuantile : HISTOGRAM_QUANTILES) { + quantileList.add( + ImmutableValueAtQuantile.create( + histogramQuantile, + histogram.getStatistics().getQuantile(histogramQuantile))); + } + quantileList.add(ImmutableValueAtQuantile.create(1, histogram.getStatistics().getMax())); + quantileList.add(ImmutableValueAtQuantile.create(1, histogram.getStatistics().getMax())); + return Optional.of( + ImmutableMetricData.createDoubleSummary( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableSummaryData.create( + Collections.singleton( + ImmutableSummaryPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + histogram.getCount(), + histogram.getStatistics().getMean() + * histogram.getCount(), + quantileList))))); + } + + private static Attributes convertVariables(Map variables) { + AttributesBuilder builder = Attributes.builder(); + variables.forEach(builder::put); + return builder.build(); + } + + /** The common metadata associated with a collection of the metrics. */ + public static class CollectionMetadata { + + private final Resource openTelemetryResource; + private final long startEpochNanos; + private final long epochNanos; + + public CollectionMetadata( + Resource openTelemetryResource, long startEpochNanos, long epochNanos) { + this.openTelemetryResource = openTelemetryResource; + this.startEpochNanos = startEpochNanos; + this.epochNanos = epochNanos; + } + + public Resource getOpenTelemetryResource() { + return openTelemetryResource; + } + + public long getStartEpochNanos() { + return startEpochNanos; + } + + public long getEpochNanos() { + return epochNanos; + } + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java new file mode 100644 index 0000000000..58e0922776 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metrics.CharacterFilter; +import com.alibaba.fluss.metrics.Counter; +import com.alibaba.fluss.metrics.Gauge; +import com.alibaba.fluss.metrics.Histogram; +import com.alibaba.fluss.metrics.Meter; +import com.alibaba.fluss.metrics.Metric; +import com.alibaba.fluss.metrics.groups.MetricGroup; +import com.alibaba.fluss.metrics.reporter.MetricReporter; +import com.alibaba.fluss.metrics.reporter.ScheduledMetricReporter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.ServiceAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** {@link MetricReporter} that exports {@link Metric Metrics} via OpenTelemetry. */ +public class OpenTelemetryReporter implements MetricReporter, ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryReporter.class); + + private static final char SCOPE_SEPARATOR = '.'; + private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + + @VisibleForTesting final Resource resource; + private final MetricExporter exporter; + private final Duration interval; + private final Clock clock; + + @VisibleForTesting final Map, MetricMetadata> gauges = new HashMap<>(); + @VisibleForTesting final Map counters = new HashMap<>(); + @VisibleForTesting final Map histograms = new HashMap<>(); + @VisibleForTesting final Map meters = new HashMap<>(); + + private Map lastValueSnapshots = Collections.emptyMap(); + private long lastCollectTimeNanos = 0; + private @Nullable CompletableResultCode lastResult; + + OpenTelemetryReporter( + String endpoint, + ConfigOptions.OpenTelemetryExporter exporterType, + Duration interval, + Duration timeout, + String serviceName, + String serviceVersion) { + this.exporter = buildMetricExporter(exporterType, endpoint, timeout); + this.resource = buildResource(serviceName, serviceVersion); + this.interval = interval; + this.clock = Clock.systemUTC(); + } + + private static MetricExporter buildMetricExporter( + ConfigOptions.OpenTelemetryExporter exporterType, String endpoint, Duration timeout) { + switch (exporterType) { + case GRPC: + OtlpGrpcMetricExporterBuilder grpcExporterBuilder = + OtlpGrpcMetricExporter.builder(); + grpcExporterBuilder.setEndpoint(endpoint); + grpcExporterBuilder.setTimeout(timeout); + return grpcExporterBuilder.build(); + case HTTP: + OtlpHttpMetricExporterBuilder httpExporterBuilder = + OtlpHttpMetricExporter.builder(); + httpExporterBuilder.setEndpoint(endpoint); + httpExporterBuilder.setTimeout(timeout); + return httpExporterBuilder.build(); + default: + LOG.error("Unsupported OpenTelemetry exporter type: {}", exporterType); + throw new FlussRuntimeException("OpenTelemetry exporter type: " + exporterType); + } + } + + private static Resource buildResource(String serviceName, String serviceVersion) { + Resource resource = Resource.getDefault(); + + if (serviceName != null) { + resource = + resource.merge( + Resource.create( + Attributes.of(ServiceAttributes.SERVICE_NAME, serviceName))); + } + + if (serviceVersion != null) { + resource = + resource.merge( + Resource.create( + Attributes.of( + ServiceAttributes.SERVICE_VERSION, serviceVersion))); + } + + return resource; + } + + @Override + public void open(Configuration config) { + // do nothing + } + + @Override + public void close() { + exporter.flush(); + + if (lastResult != null) { + LOG.debug("Waiting for up to 1 minute to export pending metrics."); + CompletableResultCode resultCode = lastResult.join(1, TimeUnit.MINUTES); + if (!resultCode.isSuccess()) { + LOG.warn( + "Failed to export pending metrics when closing reporter. Result code: {}. Details: {}", + resultCode, + resultCode.getFailureThrowable()); + } + } + + exporter.close(); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + metricName; + + Map variables = + group.getAllVariables().entrySet().stream() + .collect( + Collectors.toMap( + e -> removeEnclosingAngleBrackets(e.getKey()), + Map.Entry::getValue)); + LOG.debug("Adding metric {} with variables {}", metricName, variables); + + final MetricMetadata metricMetadata = new MetricMetadata(name, variables); + + synchronized (this) { + switch (metric.getMetricType()) { + case COUNTER: + this.counters.put((Counter) metric, metricMetadata); + break; + case GAUGE: + this.gauges.put((Gauge) metric, metricMetadata); + break; + case HISTOGRAM: + this.histograms.put((Histogram) metric, metricMetadata); + break; + case METER: + this.meters.put((Meter) metric, metricMetadata); + break; + default: + LOG.warn( + "Cannot add unknown metric type {}. This indicates that the reporter does not " + + "support this metric type.", + metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + // Make sure we are not caching the metric object + lastValueSnapshots.remove(metric); + + switch (metric.getMetricType()) { + case COUNTER: + this.counters.remove((Counter) metric); + break; + case GAUGE: + this.gauges.remove((Gauge) metric); + break; + case HISTOGRAM: + this.histograms.remove((Histogram) metric); + break; + case METER: + this.meters.remove((Meter) metric); + break; + default: + LOG.warn( + "Cannot remove unknown metric type {}. This indicates that the reporter does " + + "not support this metric type.", + metric.getClass().getName()); + } + } + } + + @Override + public void report() { + synchronized (this) { + Collection metricData = collectMetrics(); + try { + lastResult = exporter.export(metricData); + lastResult.whenComplete( + () -> { + if (lastResult.isSuccess()) { + LOG.debug( + "Exported {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } else { + LOG.warn( + "Failed to export {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } + }); + } catch (Exception e) { + LOG.error( + "Failed to call export for {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } + } + } + + @Override + public Duration scheduleInterval() { + return interval; + } + + private static String getLogicalScope(MetricGroup group) { + return group.getLogicalScope(CharacterFilter.NO_OP_FILTER, SCOPE_SEPARATOR); + } + + @VisibleForTesting + static String removeEnclosingAngleBrackets(String str) { + if (str.length() >= 2 && str.charAt(0) == '<' && str.charAt(str.length() - 1) == '>') { + return str.substring(1, str.length() - 1); + } + return str; + } + + private long getCurrentTimeNanos() { + Instant now = clock.instant(); + return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano(); + } + + private Collection collectMetrics() { + // This method is not thread-safe. + // The callee must hold the monitor on 'this' before calling it. + long currentTimeNanos = getCurrentTimeNanos(); + List data = new ArrayList<>(); + OpenTelemetryAdapter.CollectionMetadata collectionMetadata = + new OpenTelemetryAdapter.CollectionMetadata( + resource, lastCollectTimeNanos, currentTimeNanos); + + Map currentValueSnapshots = takeLastValueSnapshots(); + + for (Counter counter : counters.keySet()) { + Long count = currentValueSnapshots.get(counter); + Long lastCount = lastValueSnapshots.getOrDefault(counter, 0L); + MetricMetadata metricMetadata = counters.get(counter); + Optional metricData = + OpenTelemetryAdapter.convertCounter( + collectionMetadata, count, lastCount, metricMetadata); + metricData.ifPresent(data::add); + } + + for (Gauge gauge : gauges.keySet()) { + MetricMetadata metricMetadata = gauges.get(gauge); + Optional metricData = + OpenTelemetryAdapter.convertGauge(collectionMetadata, gauge, metricMetadata); + metricData.ifPresent(data::add); + } + + for (Meter meter : meters.keySet()) { + Long count = currentValueSnapshots.get(meter); + Long lastCount = lastValueSnapshots.getOrDefault(meter, 0L); + MetricMetadata metricMetadata = meters.get(meter); + List metricData = + OpenTelemetryAdapter.convertMeter( + collectionMetadata, meter, count, lastCount, metricMetadata); + data.addAll(metricData); + } + + for (Histogram histogram : histograms.keySet()) { + MetricMetadata metricMetadata = histograms.get(histogram); + Optional metricData = + OpenTelemetryAdapter.convertHistogram( + collectionMetadata, histogram, metricMetadata); + metricData.ifPresent(data::add); + } + + lastValueSnapshots = currentValueSnapshots; + lastCollectTimeNanos = currentTimeNanos; + + return data; + } + + private Map takeLastValueSnapshots() { + // This method is not thread-safe. + // The callee must hold the monitor on 'this' before calling it. + Map map = new HashMap<>(); + + for (Counter counter : counters.keySet()) { + map.put(counter, counter.getCount()); + } + + for (Meter meter : meters.keySet()) { + map.put(meter, meter.getCount()); + } + + return map; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java new file mode 100644 index 0000000000..0f924d9197 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.IllegalConfigurationException; +import com.alibaba.fluss.metrics.reporter.MetricReporter; +import com.alibaba.fluss.metrics.reporter.MetricReporterPlugin; + +import java.time.Duration; + +/** {@link MetricReporterPlugin} for {@link OpenTelemetryReporter}. */ +public class OpenTelemetryReporterPlugin implements MetricReporterPlugin { + + private static final String PLUGIN_NAME = "opentelemetry"; + + @Override + public MetricReporter createMetricReporter(Configuration configuration) { + String endpoint = + configuration.getString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT); + if (endpoint == null || endpoint.trim().isEmpty()) { + throw new IllegalConfigurationException( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT.key() + " must be set."); + } + ConfigOptions.OpenTelemetryExporter exporterType = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER); + if (exporterType == null) { + throw new IllegalConfigurationException( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER.key() + " must be set."); + } + Duration interval = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL); + Duration timeout = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT); + String serviceName = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME); + String serviceVersion = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_VERSION); + return new OpenTelemetryReporter( + endpoint, exporterType, interval, timeout, serviceName, serviceVersion); + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin new file mode 100644 index 0000000000..abcbf1742d --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +com.alibaba.fluss.metrics.opentelemetry.OpenTelemetryReporterPlugin From 6de33b65da2cd38b83ad2c78f3ca6a34d82ad7d3 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:18:39 +0200 Subject: [PATCH 05/10] Add new test dependencies to parent pom and add new utils to fluss-test-utils --- fluss-test-utils/pom.xml | 6 ++ .../common/TestContainerExtension.java | 74 +++++++++++++++++++ pom.xml | 17 +++-- 3 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java diff --git a/fluss-test-utils/pom.xml b/fluss-test-utils/pom.xml index 1281079883..4e2b465482 100644 --- a/fluss-test-utils/pom.xml +++ b/fluss-test-utils/pom.xml @@ -41,5 +41,11 @@ assertj-core compile + + + org.testcontainers + testcontainers + compile + \ No newline at end of file diff --git a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java new file mode 100644 index 0000000000..38c7400eb6 --- /dev/null +++ b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.testutils.common; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.GenericContainer; + +import javax.annotation.Nullable; + +import java.util.function.Supplier; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * {@code TestContainerExtension} provides common functionality for {@code TestContainer} + * implementations. + * + * @param The {@link GenericContainer} that shall be managed. + */ +public class TestContainerExtension> implements CustomExtension { + @Nullable private T testContainer; + + private final Supplier testContainerCreator; + + public TestContainerExtension(Supplier testContainerCreator) { + this.testContainerCreator = testContainerCreator; + } + + public T getTestContainer() { + assert testContainer != null; + return testContainer; + } + + private void terminateTestContainer() { + if (testContainer != null) { + testContainer.stop(); + testContainer = null; + } + } + + private void instantiateTestContainer() { + assert testContainer == null; + testContainer = testContainerCreator.get(); + testContainer.start(); + } + + @Override + public void after(ExtensionContext context) throws Exception { + terminateTestContainer(); + } + + @Override + public void before(ExtensionContext context) throws Exception { + terminateTestContainer(); + instantiateTestContainer(); + } +} diff --git a/pom.xml b/pom.xml index 9ae2a7527a..4ea81cd8a9 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,7 @@ 5.9.1 3.4.6 3.23.1 + 1.21.2 3.25.5 1.7.2 @@ -123,13 +124,13 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -440,6 +441,12 @@ ${assertj.version} test + + + org.testcontainers + testcontainers + ${testcontainers.version} + @@ -909,10 +916,10 @@ + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> Fluss UTF-8 From f035a3888b1a1979e3c61b683bdb0109ff5b90c5 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:19:50 +0200 Subject: [PATCH 06/10] Add test dependencies to fluss-metrics-opentelemetry --- .../fluss-metrics-opentelemetry/pom.xml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml index a7266d52b8..894b53aed6 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml +++ b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml @@ -60,6 +60,26 @@ opentelemetry-semconv ${opentelemetry-semconv.version} + + + + com.alibaba.fluss + fluss-test-utils + + + + com.alibaba.fluss + fluss-common + ${project.version} + test + test-jar + + + + org.testcontainers + testcontainers + test + From 98e0d6e27a3d04e81c31805ba315c3d3c7711bfa Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Mon, 23 Jun 2025 13:20:21 +0200 Subject: [PATCH 07/10] Add tests to fluss-metrics-opentelemetry --- .../opentelemetry/MetricMetadataTest.java | 37 +++ .../OpenTelemetryAdapterTest.java | 211 +++++++++++++ .../OpenTelemetryReporterITCase.java | 284 ++++++++++++++++++ .../OpenTelemetryReporterITCaseBase.java | 177 +++++++++++ .../OpenTelemetryReporterPluginTest.java | 86 ++++++ .../OpenTelemetryReporterTest.java | 171 +++++++++++ .../OpenTelemetryTestContainer.java | 107 +++++++ .../metrics/opentelemetry/TestUtils.java | 35 +++ .../src/test/resources/log4j2-test.properties | 30 ++ .../test/resources/opentelemetry-config.yaml | 47 +++ 10 files changed, 1185 insertions(+) create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties create mode 100644 fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java new file mode 100644 index 0000000000..59774a216f --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.HashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MetricMetadata}. */ +public class MetricMetadataTest { + @ParameterizedTest + @ValueSource(strings = {"fluss", "fluss.group1.", "fluss.group1.subgroup1"}) + public void testSubMetric(String metricName) { + MetricMetadata metricMetadata = + new MetricMetadata(metricName, new HashMap<>()).subMetric("mysuffix"); + assertThat(metricMetadata.getName().endsWith(MetricMetadata.NAME_SEPARATOR + "mysuffix")) + .isTrue(); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java new file mode 100644 index 0000000000..c1041ee994 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import com.alibaba.fluss.metrics.Counter; +import com.alibaba.fluss.metrics.MeterView; +import com.alibaba.fluss.metrics.SimpleCounter; +import com.alibaba.fluss.metrics.util.TestHistogram; +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryAdapter}. */ +public class OpenTelemetryAdapterTest { + private static final OpenTelemetryAdapter.CollectionMetadata METADATA = + new OpenTelemetryAdapter.CollectionMetadata( + Resource.create(Attributes.empty()), 123L, 345L); + + private static final Map VARIABLES = ImmutableMap.of("k1", "v1", "k2", "v2"); + + @Test + void testCounter() { + Optional metricData = + OpenTelemetryAdapter.convertCounter( + METADATA, 50L, 3L, new MetricMetadata("foo.bar.count", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.count"); + assertThat(metricData.get().getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + assertThat(metricData.get().getLongSumData().isMonotonic()).isEqualTo(true); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(metricData.get().getLongSumData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get().getLongSumData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(47L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testGaugeDouble() { + Optional metricData = + OpenTelemetryAdapter.convertGauge( + METADATA, () -> 123.456d, new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.value"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.DOUBLE_GAUGE); + assertThat(metricData.get().getDoubleGaugeData().getPoints().size()).isEqualTo(1); + DoublePointData data = metricData.get().getDoubleGaugeData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(123.456d); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testGaugeLong() { + Optional metricData = + OpenTelemetryAdapter.convertGauge( + METADATA, () -> 125L, new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.value"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(metricData.get().getLongGaugeData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get().getLongGaugeData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(125L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testMeter() { + Counter counter = new SimpleCounter(); + MeterView meter = new MeterView(counter); + counter.inc(345L); + meter.update(); + List metricData = + OpenTelemetryAdapter.convertMeter( + METADATA, + meter, + counter.getCount(), + 20L, + new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.size()).isEqualTo(2); + assertThat(metricData.get(0).getName()).isEqualTo("foo.bar.value.count"); + assertThat(metricData.get(0).getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(metricData.get(0).getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + assertThat(metricData.get(0).getLongSumData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get(0).getLongSumData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(325L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get(0).getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(0).getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(0).getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(0).getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + + assertThat(metricData.get(1).getName()).isEqualTo("foo.bar.value.rate"); + assertThat(metricData.get(1).getType()).isEqualTo(MetricDataType.DOUBLE_GAUGE); + assertThat(metricData.get(1).getDoubleGaugeData().getPoints().size()).isEqualTo(1); + DoublePointData data2 = + metricData.get(1).getDoubleGaugeData().getPoints().iterator().next(); + // 345L / 60 seconds + assertThat(data2.getValue()).isEqualTo(5.75d); + assertThat(asStringMap(data2.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get(1).getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(1).getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(1).getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(1).getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testHistogram() { + TestHistogram histogram = new TestHistogram(); + Optional metricData = + OpenTelemetryAdapter.convertHistogram( + METADATA, histogram, new MetricMetadata("foo.bar.histogram", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.histogram"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(metricData.get().getSummaryData().getPoints().size()).isEqualTo(1); + + SummaryPointData summaryPointData = + metricData.get().getSummaryData().getPoints().iterator().next(); + + assertThat(summaryPointData.getCount()).isEqualTo(1); + assertThat(summaryPointData.getSum()).isEqualTo(4d); + + assertThat(summaryPointData.getValues().get(0).getQuantile()).isEqualTo(0); + assertThat(summaryPointData.getValues().get(0).getValue()).isEqualTo(7); + + assertThat(summaryPointData.getValues().get(1).getQuantile()).isEqualTo(0.5); + assertThat(summaryPointData.getValues().get(1).getValue()).isEqualTo(0.5); + + assertThat(summaryPointData.getValues().get(2).getQuantile()).isEqualTo(0.75); + assertThat(summaryPointData.getValues().get(2).getValue()).isEqualTo(0.75); + + assertThat(summaryPointData.getValues().get(3).getQuantile()).isEqualTo(0.95); + assertThat(summaryPointData.getValues().get(3).getValue()).isEqualTo(0.95); + + assertThat(summaryPointData.getValues().get(4).getQuantile()).isEqualTo(0.99); + assertThat(summaryPointData.getValues().get(4).getValue()).isEqualTo(0.99); + + assertThat(summaryPointData.getValues().get(5).getQuantile()).isEqualTo(1); + assertThat(summaryPointData.getValues().get(5).getValue()).isEqualTo(6); + + assertThat(asStringMap(summaryPointData.getAttributes())).isEqualTo(VARIABLES); + } + + private Map asStringMap(Attributes attributes) { + Map map = new HashMap<>(); + for (Map.Entry, Object> entry : attributes.asMap().entrySet()) { + map.put(entry.getKey().getKey(), (String) entry.getValue()); + } + return map; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java new file mode 100644 index 0000000000..45f61d2000 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.metrics.Gauge; +import com.alibaba.fluss.metrics.Histogram; +import com.alibaba.fluss.metrics.MeterView; +import com.alibaba.fluss.metrics.SimpleCounter; +import com.alibaba.fluss.metrics.groups.MetricGroup; +import com.alibaba.fluss.metrics.util.TestHistogram; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import com.alibaba.fluss.testutils.common.TestLoggerExtension; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryReporterITCase extends OpenTelemetryReporterITCaseBase { + + private static final String LOGICAL_SCOPE = "logical.scope"; + + private MetricGroup group; + private final Histogram histogram = new TestHistogram(); + + @BeforeEach + public void setUpEach() { + group = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, new HashMap<>()); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + public void testReport(ConfigOptions.OpenTelemetryExporter exporterType) throws Exception { + OpenTelemetryReporter reporter = createReporter(exporterType); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + Gauge gauge = () -> 123.456d; + reporter.notifyOfAddedMetric(gauge, "foo.gauge", group); + + reporter.report(); + + MeterView meter = new MeterView(counter); + reporter.notifyOfAddedMetric(meter, "foo.meter", group); + + reporter.notifyOfAddedMetric(histogram, "foo.histogram", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + (json) -> { + JsonNode scopeMetrics = + json.findPath("resourceMetrics").findPath("scopeMetrics"); + assertThat(scopeMetrics.findPath("scope").findPath("name").asText()) + .isEqualTo("com.alibaba.fluss.metrics"); + JsonNode metrics = scopeMetrics.findPath("metrics"); + + List metricNames = extractMetricNames(json); + assertThat(metricNames) + .contains( + "fluss.logical.scope.foo.counter", + "fluss.logical.scope.foo.gauge", + "fluss.logical.scope.foo.meter.count", + "fluss.logical.scope.foo.meter.rate", + "fluss.logical.scope.foo.histogram"); + + metrics.forEach(OpenTelemetryReporterITCase::assertMetrics); + }); + } + + private static void assertMetrics(JsonNode metric) { + String name = metric.findPath("name").asText(); + if (name.equals("fluss.logical.scope.foo.counter")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); + } else if (name.equals("fluss.logical.scope.foo.gauge")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) + .isCloseTo(123.456, Percentage.withPercentage(1)); + } else if (name.equals("fluss.logical.scope.foo.meter.count")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); + } else if (name.equals("fluss.logical.scope.foo.meter.rate")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) + .isEqualTo(0.0); + } else if (name.equals("fluss.logical.scope.foo.histogram")) { + assertThat(metric.at("/summary/dataPoints").findPath("sum").asInt()).isEqualTo(4); + } + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + public void testReportAfterUnregister(ConfigOptions.OpenTelemetryExporter exporterType) + throws Exception { + OpenTelemetryReporter reporter = createReporter(exporterType); + + SimpleCounter counter1 = new SimpleCounter(); + SimpleCounter counter2 = new SimpleCounter(); + SimpleCounter counter3 = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter1, "foo.counter1", group); + reporter.notifyOfAddedMetric(counter2, "foo.counter2", group); + reporter.notifyOfAddedMetric(counter3, "foo.counter3", group); + + reporter.notifyOfRemovedMetric(counter2, "foo.counter2", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames) + .contains( + "fluss.logical.scope.foo.counter1", + "fluss.logical.scope.foo.counter3"); + }); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + public void testCounterDelta(ConfigOptions.OpenTelemetryExporter exporterType) + throws Exception { + OpenTelemetryReporter reporter = createReporter(exporterType); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + counter.inc(1234); + assertThat(counter.getCount()).isEqualTo(1234L); + reporter.report(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode metrics = + json.findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics"); + + metrics.forEach( + metric -> { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()) + .isEqualTo(1234); + }); + }); + + counter.inc(25); + assertThat(counter.getCount()).isEqualTo(1259L); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode metrics = + json.findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics"); + + metrics.forEach( + metric -> { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()) + .isEqualTo(1234); + }); + }); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + public void testOpenTelemetryAttributes(ConfigOptions.OpenTelemetryExporter exporterType) + throws Exception { + String serviceName = "flink-bar"; + String serviceVersion = "v42"; + OpenTelemetryReporter reporter = createReporter(exporterType, serviceName, serviceVersion); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode attributes = + json.findPath("resourceMetrics") + .findPath("resource") + .findPath("attributes"); + + List attributeKeys = + attributes.findValues("key").stream() + .map(JsonNode::asText) + .collect(Collectors.toList()); + + assertThat(attributeKeys).contains("service.name", "service.version"); + + attributes.forEach( + attribute -> { + if (attribute.get("key").asText().equals("service.name")) { + assertThat(attribute.at("/value/stringValue").asText()) + .isEqualTo(serviceName); + } else if (attribute + .get("key") + .asText() + .equals("service.version")) { + assertThat(attribute.at("/value/stringValue").asText()) + .isEqualTo(serviceVersion); + } + }); + }); + } + + private static OpenTelemetryReporter createReporter( + ConfigOptions.OpenTelemetryExporter exporterType, + String serviceName, + String serviceVersion) { + String endpoint; + switch (exporterType) { + case GRPC: + endpoint = + OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer() + .getGrpcEndpoint(); + break; + case HTTP: + endpoint = + OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer() + .getHttpEndpoint(); + break; + default: + throw new IllegalStateException("OpenTelemetry exporter type: " + exporterType); + } + + OpenTelemetryReporter reporter = + new OpenTelemetryReporter( + endpoint, + exporterType, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + serviceName, + serviceVersion); + return reporter; + } + + private static OpenTelemetryReporter createReporter( + ConfigOptions.OpenTelemetryExporter exporterType) { + return createReporter(exporterType, null, null); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java new file mode 100644 index 0000000000..2d92450567 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import com.alibaba.fluss.testutils.common.AllCallbackWrapper; +import com.alibaba.fluss.testutils.common.TestContainerExtension; +import com.alibaba.fluss.testutils.common.TestLoggerExtension; +import com.alibaba.fluss.utils.function.ThrowingConsumer; +import com.alibaba.fluss.utils.function.ThrowingRunnable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.BaseConsumer; +import org.testcontainers.containers.output.OutputFrame; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryReporterITCaseBase { + public static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryReporterITCaseBase.class); + + private static final Duration TIME_OUT = Duration.ofMinutes(2); + + @RegisterExtension + @Order(1) + private static final AllCallbackWrapper> + OPENTELEMETRY_EXTENSION = + new AllCallbackWrapper<>( + new TestContainerExtension<>(OpenTelemetryTestContainer::new)); + + @BeforeEach + public void setup() { + Slf4jLevelLogConsumer logConsumer = new Slf4jLevelLogConsumer(LOG); + OPENTELEMETRY_EXTENSION.getCustomExtension().getTestContainer().followOutput(logConsumer); + } + + public static OpenTelemetryTestContainer getOpenTelemetryContainer() { + return OPENTELEMETRY_EXTENSION.getCustomExtension().getTestContainer(); + } + + public static void eventuallyConsumeJson(ThrowingConsumer jsonConsumer) + throws Exception { + eventually( + () -> { + // opentelemetry-collector dumps every report in a new line, so in order to + // re-use the + // same collector across multiple tests, let's read only the last line + getOpenTelemetryContainer() + .copyFileFromContainer( + getOpenTelemetryContainer().getOutputLogPath().toString(), + inputStream -> { + List lines = new ArrayList<>(); + BufferedReader input = + new BufferedReader( + new InputStreamReader(inputStream)); + String last = ""; + String line; + + while ((line = input.readLine()) != null) { + lines.add(line); + last = line; + } + + ObjectMapper mapper = new ObjectMapper(); + JsonNode json = mapper.readValue(last, JsonNode.class); + try { + jsonConsumer.accept(json); + } catch (Throwable t) { + throw new ConsumeDataLogException(t, lines); + } + return null; + }); + }); + } + + public static void eventually(ThrowingRunnable runnable) throws Exception { + eventually(Math.addExact(System.nanoTime(), TIME_OUT.toNanos()), runnable); + } + + static void eventually(long deadline, ThrowingRunnable runnable) throws Exception { + Thread.sleep(10); + while (true) { + try { + runnable.run(); + break; + } catch (Throwable e) { + if (System.nanoTime() >= deadline) { + if (e instanceof ConsumeDataLogException) { + LOG.error("Failure while the following data log:"); + ((ConsumeDataLogException) e).getDataLog().forEach(LOG::error); + } + throw e; + } + } + Thread.sleep(100); + } + } + + public static List extractMetricNames(JsonNode json) { + return json.findPath("resourceMetrics").findPath("scopeMetrics").findPath("metrics") + .findValues("name").stream() + .map(JsonNode::asText) + .collect(Collectors.toList()); + } + + private static class ConsumeDataLogException extends Exception { + private final List dataLog; + + public ConsumeDataLogException(Throwable cause, List dataLog) { + super(cause); + this.dataLog = dataLog; + } + + public List getDataLog() { + return dataLog; + } + } + + /** + * Similar to {@link Slf4jLevelLogConsumer} but parses output lines and tries to log them with + * appropriate levels. + */ + private static class Slf4jLevelLogConsumer extends BaseConsumer { + + private final Logger logger; + + public Slf4jLevelLogConsumer(Logger logger) { + this.logger = logger; + } + + @Override + public void accept(OutputFrame outputFrame) { + final OutputFrame.OutputType outputType = outputFrame.getType(); + final String utf8String = outputFrame.getUtf8StringWithoutLineEnding(); + + String lowerCase = utf8String.toLowerCase(); + if (lowerCase.contains("error") || lowerCase.contains("exception")) { + logger.error("{}: {}", outputType, utf8String); + } else if (lowerCase.contains("warn") || lowerCase.contains("fail")) { + logger.warn("{}: {}", outputType, utf8String); + } else { + logger.info("{}: {}", outputType, utf8String); + } + } + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java new file mode 100644 index 0000000000..290b5743cc --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.IllegalConfigurationException; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link OpenTelemetryReporterPlugin}. */ +public class OpenTelemetryReporterPluginTest { + + private final OpenTelemetryReporterPlugin openTelemetryReporterPlugin = + new OpenTelemetryReporterPlugin(); + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + void testValidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) { + // mandatory options + Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, + "http://opentelemetry-metric-collector:4317"); + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER, exporterType); + assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .doesNotThrowAnyException(); + + // optional options + configuration.set( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL, + Duration.ofSeconds(5)); + configuration.set( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT, Duration.ofSeconds(5)); + assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .doesNotThrowAnyException(); + + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME, "fluss"); + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME, "v42"); + assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .doesNotThrowAnyException(); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + void testInvalidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) { + Configuration configuration = new Configuration(); + // invalid endpoint and no exporter type + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + + configuration.setString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, ""); + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + + configuration.setString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, " "); + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + + // endpoint is still invalid + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER, exporterType); + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java new file mode 100644 index 0000000000..0a008698bc --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.metrics.Counter; +import com.alibaba.fluss.metrics.Gauge; +import com.alibaba.fluss.metrics.Histogram; +import com.alibaba.fluss.metrics.Meter; +import com.alibaba.fluss.metrics.SimpleCounter; +import com.alibaba.fluss.metrics.groups.MetricGroup; +import com.alibaba.fluss.metrics.util.TestHistogram; +import com.alibaba.fluss.metrics.util.TestMeter; + +import io.opentelemetry.semconv.ServiceAttributes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link OpenTelemetryReporter}. */ +public class OpenTelemetryReporterTest { + + private static final String LOGICAL_SCOPE = "logical.scope"; + private static final Map labels = + Collections.unmodifiableMap( + Stream.of( + new AbstractMap.SimpleEntry<>("label1", "value1"), + new AbstractMap.SimpleEntry<>("label2", "value2")) + .collect( + Collectors.toMap( + AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue))); + + private MetricGroup metricGroup; + + @BeforeEach + void setupReporter() { + metricGroup = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, labels); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + void testInvalidEndpoint(ConfigOptions.OpenTelemetryExporter exporterType) { + assertThatThrownBy( + () -> + new OpenTelemetryReporter( + "endpoint-with-missing-protocol", + exporterType, + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null)) + .isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy( + () -> + new OpenTelemetryReporter( + "invalid://protocol", + exporterType, + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @ParameterizedTest + @EnumSource(ConfigOptions.OpenTelemetryExporter.class) + void testOpenTelemetryResourceIsConstructedCorrectly( + ConfigOptions.OpenTelemetryExporter exporterType) { + OpenTelemetryReporter reporter = + new OpenTelemetryReporter( + "http://opentelemetry-collector:4317", + exporterType, + Duration.ofSeconds(5), + Duration.ofSeconds(5), + "fluss", + "v42"); + assertThat(reporter.resource.getAttribute(ServiceAttributes.SERVICE_NAME)) + .isEqualTo("fluss"); + assertThat(reporter.resource.getAttribute(ServiceAttributes.SERVICE_VERSION)) + .isEqualTo("v42"); + } + + @Test + void testAddAndRemoveCounter() { + OpenTelemetryReporter reporter = createReporter(); + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "counter", metricGroup); + assertThat(reporter.counters.containsKey(counter)).isTrue(); + reporter.notifyOfRemovedMetric(counter, "counter", metricGroup); + assertThat(reporter.counters.containsKey(counter)).isFalse(); + } + + @Test + void testAddAndRemoveGauge() { + OpenTelemetryReporter reporter = createReporter(); + Gauge gauge = () -> 1; + reporter.notifyOfAddedMetric(gauge, "gauge", metricGroup); + assertThat(reporter.gauges.containsKey(gauge)).isTrue(); + reporter.notifyOfRemovedMetric(gauge, "meter", metricGroup); + assertThat(reporter.gauges.containsKey(gauge)).isFalse(); + } + + @Test + void testAddAndRemoveHistogram() { + OpenTelemetryReporter reporter = createReporter(); + Meter meter = new TestMeter(); + reporter.notifyOfAddedMetric(meter, "meter", metricGroup); + assertThat(reporter.meters.containsKey(meter)).isTrue(); + reporter.notifyOfRemovedMetric(meter, "meter", metricGroup); + assertThat(reporter.meters.containsKey(meter)).isFalse(); + } + + @Test + void testAddAndRemoveMeter() { + OpenTelemetryReporter reporter = createReporter(); + Histogram histogram = new TestHistogram(); + reporter.notifyOfAddedMetric(histogram, "histogram", metricGroup); + assertThat(reporter.histograms.containsKey(histogram)).isTrue(); + reporter.notifyOfRemovedMetric(histogram, "histogram", metricGroup); + assertThat(reporter.histograms.containsKey(histogram)).isFalse(); + } + + @Test + void testRemoveEnclosingAngleBrackets() { + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo("t"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo("t>"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("<")).isEqualTo("<"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets(">")).isEqualTo(">"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("<>")).isEqualTo(""); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo(""); + } + + private OpenTelemetryReporter createReporter() { + return new OpenTelemetryReporter( + "http://endpoint-must-not-be-called-in-unit-tests", + ConfigOptions.OpenTelemetryExporter.GRPC, + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java new file mode 100644 index 0000000000..04e5be058b --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.images.builder.dockerfile.DockerfileBuilder; +import org.testcontainers.images.builder.dockerfile.statement.MultiArgsStatement; +import org.testcontainers.utility.Base58; +import org.testcontainers.utility.MountableFile; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Locale; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** {@link OpenTelemetryTestContainer} provides an OpenTelemetry test instance. */ +public class OpenTelemetryTestContainer extends GenericContainer { + private static final String ALPINE_DOCKER_IMAGE_TAG = "3.22.0"; + private static final String OPENTELEMETRY_COLLECTOR_DOCKER_IMAGE_TAG = "0.128.0"; + + private static final int DEFAULT_HTTP_PORT = 4318; + private static final int DEFAULT_GRPC_PORT = 4317; + + // must be kept in sync with opentelemetry-config.yaml + private static final String DATA_DIR = "/data"; + // must be kept in sync with opentelemetry-config.yaml + private static final String LOG_FILE = "logs.json"; + + private static final Path CONFIG_PATH = + Paths.get("src/test/resources/").resolve("opentelemetry-config.yaml"); + + public OpenTelemetryTestContainer() { + super( + new ImageFromDockerfile("fluss/opentelemetry-collector-test-container") + .withDockerfileFromBuilder( + OpenTelemetryTestContainer::buildOpenTelemetryCollectorImage)); + withNetworkAliases(randomString("opentelemetry-collector", 6)); + addExposedPort(DEFAULT_HTTP_PORT); + addExposedPort(DEFAULT_GRPC_PORT); + withCopyFileToContainer( + MountableFile.forHostPath(CONFIG_PATH.toString()), "opentelemetry-config.yaml"); + withCommand("--config", "opentelemetry-config.yaml"); + } + + private static void buildOpenTelemetryCollectorImage(DockerfileBuilder builder) { + builder + // OpenTelemetry image doesn't have mkdir - use alpine instead. + .from(constructFullDockerImageName("alpine", ALPINE_DOCKER_IMAGE_TAG)) + // Create the output data directory - OpenTelemetry image doesn't have any directory + // to write to on its own. + .run("mkdir -p " + DATA_DIR) + .from( + constructFullDockerImageName( + "otel/opentelemetry-collector", + OPENTELEMETRY_COLLECTOR_DOCKER_IMAGE_TAG)) + // Copy the output data directory from alpine. It has to be owned by the + // OpenTelemetry user. + .withStatement( + new MultiArgsStatement("COPY --from=0 --chown=10001", DATA_DIR, DATA_DIR)) + .build(); + } + + public Path getOutputLogPath() { + return Paths.get(DATA_DIR, LOG_FILE); + } + + @Override + protected void containerIsStarted(InspectContainerResponse containerInfo) { + super.containerIsStarted(containerInfo); + } + + private static String randomString(String prefix, int length) { + return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); + } + + public String getHttpEndpoint() { + return String.format("http://%s:%s", getHost(), getMappedPort(DEFAULT_HTTP_PORT)); + } + + public String getGrpcEndpoint() { + return String.format("http://%s:%s", getHost(), getMappedPort(DEFAULT_GRPC_PORT)); + } + + private static String constructFullDockerImageName(String name, String tag) { + return name + ":" + tag; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java new file mode 100644 index 0000000000..0cacce31c0 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metrics.opentelemetry; + +import com.alibaba.fluss.metrics.groups.MetricGroup; +import com.alibaba.fluss.metrics.util.TestMetricGroup; + +import java.util.Map; + +/** Test utils. */ +class TestUtils { + static MetricGroup createTestMetricGroup(String logicalScope, Map variables) { + return TestMetricGroup.newBuilder() + .setLogicalScopeFunction( + (characterFilter, character) -> + characterFilter.filterCharacters(logicalScope)) + .setVariables(variables) + .build(); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..06379e065c --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache +# Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. +# root logger level should be set to OFF to not flood build logs +# set manually to INFO for debugging purposes +# It is set for OpenTelemetry to WARN for easier debugging on CI +rootLogger.level=WARN +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml new file mode 100644 index 0000000000..db143791e9 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache +# Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. + +extensions: + health_check: + zpages: + endpoint: 0.0.0.0:55679 + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + debug: + verbosity: detailed + file: + path: /data/logs.json +service: + pipelines: + metrics: + receivers: [ otlp ] + exporters: [ file ] + + extensions: [ health_check, zpages ] From f3031a0f3946b699229a6e7af77b5077519c6fd7 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Tue, 24 Jun 2025 17:34:05 +0200 Subject: [PATCH 08/10] Fix fluss-metrics-opentelemetry NOTICE file and license check --- .../fluss-metrics-opentelemetry/pom.xml | 8 +++++++ .../src/main/resources/META-INF/NOTICE | 22 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml index 894b53aed6..6ec0c7250f 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml +++ b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml @@ -100,6 +100,14 @@ *:* + + + * + + okhttp3/internal/publicsuffix/NOTICE + + + diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE index d533b2403d..b49864cf74 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE @@ -6,6 +6,24 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- io.opentelemetry:opentelemetry-sdk-metrics:1.51.0 -- io.opentelemetry:opentelemetry-semconv:1.34.0 +- com.squareup.okhttp3:okhttp:4.12.0 +- com.squareup.okio:okio-jvm:3.6.0 +- com.squareup.okio:okio:3.6.0 +- io.opentelemetry.semconv:opentelemetry-semconv:1.34.0 +- io.opentelemetry:opentelemetry-api:1.51.0 +- io.opentelemetry:opentelemetry-context:1.51.0 +- io.opentelemetry:opentelemetry-exporter-common:1.51.0 - io.opentelemetry:opentelemetry-exporter-otlp:1.51.0 +- io.opentelemetry:opentelemetry-exporter-otlp-common:1.51.0 +- io.opentelemetry:opentelemetry-exporter-sender-okhttp:1.51.0 +- io.opentelemetry:opentelemetry-sdk-common:1.51.0 +- io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.51.0 +- io.opentelemetry:opentelemetry-sdk-logs:1.51.0 +- io.opentelemetry:opentelemetry-sdk-metrics:1.51.0 +- io.opentelemetry:opentelemetry-sdk-trace:1.51.0 +- io.opentelemetry:opentelemetry-sdk:1.51.0 +- org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10 +- org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.8.21 +- org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.8.21 +- org.jetbrains.kotlin:kotlin-stdlib:1.8.21 +- org.jetbrains:annotations:17.0.0 From e540c11bbd83f6f241df4c613c2853c396861ce5 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Fri, 29 Aug 2025 19:46:49 +0200 Subject: [PATCH 09/10] Renamed packages from com.alibaba to org.apache --- fluss-dist/pom.xml | 4 +-- .../fluss-metrics-opentelemetry/pom.xml | 8 ++--- .../metrics/opentelemetry/MetricMetadata.java | 4 +-- .../opentelemetry/OpenTelemetryAdapter.java | 10 +++---- .../opentelemetry/OpenTelemetryReporter.java | 30 +++++++++---------- .../OpenTelemetryReporterPlugin.java | 12 ++++---- .../opentelemetry/MetricMetadataTest.java | 2 +- .../OpenTelemetryAdapterTest.java | 12 ++++---- .../OpenTelemetryReporterITCase.java | 24 +++++++-------- .../OpenTelemetryReporterITCaseBase.java | 24 ++++++++------- .../OpenTelemetryReporterPluginTest.java | 8 ++--- .../OpenTelemetryReporterTest.java | 22 +++++++------- .../OpenTelemetryTestContainer.java | 2 +- .../metrics/opentelemetry/TestUtils.java | 6 ++-- .../common/TestContainerExtension.java | 7 ++--- 15 files changed, 89 insertions(+), 86 deletions(-) rename fluss-metrics/fluss-metrics-opentelemetry/src/main/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/MetricMetadata.java (94%) rename fluss-metrics/fluss-metrics-opentelemetry/src/main/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java (97%) rename fluss-metrics/fluss-metrics-opentelemetry/src/main/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporter.java (95%) rename fluss-metrics/fluss-metrics-opentelemetry/src/main/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java (89%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/MetricMetadataTest.java (96%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java (97%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java (95%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java (90%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java (95%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java (93%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java (98%) rename fluss-metrics/fluss-metrics-opentelemetry/src/test/java/{com/alibaba => org/apache}/fluss/metrics/opentelemetry/TestUtils.java (88%) diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 1d9df4ad3d..14c9212250 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -77,9 +77,9 @@ - com.alibaba.fluss + org.apache.fluss fluss-metrics-opentelemetry - ${project.version} + 0.8-SNAPSHOT provided diff --git a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml index 6ec0c7250f..1b87c67819 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml +++ b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml @@ -22,7 +22,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.alibaba.fluss + org.apache.fluss fluss-metrics 0.8-SNAPSHOT @@ -37,7 +37,7 @@ - com.alibaba.fluss + org.apache.fluss fluss-common ${project.version} provided @@ -63,12 +63,12 @@ - com.alibaba.fluss + org.apache.fluss fluss-test-utils - com.alibaba.fluss + org.apache.fluss fluss-common ${project.version} test diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java similarity index 94% rename from fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java index aa712ce3f9..03dc39b722 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadata.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; -import com.alibaba.fluss.annotation.VisibleForTesting; +import org.apache.fluss.annotation.VisibleForTesting; import java.util.Map; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java similarity index 97% rename from fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java index 140c0b4447..cc2404ce8d 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Histogram; -import com.alibaba.fluss.metrics.Meter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; @@ -58,7 +58,7 @@ class OpenTelemetryAdapter { static final double[] HISTOGRAM_QUANTILES = {0.5, 0.75, 0.95, 0.99}; static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO = - InstrumentationScopeInfo.create("com.alibaba.fluss.metrics"); + InstrumentationScopeInfo.create("org.apache.fluss.metrics"); public static Optional convertCounter( CollectionMetadata collectionMetadata, diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java similarity index 95% rename from fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java index 58e0922776..9c280f57d2 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporter.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java @@ -15,21 +15,21 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; - -import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.exception.FlussRuntimeException; -import com.alibaba.fluss.metrics.CharacterFilter; -import com.alibaba.fluss.metrics.Counter; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Histogram; -import com.alibaba.fluss.metrics.Meter; -import com.alibaba.fluss.metrics.Metric; -import com.alibaba.fluss.metrics.groups.MetricGroup; -import com.alibaba.fluss.metrics.reporter.MetricReporter; -import com.alibaba.fluss.metrics.reporter.ScheduledMetricReporter; +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java similarity index 89% rename from fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java index 0f924d9197..27373d4e13 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; /* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.exception.IllegalConfigurationException; -import com.alibaba.fluss.metrics.reporter.MetricReporter; -import com.alibaba.fluss.metrics.reporter.MetricReporterPlugin; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; import java.time.Duration; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java similarity index 96% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java index 59774a216f..a99a2c7244 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/MetricMetadataTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java similarity index 97% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java index c1041ee994..60b246145b 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; /* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ -import com.alibaba.fluss.metrics.Counter; -import com.alibaba.fluss.metrics.MeterView; -import com.alibaba.fluss.metrics.SimpleCounter; -import com.alibaba.fluss.metrics.util.TestHistogram; -import com.alibaba.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.MeterView; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java similarity index 95% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java index 45f61d2000..5a2fd66cad 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; - -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Histogram; -import com.alibaba.fluss.metrics.MeterView; -import com.alibaba.fluss.metrics.SimpleCounter; -import com.alibaba.fluss.metrics.groups.MetricGroup; -import com.alibaba.fluss.metrics.util.TestHistogram; -import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import com.alibaba.fluss.testutils.common.TestLoggerExtension; +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.MeterView; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.testutils.common.TestLoggerExtension; import org.assertj.core.data.Percentage; import org.junit.jupiter.api.BeforeEach; @@ -84,7 +84,7 @@ public void testReport(ConfigOptions.OpenTelemetryExporter exporterType) throws JsonNode scopeMetrics = json.findPath("resourceMetrics").findPath("scopeMetrics"); assertThat(scopeMetrics.findPath("scope").findPath("name").asText()) - .isEqualTo("com.alibaba.fluss.metrics"); + .isEqualTo("org.apache.fluss.metrics"); JsonNode metrics = scopeMetrics.findPath("metrics"); List metricNames = extractMetricNames(json); diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java similarity index 90% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java index 2d92450567..8dcba11101 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; -import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.alibaba.fluss.testutils.common.AllCallbackWrapper; -import com.alibaba.fluss.testutils.common.TestContainerExtension; -import com.alibaba.fluss.testutils.common.TestLoggerExtension; -import com.alibaba.fluss.utils.function.ThrowingConsumer; -import com.alibaba.fluss.utils.function.ThrowingRunnable; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.testutils.common.TestContainerExtension; +import org.apache.fluss.testutils.common.TestLoggerExtension; +import org.apache.fluss.utils.function.ThrowingConsumer; +import org.apache.fluss.utils.function.ThrowingRunnable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; @@ -128,8 +128,12 @@ static void eventually(long deadline, ThrowingRunnable runnable) thro } public static List extractMetricNames(JsonNode json) { - return json.findPath("resourceMetrics").findPath("scopeMetrics").findPath("metrics") - .findValues("name").stream() + return json + .findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics") + .findValues("name") + .stream() .map(JsonNode::asText) .collect(Collectors.toList()); } diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java similarity index 95% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java index 290b5743cc..4c8365b981 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java similarity index 93% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java index 0a008698bc..4d2134a6dc 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; - -import com.alibaba.fluss.config.ConfigOptions; -import com.alibaba.fluss.metrics.Counter; -import com.alibaba.fluss.metrics.Gauge; -import com.alibaba.fluss.metrics.Histogram; -import com.alibaba.fluss.metrics.Meter; -import com.alibaba.fluss.metrics.SimpleCounter; -import com.alibaba.fluss.metrics.groups.MetricGroup; -import com.alibaba.fluss.metrics.util.TestHistogram; -import com.alibaba.fluss.metrics.util.TestMeter; +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; import io.opentelemetry.semconv.ServiceAttributes; import org.junit.jupiter.api.BeforeEach; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java similarity index 98% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java index 04e5be058b..19671dce89 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; import com.github.dockerjava.api.command.InspectContainerResponse; import org.testcontainers.containers.GenericContainer; diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java similarity index 88% rename from fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java rename to fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java index 0cacce31c0..a617feb7f4 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/com/alibaba/fluss/metrics/opentelemetry/TestUtils.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package com.alibaba.fluss.metrics.opentelemetry; +package org.apache.fluss.metrics.opentelemetry; -import com.alibaba.fluss.metrics.groups.MetricGroup; -import com.alibaba.fluss.metrics.util.TestMetricGroup; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestMetricGroup; import java.util.Map; diff --git a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java index 38c7400eb6..14575dae6a 100644 --- a/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java +++ b/fluss-test-utils/src/main/java/org/apache/fluss/testutils/common/TestContainerExtension.java @@ -15,13 +15,11 @@ * limitations under the License. */ -package com.alibaba.fluss.testutils.common; +package org.apache.fluss.testutils.common; import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.containers.GenericContainer; -import javax.annotation.Nullable; - import java.util.function.Supplier; /* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache @@ -35,7 +33,8 @@ * @param The {@link GenericContainer} that shall be managed. */ public class TestContainerExtension> implements CustomExtension { - @Nullable private T testContainer; + + private T testContainer; private final Supplier testContainerCreator; From c758d27a99ade1c84790a914cdf3228462c37185 Mon Sep 17 00:00:00 2001 From: Michael Koepf Date: Fri, 29 Aug 2025 21:42:07 +0200 Subject: [PATCH 10/10] Remove HTTP exporter (there was a bug which was not caught by the IT test setup) --- .../apache/fluss/config/ConfigOptions.java | 19 ----- .../opentelemetry/OpenTelemetryReporter.java | 31 ++------ .../OpenTelemetryReporterPlugin.java | 9 +-- .../OpenTelemetryReporterITCase.java | 77 ++++++------------- .../OpenTelemetryReporterPluginTest.java | 23 ++---- .../OpenTelemetryReporterTest.java | 18 +---- .../OpenTelemetryTestContainer.java | 6 -- .../test/resources/opentelemetry-config.yaml | 2 - 8 files changed, 41 insertions(+), 144 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 2ff7aefedd..35cfa81631 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1656,12 +1656,6 @@ public class ConfigOptions { // ------------------------------------------------------------------------ // ConfigOptions for OpenTelemetry reporter // ------------------------------------------------------------------------ - /** OpenTelemetry protocol. */ - public enum OpenTelemetryExporter { - GRPC, - HTTP - } - public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_ENDPOINT = key("metrics.reporter.opentelemetry.endpoint") .stringType() @@ -1669,19 +1663,6 @@ public enum OpenTelemetryExporter { .withDescription( "Target to which the OpenTelemetry metric reporter is going to send metrics to."); - public static final ConfigOption - METRICS_REPORTER_OPENTELEMETRY_EXPORTER = - key("metrics.reporter.opentelemetry.exporter") - .enumType(OpenTelemetryExporter.class) - .defaultValue(OpenTelemetryExporter.GRPC) - .withDescription( - "The type of exporter that is used by the OpenTelemetry metric exporter to send metrics to the configured endpoint. " - + "The endpoint must accept connections for the given exporter type. Supported exporters: " - + OpenTelemetryExporter.GRPC.name() - + ", " - + OpenTelemetryExporter.HTTP.name() - + "."); - public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL = key("metrics.reporter.opentelemetry.export-interval") .durationType() diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java index 9c280f57d2..92b145eb2c 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java @@ -18,9 +18,7 @@ package org.apache.fluss.metrics.opentelemetry; import org.apache.fluss.annotation.VisibleForTesting; -import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.metrics.CharacterFilter; import org.apache.fluss.metrics.Counter; import org.apache.fluss.metrics.Gauge; @@ -32,8 +30,6 @@ import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; -import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -87,36 +83,21 @@ public class OpenTelemetryReporter implements MetricReporter, ScheduledMetricRep OpenTelemetryReporter( String endpoint, - ConfigOptions.OpenTelemetryExporter exporterType, Duration interval, Duration timeout, String serviceName, String serviceVersion) { - this.exporter = buildMetricExporter(exporterType, endpoint, timeout); + this.exporter = buildMetricExporter(endpoint, timeout); this.resource = buildResource(serviceName, serviceVersion); this.interval = interval; this.clock = Clock.systemUTC(); } - private static MetricExporter buildMetricExporter( - ConfigOptions.OpenTelemetryExporter exporterType, String endpoint, Duration timeout) { - switch (exporterType) { - case GRPC: - OtlpGrpcMetricExporterBuilder grpcExporterBuilder = - OtlpGrpcMetricExporter.builder(); - grpcExporterBuilder.setEndpoint(endpoint); - grpcExporterBuilder.setTimeout(timeout); - return grpcExporterBuilder.build(); - case HTTP: - OtlpHttpMetricExporterBuilder httpExporterBuilder = - OtlpHttpMetricExporter.builder(); - httpExporterBuilder.setEndpoint(endpoint); - httpExporterBuilder.setTimeout(timeout); - return httpExporterBuilder.build(); - default: - LOG.error("Unsupported OpenTelemetry exporter type: {}", exporterType); - throw new FlussRuntimeException("OpenTelemetry exporter type: " + exporterType); - } + private static MetricExporter buildMetricExporter(String endpoint, Duration timeout) { + OtlpGrpcMetricExporterBuilder grpcExporterBuilder = OtlpGrpcMetricExporter.builder(); + grpcExporterBuilder.setEndpoint(endpoint); + grpcExporterBuilder.setTimeout(timeout); + return grpcExporterBuilder.build(); } private static Resource buildResource(String serviceName, String serviceVersion) { diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java index 27373d4e13..dced8c3b78 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java @@ -42,12 +42,6 @@ public MetricReporter createMetricReporter(Configuration configuration) { throw new IllegalConfigurationException( ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT.key() + " must be set."); } - ConfigOptions.OpenTelemetryExporter exporterType = - configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER); - if (exporterType == null) { - throw new IllegalConfigurationException( - ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER.key() + " must be set."); - } Duration interval = configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL); Duration timeout = @@ -56,8 +50,7 @@ public MetricReporter createMetricReporter(Configuration configuration) { configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME); String serviceVersion = configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_VERSION); - return new OpenTelemetryReporter( - endpoint, exporterType, interval, timeout, serviceName, serviceVersion); + return new OpenTelemetryReporter(endpoint, interval, timeout, serviceName, serviceVersion); } @Override diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java index 5a2fd66cad..690a4a2c49 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java @@ -17,7 +17,6 @@ package org.apache.fluss.metrics.opentelemetry; -import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.metrics.Gauge; import org.apache.fluss.metrics.Histogram; import org.apache.fluss.metrics.MeterView; @@ -29,9 +28,8 @@ import org.assertj.core.data.Percentage; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; import java.time.Duration; import java.util.HashMap; @@ -58,10 +56,9 @@ public void setUpEach() { group = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, new HashMap<>()); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - public void testReport(ConfigOptions.OpenTelemetryExporter exporterType) throws Exception { - OpenTelemetryReporter reporter = createReporter(exporterType); + @Test + public void testReport() throws Exception { + OpenTelemetryReporter reporter = createReporter(); SimpleCounter counter = new SimpleCounter(); reporter.notifyOfAddedMetric(counter, "foo.counter", group); @@ -117,11 +114,9 @@ private static void assertMetrics(JsonNode metric) { } } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - public void testReportAfterUnregister(ConfigOptions.OpenTelemetryExporter exporterType) - throws Exception { - OpenTelemetryReporter reporter = createReporter(exporterType); + @Test + public void testReportAfterUnregister() throws Exception { + OpenTelemetryReporter reporter = createReporter(); SimpleCounter counter1 = new SimpleCounter(); SimpleCounter counter2 = new SimpleCounter(); @@ -145,11 +140,9 @@ public void testReportAfterUnregister(ConfigOptions.OpenTelemetryExporter export }); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - public void testCounterDelta(ConfigOptions.OpenTelemetryExporter exporterType) - throws Exception { - OpenTelemetryReporter reporter = createReporter(exporterType); + @Test + public void testCounterDelta() throws Exception { + OpenTelemetryReporter reporter = createReporter(); SimpleCounter counter = new SimpleCounter(); reporter.notifyOfAddedMetric(counter, "foo.counter", group); @@ -199,13 +192,11 @@ public void testCounterDelta(ConfigOptions.OpenTelemetryExporter exporterType) }); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - public void testOpenTelemetryAttributes(ConfigOptions.OpenTelemetryExporter exporterType) - throws Exception { + @Test + public void testOpenTelemetryAttributes() throws Exception { String serviceName = "flink-bar"; String serviceVersion = "v42"; - OpenTelemetryReporter reporter = createReporter(exporterType, serviceName, serviceVersion); + OpenTelemetryReporter reporter = createReporter(serviceName, serviceVersion); SimpleCounter counter = new SimpleCounter(); reporter.notifyOfAddedMetric(counter, "foo.counter", group); @@ -246,39 +237,19 @@ public void testOpenTelemetryAttributes(ConfigOptions.OpenTelemetryExporter expo }); } - private static OpenTelemetryReporter createReporter( - ConfigOptions.OpenTelemetryExporter exporterType, - String serviceName, - String serviceVersion) { - String endpoint; - switch (exporterType) { - case GRPC: - endpoint = - OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer() - .getGrpcEndpoint(); - break; - case HTTP: - endpoint = - OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer() - .getHttpEndpoint(); - break; - default: - throw new IllegalStateException("OpenTelemetry exporter type: " + exporterType); - } + private static OpenTelemetryReporter createReporter(String serviceName, String serviceVersion) { + String endpoint = + OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer().getGrpcEndpoint(); - OpenTelemetryReporter reporter = - new OpenTelemetryReporter( - endpoint, - exporterType, - Duration.ofSeconds(10), - Duration.ofSeconds(10), - serviceName, - serviceVersion); - return reporter; + return new OpenTelemetryReporter( + endpoint, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + serviceName, + serviceVersion); } - private static OpenTelemetryReporter createReporter( - ConfigOptions.OpenTelemetryExporter exporterType) { - return createReporter(exporterType, null, null); + private static OpenTelemetryReporter createReporter() { + return createReporter(null, null); } } diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java index 4c8365b981..4e3d287290 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java @@ -21,8 +21,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.IllegalConfigurationException; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.api.Test; import java.time.Duration; @@ -35,17 +34,13 @@ public class OpenTelemetryReporterPluginTest { private final OpenTelemetryReporterPlugin openTelemetryReporterPlugin = new OpenTelemetryReporterPlugin(); - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - void testValidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) { + @Test + void testValidConfiguration() { // mandatory options Configuration configuration = new Configuration(); configuration.setString( ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, "http://opentelemetry-metric-collector:4317"); - configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER, exporterType); - assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) - .doesNotThrowAnyException(); // optional options configuration.set( @@ -62,11 +57,10 @@ void testValidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) { .doesNotThrowAnyException(); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - void testInvalidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) { + @Test + void testInvalidConfiguration() { Configuration configuration = new Configuration(); - // invalid endpoint and no exporter type + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) .isInstanceOf(IllegalConfigurationException.class); @@ -77,10 +71,5 @@ void testInvalidConfiguration(ConfigOptions.OpenTelemetryExporter exporterType) configuration.setString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, " "); assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) .isInstanceOf(IllegalConfigurationException.class); - - // endpoint is still invalid - configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORTER, exporterType); - assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) - .isInstanceOf(IllegalConfigurationException.class); } } diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java index 4d2134a6dc..5d1fdfd40e 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java @@ -17,7 +17,6 @@ package org.apache.fluss.metrics.opentelemetry; -import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.metrics.Counter; import org.apache.fluss.metrics.Gauge; import org.apache.fluss.metrics.Histogram; @@ -30,8 +29,6 @@ import io.opentelemetry.semconv.ServiceAttributes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; import java.time.Duration; import java.util.AbstractMap; @@ -64,14 +61,12 @@ void setupReporter() { metricGroup = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, labels); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - void testInvalidEndpoint(ConfigOptions.OpenTelemetryExporter exporterType) { + @Test + void testInvalidEndpoint() { assertThatThrownBy( () -> new OpenTelemetryReporter( "endpoint-with-missing-protocol", - exporterType, Duration.ofSeconds(5), Duration.ofSeconds(5), null, @@ -82,7 +77,6 @@ void testInvalidEndpoint(ConfigOptions.OpenTelemetryExporter exporterType) { () -> new OpenTelemetryReporter( "invalid://protocol", - exporterType, Duration.ofSeconds(5), Duration.ofSeconds(5), null, @@ -90,14 +84,11 @@ void testInvalidEndpoint(ConfigOptions.OpenTelemetryExporter exporterType) { .isInstanceOf(IllegalArgumentException.class); } - @ParameterizedTest - @EnumSource(ConfigOptions.OpenTelemetryExporter.class) - void testOpenTelemetryResourceIsConstructedCorrectly( - ConfigOptions.OpenTelemetryExporter exporterType) { + @Test + void testOpenTelemetryResourceIsConstructedCorrectly() { OpenTelemetryReporter reporter = new OpenTelemetryReporter( "http://opentelemetry-collector:4317", - exporterType, Duration.ofSeconds(5), Duration.ofSeconds(5), "fluss", @@ -162,7 +153,6 @@ void testRemoveEnclosingAngleBrackets() { private OpenTelemetryReporter createReporter() { return new OpenTelemetryReporter( "http://endpoint-must-not-be-called-in-unit-tests", - ConfigOptions.OpenTelemetryExporter.GRPC, Duration.ofSeconds(5), Duration.ofSeconds(5), null, diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java index 19671dce89..c22a18b3b9 100644 --- a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java @@ -38,7 +38,6 @@ public class OpenTelemetryTestContainer extends GenericContainer