Skip to content

Commit cce6139

Browse files
authored
Add version field to KafkaConnector and KafkaMirrorMaker2 CR (#12002)
Signed-off-by: Kate Stanley <11195226+katheris@users.noreply.github.com>
1 parent 0254161 commit cce6139

14 files changed

Lines changed: 417 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* New way of defining the target (`.spec.target`) and source clusters (`.spec.mirrors[].source`) in the `KafkaMirrorMaker2` custom resources.
1212
* Strimzi Access Operator installation files updated to version 0.2.0
1313
* New feature gate `UseConnectBuildWithBuildah` (disabled by default) for running the Connect Build feature with Buildah instead of Kaniko on Kubernetes - according to [Strimzi Proposal #114](https://github.com/strimzi/proposals/blob/main/114-use-buildah-instead-of-kaniko.md).
14+
* New field `spec.version` in the `KafkaConnecter` custom resource, and new `version` fields for each connector under `spec.mirrors[]` in the `KafkaMirrorMaker2` custom resource for configuring the desired version of a connector.
1415

1516
### Major changes, deprecations, and removals
1617

@@ -33,6 +34,8 @@
3334
* The `.spec.build.output.additionalKanikoOptions` field in the `KafkaConnect` custom resource is deprecated and will be removed in the future.
3435
* Use `.spec.build.output.additionalBuildOptions` field instead.
3536
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
37+
* Configuring `connector.plugin.version` under `spec.config` in the `KafkaConnector` custom resource, and under `spec.mirrors[].sourceConnector.config`, `spec.mirrors[].checkpointConnector.config`, and `spec.mirrors[].heartbeatConnector.config` in the `KafkaMirrorMaker2` custom resource is deprecated and will be forbidden in Strimzi 0.50.0.
38+
Instead, please use `spec.version` in the `KafkaConnecter` custom resource, and the connector specific `version` fields in the `KafkaMirrorMaker2` custom resource.
3639

3740
## 0.48.0
3841

api/src/main/java/io/strimzi/api/kafka/model/connector/AbstractConnectorSpec.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
builderPackage = Constants.FABRIC8_KUBERNETES_API
3030
)
3131
@JsonInclude(JsonInclude.Include.NON_NULL)
32-
@JsonPropertyOrder({"pause", "tasksMax", "config", "state", "listOffsets", "alterOffsets"})
32+
@JsonPropertyOrder({"pause", "tasksMax", "version", "config", "state", "listOffsets", "alterOffsets"})
3333
@EqualsAndHashCode(callSuper = true)
3434
@ToString(callSuper = true)
3535
public abstract class AbstractConnectorSpec extends Spec {
@@ -40,6 +40,7 @@ public abstract class AbstractConnectorSpec extends Spec {
4040

4141
private Integer tasksMax;
4242
private Boolean pause;
43+
private String version;
4344
private Map<String, Object> config = new HashMap<>(0);
4445
private ConnectorState state;
4546

@@ -66,6 +67,24 @@ public void setTasksMax(Integer tasksMax) {
6667
this.tasksMax = tasksMax;
6768
}
6869

70+
/**
71+
* @return Version or version range for the Kafka Connector
72+
*/
73+
@Description("Desired version or version range to respect when starting the Kafka Connector. This is only supported when using Kafka Connect version 4.1.0 and higher.")
74+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
75+
public String getVersion() {
76+
return version;
77+
}
78+
79+
/**
80+
* Sets the plugin version string for the Kafka Connector
81+
*
82+
* @param version Version or version range
83+
*/
84+
public void setVersion(String version) {
85+
this.version = version;
86+
}
87+
6988
/**
7089
* @return Connector configuration
7190
*/

api/src/main/java/io/strimzi/api/kafka/model/connector/KafkaConnectorSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
builderPackage = Constants.FABRIC8_KUBERNETES_API
1919
)
2020
@JsonInclude(JsonInclude.Include.NON_NULL)
21-
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "config", "pause", "state", "listOffsets", "alterOffsets"})
21+
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "version", "config", "pause", "state", "listOffsets", "alterOffsets"})
2222
@EqualsAndHashCode(callSuper = true)
2323
@ToString(callSuper = true)
2424
public class KafkaConnectorSpec extends AbstractConnectorSpec {

api/src/main/java/io/strimzi/api/kafka/model/mirrormaker2/KafkaMirrorMaker2ConnectorSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
builderPackage = Constants.FABRIC8_KUBERNETES_API
1818
)
1919
@JsonInclude(JsonInclude.Include.NON_NULL)
20-
@JsonPropertyOrder({"tasksMax", "pause", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
20+
@JsonPropertyOrder({"tasksMax", "pause", "version", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
2121
@EqualsAndHashCode(callSuper = true)
2222
@ToString(callSuper = true)
2323
public class KafkaMirrorMaker2ConnectorSpec extends AbstractConnectorSpec { }

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public List<KafkaConnector> generateConnectorDefinitions() {
125125
.withState(mm2ConnectorSpec.getState())
126126
.withAutoRestart(mm2ConnectorSpec.getAutoRestart())
127127
.withTasksMax(mm2ConnectorSpec.getTasksMax())
128+
.withVersion(mm2ConnectorSpec.getVersion())
128129
.withListOffsets(mm2ConnectorSpec.getListOffsets())
129130
.withAlterOffsets(mm2ConnectorSpec.getAlterOffsets())
130131
.endSpec()

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,22 @@ protected Future<Void> connectPodDisruptionBudget(Reconciliation reconciliation,
435435
protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient,
436436
String connectorName, KafkaConnectorSpec connectorSpec, CustomResource resource) {
437437
KafkaConnectorConfiguration desiredConfig = new KafkaConnectorConfiguration(reconciliation, connectorSpec.getConfig().entrySet());
438+
// In Strimzi 0.50.0 connector.plugin.version will be added to forbidden list, for now add warning to conditions if specified
439+
// Work for removing this is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/12027
440+
List<Condition> initialConditions = new ArrayList<>();
441+
if (desiredConfig.getConfigOption("connector.plugin.version") != null) {
442+
String message = "Config option connector.plugin.version has been set under the config field. This is deprecated and will be forbidden in future. " +
443+
"Use version field instead.";
444+
LOGGER.warnCr(reconciliation, message);
445+
initialConditions.add(StatusUtils.buildWarningCondition("DeprecatedFields", message));
446+
}
438447

439448
return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorConfig(reconciliation, new BackOff(200L, 2, 6), host, port, connectorName)).compose(
440449
currentConfig -> {
441450
if (!needsReconfiguring(reconciliation, connectorName, connectorSpec, desiredConfig.asOrderedProperties().asMap(), currentConfig)) {
442451
LOGGER.debugCr(reconciliation, "Connector {} exists and has desired config, {}=={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
443452
return VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName))
444-
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, new ArrayList<>()))
453+
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, initialConditions))
445454
.compose(conditions -> manageConnectorOffsets(reconciliation, host, apiClient, connectorName, resource, connectorSpec, conditions))
446455
.compose(conditions -> maybeRestartConnector(reconciliation, host, apiClient, connectorName, resource, conditions))
447456
.compose(conditions -> maybeRestartConnectorTask(reconciliation, host, apiClient, connectorName, resource, conditions))
@@ -452,7 +461,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
452461
} else {
453462
LOGGER.debugCr(reconciliation, "Connector {} exists but does not have desired config, {}!={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
454463
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
455-
.compose(createConnectorStatusAndConditions())
464+
.compose(createConnectorStatusAndConditions(initialConditions))
456465
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
457466
}
458467
},
@@ -461,7 +470,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
461470
&& ((ConnectRestException) error).getStatusCode() == 404) {
462471
LOGGER.debugCr(reconciliation, "Connector {} does not exist", connectorName);
463472
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
464-
.compose(createConnectorStatusAndConditions())
473+
.compose(createConnectorStatusAndConditions(initialConditions))
465474
.compose(status -> autoRestartFailedConnectorAndTasks(reconciliation, host, apiClient, connectorName, connectorSpec, status, resource))
466475
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
467476
} else {
@@ -474,11 +483,14 @@ private boolean needsReconfiguring(Reconciliation reconciliation, String connect
474483
KafkaConnectorSpec connectorSpec,
475484
Map<String, String> desiredConfig,
476485
Map<String, String> actualConfig) {
477-
// The actual which comes from Connect API includes tasks.max, connector.class and name,
486+
// The actual which comes from Connect API includes tasks.max, connector.class, connector.plugin.version (if set) and name,
478487
// which connectorSpec.getConfig() does not
479488
if (connectorSpec.getTasksMax() != null) {
480489
desiredConfig.put("tasks.max", connectorSpec.getTasksMax().toString());
481490
}
491+
if (connectorSpec.getVersion() != null) {
492+
desiredConfig.put("connector.plugin.version", connectorSpec.getVersion());
493+
}
482494
desiredConfig.put("name", connectorName);
483495
desiredConfig.put("connector.class", connectorSpec.getClassName());
484496

@@ -1178,6 +1190,10 @@ private static JsonObject asJson(KafkaConnectorSpec spec, KafkaConnectorConfigur
11781190
connectorConfigJson.put("tasks.max", spec.getTasksMax());
11791191
}
11801192

1193+
if (spec.getVersion() != null) {
1194+
connectorConfigJson.put("connector.plugin.version", spec.getVersion());
1195+
}
1196+
11811197
return connectorConfigJson.put("connector.class", spec.getClassName());
11821198
}
11831199

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.fabric8.kubernetes.client.CustomResource;
1010
import io.fabric8.kubernetes.client.KubernetesClient;
1111
import io.strimzi.api.kafka.model.common.CertSecretSource;
12+
import io.strimzi.api.kafka.model.common.Condition;
1213
import io.strimzi.api.kafka.model.connector.AutoRestartStatus;
1314
import io.strimzi.api.kafka.model.connector.KafkaConnector;
1415
import io.strimzi.api.kafka.model.connector.KafkaConnectorSpec;
@@ -147,8 +148,12 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
147148
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : reconcileConnectors(reconciliation, kafkaMirrorMaker2, mirrorMaker2Cluster, kafkaMirrorMaker2Status))
148149
.map((Void) null)
149150
.onComplete(reconciliationResult -> {
151+
// Extract warning conditions from reconciliation
152+
List<Condition> warningConditions = kafkaMirrorMaker2Status.getConditions();
150153
StatusUtils.setStatusConditionAndObservedGeneration(kafkaMirrorMaker2, kafkaMirrorMaker2Status, reconciliationResult.cause());
151-
154+
if (warningConditions != null && !warningConditions.isEmpty()) {
155+
kafkaMirrorMaker2Status.addConditions(warningConditions);
156+
}
152157
if (!hasZeroReplicas) {
153158
kafkaMirrorMaker2Status.setUrl(KafkaMirrorMaker2Resources.url(mirrorMaker2Cluster.getCluster(), namespace, KafkaMirrorMaker2Cluster.REST_API_PORT));
154159
}

0 commit comments

Comments
 (0)