diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 8b4664d70a..35cfa81631 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1653,6 +1653,42 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9990-9999."); + // ------------------------------------------------------------------------ + // ConfigOptions for OpenTelemetry reporter + // ------------------------------------------------------------------------ + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_ENDPOINT = + key("metrics.reporter.opentelemetry.endpoint") + .stringType() + .noDefaultValue() + .withDescription( + "Target to which the OpenTelemetry metric reporter is going to send metrics to."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL = + key("metrics.reporter.opentelemetry.export-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "Frequency of metric export by the OpenTelemetry metric reporter to the endpoint."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT = + key("metrics.reporter.opentelemetry.export-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "Maximum time the OpenTelemetry metric reporter will wait for each metric export."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME = + key("metrics.reporter.opentelemetry.service.name") + .stringType() + .noDefaultValue() + .withDescription("Service name that is set in the OpenTelemetry Resource."); + + public static final ConfigOption METRICS_REPORTER_OPENTELEMETRY_SERVICE_VERSION = + key("metrics.reporter.opentelemetry.service.version") + .stringType() + .noDefaultValue() + .withDescription("Service version that is set in the OpenTelemetry Resource."); + // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 243918a665..14c9212250 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -76,6 +76,13 @@ provided + + org.apache.fluss + fluss-metrics-opentelemetry + 0.8-SNAPSHOT + provided + + org.apache.fluss fluss-lake-paimon @@ -96,7 +103,7 @@ ${project.version} provided - + org.apache.flink flink-shaded-hadoop-2-uber diff --git a/fluss-dist/src/main/assemblies/plugins.xml b/fluss-dist/src/main/assemblies/plugins.xml index e41d79006c..9fb4fb5110 100644 --- a/fluss-dist/src/main/assemblies/plugins.xml +++ b/fluss-dist/src/main/assemblies/plugins.xml @@ -81,6 +81,15 @@ 0644 + + + ../fluss-metrics/fluss-metrics-opentelemetry/target/fluss-metrics-opentelemetry-${project.version}.jar + + plugins/opentelemetry/ + fluss-metrics-opentelemetry-${project.version}.jar + 0644 + + ../fluss-lake/fluss-lake-paimon/target/fluss-lake-paimon-${project.version}.jar diff --git a/fluss-metrics/fluss-metrics-opentelemetry/pom.xml b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml new file mode 100644 index 0000000000..1b87c67819 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/pom.xml @@ -0,0 +1,117 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-metrics + 0.8-SNAPSHOT + + + fluss-metrics-opentelemetry + Fluss : Metrics : OpenTelemetry + + + 1.51.0 + 1.34.0 + + + + + org.apache.fluss + fluss-common + ${project.version} + provided + + + + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + io.opentelemetry.semconv + opentelemetry-semconv + ${opentelemetry-semconv.version} + + + + + org.apache.fluss + fluss-test-utils + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + org.testcontainers + testcontainers + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + *:* + + + + + * + + okhttp3/internal/publicsuffix/NOTICE + + + + + + + + + + \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java new file mode 100644 index 0000000000..03dc39b722 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/MetricMetadata.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.annotation.VisibleForTesting; + +import java.util.Map; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Metadata associated with a given metric, stored alongside it so that it can be provided during + * reporting time. + */ +class MetricMetadata { + + @VisibleForTesting static final String NAME_SEPARATOR = "."; + + private final String name; + private final Map variables; + + public MetricMetadata(String name, Map variables) { + this.name = name; + this.variables = variables; + } + + public MetricMetadata subMetric(String suffix) { + return new MetricMetadata(name + NAME_SEPARATOR + suffix, variables); + } + + public String getName() { + return name; + } + + public Map getVariables() { + return variables; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java new file mode 100644 index 0000000000..cc2404ce8d --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapter.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.ValueAtQuantile; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; +import io.opentelemetry.sdk.resources.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * An adapter class which translates from Fluss metrics to OpenTelmetry metrics which can exported + * with the standard OpenTelemetry {@link io.opentelemetry.sdk.metrics.export.MetricExporter}s. + */ +class OpenTelemetryAdapter { + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryAdapter.class); + static final double[] HISTOGRAM_QUANTILES = {0.5, 0.75, 0.95, 0.99}; + + static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO = + InstrumentationScopeInfo.create("org.apache.fluss.metrics"); + + public static Optional convertCounter( + CollectionMetadata collectionMetadata, + Long count, + Long previousCount, + MetricMetadata metricMetadata) { + long delta = count - previousCount; + if (delta < 0) { + LOG.warn( + "Non-monotonic counter {}: current count {} is less than previous count {}", + metricMetadata.getName(), + count, + previousCount); + return Optional.empty(); + } + + boolean isMonotonic = true; + return Optional.of( + ImmutableMetricData.createLongSum( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableSumData.create( + isMonotonic, + AggregationTemporality.DELTA, + Collections.singleton( + ImmutableLongPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + delta))))); + } + + /** + * Converts a Fluss Gauge to a {@link MetricData}. + * + * @param collectionMetadata The common collection metadata + * @param gauge The Fluss Gauge to convert + * @param metricMetadata The metric metadata + * @return A {@link MetricData} if it's able to convert successfully + */ + public static Optional convertGauge( + CollectionMetadata collectionMetadata, Gauge gauge, MetricMetadata metricMetadata) { + if (!(gauge.getValue() instanceof Number)) { + LOG.debug( + "Couldn't adapt gauge {} with value {} and type {}", + metricMetadata.getName(), + gauge.getValue(), + gauge.getValue().getClass().getName()); + return Optional.empty(); + } + + Number number = (Number) gauge.getValue(); + if (number instanceof Long || number instanceof Integer) { + return Optional.of( + ImmutableMetricData.createLongGauge( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableGaugeData.create( + Collections.singleton( + ImmutableLongPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + number.longValue()))))); + } else { + return Optional.of( + ImmutableMetricData.createDoubleGauge( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableGaugeData.create( + Collections.singleton( + ImmutableDoublePointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + number.doubleValue()))))); + } + } + + /** + * Converts a Fluss Meter to a {@link MetricData}. + * + * @param collectionMetadata The common collection metadata + * @param meter The Fluss Meter to convert + * @param metricMetadata The metric metadata + * @return A {@link MetricData} if it's able to convert successfully + */ + public static List convertMeter( + CollectionMetadata collectionMetadata, + Meter meter, + Long count, + Long previousCount, + MetricMetadata metricMetadata) { + List metricData = new ArrayList<>(); + convertCounter(collectionMetadata, count, previousCount, metricMetadata.subMetric("count")) + .ifPresent(metricData::add); + convertGauge(collectionMetadata, meter::getRate, metricMetadata.subMetric("rate")) + .ifPresent(metricData::add); + return metricData; + } + + /** + * Converts a Fluss Histogram to a list of {@link MetricData}s. + * + * @param collectionMetadata The common collection metadata + * @param histogram The Fluss Histogram to convert + * @param metricMetadata The metric metadata + * @return A list of {@link MetricData}s if it's able to convert successfully, or empty if not + */ + public static Optional convertHistogram( + CollectionMetadata collectionMetadata, + Histogram histogram, + MetricMetadata metricMetadata) { + List quantileList = new ArrayList<>(); + quantileList.add(ImmutableValueAtQuantile.create(0, histogram.getStatistics().getMin())); + for (double histogramQuantile : HISTOGRAM_QUANTILES) { + quantileList.add( + ImmutableValueAtQuantile.create( + histogramQuantile, + histogram.getStatistics().getQuantile(histogramQuantile))); + } + quantileList.add(ImmutableValueAtQuantile.create(1, histogram.getStatistics().getMax())); + quantileList.add(ImmutableValueAtQuantile.create(1, histogram.getStatistics().getMax())); + return Optional.of( + ImmutableMetricData.createDoubleSummary( + collectionMetadata.getOpenTelemetryResource(), + INSTRUMENTATION_SCOPE_INFO, + metricMetadata.getName(), + "", + "", + ImmutableSummaryData.create( + Collections.singleton( + ImmutableSummaryPointData.create( + collectionMetadata.getStartEpochNanos(), + collectionMetadata.getEpochNanos(), + convertVariables(metricMetadata.getVariables()), + histogram.getCount(), + histogram.getStatistics().getMean() + * histogram.getCount(), + quantileList))))); + } + + private static Attributes convertVariables(Map variables) { + AttributesBuilder builder = Attributes.builder(); + variables.forEach(builder::put); + return builder.build(); + } + + /** The common metadata associated with a collection of the metrics. */ + public static class CollectionMetadata { + + private final Resource openTelemetryResource; + private final long startEpochNanos; + private final long epochNanos; + + public CollectionMetadata( + Resource openTelemetryResource, long startEpochNanos, long epochNanos) { + this.openTelemetryResource = openTelemetryResource; + this.startEpochNanos = startEpochNanos; + this.epochNanos = epochNanos; + } + + public Resource getOpenTelemetryResource() { + return openTelemetryResource; + } + + public long getStartEpochNanos() { + return startEpochNanos; + } + + public long getEpochNanos() { + return epochNanos; + } + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java new file mode 100644 index 0000000000..92b145eb2c --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporter.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.ServiceAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** {@link MetricReporter} that exports {@link Metric Metrics} via OpenTelemetry. */ +public class OpenTelemetryReporter implements MetricReporter, ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryReporter.class); + + private static final char SCOPE_SEPARATOR = '.'; + private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + + @VisibleForTesting final Resource resource; + private final MetricExporter exporter; + private final Duration interval; + private final Clock clock; + + @VisibleForTesting final Map, MetricMetadata> gauges = new HashMap<>(); + @VisibleForTesting final Map counters = new HashMap<>(); + @VisibleForTesting final Map histograms = new HashMap<>(); + @VisibleForTesting final Map meters = new HashMap<>(); + + private Map lastValueSnapshots = Collections.emptyMap(); + private long lastCollectTimeNanos = 0; + private @Nullable CompletableResultCode lastResult; + + OpenTelemetryReporter( + String endpoint, + Duration interval, + Duration timeout, + String serviceName, + String serviceVersion) { + this.exporter = buildMetricExporter(endpoint, timeout); + this.resource = buildResource(serviceName, serviceVersion); + this.interval = interval; + this.clock = Clock.systemUTC(); + } + + private static MetricExporter buildMetricExporter(String endpoint, Duration timeout) { + OtlpGrpcMetricExporterBuilder grpcExporterBuilder = OtlpGrpcMetricExporter.builder(); + grpcExporterBuilder.setEndpoint(endpoint); + grpcExporterBuilder.setTimeout(timeout); + return grpcExporterBuilder.build(); + } + + private static Resource buildResource(String serviceName, String serviceVersion) { + Resource resource = Resource.getDefault(); + + if (serviceName != null) { + resource = + resource.merge( + Resource.create( + Attributes.of(ServiceAttributes.SERVICE_NAME, serviceName))); + } + + if (serviceVersion != null) { + resource = + resource.merge( + Resource.create( + Attributes.of( + ServiceAttributes.SERVICE_VERSION, serviceVersion))); + } + + return resource; + } + + @Override + public void open(Configuration config) { + // do nothing + } + + @Override + public void close() { + exporter.flush(); + + if (lastResult != null) { + LOG.debug("Waiting for up to 1 minute to export pending metrics."); + CompletableResultCode resultCode = lastResult.join(1, TimeUnit.MINUTES); + if (!resultCode.isSuccess()) { + LOG.warn( + "Failed to export pending metrics when closing reporter. Result code: {}. Details: {}", + resultCode, + resultCode.getFailureThrowable()); + } + } + + exporter.close(); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = SCOPE_PREFIX + getLogicalScope(group) + SCOPE_SEPARATOR + metricName; + + Map variables = + group.getAllVariables().entrySet().stream() + .collect( + Collectors.toMap( + e -> removeEnclosingAngleBrackets(e.getKey()), + Map.Entry::getValue)); + LOG.debug("Adding metric {} with variables {}", metricName, variables); + + final MetricMetadata metricMetadata = new MetricMetadata(name, variables); + + synchronized (this) { + switch (metric.getMetricType()) { + case COUNTER: + this.counters.put((Counter) metric, metricMetadata); + break; + case GAUGE: + this.gauges.put((Gauge) metric, metricMetadata); + break; + case HISTOGRAM: + this.histograms.put((Histogram) metric, metricMetadata); + break; + case METER: + this.meters.put((Meter) metric, metricMetadata); + break; + default: + LOG.warn( + "Cannot add unknown metric type {}. This indicates that the reporter does not " + + "support this metric type.", + metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + // Make sure we are not caching the metric object + lastValueSnapshots.remove(metric); + + switch (metric.getMetricType()) { + case COUNTER: + this.counters.remove((Counter) metric); + break; + case GAUGE: + this.gauges.remove((Gauge) metric); + break; + case HISTOGRAM: + this.histograms.remove((Histogram) metric); + break; + case METER: + this.meters.remove((Meter) metric); + break; + default: + LOG.warn( + "Cannot remove unknown metric type {}. This indicates that the reporter does " + + "not support this metric type.", + metric.getClass().getName()); + } + } + } + + @Override + public void report() { + synchronized (this) { + Collection metricData = collectMetrics(); + try { + lastResult = exporter.export(metricData); + lastResult.whenComplete( + () -> { + if (lastResult.isSuccess()) { + LOG.debug( + "Exported {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } else { + LOG.warn( + "Failed to export {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } + }); + } catch (Exception e) { + LOG.error( + "Failed to call export for {} metrics using {}", + metricData.size(), + exporter.getClass().getName()); + } + } + } + + @Override + public Duration scheduleInterval() { + return interval; + } + + private static String getLogicalScope(MetricGroup group) { + return group.getLogicalScope(CharacterFilter.NO_OP_FILTER, SCOPE_SEPARATOR); + } + + @VisibleForTesting + static String removeEnclosingAngleBrackets(String str) { + if (str.length() >= 2 && str.charAt(0) == '<' && str.charAt(str.length() - 1) == '>') { + return str.substring(1, str.length() - 1); + } + return str; + } + + private long getCurrentTimeNanos() { + Instant now = clock.instant(); + return TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano(); + } + + private Collection collectMetrics() { + // This method is not thread-safe. + // The callee must hold the monitor on 'this' before calling it. + long currentTimeNanos = getCurrentTimeNanos(); + List data = new ArrayList<>(); + OpenTelemetryAdapter.CollectionMetadata collectionMetadata = + new OpenTelemetryAdapter.CollectionMetadata( + resource, lastCollectTimeNanos, currentTimeNanos); + + Map currentValueSnapshots = takeLastValueSnapshots(); + + for (Counter counter : counters.keySet()) { + Long count = currentValueSnapshots.get(counter); + Long lastCount = lastValueSnapshots.getOrDefault(counter, 0L); + MetricMetadata metricMetadata = counters.get(counter); + Optional metricData = + OpenTelemetryAdapter.convertCounter( + collectionMetadata, count, lastCount, metricMetadata); + metricData.ifPresent(data::add); + } + + for (Gauge gauge : gauges.keySet()) { + MetricMetadata metricMetadata = gauges.get(gauge); + Optional metricData = + OpenTelemetryAdapter.convertGauge(collectionMetadata, gauge, metricMetadata); + metricData.ifPresent(data::add); + } + + for (Meter meter : meters.keySet()) { + Long count = currentValueSnapshots.get(meter); + Long lastCount = lastValueSnapshots.getOrDefault(meter, 0L); + MetricMetadata metricMetadata = meters.get(meter); + List metricData = + OpenTelemetryAdapter.convertMeter( + collectionMetadata, meter, count, lastCount, metricMetadata); + data.addAll(metricData); + } + + for (Histogram histogram : histograms.keySet()) { + MetricMetadata metricMetadata = histograms.get(histogram); + Optional metricData = + OpenTelemetryAdapter.convertHistogram( + collectionMetadata, histogram, metricMetadata); + metricData.ifPresent(data::add); + } + + lastValueSnapshots = currentValueSnapshots; + lastCollectTimeNanos = currentTimeNanos; + + return data; + } + + private Map takeLastValueSnapshots() { + // This method is not thread-safe. + // The callee must hold the monitor on 'this' before calling it. + Map map = new HashMap<>(); + + for (Counter counter : counters.keySet()) { + map.put(counter, counter.getCount()); + } + + for (Meter meter : meters.keySet()) { + map.put(meter, meter.getCount()); + } + + return map; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java new file mode 100644 index 0000000000..dced8c3b78 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPlugin.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; + +import java.time.Duration; + +/** {@link MetricReporterPlugin} for {@link OpenTelemetryReporter}. */ +public class OpenTelemetryReporterPlugin implements MetricReporterPlugin { + + private static final String PLUGIN_NAME = "opentelemetry"; + + @Override + public MetricReporter createMetricReporter(Configuration configuration) { + String endpoint = + configuration.getString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT); + if (endpoint == null || endpoint.trim().isEmpty()) { + throw new IllegalConfigurationException( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT.key() + " must be set."); + } + Duration interval = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL); + Duration timeout = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT); + String serviceName = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME); + String serviceVersion = + configuration.get(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_VERSION); + return new OpenTelemetryReporter(endpoint, interval, timeout, serviceName, serviceVersion); + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..b49864cf74 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/NOTICE @@ -0,0 +1,29 @@ +fluss-metrics-opentelemetry +Copyright 2025 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.squareup.okhttp3:okhttp:4.12.0 +- com.squareup.okio:okio-jvm:3.6.0 +- com.squareup.okio:okio:3.6.0 +- io.opentelemetry.semconv:opentelemetry-semconv:1.34.0 +- io.opentelemetry:opentelemetry-api:1.51.0 +- io.opentelemetry:opentelemetry-context:1.51.0 +- io.opentelemetry:opentelemetry-exporter-common:1.51.0 +- io.opentelemetry:opentelemetry-exporter-otlp:1.51.0 +- io.opentelemetry:opentelemetry-exporter-otlp-common:1.51.0 +- io.opentelemetry:opentelemetry-exporter-sender-okhttp:1.51.0 +- io.opentelemetry:opentelemetry-sdk-common:1.51.0 +- io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.51.0 +- io.opentelemetry:opentelemetry-sdk-logs:1.51.0 +- io.opentelemetry:opentelemetry-sdk-metrics:1.51.0 +- io.opentelemetry:opentelemetry-sdk-trace:1.51.0 +- io.opentelemetry:opentelemetry-sdk:1.51.0 +- org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10 +- org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.8.21 +- org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.8.21 +- org.jetbrains.kotlin:kotlin-stdlib:1.8.21 +- org.jetbrains:annotations:17.0.0 diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin new file mode 100644 index 0000000000..abcbf1742d --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/main/resources/META-INF/services/com.alibaba.fluss.metrics.reporter.MetricReporterPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +com.alibaba.fluss.metrics.opentelemetry.OpenTelemetryReporterPlugin diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java new file mode 100644 index 0000000000..a99a2c7244 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/MetricMetadataTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.HashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MetricMetadata}. */ +public class MetricMetadataTest { + @ParameterizedTest + @ValueSource(strings = {"fluss", "fluss.group1.", "fluss.group1.subgroup1"}) + public void testSubMetric(String metricName) { + MetricMetadata metricMetadata = + new MetricMetadata(metricName, new HashMap<>()).subMetric("mysuffix"); + assertThat(metricMetadata.getName().endsWith(MetricMetadata.NAME_SEPARATOR + "mysuffix")) + .isTrue(); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java new file mode 100644 index 0000000000..60b246145b --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryAdapterTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.MeterView; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableMap; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryAdapter}. */ +public class OpenTelemetryAdapterTest { + private static final OpenTelemetryAdapter.CollectionMetadata METADATA = + new OpenTelemetryAdapter.CollectionMetadata( + Resource.create(Attributes.empty()), 123L, 345L); + + private static final Map VARIABLES = ImmutableMap.of("k1", "v1", "k2", "v2"); + + @Test + void testCounter() { + Optional metricData = + OpenTelemetryAdapter.convertCounter( + METADATA, 50L, 3L, new MetricMetadata("foo.bar.count", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.count"); + assertThat(metricData.get().getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + assertThat(metricData.get().getLongSumData().isMonotonic()).isEqualTo(true); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(metricData.get().getLongSumData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get().getLongSumData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(47L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testGaugeDouble() { + Optional metricData = + OpenTelemetryAdapter.convertGauge( + METADATA, () -> 123.456d, new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.value"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.DOUBLE_GAUGE); + assertThat(metricData.get().getDoubleGaugeData().getPoints().size()).isEqualTo(1); + DoublePointData data = metricData.get().getDoubleGaugeData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(123.456d); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testGaugeLong() { + Optional metricData = + OpenTelemetryAdapter.convertGauge( + METADATA, () -> 125L, new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.value"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(metricData.get().getLongGaugeData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get().getLongGaugeData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(125L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get().getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get().getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get().getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testMeter() { + Counter counter = new SimpleCounter(); + MeterView meter = new MeterView(counter); + counter.inc(345L); + meter.update(); + List metricData = + OpenTelemetryAdapter.convertMeter( + METADATA, + meter, + counter.getCount(), + 20L, + new MetricMetadata("foo.bar.value", VARIABLES)); + + assertThat(metricData.size()).isEqualTo(2); + assertThat(metricData.get(0).getName()).isEqualTo("foo.bar.value.count"); + assertThat(metricData.get(0).getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(metricData.get(0).getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + assertThat(metricData.get(0).getLongSumData().getPoints().size()).isEqualTo(1); + LongPointData data = metricData.get(0).getLongSumData().getPoints().iterator().next(); + assertThat(data.getValue()).isEqualTo(325L); + assertThat(asStringMap(data.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get(0).getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(0).getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(0).getDoubleGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(0).getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + + assertThat(metricData.get(1).getName()).isEqualTo("foo.bar.value.rate"); + assertThat(metricData.get(1).getType()).isEqualTo(MetricDataType.DOUBLE_GAUGE); + assertThat(metricData.get(1).getDoubleGaugeData().getPoints().size()).isEqualTo(1); + DoublePointData data2 = + metricData.get(1).getDoubleGaugeData().getPoints().iterator().next(); + // 345L / 60 seconds + assertThat(data2.getValue()).isEqualTo(5.75d); + assertThat(asStringMap(data2.getAttributes())).isEqualTo(VARIABLES); + assertThat(metricData.get(1).getLongSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(1).getDoubleSumData()).isEqualTo(ImmutableSumData.empty()); + assertThat(metricData.get(1).getLongGaugeData()).isEqualTo(ImmutableGaugeData.empty()); + assertThat(metricData.get(1).getHistogramData()).isEqualTo(ImmutableHistogramData.empty()); + } + + @Test + void testHistogram() { + TestHistogram histogram = new TestHistogram(); + Optional metricData = + OpenTelemetryAdapter.convertHistogram( + METADATA, histogram, new MetricMetadata("foo.bar.histogram", VARIABLES)); + + assertThat(metricData.isPresent()).isTrue(); + assertThat(metricData.get().getName()).isEqualTo("foo.bar.histogram"); + assertThat(metricData.get().getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(metricData.get().getSummaryData().getPoints().size()).isEqualTo(1); + + SummaryPointData summaryPointData = + metricData.get().getSummaryData().getPoints().iterator().next(); + + assertThat(summaryPointData.getCount()).isEqualTo(1); + assertThat(summaryPointData.getSum()).isEqualTo(4d); + + assertThat(summaryPointData.getValues().get(0).getQuantile()).isEqualTo(0); + assertThat(summaryPointData.getValues().get(0).getValue()).isEqualTo(7); + + assertThat(summaryPointData.getValues().get(1).getQuantile()).isEqualTo(0.5); + assertThat(summaryPointData.getValues().get(1).getValue()).isEqualTo(0.5); + + assertThat(summaryPointData.getValues().get(2).getQuantile()).isEqualTo(0.75); + assertThat(summaryPointData.getValues().get(2).getValue()).isEqualTo(0.75); + + assertThat(summaryPointData.getValues().get(3).getQuantile()).isEqualTo(0.95); + assertThat(summaryPointData.getValues().get(3).getValue()).isEqualTo(0.95); + + assertThat(summaryPointData.getValues().get(4).getQuantile()).isEqualTo(0.99); + assertThat(summaryPointData.getValues().get(4).getValue()).isEqualTo(0.99); + + assertThat(summaryPointData.getValues().get(5).getQuantile()).isEqualTo(1); + assertThat(summaryPointData.getValues().get(5).getValue()).isEqualTo(6); + + assertThat(asStringMap(summaryPointData.getAttributes())).isEqualTo(VARIABLES); + } + + private Map asStringMap(Attributes attributes) { + Map map = new HashMap<>(); + for (Map.Entry, Object> entry : attributes.asMap().entrySet()) { + map.put(entry.getKey().getKey(), (String) entry.getValue()); + } + return map; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java new file mode 100644 index 0000000000..690a4a2c49 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCase.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.MeterView; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.testutils.common.TestLoggerExtension; + +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryReporterITCase extends OpenTelemetryReporterITCaseBase { + + private static final String LOGICAL_SCOPE = "logical.scope"; + + private MetricGroup group; + private final Histogram histogram = new TestHistogram(); + + @BeforeEach + public void setUpEach() { + group = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, new HashMap<>()); + } + + @Test + public void testReport() throws Exception { + OpenTelemetryReporter reporter = createReporter(); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + Gauge gauge = () -> 123.456d; + reporter.notifyOfAddedMetric(gauge, "foo.gauge", group); + + reporter.report(); + + MeterView meter = new MeterView(counter); + reporter.notifyOfAddedMetric(meter, "foo.meter", group); + + reporter.notifyOfAddedMetric(histogram, "foo.histogram", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + (json) -> { + JsonNode scopeMetrics = + json.findPath("resourceMetrics").findPath("scopeMetrics"); + assertThat(scopeMetrics.findPath("scope").findPath("name").asText()) + .isEqualTo("org.apache.fluss.metrics"); + JsonNode metrics = scopeMetrics.findPath("metrics"); + + List metricNames = extractMetricNames(json); + assertThat(metricNames) + .contains( + "fluss.logical.scope.foo.counter", + "fluss.logical.scope.foo.gauge", + "fluss.logical.scope.foo.meter.count", + "fluss.logical.scope.foo.meter.rate", + "fluss.logical.scope.foo.histogram"); + + metrics.forEach(OpenTelemetryReporterITCase::assertMetrics); + }); + } + + private static void assertMetrics(JsonNode metric) { + String name = metric.findPath("name").asText(); + if (name.equals("fluss.logical.scope.foo.counter")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); + } else if (name.equals("fluss.logical.scope.foo.gauge")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) + .isCloseTo(123.456, Percentage.withPercentage(1)); + } else if (name.equals("fluss.logical.scope.foo.meter.count")) { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()).isEqualTo(0); + } else if (name.equals("fluss.logical.scope.foo.meter.rate")) { + assertThat(metric.at("/gauge/dataPoints").findPath("asDouble").asDouble()) + .isEqualTo(0.0); + } else if (name.equals("fluss.logical.scope.foo.histogram")) { + assertThat(metric.at("/summary/dataPoints").findPath("sum").asInt()).isEqualTo(4); + } + } + + @Test + public void testReportAfterUnregister() throws Exception { + OpenTelemetryReporter reporter = createReporter(); + + SimpleCounter counter1 = new SimpleCounter(); + SimpleCounter counter2 = new SimpleCounter(); + SimpleCounter counter3 = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter1, "foo.counter1", group); + reporter.notifyOfAddedMetric(counter2, "foo.counter2", group); + reporter.notifyOfAddedMetric(counter3, "foo.counter3", group); + + reporter.notifyOfRemovedMetric(counter2, "foo.counter2", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames) + .contains( + "fluss.logical.scope.foo.counter1", + "fluss.logical.scope.foo.counter3"); + }); + } + + @Test + public void testCounterDelta() throws Exception { + OpenTelemetryReporter reporter = createReporter(); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + counter.inc(1234); + assertThat(counter.getCount()).isEqualTo(1234L); + reporter.report(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode metrics = + json.findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics"); + + metrics.forEach( + metric -> { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()) + .isEqualTo(1234); + }); + }); + + counter.inc(25); + assertThat(counter.getCount()).isEqualTo(1259L); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode metrics = + json.findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics"); + + metrics.forEach( + metric -> { + assertThat(metric.at("/sum/dataPoints").findPath("asInt").asInt()) + .isEqualTo(1234); + }); + }); + } + + @Test + public void testOpenTelemetryAttributes() throws Exception { + String serviceName = "flink-bar"; + String serviceVersion = "v42"; + OpenTelemetryReporter reporter = createReporter(serviceName, serviceVersion); + + SimpleCounter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "foo.counter", group); + + reporter.report(); + reporter.close(); + + eventuallyConsumeJson( + json -> { + List metricNames = extractMetricNames(json); + assertThat(metricNames).contains("fluss.logical.scope.foo.counter"); + + JsonNode attributes = + json.findPath("resourceMetrics") + .findPath("resource") + .findPath("attributes"); + + List attributeKeys = + attributes.findValues("key").stream() + .map(JsonNode::asText) + .collect(Collectors.toList()); + + assertThat(attributeKeys).contains("service.name", "service.version"); + + attributes.forEach( + attribute -> { + if (attribute.get("key").asText().equals("service.name")) { + assertThat(attribute.at("/value/stringValue").asText()) + .isEqualTo(serviceName); + } else if (attribute + .get("key") + .asText() + .equals("service.version")) { + assertThat(attribute.at("/value/stringValue").asText()) + .isEqualTo(serviceVersion); + } + }); + }); + } + + private static OpenTelemetryReporter createReporter(String serviceName, String serviceVersion) { + String endpoint = + OpenTelemetryReporterITCaseBase.getOpenTelemetryContainer().getGrpcEndpoint(); + + return new OpenTelemetryReporter( + endpoint, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + serviceName, + serviceVersion); + } + + private static OpenTelemetryReporter createReporter() { + return createReporter(null, null); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java new file mode 100644 index 0000000000..8dcba11101 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterITCaseBase.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.testutils.common.TestContainerExtension; +import org.apache.fluss.testutils.common.TestLoggerExtension; +import org.apache.fluss.utils.function.ThrowingConsumer; +import org.apache.fluss.utils.function.ThrowingRunnable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.BaseConsumer; +import org.testcontainers.containers.output.OutputFrame; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Tests for {@link OpenTelemetryReporter}. */ +@ExtendWith(TestLoggerExtension.class) +public class OpenTelemetryReporterITCaseBase { + public static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryReporterITCaseBase.class); + + private static final Duration TIME_OUT = Duration.ofMinutes(2); + + @RegisterExtension + @Order(1) + private static final AllCallbackWrapper> + OPENTELEMETRY_EXTENSION = + new AllCallbackWrapper<>( + new TestContainerExtension<>(OpenTelemetryTestContainer::new)); + + @BeforeEach + public void setup() { + Slf4jLevelLogConsumer logConsumer = new Slf4jLevelLogConsumer(LOG); + OPENTELEMETRY_EXTENSION.getCustomExtension().getTestContainer().followOutput(logConsumer); + } + + public static OpenTelemetryTestContainer getOpenTelemetryContainer() { + return OPENTELEMETRY_EXTENSION.getCustomExtension().getTestContainer(); + } + + public static void eventuallyConsumeJson(ThrowingConsumer jsonConsumer) + throws Exception { + eventually( + () -> { + // opentelemetry-collector dumps every report in a new line, so in order to + // re-use the + // same collector across multiple tests, let's read only the last line + getOpenTelemetryContainer() + .copyFileFromContainer( + getOpenTelemetryContainer().getOutputLogPath().toString(), + inputStream -> { + List lines = new ArrayList<>(); + BufferedReader input = + new BufferedReader( + new InputStreamReader(inputStream)); + String last = ""; + String line; + + while ((line = input.readLine()) != null) { + lines.add(line); + last = line; + } + + ObjectMapper mapper = new ObjectMapper(); + JsonNode json = mapper.readValue(last, JsonNode.class); + try { + jsonConsumer.accept(json); + } catch (Throwable t) { + throw new ConsumeDataLogException(t, lines); + } + return null; + }); + }); + } + + public static void eventually(ThrowingRunnable runnable) throws Exception { + eventually(Math.addExact(System.nanoTime(), TIME_OUT.toNanos()), runnable); + } + + static void eventually(long deadline, ThrowingRunnable runnable) throws Exception { + Thread.sleep(10); + while (true) { + try { + runnable.run(); + break; + } catch (Throwable e) { + if (System.nanoTime() >= deadline) { + if (e instanceof ConsumeDataLogException) { + LOG.error("Failure while the following data log:"); + ((ConsumeDataLogException) e).getDataLog().forEach(LOG::error); + } + throw e; + } + } + Thread.sleep(100); + } + } + + public static List extractMetricNames(JsonNode json) { + return json + .findPath("resourceMetrics") + .findPath("scopeMetrics") + .findPath("metrics") + .findValues("name") + .stream() + .map(JsonNode::asText) + .collect(Collectors.toList()); + } + + private static class ConsumeDataLogException extends Exception { + private final List dataLog; + + public ConsumeDataLogException(Throwable cause, List dataLog) { + super(cause); + this.dataLog = dataLog; + } + + public List getDataLog() { + return dataLog; + } + } + + /** + * Similar to {@link Slf4jLevelLogConsumer} but parses output lines and tries to log them with + * appropriate levels. + */ + private static class Slf4jLevelLogConsumer extends BaseConsumer { + + private final Logger logger; + + public Slf4jLevelLogConsumer(Logger logger) { + this.logger = logger; + } + + @Override + public void accept(OutputFrame outputFrame) { + final OutputFrame.OutputType outputType = outputFrame.getType(); + final String utf8String = outputFrame.getUtf8StringWithoutLineEnding(); + + String lowerCase = utf8String.toLowerCase(); + if (lowerCase.contains("error") || lowerCase.contains("exception")) { + logger.error("{}: {}", outputType, utf8String); + } else if (lowerCase.contains("warn") || lowerCase.contains("fail")) { + logger.warn("{}: {}", outputType, utf8String); + } else { + logger.info("{}: {}", outputType, utf8String); + } + } + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java new file mode 100644 index 0000000000..4e3d287290 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterPluginTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link OpenTelemetryReporterPlugin}. */ +public class OpenTelemetryReporterPluginTest { + + private final OpenTelemetryReporterPlugin openTelemetryReporterPlugin = + new OpenTelemetryReporterPlugin(); + + @Test + void testValidConfiguration() { + // mandatory options + Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, + "http://opentelemetry-metric-collector:4317"); + + // optional options + configuration.set( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_INTERVAL, + Duration.ofSeconds(5)); + configuration.set( + ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_EXPORT_TIMEOUT, Duration.ofSeconds(5)); + assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .doesNotThrowAnyException(); + + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME, "fluss"); + configuration.set(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_SERVICE_NAME, "v42"); + assertThatCode(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .doesNotThrowAnyException(); + } + + @Test + void testInvalidConfiguration() { + Configuration configuration = new Configuration(); + + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + + configuration.setString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, ""); + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + + configuration.setString(ConfigOptions.METRICS_REPORTER_OPENTELEMETRY_ENDPOINT, " "); + assertThatThrownBy(() -> openTelemetryReporterPlugin.createMetricReporter(configuration)) + .isInstanceOf(IllegalConfigurationException.class); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java new file mode 100644 index 0000000000..5d1fdfd40e --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryReporterTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; + +import io.opentelemetry.semconv.ServiceAttributes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link OpenTelemetryReporter}. */ +public class OpenTelemetryReporterTest { + + private static final String LOGICAL_SCOPE = "logical.scope"; + private static final Map labels = + Collections.unmodifiableMap( + Stream.of( + new AbstractMap.SimpleEntry<>("label1", "value1"), + new AbstractMap.SimpleEntry<>("label2", "value2")) + .collect( + Collectors.toMap( + AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue))); + + private MetricGroup metricGroup; + + @BeforeEach + void setupReporter() { + metricGroup = TestUtils.createTestMetricGroup(LOGICAL_SCOPE, labels); + } + + @Test + void testInvalidEndpoint() { + assertThatThrownBy( + () -> + new OpenTelemetryReporter( + "endpoint-with-missing-protocol", + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null)) + .isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy( + () -> + new OpenTelemetryReporter( + "invalid://protocol", + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testOpenTelemetryResourceIsConstructedCorrectly() { + OpenTelemetryReporter reporter = + new OpenTelemetryReporter( + "http://opentelemetry-collector:4317", + Duration.ofSeconds(5), + Duration.ofSeconds(5), + "fluss", + "v42"); + assertThat(reporter.resource.getAttribute(ServiceAttributes.SERVICE_NAME)) + .isEqualTo("fluss"); + assertThat(reporter.resource.getAttribute(ServiceAttributes.SERVICE_VERSION)) + .isEqualTo("v42"); + } + + @Test + void testAddAndRemoveCounter() { + OpenTelemetryReporter reporter = createReporter(); + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "counter", metricGroup); + assertThat(reporter.counters.containsKey(counter)).isTrue(); + reporter.notifyOfRemovedMetric(counter, "counter", metricGroup); + assertThat(reporter.counters.containsKey(counter)).isFalse(); + } + + @Test + void testAddAndRemoveGauge() { + OpenTelemetryReporter reporter = createReporter(); + Gauge gauge = () -> 1; + reporter.notifyOfAddedMetric(gauge, "gauge", metricGroup); + assertThat(reporter.gauges.containsKey(gauge)).isTrue(); + reporter.notifyOfRemovedMetric(gauge, "meter", metricGroup); + assertThat(reporter.gauges.containsKey(gauge)).isFalse(); + } + + @Test + void testAddAndRemoveHistogram() { + OpenTelemetryReporter reporter = createReporter(); + Meter meter = new TestMeter(); + reporter.notifyOfAddedMetric(meter, "meter", metricGroup); + assertThat(reporter.meters.containsKey(meter)).isTrue(); + reporter.notifyOfRemovedMetric(meter, "meter", metricGroup); + assertThat(reporter.meters.containsKey(meter)).isFalse(); + } + + @Test + void testAddAndRemoveMeter() { + OpenTelemetryReporter reporter = createReporter(); + Histogram histogram = new TestHistogram(); + reporter.notifyOfAddedMetric(histogram, "histogram", metricGroup); + assertThat(reporter.histograms.containsKey(histogram)).isTrue(); + reporter.notifyOfRemovedMetric(histogram, "histogram", metricGroup); + assertThat(reporter.histograms.containsKey(histogram)).isFalse(); + } + + @Test + void testRemoveEnclosingAngleBrackets() { + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo("t"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo("t>"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("<")).isEqualTo("<"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets(">")).isEqualTo(">"); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("<>")).isEqualTo(""); + assertThat(OpenTelemetryReporter.removeEnclosingAngleBrackets("")).isEqualTo(""); + } + + private OpenTelemetryReporter createReporter() { + return new OpenTelemetryReporter( + "http://endpoint-must-not-be-called-in-unit-tests", + Duration.ofSeconds(5), + Duration.ofSeconds(5), + null, + null); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java new file mode 100644 index 0000000000..c22a18b3b9 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/OpenTelemetryTestContainer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.images.builder.dockerfile.DockerfileBuilder; +import org.testcontainers.images.builder.dockerfile.statement.MultiArgsStatement; +import org.testcontainers.utility.Base58; +import org.testcontainers.utility.MountableFile; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Locale; + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** {@link OpenTelemetryTestContainer} provides an OpenTelemetry test instance. */ +public class OpenTelemetryTestContainer extends GenericContainer { + private static final String ALPINE_DOCKER_IMAGE_TAG = "3.22.0"; + private static final String OPENTELEMETRY_COLLECTOR_DOCKER_IMAGE_TAG = "0.128.0"; + + private static final int DEFAULT_GRPC_PORT = 4317; + + // must be kept in sync with opentelemetry-config.yaml + private static final String DATA_DIR = "/data"; + // must be kept in sync with opentelemetry-config.yaml + private static final String LOG_FILE = "logs.json"; + + private static final Path CONFIG_PATH = + Paths.get("src/test/resources/").resolve("opentelemetry-config.yaml"); + + public OpenTelemetryTestContainer() { + super( + new ImageFromDockerfile("fluss/opentelemetry-collector-test-container") + .withDockerfileFromBuilder( + OpenTelemetryTestContainer::buildOpenTelemetryCollectorImage)); + withNetworkAliases(randomString("opentelemetry-collector", 6)); + addExposedPort(DEFAULT_GRPC_PORT); + withCopyFileToContainer( + MountableFile.forHostPath(CONFIG_PATH.toString()), "opentelemetry-config.yaml"); + withCommand("--config", "opentelemetry-config.yaml"); + } + + private static void buildOpenTelemetryCollectorImage(DockerfileBuilder builder) { + builder + // OpenTelemetry image doesn't have mkdir - use alpine instead. + .from(constructFullDockerImageName("alpine", ALPINE_DOCKER_IMAGE_TAG)) + // Create the output data directory - OpenTelemetry image doesn't have any directory + // to write to on its own. + .run("mkdir -p " + DATA_DIR) + .from( + constructFullDockerImageName( + "otel/opentelemetry-collector", + OPENTELEMETRY_COLLECTOR_DOCKER_IMAGE_TAG)) + // Copy the output data directory from alpine. It has to be owned by the + // OpenTelemetry user. + .withStatement( + new MultiArgsStatement("COPY --from=0 --chown=10001", DATA_DIR, DATA_DIR)) + .build(); + } + + public Path getOutputLogPath() { + return Paths.get(DATA_DIR, LOG_FILE); + } + + @Override + protected void containerIsStarted(InspectContainerResponse containerInfo) { + super.containerIsStarted(containerInfo); + } + + private static String randomString(String prefix, int length) { + return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); + } + + public String getGrpcEndpoint() { + return String.format("http://%s:%s", getHost(), getMappedPort(DEFAULT_GRPC_PORT)); + } + + private static String constructFullDockerImageName(String name, String tag) { + return name + ":" + tag; + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java new file mode 100644 index 0000000000..a617feb7f4 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/java/org/apache/fluss/metrics/opentelemetry/TestUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.opentelemetry; + +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestMetricGroup; + +import java.util.Map; + +/** Test utils. */ +class TestUtils { + static MetricGroup createTestMetricGroup(String logicalScope, Map variables) { + return TestMetricGroup.newBuilder() + .setLogicalScopeFunction( + (characterFilter, character) -> + characterFilter.filterCharacters(logicalScope)) + .setVariables(variables) + .build(); + } +} diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..06379e065c --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/log4j2-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache +# Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. +# root logger level should be set to OFF to not flood build logs +# set manually to INFO for debugging purposes +# It is set for OpenTelemetry to WARN for easier debugging on CI +rootLogger.level=WARN +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml new file mode 100644 index 0000000000..d768fbbcb8 --- /dev/null +++ b/fluss-metrics/fluss-metrics-opentelemetry/src/test/resources/opentelemetry-config.yaml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache +# Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. + +extensions: + health_check: + zpages: + endpoint: 0.0.0.0:55679 + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +processors: + batch: + +exporters: + debug: + verbosity: detailed + file: + path: /data/logs.json +service: + pipelines: + metrics: + receivers: [ otlp ] + exporters: [ file ] + + extensions: [ health_check, zpages ] diff --git a/fluss-metrics/pom.xml b/fluss-metrics/pom.xml index b6358bbb8c..2af10f75a0 100644 --- a/fluss-metrics/pom.xml +++ b/fluss-metrics/pom.xml @@ -34,6 +34,7 @@ fluss-metrics-prometheus fluss-metrics-jmx + fluss-metrics-opentelemetry + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> Fluss UTF-8