Skip to content

Commit 2e89278

Browse files
[NIFI-15162] Add Metrics Strategy parameter to Flow Metrics REST Resource (#10501)
Co-authored-by: David Handermann <[email protected]> Signed-off-by: David Handermann <[email protected]>
1 parent 28589e4 commit 2e89278

File tree

6 files changed

+74
-47
lines changed

6 files changed

+74
-47
lines changed

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/prometheusutil/PrometheusMetricsUtil.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.nifi.prometheusutil;
1919

2020
import io.prometheus.client.CollectorRegistry;
21-
import org.apache.nifi.components.AllowableValue;
2221
import org.apache.nifi.controller.status.ConnectionStatus;
2322
import org.apache.nifi.controller.status.PortStatus;
2423
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -31,24 +30,24 @@
3130
import org.apache.nifi.metrics.jvm.JvmMetrics;
3231
import org.apache.nifi.processor.DataUnit;
3332
import org.apache.nifi.util.StringUtils;
33+
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
3434

3535
import java.util.Map;
3636
import java.util.concurrent.TimeUnit;
3737

38+
import static org.apache.nifi.web.api.request.FlowMetricsReportingStrategy.ALL_COMPONENTS;
39+
import static org.apache.nifi.web.api.request.FlowMetricsReportingStrategy.ALL_PROCESS_GROUPS;
40+
3841
public class PrometheusMetricsUtil {
3942

40-
public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
41-
"Send metrics for each process group");
42-
public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
43-
"Send metrics for each component in the system, to include processors, connections, controller services, etc.");
4443

4544
protected static final String DEFAULT_LABEL_STRING = "";
4645
private static final double MAXIMUM_BACKPRESSURE = 1.0;
4746
private static final double UNDEFINED_BACKPRESSURE = -1.0;
4847
private static final double NANOS_PER_MILLI = 1000000.0;
4948

5049
public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetricsRegistry, ProcessGroupStatus status,
51-
String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
50+
String instId, String parentProcessGroupId, String compType, FlowMetricsReportingStrategy metricsStrategy) {
5251

5352
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
5453
final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
@@ -90,11 +89,11 @@ public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetric
9089
instanceId, componentType, componentName, componentId, parentPGId);
9190

