Skip to content

Commit 663929c

Browse files
authored
[GOBBLIN-2209] Emit GaaS Executor Otel Metrics (#4118)
1 parent 85dbbc6 commit 663929c

File tree

25 files changed

+1159
-9
lines changed

25 files changed

+1159
-9
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,10 @@ public class ConfigurationKeys {
934934

935935
// Opentelemetry based metrics reporting
936936
public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = "metrics.reporting.opentelemetry.";
937+
public static final String METRICS_REPORTING_OPENTELEMETRY_CLASSNAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "className";
938+
public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_CLASSNAME = "org.apache.gobblin.metrics.OpenTelemetryMetrics";
937939
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";
940+
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
938941

939942
public static final String METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.enabled";
940943

@@ -943,14 +946,19 @@ public class ConfigurationKeys {
943946
public static final String METRICS_REPORTING_OPENTELEMETRY_LOGEXPORTER_CLASSNAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "logexporter.className";
944947

945948
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
946-
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
947949

948950
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
949951
public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC = METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
950952
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON String with string keys and values
951953
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
952954

953-
public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_CONFIGURATIONS_PREFIX + "interval.millis";
955+
public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "interval.millis";
956+
public static final String METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.buckets";
957+
public static final String METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "histogram.max.scale";
958+
// A comma-separated list of dimensions to add to the OpenTelemetry metrics
959+
public static final String METRICS_REPORTING_OPENTELEMETRY_DIMENSIONS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "dimensions";
960+
public static final String METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "group.name";
961+
public static final String DEFAULT_METRICS_REPORTING_OPENTELEMETRY_GROUP_NAME = "org.apache.gobblin.metrics";
954962

955963
/**
956964
* Rest server configuration properties.

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/OpenTelemetryMetrics.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
3333
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
3434
import io.opentelemetry.sdk.OpenTelemetrySdk;
35+
import io.opentelemetry.sdk.metrics.Aggregation;
36+
import io.opentelemetry.sdk.metrics.InstrumentSelector;
37+
import io.opentelemetry.sdk.metrics.InstrumentType;
38+
import io.opentelemetry.sdk.metrics.View;
39+
import io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation;
3540
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
3641
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
3742
import io.opentelemetry.sdk.metrics.export.MetricExporter;
@@ -52,8 +57,10 @@
5257
@Slf4j
5358
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {
5459

55-
private static OpenTelemetryMetrics GLOBAL_INSTANCE;
60+
private static volatile OpenTelemetryMetrics GLOBAL_INSTANCE;
5661
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 10000L;
62+
private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS = 256;
63+
private static final int DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE = 3;
5764

5865
private OpenTelemetryMetrics(State state) {
5966
super(state);
@@ -94,7 +101,12 @@ protected MetricExporter initializeMetricExporter(State state) {
94101
public static OpenTelemetryMetrics getInstance(State state) {
95102
if (state.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
96103
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED) && GLOBAL_INSTANCE == null) {
97-
GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
104+
synchronized (OpenTelemetryMetrics.class) {
105+
if (GLOBAL_INSTANCE == null) {
106+
log.info("Creating OpenTelemetryMetrics instance");
107+
GLOBAL_INSTANCE = new OpenTelemetryMetrics(state);
108+
}
109+
}
98110
}
99111
return GLOBAL_INSTANCE;
100112
}
@@ -115,6 +127,13 @@ protected void initialize(State state) {
115127
}
116128
metricsResource = Resource.getDefault().merge(Resource.create(attributesBuilder.build()));
117129
}
130+
131+
Aggregation histogramAggregation = Base2ExponentialHistogramAggregation.create(
132+
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS,
133+
DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_BUCKETS),
134+
state.getPropAsInt(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HISTOGRAM_MAX_SCALE,
135+
DEFAULT_OPENTELEMETRY_HISTOGRAM_MAX_SCALE));
136+
118137
SdkMeterProvider meterProvider = SdkMeterProvider.builder()
119138
.setResource(metricsResource)
120139
.registerMetricReader(
@@ -123,6 +142,9 @@ protected void initialize(State state) {
123142
state.getPropAsLong(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS,
124143
DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS)))
125144
.build())
145+
.registerView(
146+
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
147+
View.builder().setAggregation(histogramAggregation).build())
126148
.build();
127149

128150
this.openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.metrics.opentelemetry;
19+
20+
import io.opentelemetry.api.common.Attributes;
21+
import io.opentelemetry.api.metrics.Meter;
22+
import lombok.AllArgsConstructor;
23+
import lombok.Getter;
24+
25+
26+
@Getter
27+
@AllArgsConstructor
28+
public enum GobblinOpenTelemetryMetrics {
29+
/**
30+
* Metric to track the count of Gobblin Jobs for each of its state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
31+
* Metric Unit: 1 represents each increment will add one data point to the counter.
32+
* */
33+
GOBBLIN_JOB_STATE("gobblin.job.state", "Gobblin job state counter", "1", OpenTelemetryMetricType.LONG_COUNTER),
34+
35+
/**
36+
* Metric to track the latency of each Gobblin Job state (GenerateWorkUnit, ProcessWorkUnit, CommitStep).
37+
* Metric Unit: seconds (s) represents the time taken for each state.
38+
* */
39+
GOBBLIN_JOB_STATE_LATENCY("gobblin.job.state.latency", "Gobblin job state latency", "s", OpenTelemetryMetricType.DOUBLE_HISTOGRAM);
40+
41+
private final String metricName;
42+
private final String metricDescription;
43+
private final String metricUnit;
44+
private final OpenTelemetryMetricType metricType;
45+
46+
@SuppressWarnings("unchecked")
47+
public <T extends OpenTelemetryMetric> T createMetric(Attributes attributes, Meter meter) {
48+
return (T) this.metricType.getFactory().newMetric(this.metricName, this.metricDescription, this.metricUnit, attributes, meter);
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return String.format("Metric{name='%s', description='%s', unit='%s', type=%s}", metricName, metricDescription, metricUnit, metricType);
54+
}
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.metrics.opentelemetry;
19+
20+
public class GobblinOpenTelemetryMetricsConstants {
21+
22+
public static class DimensionKeys {
23+
public static final String STATE = "state";
24+
public static final String CURR_STATE = "currState";
25+
}
26+
27+
public static class DimensionValues {
28+
public static final String GENERATE_WU = "generateWU";
29+
public static final String PROCESS_WU = "processWU";
30+
public static final String COMMIT_STEP = "commitStep";
31+
public static final String JOB_START = "jobStart";
32+
public static final String JOB_COMPLETE = "jobComplete";
33+
public static final String GENERATE_WU_START = "generateWUStart";
34+
public static final String GENERATE_WU_COMPLETE = "generateWUComplete";
35+
public static final String PROCESS_WU_START = "processWUStart";
36+
public static final String PROCESS_WU_COMPLETE = "processWUComplete";
37+
public static final String COMMIT_STEP_START = "commitStepStart";
38+
public static final String COMMIT_STEP_COMPLETE = "commitStepComplete";
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.metrics.opentelemetry;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.extern.slf4j.Slf4j;
22+
23+
import io.opentelemetry.api.common.Attributes;
24+
import io.opentelemetry.api.metrics.DoubleHistogram;
25+
26+
27+
/**
28+
* Implementation of {@link OpenTelemetryMetric} that wraps an OpenTelemetry {@link DoubleHistogram}.
29+
*
30+
* <p>This class provides a histogram for recording the distribution of double values.
31+
* It supports recording values with optional additional attributes that can be merged with base attributes.</p>
32+
*
33+
*/
34+
@Slf4j
35+
@AllArgsConstructor
36+
public class OpenTelemetryDoubleHistogram implements OpenTelemetryMetric {
37+
private String name;
38+
private Attributes baseAttributes;
39+
private DoubleHistogram doubleHistogram;
40+
41+
/**
42+
* Records the specified value in the histogram with the base attributes.
43+
*
44+
* @param value the double value to record in the histogram
45+
*/
46+
public void record(double value) {
47+
log.debug("Emitting double histogram metric: {}, value: {}, attributes: {}", this.name, value, this.baseAttributes);
48+
this.doubleHistogram.record(value, this.baseAttributes);
49+
}
50+
51+
/**
52+
* Records the specified value in the histogram with a combination of base attributes and additional attributes.
53+
*
54+
* @param value the double value to record in the histogram
55+
* @param additionalAttributes the additional attributes to be merged with base attributes
56+
*/
57+
public void record(double value, Attributes additionalAttributes) {
58+
log.debug("Emitting double histogram metric: {}, value: {}, base attributes: {}, additional attributes: {}",
59+
this.name, value, this.baseAttributes, additionalAttributes);
60+
this.doubleHistogram.record(value, OpenTelemetryHelper.mergeAttributes(this.baseAttributes, additionalAttributes));
61+
}
62+
63+
/**
64+
* {@inheritDoc}
65+
*/
66+
@Override
67+
public String getMetricName() {
68+
return this.name;
69+
}
70+
71+
/**
72+
* {@inheritDoc}
73+
*/
74+
@Override
75+
public OpenTelemetryMetricType getMetricType() {
76+
return OpenTelemetryMetricType.DOUBLE_HISTOGRAM;
77+
}
78+
79+
/**
80+
* Returns a string representation of this histogram with its name.
81+
*/
82+
@Override
83+
public String toString() {
84+
return "OpenTelemetryDoubleHistogram{name='" + name + "'}";
85+
}
86+
87+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.metrics.opentelemetry;
19+
20+
import java.util.Map;
21+
22+
import lombok.experimental.UtilityClass;
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
import io.opentelemetry.api.common.Attributes;
26+
import io.opentelemetry.api.common.AttributesBuilder;
27+
28+
/**
29+
* Utility class for OpenTelemetry related operations.
30+
*
31+
* <p>Provides methods to handle OpenTelemetry attributes, including merging multiple
32+
* {@link Attributes} instances and converting maps to {@link Attributes}.
33+
*/
34+
@UtilityClass
35+
public class OpenTelemetryHelper {
36+
37+
private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "UNKNOWN";
38+
39+
/**
40+
* Returns the provided attribute value when it is non-null and non-empty;
41+
* otherwise returns the default OpenTelemetry attribute placeholder.
42+
*
43+
* @param value candidate attribute value to check
44+
* @return the original value if not empty, or DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE otherwise
45+
*/
46+
public static String getOrDefaultOpenTelemetryAttrValue(String value) {
47+
return StringUtils.defaultIfBlank(value, DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE);
48+
}
49+
50+
/**
51+
* Merges multiple {@link Attributes} instances into a single {@link Attributes}.
52+
*
53+
* <p>Any {@code null} or empty ({@link Attributes#isEmpty()}) instances are ignored.
54+
* The resulting {@link Attributes} contains all key-value pairs from the
55+
* provided non-null, non-empty inputs in the order they are given.
56+
* For duplicate keys, the last occurrence in the array will take precedence.
57+
*
58+
* @param attributesArray array of {@link Attributes} to merge; may contain {@code null} or empty entries
59+
* @return a new {@link Attributes} instance containing all entries from the non-null,
60+
* non-empty inputs; never {@code null}
61+
*/
62+
public static Attributes mergeAttributes(Attributes... attributesArray) {
63+
AttributesBuilder builder = Attributes.builder();
64+
for (Attributes attrs : attributesArray) {
65+
if (attrs != null && !attrs.isEmpty()) {
66+
builder.putAll(attrs);
67+
}
68+
}
69+
return builder.build();
70+
}
71+
72+
/**
73+
* Converts a map of string attributes to an OpenTelemetry {@link Attributes} instance.
74+
*
75+
* <p>Each entry in the map is converted to an OpenTelemetry attribute, using
76+
* {@link #getOrDefaultOpenTelemetryAttrValue(String)} to handle empty values.
77+
*
78+
* @param attributes map of string attributes to convert; may be {@code null}
79+
* @return a new {@link Attributes} instance containing the converted attributes;
80+
* never {@code null}
81+
*/
82+
public static Attributes toOpenTelemetryAttributes(Map<String, String> attributes) {
83+
AttributesBuilder builder = Attributes.builder();
84+
if (attributes != null) {
85+
for (Map.Entry<String, String> entry : attributes.entrySet()) {
86+
String key = entry.getKey();
87+
String value = getOrDefaultOpenTelemetryAttrValue(entry.getValue());
88+
builder.put(key, value);
89+
}
90+
}
91+
return builder.build();
92+
}
93+
94+
}

0 commit comments

Comments
 (0)