1212import io .strimzi .api .kafka .model .kafka .PersistentClaimStorage ;
1313import io .strimzi .api .kafka .model .kafka .SingleVolumeStorage ;
1414import io .strimzi .api .kafka .model .kafka .Storage ;
15+ import io .strimzi .api .kafka .model .kafka .cruisecontrol .BrokerCapacity ;
1516import io .strimzi .api .kafka .model .kafka .cruisecontrol .BrokerCapacityOverride ;
1617import io .strimzi .api .kafka .model .kafka .cruisecontrol .CruiseControlSpec ;
1718import io .strimzi .operator .cluster .model .NodeRef ;
2021import io .strimzi .operator .cluster .model .VolumeUtils ;
2122import io .strimzi .operator .common .Reconciliation ;
2223import io .strimzi .operator .common .ReconciliationLogger ;
24+ import io .strimzi .operator .common .model .resourcerequirements .ResourceRequirementsUtils ;
2325import io .vertx .core .json .JsonArray ;
2426import io .vertx .core .json .JsonObject ;
2527
2931import java .util .Set ;
3032import java .util .TreeMap ;
3133
34+ import static io .strimzi .operator .cluster .model .cruisecontrol .CapacityResourceType .CPU ;
35+ import static io .strimzi .operator .cluster .model .cruisecontrol .CapacityResourceType .DISK ;
36+ import static io .strimzi .operator .cluster .model .cruisecontrol .CapacityResourceType .INBOUND_NETWORK ;
37+ import static io .strimzi .operator .cluster .model .cruisecontrol .CapacityResourceType .OUTBOUND_NETWORK ;
38+
3239/**
3340 * Uses information in a Kafka Custom Resource to generate a capacity configuration file to be used for
3441 * Cruise Control's Broker Capacity File Resolver.
125132public class Capacity {
126133 protected static final ReconciliationLogger LOGGER = ReconciliationLogger .create (Capacity .class .getName ());
127134 private final Reconciliation reconciliation ;
128- private final TreeMap <Integer , BrokerCapacity > capacityEntries ;
135+ private final TreeMap <Integer , BrokerCapacityEntry > capacityEntries ;
129136
130137 /**
131138 * Broker capacities key
@@ -137,52 +144,11 @@ public class Capacity {
137144 */
138145 public static final String CAPACITY_KEY = "capacity" ;
139146
140- /**
141- * Disk key
142- */
143- public static final String DISK_KEY = "DISK" ;
144-
145- /**
146- * CPU key
147- */
148- public static final String CPU_KEY = "CPU" ;
149-
150- /**
151- * Inbound network key
152- */
153- public static final String INBOUND_NETWORK_KEY = "NW_IN" ;
154-
155- /**
156- * Outbound network key
157- */
158- public static final String OUTBOUND_NETWORK_KEY = "NW_OUT" ;
159-
160- /**
161- * Resource type
162- */
163- public static final String RESOURCE_TYPE = "cpu" ;
164-
165147 private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka" ;
166148 private static final String KAFKA_LOG_DIR = "kafka-log" ;
167149 private static final String BROKER_ID_KEY = "brokerId" ;
168150 private static final String DOC_KEY = "doc" ;
169151
170- private enum ResourceRequirementType {
171- REQUEST ,
172- LIMIT ;
173-
174- private Quantity getQuantity (ResourceRequirements resources ) {
175- Map <String , Quantity > resourceRequirement = switch (this ) {
176- case REQUEST -> resources .getRequests ();
177- case LIMIT -> resources .getLimits ();
178- };
179- if (resourceRequirement != null ) {
180- return resourceRequirement .get (RESOURCE_TYPE );
181- }
182- return null ;
183- }
184- }
185-
186152 /**
187153 * Constructor
188154 *
@@ -205,26 +171,38 @@ public Capacity(
205171 processCapacityEntries (spec .getCruiseControl (), kafkaBrokerNodes , kafkaStorage , kafkaBrokerResources );
206172 }
207173
208- private static Integer getResourceRequirement (ResourceRequirements resources , ResourceRequirementType requirementType ) {
209- if (resources != null ) {
210- Quantity quantity = requirementType .getQuantity (resources );
211- if (quantity != null ) {
212- return Quantities .parseCpuAsMilliCpus (quantity .toString ());
174+ /**
175+ * Checks whether all Kafka broker pods have their CPU resource requests equal to their CPU limits.
176+ *
177+ * @param kafkaBrokerResources a map of broker pod names to their {@link ResourceRequirements}
178+ * @return {@code true} if all brokers have matching CPU requests and limits; {@code false} otherwise
179+ */
180+ public static boolean cpuRequestsMatchLimits (Map <String , ResourceRequirements > kafkaBrokerResources ) {
181+ if (kafkaBrokerResources == null ) {
182+ return false ;
183+ }
184+ for (ResourceRequirements resourceRequirements : kafkaBrokerResources .values ()) {
185+ Quantity request = ResourceRequirementsUtils .getCpuRequest (resourceRequirements );
186+ Quantity limit = ResourceRequirementsUtils .getCpuLimit (resourceRequirements );
187+ if (request == null || limit == null || request .compareTo (limit ) != 0 ) {
188+ return false ;
213189 }
214190 }
215- return null ;
191+ return true ;
216192 }
217193
218194 private static CpuCapacity getCpuBasedOnRequirements (ResourceRequirements resourceRequirements ) {
219- Integer request = getResourceRequirement (resourceRequirements , ResourceRequirementType . REQUEST );
220- Integer limit = getResourceRequirement (resourceRequirements , ResourceRequirementType . LIMIT );
195+ Quantity request = ResourceRequirementsUtils . getCpuRequest (resourceRequirements );
196+ Quantity limit = ResourceRequirementsUtils . getCpuLimit (resourceRequirements );
221197
222198 if (request != null ) {
223- return new CpuCapacity (CpuCapacity .milliCpuToCpu (request ));
199+ int milliCpus = Quantities .parseCpuAsMilliCpus (request .toString ());
200+ return new CpuCapacity (CpuCapacity .milliCpuToCpu (milliCpus ));
224201 } else if (limit != null ) {
225- return new CpuCapacity (CpuCapacity .milliCpuToCpu (limit ));
202+ int milliCpus = Quantities .parseCpuAsMilliCpus (limit .toString ());
203+ return new CpuCapacity (CpuCapacity .milliCpuToCpu (milliCpus ));
226204 } else {
227- return new CpuCapacity (BrokerCapacity .DEFAULT_CPU_CORE_CAPACITY );
205+ return new CpuCapacity (BrokerCapacityEntry .DEFAULT_CPU_CORE_CAPACITY );
228206 }
229207 }
230208
@@ -265,7 +243,7 @@ private static String processInboundNetwork(io.strimzi.api.kafka.model.kafka.cru
265243 } else if (bc != null && bc .getInboundNetwork () != null ) {
266244 return getThroughputInKiB (bc .getInboundNetwork ());
267245 } else {
268- return BrokerCapacity .DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND ;
246+ return BrokerCapacityEntry .DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND ;
269247 }
270248 }
271249
@@ -275,7 +253,7 @@ private static String processOutboundNetwork(io.strimzi.api.kafka.model.kafka.cr
275253 } else if (bc != null && bc .getOutboundNetwork () != null ) {
276254 return getThroughputInKiB (bc .getOutboundNetwork ());
277255 } else {
278- return BrokerCapacity .DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND ;
256+ return BrokerCapacityEntry .DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND ;
279257 }
280258 }
281259
@@ -317,7 +295,7 @@ private static DiskCapacity generateDiskCapacity(Storage storage) {
317295 if (((EphemeralStorage ) storage ).getSizeLimit () != null ) {
318296 return DiskCapacity .of (getSizeInMiB (((EphemeralStorage ) storage ).getSizeLimit ()));
319297 } else {
320- return DiskCapacity .of (BrokerCapacity .DEFAULT_DISK_CAPACITY_IN_MIB );
298+ return DiskCapacity .of (BrokerCapacityEntry .DEFAULT_DISK_CAPACITY_IN_MIB );
321299 }
322300 } else if (storage == null ) {
323301 throw new IllegalStateException ("The storage declaration is missing" );
@@ -335,7 +313,7 @@ private static DiskCapacity generateDiskCapacity(Storage storage) {
335313 */
336314 private static String getSizeInMiB (String size ) {
337315 if (size == null ) {
338- return BrokerCapacity .DEFAULT_DISK_CAPACITY_IN_MIB ;
316+ return BrokerCapacityEntry .DEFAULT_DISK_CAPACITY_IN_MIB ;
339317 }
340318 return String .valueOf (StorageUtils .convertTo (size , "Mi" ));
341319 }
@@ -353,7 +331,7 @@ public static String getThroughputInKiB(String throughput) {
353331 }
354332
355333 private void processCapacityEntries (CruiseControlSpec spec , Set <NodeRef > kafkaBrokerNodes , Map <String , Storage > kafkaStorage , Map <String , ResourceRequirements > kafkaBrokerResources ) {
356- io . strimzi . api . kafka . model . kafka . cruisecontrol . BrokerCapacity brokerCapacity = spec .getBrokerCapacity ();
334+ BrokerCapacity brokerCapacity = spec .getBrokerCapacity ();
357335
358336 String inboundNetwork = processInboundNetwork (brokerCapacity , null );
359337 String outboundNetwork = processOutboundNetwork (brokerCapacity , null );
@@ -363,7 +341,7 @@ private void processCapacityEntries(CruiseControlSpec spec, Set<NodeRef> kafkaBr
363341 DiskCapacity disk = processDisk (kafkaStorage .get (node .poolName ()), node .nodeId ());
364342 CpuCapacity cpu = processCpu (null , brokerCapacity , kafkaBrokerResources .get (node .poolName ()));
365343
366- BrokerCapacity broker = new BrokerCapacity (node .nodeId (), cpu , disk , inboundNetwork , outboundNetwork );
344+ BrokerCapacityEntry broker = new BrokerCapacityEntry (node .nodeId (), cpu , disk , inboundNetwork , outboundNetwork );
367345 capacityEntries .put (node .nodeId (), broker );
368346 }
369347
@@ -382,12 +360,12 @@ private void processCapacityEntries(CruiseControlSpec spec, Set<NodeRef> kafkaBr
382360 inboundNetwork = processInboundNetwork (brokerCapacity , override );
383361 outboundNetwork = processOutboundNetwork (brokerCapacity , override );
384362 for (int id : ids ) {
385- if (id == BrokerCapacity .DEFAULT_BROKER_ID ) {
363+ if (id == BrokerCapacityEntry .DEFAULT_BROKER_ID ) {
386364 LOGGER .warnCr (reconciliation , "Ignoring broker capacity override with illegal broker id -1." );
387365 } else {
388366 if (capacityEntries .containsKey (id )) {
389367 if (overrideIds .add (id )) {
390- BrokerCapacity brokerCapacityEntry = capacityEntries .get (id );
368+ BrokerCapacityEntry brokerCapacityEntry = capacityEntries .get (id );
391369 brokerCapacityEntry .setCpu (processCpu (override , brokerCapacity , kafkaBrokerResources .get (Integer .toString (id ))));
392370 brokerCapacityEntry .setInboundNetwork (inboundNetwork );
393371 brokerCapacityEntry .setOutboundNetwork (outboundNetwork );
@@ -411,14 +389,14 @@ private void processCapacityEntries(CruiseControlSpec spec, Set<NodeRef> kafkaBr
411389 * @param brokerCapacity Broker capacity object
412390 * @return Broker entry as a JsonObject
413391 */
414- private JsonObject generateBrokerCapacity ( BrokerCapacity brokerCapacity ) {
392+ private JsonObject generateBrokerCapacityEntry ( BrokerCapacityEntry brokerCapacity ) {
415393 return new JsonObject ()
416394 .put (BROKER_ID_KEY , brokerCapacity .getId ())
417395 .put (CAPACITY_KEY , new JsonObject ()
418- .put (DISK_KEY , brokerCapacity .getDisk ().getJson ())
419- .put (CPU_KEY , brokerCapacity .getCpu ().getJson ())
420- .put (INBOUND_NETWORK_KEY , brokerCapacity .getInboundNetwork ())
421- .put (OUTBOUND_NETWORK_KEY , brokerCapacity .getOutboundNetwork ())
396+ .put (DISK . getKey () , brokerCapacity .getDisk ().getJson ())
397+ .put (CPU . getKey () , brokerCapacity .getCpu ().getJson ())
398+ .put (INBOUND_NETWORK . getKey () , brokerCapacity .getInboundNetwork ())
399+ .put (OUTBOUND_NETWORK . getKey () , brokerCapacity .getOutboundNetwork ())
422400 )
423401 .put (DOC_KEY , brokerCapacity .getDoc ());
424402 }
@@ -430,8 +408,8 @@ private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) {
430408 */
431409 public JsonObject generateCapacityConfig () {
432410 JsonArray brokerList = new JsonArray ();
433- for (BrokerCapacity brokerCapacity : capacityEntries .values ()) {
434- JsonObject brokerEntry = generateBrokerCapacity (brokerCapacity );
411+ for (BrokerCapacityEntry brokerCapacity : capacityEntries .values ()) {
412+ JsonObject brokerEntry = generateBrokerCapacityEntry (brokerCapacity );
435413 brokerList .add (brokerEntry );
436414 }
437415
@@ -449,7 +427,7 @@ public String toString() {
449427 /**
450428 * @return Capacity entries
451429 */
452- public TreeMap <Integer , BrokerCapacity > getCapacityEntries () {
430+ public TreeMap <Integer , BrokerCapacityEntry > getCapacityEntries () {
453431 return capacityEntries ;
454432 }
455433}
0 commit comments