Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Comment thread
scholzj marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Added support for Strimzi Metrics Reporter to Kafka Connect, Mirror Maker 2 and Kafka Bridge.
* Add new feature gate `ServerSideApplyPhase1` (disabled by default) that adds support for Server Side Apply for `ConfigMap`, `Ingress`, `PVC`, `Service`, and `ServiceAccount` according to [Strimzi Proposal #105](https://github.com/strimzi/proposals/blob/main/105-server-side-apply-implementation-fg-timelines.md).
* Added distinction between changes of "cluster-wide" broker properties applied dynamically at cluster level, and "per-broker" broker properties applied dynamically at broker level.
* Extend the EntityOperator, Cruise Control and KafkaExporter deployment to support PDB via the template section in the CR spec.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.api.kafka.model.common.template.ContainerTemplate;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.PodDisruptionBudgetTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.common.template.ResourceTemplate;
import io.strimzi.crdgenerator.annotations.Description;
Expand All @@ -29,7 +30,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"deployment", "pod", "topicOperatorContainer", "userOperatorContainer", "tlsSidecarContainer", "serviceAccount", "entityOperatorRole", "topicOperatorRoleBinding", "userOperatorRoleBinding"})
@JsonPropertyOrder({"deployment", "pod", "topicOperatorContainer", "userOperatorContainer", "tlsSidecarContainer", "serviceAccount", "podDisruptionBudget", "entityOperatorRole", "topicOperatorRoleBinding", "userOperatorRoleBinding"})
@EqualsAndHashCode
@ToString
public class EntityOperatorTemplate implements UnknownPropertyPreserving {
Expand All @@ -42,6 +43,7 @@ public class EntityOperatorTemplate implements UnknownPropertyPreserving {
private ContainerTemplate userOperatorContainer;
private ContainerTemplate tlsSidecarContainer;
private ResourceTemplate serviceAccount;
private PodDisruptionBudgetTemplate podDisruptionBudget;
private Map<String, Object> additionalProperties;

@Description("Template for Entity Operator `Deployment`.")
Expand Down Expand Up @@ -136,6 +138,15 @@ public void setServiceAccount(ResourceTemplate serviceAccount) {
this.serviceAccount = serviceAccount;
}

@Description("Template for the Entity Operator Pod Disruption Budget.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public PodDisruptionBudgetTemplate getPodDisruptionBudget() {
return podDisruptionBudget;
}
public void setPodDisruptionBudget(PodDisruptionBudgetTemplate podDisruptionBudget) {
this.podDisruptionBudget = podDisruptionBudget;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : Map.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.api.kafka.model.common.template.ContainerTemplate;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.PodDisruptionBudgetTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.common.template.ResourceTemplate;
import io.strimzi.crdgenerator.annotations.Description;
Expand All @@ -30,7 +31,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"deployment", "pod", "service", "container", "serviceAccount"})
@JsonPropertyOrder({"deployment", "pod", "service", "container", "serviceAccount", "podDisruptionBudget"})
@EqualsAndHashCode
@ToString
public class KafkaExporterTemplate implements UnknownPropertyPreserving {
Expand All @@ -39,6 +40,7 @@ public class KafkaExporterTemplate implements UnknownPropertyPreserving {
private ResourceTemplate service;
private ContainerTemplate container;
private ResourceTemplate serviceAccount;
private PodDisruptionBudgetTemplate podDisruptionBudget;
private Map<String, Object> additionalProperties;

@Description("Template for Kafka Exporter `Deployment`.")
Expand Down Expand Up @@ -94,6 +96,15 @@ public void setServiceAccount(ResourceTemplate serviceAccount) {
this.serviceAccount = serviceAccount;
}

@Description("Template for the Pod Disruption Budget for Kafka Exporter pods.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public PodDisruptionBudgetTemplate getPodDisruptionBudget() {
return podDisruptionBudget;
}
public void setPodDisruptionBudget(PodDisruptionBudgetTemplate podDisruptionBudget) {
this.podDisruptionBudget = podDisruptionBudget;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : Map.of();
Expand All @@ -106,4 +117,5 @@ public void setAdditionalProperty(String name, Object value) {
}
this.additionalProperties.put(name, value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPeer;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.strimzi.api.kafka.model.common.JvmOptions;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
import io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.InternalServiceTemplate;
import io.strimzi.api.kafka.model.common.template.PodDisruptionBudgetTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
Expand Down Expand Up @@ -140,6 +142,7 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup
private DeploymentTemplate templateDeployment;
private PodTemplate templatePod;
private InternalServiceTemplate templateService;
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;

private static final Map<String, String> DEFAULT_POD_LABELS = new HashMap<>();
static {
Expand Down Expand Up @@ -235,6 +238,7 @@ public static CruiseControl fromCrd(
result.templateService = template.getApiService();
result.templateServiceAccount = template.getServiceAccount();
result.templateContainer = template.getCruiseControlContainer();
result.templatePodDisruptionBudget = template.getPodDisruptionBudget();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getPodDisruptionBudget() method is already present in Cruise Control, so is the current state that Cruise Control had it as part of the template, but we were ignoring it? @kyguy can you confirm whether PodDisruptionBudget is already supported for Cruise Control?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getPodDisruptionBudget() method is already present in Cruise Control, so is the current state that Cruise Control had it as part of the template, but we were ignoring it?

Yes, we were ignoring it. It looks like I added the template years ago but never added the spec or reconciliation bits.

can you confirm whether PodDisruptionBudget is already supported for Cruise Control?

I can confirm it is not currently supported for Cruise Control.

}

return result;
Expand Down Expand Up @@ -542,4 +546,19 @@ public ConfigMap generateConfigMap(MetricsAndLogging metricsAndLogging) {
configMapData
);
}
/**
* Generates the PodDisruptionBudget for Cruise Control
*
* @return The PodDisruptionBudget for Cruise Control
*/
public PodDisruptionBudget generatePodDisruptionBudget() {
return PodDisruptionBudgetUtils.createCustomControllerPodDisruptionBudget(
componentName,
namespace,
labels,
ownerReference,
templatePodDisruptionBudget,
1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.api.model.rbac.ClusterRole;
import io.fabric8.kubernetes.api.model.rbac.PolicyRule;
import io.fabric8.kubernetes.api.model.rbac.Role;
import io.strimzi.api.kafka.model.common.Probe;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.PodDisruptionBudgetTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.common.template.ResourceTemplate;
import io.strimzi.api.kafka.model.kafka.Kafka;
Expand Down Expand Up @@ -81,6 +83,7 @@ public class EntityOperator extends AbstractModel {
private ResourceTemplate templateRole;
private DeploymentTemplate templateDeployment;
private PodTemplate templatePod;
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;

private static final Map<String, String> DEFAULT_POD_LABELS = new HashMap<>();
static {
Expand Down Expand Up @@ -135,6 +138,7 @@ public static EntityOperator fromCrd(Reconciliation reconciliation,
result.templateDeployment = template.getDeployment();
result.templatePod = template.getPod();
result.templateServiceAccount = template.getServiceAccount();
result.templatePodDisruptionBudget = template.getPodDisruptionBudget();

if (topicOperator != null) {
topicOperator.templateContainer = template.getTopicOperatorContainer();
Expand Down Expand Up @@ -305,4 +309,19 @@ public NetworkPolicy generateNetworkPolicy() {
rules
);
}
/**
* Generates the PodDisruptionBudget for the Entity Operator
*
* @return The PodDisruptionBudget for the Entity Operator
*/
public PodDisruptionBudget generatePodDisruptionBudget() {
return PodDisruptionBudgetUtils.createCustomControllerPodDisruptionBudget(
componentName,
namespace,
labels,
ownerReference,
templatePodDisruptionBudget,
1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.strimzi.api.kafka.model.common.Probe;
import io.strimzi.api.kafka.model.common.ProbeBuilder;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
import io.strimzi.api.kafka.model.common.template.PodDisruptionBudgetTemplate;
import io.strimzi.api.kafka.model.common.template.PodTemplate;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class KafkaExporter extends AbstractModel {

private DeploymentTemplate templateDeployment;
private PodTemplate templatePod;
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;
private static final Map<String, String> DEFAULT_POD_LABELS = new HashMap<>();
static {
String value = System.getenv(CO_ENV_VAR_CUSTOM_KAFKA_EXPORTER_POD_LABELS);
Expand Down Expand Up @@ -148,6 +151,7 @@ public static KafkaExporter fromCrd(Reconciliation reconciliation, Kafka kafkaAs
result.templatePod = template.getPod();
result.templateServiceAccount = template.getServiceAccount();
result.templateContainer = template.getContainer();
result.templatePodDisruptionBudget = template.getPodDisruptionBudget();
}

result.version = versions.supportedVersion(kafkaAssembly.getSpec().getKafka().getVersion()).version();
Expand Down Expand Up @@ -313,4 +317,19 @@ public NetworkPolicy generateNetworkPolicy() {
rules
);
}
/**
* Generates the PodDisruptionBudget for Kafka Exporter
*
* @return The PodDisruptionBudget.
*/
public PodDisruptionBudget generatePodDisruptionBudget() {
return PodDisruptionBudgetUtils.createCustomControllerPodDisruptionBudget(
componentName,
namespace,
labels,
ownerReference,
templatePodDisruptionBudget,
1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.NetworkPolicyOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceAccountOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator;
Expand Down Expand Up @@ -63,6 +64,9 @@ public class CruiseControlReconciler {
private final NetworkPolicyOperator networkPolicyOperator;
private final ConfigMapOperator configMapOperator;
private final PasswordGenerator passwordGenerator;
private final boolean isPodDisruptionBudgetGeneration;
private final PodDisruptionBudgetOperator podDisruptionBudgetOperator;


private String certificateHash = "";
private String serverConfigurationHash = "";
Expand Down Expand Up @@ -114,6 +118,8 @@ public CruiseControlReconciler(
this.serviceOperator = supplier.serviceOperations;
this.networkPolicyOperator = supplier.networkPolicyOperator;
this.configMapOperator = supplier.configMapOperations;
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();
this.podDisruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
}

/**
Expand All @@ -135,6 +141,7 @@ public Future<Void> reconcile(boolean isOpenShift, ImagePullPolicy imagePullPoli
.compose(i -> certificatesSecret(clock))
.compose(i -> apiSecret())
.compose(i -> service())
.compose(i -> podDisruptionBudget())
Comment thread
katheris marked this conversation as resolved.
.compose(i -> deployment(isOpenShift, imagePullPolicy, imagePullSecrets))
.compose(i -> waitForDeploymentReadiness());
}
Expand Down Expand Up @@ -173,7 +180,24 @@ protected Future<Void> serviceAccount() {
cruiseControl != null ? cruiseControl.generateServiceAccount() : null
).mapEmpty();
}

/**
* Manages the Cruise Control Pod Disruption Budget
*
* @return Future which completes when the reconciliation is done
*/
protected Future<Void> podDisruptionBudget() {
if (isPodDisruptionBudgetGeneration) {
return podDisruptionBudgetOperator
.reconcile(
reconciliation,
reconciliation.namespace(),
CruiseControlResources.componentName(reconciliation.name()),
cruiseControl != null ? cruiseControl.generatePodDisruptionBudget() : null
).mapEmpty();
} else {
return Future.succeededFuture();
}
}
/**
* Manages the Cruise Control ConfigMap which contains the following:
* (1) Cruise Control server configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.NetworkPolicyOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.PodDisruptionBudgetOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleBindingOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.RoleOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
Expand Down Expand Up @@ -50,11 +51,13 @@ public class EntityOperatorReconciler {
private final SecretOperator secretOperator;
private final ServiceAccountOperator serviceAccountOperator;
private final boolean isNetworkPolicyGeneration;
private final boolean isPodDisruptionBudgetGeneration;
private final RoleOperator roleOperator;
private final RoleBindingOperator roleBindingOperator;
private final ConfigMapOperator configMapOperator;
private final NetworkPolicyOperator networkPolicyOperator;
private final boolean isCruiseControlEnabled;
private final PodDisruptionBudgetOperator podDistruptionBudgetOperator;

private String toCertificateHash = "";
private String uoCertificateHash = "";
Expand Down Expand Up @@ -83,6 +86,7 @@ public EntityOperatorReconciler(
this.maintenanceWindows = kafkaAssembly.getSpec().getMaintenanceTimeWindows();
this.isNetworkPolicyGeneration = config.isNetworkPolicyGeneration();
this.isCruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();

this.deploymentOperator = supplier.deploymentOperations;
this.secretOperator = supplier.secretOperations;
Expand All @@ -91,6 +95,7 @@ public EntityOperatorReconciler(
this.roleBindingOperator = supplier.roleBindingOperations;
this.configMapOperator = supplier.configMapOperations;
this.networkPolicyOperator = supplier.networkPolicyOperator;
this.podDistruptionBudgetOperator = supplier.podDisruptionBudgetOperator;
}

/**
Expand All @@ -111,6 +116,7 @@ public Future<Void> reconcile(boolean isOpenShift, ImagePullPolicy imagePullPoli
.compose(i -> topicOperatorRole())
.compose(i -> userOperatorRole())
.compose(i -> networkPolicy())
.compose(i -> podDistruptionBudget())
.compose(i -> topicOperatorRoleBindings())
.compose(i -> userOperatorRoleBindings())
.compose(i -> topicOperatorConfigMap())
Expand Down Expand Up @@ -416,7 +422,24 @@ protected Future<Void> networkPolicy() {
return Future.succeededFuture();
}
}

/**
* Manages the Entity Operator Pod Disruption Budget
*
* @return Future which completes when the reconciliation is done
*/
protected Future<Void> podDistruptionBudget() {
Comment thread
katheris marked this conversation as resolved.
if (isPodDisruptionBudgetGeneration) {
return podDistruptionBudgetOperator
.reconcile(
reconciliation,
reconciliation.namespace(),
KafkaResources.entityOperatorDeploymentName(reconciliation.name()),
entityOperator != null ? entityOperator.generatePodDisruptionBudget() : null
).mapEmpty();
} else {
return Future.succeededFuture();
}
}
/**
* Manages the Entity Operator Deployment.
*
Expand Down
Loading
Loading