9291
// Report metrics for child process groups if specified
93-
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
92+
if (ALL_PROCESS_GROUPS == metricsStrategy || ALL_COMPONENTS == metricsStrategy) {
9493
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(nifiMetricsRegistry, childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
9594
}
9695

97-
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
96+
if (ALL_COMPONENTS == metricsStrategy) {
9897
// Report metrics for all components
9998
for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
10099
Map<String, Long> counters = processorStatus.getCounters();

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
162162
import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
163163
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
164+
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
164165

165166
import java.io.IOException;
166167
import java.io.InputStream;
@@ -407,9 +408,10 @@ public interface NiFiServiceFacade {
407408
* Generate metrics for the flow and return selected registries
408409
*
409410
* @param includeRegistries Set of Flow Metrics Registries to be returned
411+
* @param flowMetricsStrategy Flow metrics reporting strategy limits collected metrics
410412
* @return Collector Registries
411413
*/
412-
Collection<CollectorRegistry> generateFlowMetrics(Set<FlowMetricsRegistry> includeRegistries);
414+
Collection<CollectorRegistry> generateFlowMetrics(Set<FlowMetricsRegistry> includeRegistries, FlowMetricsReportingStrategy flowMetricsStrategy);
413415

414416
/**
415417
* Updates the configuration for this controller.

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@
393393
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
394394
import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
395395
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
396+
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
396397
import org.apache.nifi.web.controller.ControllerFacade;
397398
import org.apache.nifi.web.dao.AccessPolicyDAO;
398399
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -6634,7 +6635,7 @@ public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
66346635
return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
66356636
}
66366637

6637-
protected Collection<AbstractMetricsRegistry> populateFlowMetrics() {
6638+
protected Collection<AbstractMetricsRegistry> populateFlowMetrics(FlowMetricsReportingStrategy flowMetricsStrategy) {
66386639
// Include registries which are fully refreshed upon each invocation
66396640
NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
66406641
BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
@@ -6644,8 +6645,7 @@ protected Collection<AbstractMetricsRegistry> populateFlowMetrics() {
66446645
final String instanceId = node == null ? instId : node.getId();
66456646
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
66466647

6647-
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", ROOT_PROCESS_GROUP,
6648-
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
6648+
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", ROOT_PROCESS_GROUP, flowMetricsStrategy);
66496649

66506650
// Add the total byte counts (read/written) to the NiFi metrics registry
66516651
FlowFileEventRepository flowFileEventRepository = controllerFacade.getFlowFileEventRepository();
@@ -6746,19 +6746,19 @@ protected Collection<AbstractMetricsRegistry> populateFlowMetrics() {
67466746
@Override
67476747
public Collection<CollectorRegistry> generateFlowMetrics() {
67486748

6749-
return populateFlowMetrics().stream().map(AbstractMetricsRegistry::getRegistry)
6749+
return populateFlowMetrics(FlowMetricsReportingStrategy.ALL_COMPONENTS).stream().map(AbstractMetricsRegistry::getRegistry)
67506750
.collect(Collectors.toList());
67516751
}
67526752

67536753
@Override
6754-
public Collection<CollectorRegistry> generateFlowMetrics(final Set<FlowMetricsRegistry> includeRegistries) {
6754+
public Collection<CollectorRegistry> generateFlowMetrics(final Set<FlowMetricsRegistry> includeRegistries, final FlowMetricsReportingStrategy flowMetricsStrategy) {
67556755
final Set<FlowMetricsRegistry> selectedRegistries = includeRegistries.isEmpty() ? new HashSet<>(Arrays.asList(FlowMetricsRegistry.values())) : includeRegistries;
67566756

67576757
final Set<Class<? extends AbstractMetricsRegistry>> registryClasses = selectedRegistries.stream()
67586758
.map(FlowMetricsRegistry::getRegistryClass)
67596759
.collect(Collectors.toSet());
67606760

6761-
Collection<AbstractMetricsRegistry> configuredRegistries = populateFlowMetrics();
6761+
Collection<AbstractMetricsRegistry> configuredRegistries = populateFlowMetrics(flowMetricsStrategy);
67626762

67636763
return configuredRegistries.stream()
67646764
.filter(configuredRegistry -> registryClasses.contains(configuredRegistry.getClass()))

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
164164
import org.apache.nifi.web.api.request.DateTimeParameter;
165165
import org.apache.nifi.web.api.request.FlowMetricsProducer;
166+
import org.apache.nifi.web.api.request.FlowMetricsReportingStrategy;
166167
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
167168
import org.apache.nifi.web.api.request.IntegerParameter;
168169
import org.apache.nifi.web.api.request.LongParameter;
@@ -579,13 +580,19 @@ public Response getFlowMetrics(
579580
@Parameter(
580581
description = "Name of the first field of JSON object. Applicable for JSON producer only."
581582
)
582-
@QueryParam("rootFieldName") final String rootFieldName
583-
) {
583+
@QueryParam("rootFieldName") final String rootFieldName,
584+
@Parameter(
585+
description = "Flow metrics reporting strategy limits collected metrics"
586+
)
587+
@DefaultValue("ALL_COMPONENTS")
588+
@QueryParam("flowMetricsReportingStrategy") final FlowMetricsReportingStrategy flowMetricsReportingStrategy
589+
) {
584590

585591
authorizeFlow();
586592

587593
final Set<FlowMetricsRegistry> selectedRegistries = includedRegistries == null ? Collections.emptySet() : includedRegistries;
588-
final Collection<CollectorRegistry> registries = serviceFacade.generateFlowMetrics(selectedRegistries);
594+
final FlowMetricsReportingStrategy selectedStrategy = flowMetricsReportingStrategy == null ? FlowMetricsReportingStrategy.ALL_COMPONENTS : flowMetricsReportingStrategy;
595+
final Collection<CollectorRegistry> registries = serviceFacade.generateFlowMetrics(selectedRegistries, selectedStrategy);
589596

590597
if (FlowMetricsProducer.PROMETHEUS.getProducer().equalsIgnoreCase(producer)) {
591598
final StreamingOutput response = (outputStream -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.web.api.request;
18+
19+
public enum FlowMetricsReportingStrategy {
20+
ALL_PROCESS_GROUPS,
21+
ALL_COMPONENTS;
22+
}

0 commit comments

Comments
 (0)