Skip to content

Commit d4f8cae

Browse files
authored
Add KafkaNodePool resource count metric and fix dashboard defaults (#12281)
Signed-off-by: Azeez Syed <syedazeez337@gmail.com>
1 parent b670645 commit d4f8cae

File tree

4 files changed

+93
-0
lines changed

4 files changed

+93
-0
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
import io.strimzi.operator.common.model.PasswordGenerator;
4848
import io.strimzi.operator.common.model.StatusDiff;
4949
import io.strimzi.operator.common.model.StatusUtils;
50+
import io.vertx.core.AsyncResult;
5051
import io.vertx.core.Future;
52+
import io.vertx.core.Handler;
5153
import io.vertx.core.Promise;
5254
import io.vertx.core.Vertx;
5355

@@ -133,6 +135,23 @@ public KafkaAssemblyOperator(Vertx vertx, PlatformFeaturesAvailability pfa,
133135
this.clock = Clock.systemUTC();
134136
}
135137

138+
@Override
139+
public void reconcileThese(String trigger, Set<NamespaceAndName> desiredNames, String namespace, Handler<AsyncResult<Void>> handler) {
140+
super.reconcileThese(trigger, desiredNames, namespace, ignore -> {
141+
nodePoolOperator.listAsync(namespace, selector())
142+
.onComplete(ar -> {
143+
if (ar.succeeded()) {
144+
metrics.resetNodePoolCounters(namespace);
145+
ar.result().forEach(nodePool ->
146+
metrics.nodePoolResourceCounter(nodePool.getMetadata().getNamespace()).incrementAndGet());
147+
handler.handle(Future.succeededFuture());
148+
} else {
149+
handler.handle(ar.map((Void) null));
150+
}
151+
});
152+
});
153+
}
154+
136155
@Override
137156
@SuppressWarnings({"checkstyle:NPathComplexity", "deprecation"}) // .status.kafkaMetadataState is deprecated
138157
public Future<KafkaStatus> createOrUpdate(Reconciliation reconciliation, Kafka kafkaAssembly) {

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMetricsHolder.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import io.micrometer.core.instrument.Tag;
88
import io.micrometer.core.instrument.Tags;
9+
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
910
import io.strimzi.operator.common.MetricsProvider;
11+
import io.strimzi.operator.common.config.ConfigParameter;
1012
import io.strimzi.operator.common.metrics.CertificateMetricKey;
1113
import io.strimzi.operator.common.metrics.MetricKey;
1214
import io.strimzi.operator.common.metrics.MetricsUtils;
@@ -18,6 +20,7 @@
1820
import java.util.Map;
1921
import java.util.Optional;
2022
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicInteger;
2124
import java.util.concurrent.atomic.AtomicLong;
2225
import java.util.function.Predicate;
2326

@@ -31,6 +34,7 @@ public class KafkaAssemblyOperatorMetricsHolder extends OperatorMetricsHolder {
3134
public static final String METRICS_CERTIFICATE_EXPIRATION_MS = METRICS_PREFIX + "certificate.expiration.timestamp.ms";
3235

3336
protected final Map<MetricKey, AtomicLong> certificateExpirationMap = new ConcurrentHashMap<>(1);
37+
protected final Map<MetricKey, AtomicInteger> nodePoolResourceCounterMap = new ConcurrentHashMap<>(1);
3438

3539
/**
3640
* Constructs the operator metrics holder
@@ -97,4 +101,31 @@ public void removeMetricsForCertificates(Predicate<CertificateMetricKey> shouldD
97101

98102
removedKeys.forEach(certificateExpirationMap::remove);
99103
}
104+
105+
/**
106+
* Counter metric for number of KafkaNodePool resources.
107+
*
108+
* @param namespace Namespace of the resources being reconciled
109+
*
110+
* @return Metrics gauge
111+
*/
112+
public AtomicInteger nodePoolResourceCounter(String namespace) {
113+
return getGauge(new MetricKey(KafkaNodePool.RESOURCE_KIND, namespace), METRICS_RESOURCES,
114+
"Number of custom resources the operator sees",
115+
Optional.of(getLabelSelectorValues()), nodePoolResourceCounterMap);
116+
}
117+
118+
/**
119+
* Resets all values in the node pool resource counter map to 0. This is used to
120+
* handle removed node pool resources from various namespaces during the periodical reconciliation.
121+
*
122+
* @param namespace Namespace for which should the metrics be reset to 0
123+
*/
124+
public void resetNodePoolCounters(String namespace) {
125+
if (namespace.equals(ConfigParameter.ANY_NAMESPACE)) {
126+
nodePoolResourceCounterMap.forEach((key, counter) -> counter.set(0));
127+
} else {
128+
nodePoolResourceCounter(namespace).set(0);
129+
}
130+
}
100131
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorMetricsHolderTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66

77
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
88
import io.strimzi.operator.common.MicrometerMetricsProvider;
9+
import io.strimzi.operator.common.config.ConfigParameter;
910
import io.strimzi.operator.common.metrics.CertificateMetricKey;
1011
import io.strimzi.operator.common.model.Labels;
1112
import org.junit.jupiter.api.BeforeEach;
1213
import org.junit.jupiter.api.DisplayName;
1314
import org.junit.jupiter.api.Test;
1415

1516
import java.util.Locale;
17+
import java.util.concurrent.atomic.AtomicInteger;
1618
import java.util.concurrent.atomic.AtomicLong;
1719
import java.util.function.Predicate;
1820

@@ -73,4 +75,37 @@ void shouldReturnCorrectExpirationTimeForClusterCaCertificate() {
7375
private boolean matchCaTypes(String actual, CertificateMetricKey.Type expected) {
7476
return actual.equals(expected.getDisplayName().toLowerCase(Locale.ROOT));
7577
}
78+
79+
@Test
80+
@DisplayName("Should return correct resource counter for node pool")
81+
void shouldReturnCorrectResourceCounterForNodePool() {
82+
AtomicInteger counter = metricsHolder.nodePoolResourceCounter("TestNamespace");
83+
84+
assertEquals(0, counter.get(), "Initial counter value should be 0");
85+
86+
counter.incrementAndGet();
87+
assertEquals(1, counter.get(), "Counter should be incremented to 1");
88+
}
89+
90+
@Test
91+
@DisplayName("Should reset node pool counters for specific namespace")
92+
void shouldResetNodePoolCountersForSpecificNamespace() {
93+
metricsHolder.nodePoolResourceCounter("TestNamespace").set(5);
94+
95+
metricsHolder.resetNodePoolCounters("TestNamespace");
96+
97+
assertEquals(0, metricsHolder.nodePoolResourceCounter("TestNamespace").get(), "Counter should be reset to 0");
98+
}
99+
100+
@Test
101+
@DisplayName("Should reset node pool counters for all namespaces")
102+
void shouldResetNodePoolCountersForAllNamespaces() {
103+
metricsHolder.nodePoolResourceCounter("Namespace1").set(3);
104+
metricsHolder.nodePoolResourceCounter("Namespace2").set(7);
105+
106+
metricsHolder.resetNodePoolCounters(ConfigParameter.ANY_NAMESPACE);
107+
108+
assertEquals(0, metricsHolder.nodePoolResourceCounter("Namespace1").get(), "Counter for Namespace1 should be reset to 0");
109+
assertEquals(0, metricsHolder.nodePoolResourceCounter("Namespace2").get(), "Counter for Namespace2 should be reset to 0");
110+
}
76111
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,10 @@ public void testReconcileMultipleKafkaInOneNamespace(VertxTestContext context) {
15861586
when(mockKafkaOps.getAsync(eq(NAMESPACE), eq("bar"))).thenReturn(Future.succeededFuture(bar));
15871587
when(mockKafkaOps.updateStatusAsync(any(), any(Kafka.class))).thenReturn(Future.succeededFuture());
15881588

1589+
CrdOperator<KubernetesClient, KafkaNodePool, KafkaNodePoolList> mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator;
1590+
when(mockKafkaNodePoolOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));
1591+
when(mockKafkaNodePoolOps.listAsync(eq(NAMESPACE), isNull(LabelSelector.class))).thenReturn(Future.succeededFuture(List.of()));
1592+
15891593
AtomicBoolean fooReconciled = new AtomicBoolean(false);
15901594
AtomicBoolean barReconciled = new AtomicBoolean(false);
15911595

@@ -1658,6 +1662,10 @@ public void testReconcileAllNamespaces(VertxTestContext context) {
16581662
when(mockKafkaOps.getAsync(eq("namespace2"), eq("bar"))).thenReturn(Future.succeededFuture(bar));
16591663
when(mockKafkaOps.updateStatusAsync(any(), any(Kafka.class))).thenReturn(Future.succeededFuture());
16601664

1665+
CrdOperator<KubernetesClient, KafkaNodePool, KafkaNodePoolList> mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator;
1666+
when(mockKafkaNodePoolOps.listAsync(eq("*"), any(Labels.class))).thenReturn(Future.succeededFuture(List.of()));
1667+
when(mockKafkaNodePoolOps.listAsync(eq("*"), isNull(LabelSelector.class))).thenReturn(Future.succeededFuture(List.of()));
1668+
16611669
AtomicBoolean fooReconciled = new AtomicBoolean(false);
16621670
AtomicBoolean barReconciled = new AtomicBoolean(false);
16631671

0 commit comments

Comments
 (0)