diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index bcbe16a5faa51..c4e8cf4885f74 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -204,83 +204,91 @@ public boolean isReady() { @Override public void onStartup(String connector) { statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } @Override public void onStop(String connector) { statusBackingStore.put(new ConnectorStatus(connector, AbstractStatus.State.STOPPED, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } @Override public void onPause(String connector) { statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.PAUSED, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } @Override public void onResume(String connector) { statusBackingStore.put(new ConnectorStatus(connector, TaskStatus.State.RUNNING, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } @Override public void onShutdown(String connector) { statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } @Override public void onFailure(String connector, Throwable cause) { statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED, - trace(cause), workerId, generation())); + trace(cause), workerId, generation(), worker.connectorVersion(connector))); } @Override public void onStartup(ConnectorTaskId id) { - statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation())); + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null, + worker.taskVersion(id))); } @Override public void onFailure(ConnectorTaskId id, Throwable cause) { - statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause))); + statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause), + worker.taskVersion(id))); } @Override public void onShutdown(ConnectorTaskId id) { - statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation())); + statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation(), null, + worker.taskVersion(id))); } @Override public void onResume(ConnectorTaskId id) { - statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation())); + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation(), null, + worker.taskVersion(id))); } @Override public void onPause(ConnectorTaskId id) { - statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation())); + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation(), null, + worker.taskVersion(id))); } @Override public void onDeletion(String connector) { for (TaskStatus status : statusBackingStore.getAll(connector)) onDeletion(status.id()); - statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation())); + statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation(), + worker.connectorVersion(connector))); } @Override public void onDeletion(ConnectorTaskId id) { - statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation(), null, + worker.taskVersion(id))); } public void onRestart(String connector) { statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RESTARTING, - workerId, generation())); + workerId, generation(), worker.connectorVersion(connector))); } public void onRestart(ConnectorTaskId id) { - statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation())); + statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RESTARTING, workerId, generation(), null, + worker.taskVersion(id))); } @Override @@ -348,12 +356,12 @@ public ConnectorStateInfo connectorStatus(String connName) { Collection tasks = statusBackingStore.getAll(connName); ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState( - connector.state().toString(), connector.workerId(), connector.trace()); + connector.state().toString(), connector.workerId(), connector.trace(), connector.version()); List taskStates = new ArrayList<>(); for (TaskStatus status : tasks) { taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(), - status.state().toString(), status.workerId(), status.trace())); + status.state().toString(), status.workerId(), status.trace(), status.version())); } Collections.sort(taskStates); @@ -389,7 +397,7 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { throw new NotFoundException("No status found for task " + id); return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(), - status.workerId(), status.trace()); + status.workerId(), status.trace(), status.version()); } @Override @@ -627,7 +635,8 @@ public Optional buildRestartPlan(RestartRequest request) { ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState( connectorState.toString(), connectorStatus.workerId(), - connectorStatus.trace() + connectorStatus.trace(), + connectorStatus.version() ); // Collect the task states, If requested, mark the task as restarting @@ -639,7 +648,8 @@ public Optional buildRestartPlan(RestartRequest request) { taskStatus.id().task(), taskState.toString(), taskStatus.workerId(), - taskStatus.trace() + taskStatus.trace(), + taskStatus.version() ); }) .collect(Collectors.toList()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java index 76036d610d738..fc8bc7ca05061 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java @@ -34,18 +34,29 @@ public enum State { private final State state; private final String trace; private final String workerId; + private final String version; private final int generation; public AbstractStatus(T id, State state, String workerId, int generation, - String trace) { + String trace, + String version) { this.id = id; this.state = state; this.workerId = workerId; this.generation = generation; this.trace = trace; + this.version = version; + } + + public AbstractStatus(T id, + State state, + String workerId, + int generation, + String trace) { + this(id, state, workerId, generation, trace, null); } public T id() { @@ -68,12 +79,17 @@ public int generation() { return generation; } + public String version() { + return version; + } + @Override public String toString() { return "Status{" + "id=" + id + ", state=" + state + ", workerId='" + workerId + '\'' + + ", version='" + version + '\'' + ", generation=" + generation + '}'; } @@ -89,7 +105,8 @@ public boolean equals(Object o) { && Objects.equals(id, that.id) && state == that.state && Objects.equals(trace, that.trace) - && Objects.equals(workerId, that.workerId); + && Objects.equals(workerId, that.workerId) + && Objects.equals(version, that.version); } @Override @@ -98,6 +115,7 @@ public int hashCode() { result = 31 * result + (state != null ? state.hashCode() : 0); result = 31 * result + (trace != null ? trace.hashCode() : 0); result = 31 * result + (workerId != null ? workerId.hashCode() : 0); + result = 31 * result + (version != null ? version.hashCode() : 0); result = 31 * result + generation; return result; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 683eb3abed0f2..9a74d81770fa7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -203,6 +203,7 @@ protected abstract void producerSendFailed( private final boolean topicTrackingEnabled; private final TopicCreation topicCreation; private final Executor closeExecutor; + private final String version; // Visible for testing List toSend; @@ -236,11 +237,12 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, StatusBackingStore statusBackingStore, Executor closeExecutor, Supplier>> errorReportersSupplier, + TaskPluginsMetadata pluginsMetadata, Function pluginLoaderSwapper) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, transformationChain, errorReportersSupplier, - time, statusBackingStore, pluginLoaderSwapper); + time, statusBackingStore, pluginsMetadata, pluginLoaderSwapper); this.workerConfig = workerConfig; this.task = task; @@ -258,6 +260,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id, this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.topicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.topicCreation = TopicCreation.newTopicCreation(workerConfig, topicGroups); + this.version = task.version(); } @Override @@ -391,6 +394,11 @@ public void execute() { finalOffsetCommit(false); } + @Override + public String taskVersion() { + return version; + } + /** * Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can * be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java index 1d144440f2c20..496c838cf45f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java @@ -37,6 +37,10 @@ public class ConnectMetricsRegistry { public static final String WORKER_GROUP_NAME = "connect-worker-metrics"; public static final String WORKER_REBALANCE_GROUP_NAME = "connect-worker-rebalance-metrics"; public static final String TASK_ERROR_HANDLING_GROUP_NAME = "task-error-metrics"; + public static final String TRANSFORMS_GROUP = "connector-transform-metrics"; + public static final String PREDICATES_GROUP = "connector-predicate-metrics"; + public static final String TRANSFORM_TAG_NAME = "transform"; + public static final String PREDICATE_TAG_NAME = "predicate"; private final List allTemplates = new ArrayList<>(); public final MetricNameTemplate connectorStatus; @@ -59,6 +63,17 @@ public class ConnectMetricsRegistry { public final MetricNameTemplate taskBatchSizeAvg; public final MetricNameTemplate taskCommitFailurePercentage; public final MetricNameTemplate taskCommitSuccessPercentage; + public final MetricNameTemplate taskConnectorClass; + public final MetricNameTemplate taskConnectorClassVersion; + public final MetricNameTemplate taskConnectorType; + public final MetricNameTemplate taskClass; + public final MetricNameTemplate taskVersion; + public final MetricNameTemplate taskKeyConverterClass; + public final MetricNameTemplate taskValueConverterClass; + public final MetricNameTemplate taskKeyConverterVersion; + public final MetricNameTemplate taskValueConverterVersion; + public final MetricNameTemplate taskHeaderConverterClass; + public final MetricNameTemplate taskHeaderConverterVersion; public final MetricNameTemplate sourceRecordPollRate; public final MetricNameTemplate sourceRecordPollTotal; public final MetricNameTemplate sourceRecordWriteRate; @@ -115,6 +130,10 @@ public class ConnectMetricsRegistry { public final MetricNameTemplate transactionSizeMin; public final MetricNameTemplate transactionSizeMax; public final MetricNameTemplate transactionSizeAvg; + public final MetricNameTemplate transformClass; + public final MetricNameTemplate transformVersion; + public final MetricNameTemplate predicateClass; + public final MetricNameTemplate predicateVersion; public Map connectorStatusMetrics; @@ -164,6 +183,43 @@ public ConnectMetricsRegistry(Set tags) { taskCommitSuccessPercentage = createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME, "The average percentage of this task's offset commit attempts that succeeded.", workerTaskTags); + taskConnectorClass = createTemplate("connector-class", TASK_GROUP_NAME, "The name of the connector class.", workerTaskTags); + taskConnectorClassVersion = createTemplate("connector-version", TASK_GROUP_NAME, + "The version of the connector class, as reported by the connector.", workerTaskTags); + taskConnectorType = createTemplate("connector-type", TASK_GROUP_NAME, "The type of the connector. One of 'source' or 'sink'.", + workerTaskTags); + taskClass = createTemplate("task-class", TASK_GROUP_NAME, "The class name of the task.", workerTaskTags); + taskVersion = createTemplate("task-version", TASK_GROUP_NAME, "The version of the task.", workerTaskTags); + taskKeyConverterClass = createTemplate("key-converter-class", TASK_GROUP_NAME, + "The fully qualified class name from key.converter", workerTaskTags); + taskValueConverterClass = createTemplate("value-converter-class", TASK_GROUP_NAME, + "The fully qualified class name from value.converter", workerTaskTags); + taskKeyConverterVersion = createTemplate("key-converter-version", TASK_GROUP_NAME, + "The version instantiated for key.converter. May be undefined", workerTaskTags); + taskValueConverterVersion = createTemplate("value-converter-version", TASK_GROUP_NAME, + "The version instantiated for value.converter. May be undefined", workerTaskTags); + taskHeaderConverterClass = createTemplate("header-converter-class", TASK_GROUP_NAME, + "The fully qualified class name from header.converter", workerTaskTags); + taskHeaderConverterVersion = createTemplate("header-converter-version", TASK_GROUP_NAME, + "The version instantiated for header.converter. May be undefined", workerTaskTags); + + /* Transformation Metrics */ + Set transformTags = new LinkedHashSet<>(tags); + transformTags.addAll(workerTaskTags); + transformTags.add(TRANSFORM_TAG_NAME); + transformClass = createTemplate("transform-class", TRANSFORMS_GROUP, + "The class name of the transformation class", transformTags); + transformVersion = createTemplate("transform-version", TRANSFORMS_GROUP, + "The version of the transformation class", transformTags); + + /* Predicate Metrics */ + Set predicateTags = new LinkedHashSet<>(tags); + predicateTags.addAll(workerTaskTags); + predicateTags.add(PREDICATE_TAG_NAME); + predicateClass = createTemplate("predicate-class", PREDICATES_GROUP, + "The class name of the predicate class", predicateTags); + predicateVersion = createTemplate("predicate-version", PREDICATES_GROUP, + "The version of the predicate class", predicateTags); /* Source worker task level */ Set sourceTaskTags = new LinkedHashSet<>(tags); @@ -426,4 +482,20 @@ public String workerRebalanceGroupName() { public String taskErrorHandlingGroupName() { return TASK_ERROR_HANDLING_GROUP_NAME; } + + public String transformsGroupName() { + return TRANSFORMS_GROUP; + } + + public String transformsTagName() { + return TRANSFORM_TAG_NAME; + } + + public String predicatesGroupName() { + return PREDICATES_GROUP; + } + + public String predicateTagName() { + return PREDICATE_TAG_NAME; + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index db485146811a3..a144261d84224 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -374,7 +374,7 @@ public > List> transformationS final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX; @SuppressWarnings("unchecked") final Transformation transformation = getTransformationOrPredicate(plugins, typeConfig, versionConfig); Map configs = originalsWithPrefix(prefix); - Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG); + String predicateAlias = (String) configs.remove(TransformationStage.PREDICATE_CONFIG); Object negate = configs.remove(TransformationStage.NEGATE_CONFIG); transformation.configure(configs); Plugin> transformationPlugin = metrics.wrap(transformation, connectorTaskId, alias); @@ -385,10 +385,24 @@ public > List> transformationS @SuppressWarnings("unchecked") Predicate predicate = getTransformationOrPredicate(plugins, predicateTypeConfig, predicateVersionConfig); predicate.configure(originalsWithPrefix(predicatePrefix)); - Plugin> predicatePlugin = metrics.wrap(predicate, connectorTaskId, (String) predicateAlias); - transformations.add(new TransformationStage<>(predicatePlugin, negate != null && Boolean.parseBoolean(negate.toString()), transformationPlugin, plugins.safeLoaderSwapper())); + Plugin> predicatePlugin = metrics.wrap(predicate, connectorTaskId, predicateAlias); + transformations.add(new TransformationStage<>( + predicatePlugin, + predicateAlias, + plugins.pluginVersion(predicate.getClass().getName(), predicate.getClass().getClassLoader(), PluginType.PREDICATE), + negate != null && Boolean.parseBoolean(negate.toString()), + transformationPlugin, + alias, + plugins.pluginVersion(transformation.getClass().getName(), transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION), + plugins.safeLoaderSwapper()) + ); } else { - transformations.add(new TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper())); + transformations.add(new TransformationStage<>( + transformationPlugin, + alias, + plugins.pluginVersion(transformation.getClass().getName(), transformation.getClass().getClassLoader(), PluginType.TRANSFORMATION), + plugins.safeLoaderSwapper()) + ); } } catch (Exception e) { throw new ConnectException(e); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java index 10ed188cdf883..d704a3374e296 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java @@ -19,8 +19,12 @@ public class ConnectorStatus extends AbstractStatus { - public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) { - super(connector, state, workerUrl, generation, msg); + public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation, String version) { + super(connector, state, workerUrl, generation, msg, version); + } + + public ConnectorStatus(String connector, State state, String workerUrl, int generation, String version) { + super(connector, state, workerUrl, generation, null, version); } public ConnectorStatus(String connector, State state, String workerUrl, int generation) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index fafbdbbc3f484..222de3ff49c0d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -104,11 +104,12 @@ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id, Runnable preProducerCheck, Runnable postProducerCheck, Supplier>> errorReportersSupplier, + TaskPluginsMetadata pluginsMetadata, Function pluginLoaderSwapper) { super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, buildTransactionContext(sourceConfig), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, - loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper); + loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper); this.transactionOpen = false; this.committableRecords = new LinkedHashMap<>(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java new file mode 100644 index 0000000000000..14e6cb9b7a717 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; + +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class TaskPluginsMetadata { + + private final String connectorClass; + private final String connectorVersion; + private final ConnectorType connectorType; + private final String taskClass; + private final String taskVersion; + private final String keyConverterClass; + private final String keyConverterVersion; + private final String valueConverterClass; + private final String valueConverterVersion; + private final String headerConverterClass; + private final String headerConverterVersion; + private final Set transformations; + private final Set predicates; + + public TaskPluginsMetadata( + Class connectorClass, + Task task, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + List transformationStageInfo, + Plugins plugins + ) { + + assert connectorClass != null; + assert task != null; + assert keyConverter != null; + assert valueConverter != null; + assert headerConverter != null; + assert transformationStageInfo != null; + + this.connectorClass = connectorClass.getName(); + this.connectorVersion = plugins.pluginVersion(connectorClass.getName(), connectorClass.getClassLoader(), PluginType.SINK, PluginType.SOURCE); + this.connectorType = ConnectorType.from(connectorClass); + this.taskClass = task.getClass().getName(); + this.taskVersion = task.version(); + this.keyConverterClass = keyConverter.getClass().getName(); + this.keyConverterVersion = plugins.pluginVersion(keyConverter.getClass().getName(), keyConverter.getClass().getClassLoader(), PluginType.CONVERTER); + this.valueConverterClass = valueConverter.getClass().getName(); + this.valueConverterVersion = plugins.pluginVersion(valueConverter.getClass().getName(), valueConverter.getClass().getClassLoader(), PluginType.CONVERTER); + this.headerConverterClass = headerConverter.getClass().getName(); + this.headerConverterVersion = plugins.pluginVersion(headerConverter.getClass().getName(), headerConverter.getClass().getClassLoader(), PluginType.HEADER_CONVERTER); + this.transformations = transformationStageInfo.stream().map(TransformationStage.StageInfo::transform).collect(Collectors.toSet()); + this.predicates = transformationStageInfo.stream().map(TransformationStage.StageInfo::predicate).filter(Objects::nonNull).collect(Collectors.toSet()); + } + + public String connectorClass() { + return connectorClass; + } + + public String connectorVersion() { + return connectorVersion; + } + + public ConnectorType connectorType() { + return connectorType; + } + + public String taskClass() { + return taskClass; + } + + public String taskVersion() { + return taskVersion; + } + + public String keyConverterClass() { + return keyConverterClass; + } + + public String keyConverterVersion() { + return keyConverterVersion; + } + + public String valueConverterClass() { + return valueConverterClass; + } + + public String valueConverterVersion() { + return valueConverterVersion; + } + + public String headerConverterClass() { + return headerConverterClass; + } + + public String headerConverterVersion() { + return headerConverterVersion; + } + + public Set transformations() { + return transformations; + } + + public Set predicates() { + return predicates; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java index e35efcafe2e91..45150ef7ef5a3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java @@ -20,8 +20,8 @@ public class TaskStatus extends AbstractStatus { - public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) { - super(id, state, workerUrl, generation, trace); + public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace, String version) { + super(id, state, workerUrl, generation, trace, version); } public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java index f6b92697c443b..68d52f2c1ca3d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; import java.util.StringJoiner; +import java.util.stream.Collectors; /** * Represents a chain of {@link Transformation}s to be applied to a {@link ConnectRecord} serially. @@ -89,4 +90,8 @@ public String toString() { } return chain.toString(); } + + public List transformationChainInfo() { + return transformationStages.stream().map(TransformationStage::transformationStageInfo).collect(Collectors.toList()); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java index a86c4878ab37e..56293e0363206 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; +import java.util.Objects; import java.util.function.Function; /** @@ -39,18 +40,40 @@ public class TransformationStage> implements AutoClos private final Plugin> predicatePlugin; private final Plugin> transformationPlugin; private final boolean negate; + private final String transformAlias; + private final String predicateAlias; + private final String transformVersion; + private final String predicateVersion; private final Function pluginLoaderSwapper; - TransformationStage(Plugin> transformationPlugin, Function pluginLoaderSwapper) { - this(null, false, transformationPlugin, pluginLoaderSwapper); + TransformationStage( + Plugin> transformationPlugin, + String transformAlias, + String transformVersion, + Function pluginLoaderSwapper + ) { + this(null, null, null, false, transformationPlugin, transformAlias, transformVersion, pluginLoaderSwapper); } - TransformationStage(Plugin> predicatePlugin, boolean negate, Plugin> transformationPlugin, Function pluginLoaderSwapper) { + TransformationStage( + Plugin> predicatePlugin, + String predicateAlias, + String predicateVersion, + boolean negate, + Plugin> transformationPlugin, + String transformAlias, + String transformVersion, + Function pluginLoaderSwapper + ) { this.predicatePlugin = predicatePlugin; this.negate = negate; this.transformationPlugin = transformationPlugin; this.pluginLoaderSwapper = pluginLoaderSwapper; + this.transformAlias = transformAlias; + this.predicateAlias = predicateAlias; + this.transformVersion = transformVersion; + this.predicateVersion = predicateVersion; } public Class> transformClass() { @@ -89,4 +112,32 @@ public String toString() { ", negate=" + negate + '}'; } + + public record AliasedPluginInfo(String alias, String className, String version) { + public AliasedPluginInfo { + Objects.requireNonNull(alias, "alias cannot be null"); + Objects.requireNonNull(className, "className cannot be null"); + } + } + + + public record StageInfo(AliasedPluginInfo transform, AliasedPluginInfo predicate) { + public StageInfo { + Objects.requireNonNull(transform, "transform cannot be null"); + } + } + + + public StageInfo transformationStageInfo() { + AliasedPluginInfo transformInfo = new AliasedPluginInfo( + transformAlias, + transformationPlugin.get().getClass().getName(), + transformVersion + ); + AliasedPluginInfo predicateInfo = predicatePlugin != null ? new AliasedPluginInfo( + predicateAlias, + predicatePlugin.get().getClass().getName(), predicateVersion + ) : null; + return new StageInfo(transformInfo, predicateInfo); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index a435d281a7ccd..9d959be1aef75 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -346,7 +346,7 @@ public void startConnector( } workerConnector = new WorkerConnector( connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader); - log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); + log.info("Instantiated connector {} with version {} of type {}", connName, workerConnector.connectorVersion(), connector.getClass()); workerConnector.transitionTo(initialState, onConnectorStateChange); } } catch (Throwable t) { @@ -562,6 +562,22 @@ public boolean isRunning(String connName) { return workerConnector != null && workerConnector.isRunning(); } + public String connectorVersion(String connName) { + WorkerConnector conn = connectors.get(connName); + if (conn == null) { + return null; + } + return conn.connectorVersion(); + } + + public String taskVersion(ConnectorTaskId taskId) { + WorkerTask task = tasks.get(taskId); + if (task == null) { + return null; + } + return task.taskVersion(); + } + /** * Start a sink task managed by this worker. * @@ -714,7 +730,7 @@ private boolean startTask( .withKeyConverterPlugin(metrics.wrap(keyConverter, id, true)) .withValueConverterPlugin(metrics.wrap(valueConverter, id, false)) .withHeaderConverterPlugin(metrics.wrap(headerConverter, id)) - .withClassloader(connectorLoader) + .withClassLoader(connectorLoader) .build(); workerTask.initialize(taskConfig); @@ -1818,11 +1834,12 @@ public TaskBuilder withHeaderConverterPlugin(Plugin heade return this; } - public TaskBuilder withClassloader(ClassLoader classLoader) { + public TaskBuilder withClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; return this; } + public WorkerTask build() { Objects.requireNonNull(task, "Task cannot be null"); Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); @@ -1840,10 +1857,13 @@ public WorkerTask build() { TransformationChain transformationChain = new TransformationChain<>(connectorConfig.transformationStages(plugins, id, metrics), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); + TaskPluginsMetadata taskPluginsMetadata = new TaskPluginsMetadata( + connectorClass, task, keyConverterPlugin.get(), valueConverterPlugin.get(), headerConverterPlugin.get(), transformationChain.transformationChainInfo(), plugins); + return doBuild(task, id, configState, statusListener, initialState, - connectorConfig, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, classLoader, - retryWithToleranceOperator, transformationChain, - errorHandlingMetrics, connectorClass); + connectorConfig, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, classLoader, + retryWithToleranceOperator, transformationChain, + errorHandlingMetrics, connectorClass, taskPluginsMetadata); } abstract WorkerTask doBuild( @@ -1860,7 +1880,8 @@ abstract WorkerTask doBuild( RetryWithToleranceOperator retryWithToleranceOperator, TransformationChain transformationChain, ErrorHandlingMetrics errorHandlingMetrics, - Class connectorClass + Class connectorClass, + TaskPluginsMetadata pluginsMetadata ); } @@ -1888,7 +1909,8 @@ public WorkerTask, SinkRecord> doBuild( RetryWithToleranceOperator> retryWithToleranceOperator, TransformationChain, SinkRecord> transformationChain, ErrorHandlingMetrics errorHandlingMetrics, - Class connectorClass + Class connectorClass, + TaskPluginsMetadata taskPluginsMetadata ) { SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, @@ -1902,7 +1924,7 @@ public WorkerTask, SinkRecord> doBuild( return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, consumer, classLoader, time, retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(), - () -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper()); + () -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass), taskPluginsMetadata, plugins.safeLoaderSwapper()); } } @@ -1929,7 +1951,8 @@ public WorkerTask doBuild( RetryWithToleranceOperator retryWithToleranceOperator, TransformationChain transformationChain, ErrorHandlingMetrics errorHandlingMetrics, - Class connectorClass + Class connectorClass, + TaskPluginsMetadata pluginsMetadata ) { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); @@ -1962,7 +1985,7 @@ public WorkerTask doBuild( return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, - retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper()); + retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), pluginsMetadata, plugins.safeLoaderSwapper()); } } @@ -1996,7 +2019,8 @@ public WorkerTask doBuild( RetryWithToleranceOperator retryWithToleranceOperator, TransformationChain transformationChain, ErrorHandlingMetrics errorHandlingMetrics, - Class connectorClass + Class connectorClass, + TaskPluginsMetadata pluginsMetadata ) { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); @@ -2027,7 +2051,7 @@ public WorkerTask doBuild( headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator, herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck, - () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper()); + () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), pluginsMetadata, plugins.safeLoaderSwapper()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index e2473dbbf71e3..3faf70f898c7c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -78,6 +78,7 @@ private enum State { private volatile Throwable externalFailure; private volatile boolean stopping; // indicates whether the Worker has asked the connector to stop private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown) + private final String version; private State state; private final CloseableOffsetStorageReader offsetStorageReader; @@ -97,8 +98,9 @@ public WorkerConnector(String connName, this.loader = loader; this.ctx = ctx; this.connector = connector; + this.version = connector.version(); this.state = State.INIT; - this.metrics = new ConnectorMetricsGroup(connectMetrics, AbstractStatus.State.UNASSIGNED, statusListener); + this.metrics = new ConnectorMetricsGroup(connectMetrics, AbstractStatus.State.UNASSIGNED, this.version, statusListener); this.statusListener = this.metrics; this.offsetStorageReader = offsetStorageReader; this.offsetStore = offsetStore; @@ -418,6 +420,10 @@ public final boolean isSourceConnector() { return ConnectUtils.isSourceConnector(connector); } + public String connectorVersion() { + return version; + } + protected final String connectorType() { if (isSinkConnector()) return "sink"; @@ -450,7 +456,12 @@ class ConnectorMetricsGroup implements ConnectorStatus.Listener, AutoCloseable { private final MetricGroup metricGroup; private final ConnectorStatus.Listener delegate; - public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) { + public ConnectorMetricsGroup( + ConnectMetrics connectMetrics, + AbstractStatus.State initialState, + String connectorVersion, + ConnectorStatus.Listener delegate + ) { Objects.requireNonNull(connectMetrics); Objects.requireNonNull(connector); Objects.requireNonNull(initialState); @@ -465,7 +476,7 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State metricGroup.addImmutableValueMetric(registry.connectorType, connectorType()); metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName()); - metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version()); + metricGroup.addImmutableValueMetric(registry.connectorVersion, connectorVersion); metricGroup.addValueMetric(registry.connectorStatus, now -> state.toString().toLowerCase(Locale.getDefault())); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 4b8256115ed5d..14b093c91230b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -105,6 +105,7 @@ class WorkerSinkTask extends WorkerTask, SinkReco private boolean committing; private boolean taskStopped; private final WorkerErrantRecordReporter workerErrantRecordReporter; + private final String version; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, @@ -125,9 +126,10 @@ public WorkerSinkTask(ConnectorTaskId id, WorkerErrantRecordReporter workerErrantRecordReporter, StatusBackingStore statusBackingStore, Supplier>>> errorReportersSupplier, + TaskPluginsMetadata pluginsMetadata, Function pluginLoaderSwapper) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, - retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper); + retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore, pluginsMetadata, pluginLoaderSwapper); this.workerConfig = workerConfig; this.task = task; @@ -153,6 +155,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.taskStopped = false; this.workerErrantRecordReporter = workerErrantRecordReporter; + this.version = task.version(); } @Override @@ -227,6 +230,11 @@ public void execute() { } } + @Override + public String taskVersion() { + return version; + } + protected void iteration() { final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 0806e8877355b..3ccd530be3900 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -94,12 +94,13 @@ public WorkerSourceTask(ConnectorTaskId id, StatusBackingStore statusBackingStore, Executor closeExecutor, Supplier>> errorReportersSupplier, + TaskPluginsMetadata pluginsMetadata, Function pluginLoaderSwapper) { super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, null, producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader, - time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper); + time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginsMetadata, pluginLoaderSwapper); this.committableOffsets = CommittableOffsets.EMPTY; this.submittedRecords = new SubmittedRecords(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index fa28a4e7b0ea9..1661d710a8659 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; @@ -94,9 +95,10 @@ public WorkerTask(ConnectorTaskId id, Supplier>> errorReportersSupplier, Time time, StatusBackingStore statusBackingStore, + TaskPluginsMetadata pluginsMetadata, Function pluginLoaderSwapper) { this.id = id; - this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); + this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener, pluginsMetadata); this.errorMetrics = errorMetrics; this.statusListener = taskMetricsGroup; this.loader = loader; @@ -196,6 +198,8 @@ void doStart() { protected abstract void close(); + protected abstract String taskVersion(); + protected boolean isFailed() { return failed; } @@ -397,14 +401,25 @@ TaskMetricsGroup taskMetricsGroup() { static class TaskMetricsGroup implements TaskStatus.Listener { private final TaskStatus.Listener delegateListener; private final MetricGroup metricGroup; + private final List transformationGroups = new ArrayList<>(); + private final List predicateGroups = new ArrayList<>(); private final Time time; private final StateTracker taskStateTimer; private final Sensor commitTime; private final Sensor batchSize; private final Sensor commitAttempts; + private final ConnectMetrics connectMetrics; + private final ConnectorTaskId id; public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) { + this(id, connectMetrics, statusListener, null); + } + + public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener, TaskPluginsMetadata pluginsMetadata) { delegateListener = statusListener; + this.connectMetrics = connectMetrics; + this.id = id; + time = connectMetrics.time(); taskStateTimer = new StateTracker(); ConnectMetricsRegistry registry = connectMetrics.registry(); @@ -434,6 +449,7 @@ public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskS Frequencies commitFrequencies = Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds); commitAttempts = metricGroup.sensor("offset-commit-completion"); commitAttempts.add(commitFrequencies); + addPluginInfoMetric(pluginsMetadata); } private void addRatioMetric(final State matchingState, MetricNameTemplate template) { @@ -442,8 +458,52 @@ private void addRatioMetric(final State matchingState, MetricNameTemplate templa taskStateTimer.durationRatio(matchingState, now)); } + private void addPluginInfoMetric(TaskPluginsMetadata pluginsMetadata) { + if (pluginsMetadata == null) { + return; + } + ConnectMetricsRegistry registry = connectMetrics.registry(); + metricGroup.addValueMetric(registry.taskConnectorClass, now -> pluginsMetadata.connectorClass()); + metricGroup.addValueMetric(registry.taskConnectorClassVersion, now -> pluginsMetadata.connectorVersion()); + metricGroup.addValueMetric(registry.taskConnectorType, now -> pluginsMetadata.connectorType()); + metricGroup.addValueMetric(registry.taskClass, now -> pluginsMetadata.taskClass()); + metricGroup.addValueMetric(registry.taskVersion, now -> pluginsMetadata.taskVersion()); + metricGroup.addValueMetric(registry.taskKeyConverterClass, now -> pluginsMetadata.keyConverterClass()); + metricGroup.addValueMetric(registry.taskKeyConverterVersion, now -> pluginsMetadata.keyConverterVersion()); + metricGroup.addValueMetric(registry.taskValueConverterClass, now -> pluginsMetadata.valueConverterClass()); + metricGroup.addValueMetric(registry.taskValueConverterVersion, now -> pluginsMetadata.valueConverterVersion()); + metricGroup.addValueMetric(registry.taskHeaderConverterClass, now -> pluginsMetadata.headerConverterClass()); + metricGroup.addValueMetric(registry.taskHeaderConverterVersion, now -> pluginsMetadata.headerConverterVersion()); + + if (!pluginsMetadata.transformations().isEmpty()) { + for (TransformationStage.AliasedPluginInfo entry : pluginsMetadata.transformations()) { + MetricGroup transformationGroup = connectMetrics.group(registry.transformsGroupName(), + registry.connectorTagName(), id.connector(), + registry.taskTagName(), Integer.toString(id.task()), + registry.transformsTagName(), entry.alias()); + transformationGroup.addValueMetric(registry.transformClass, now -> entry.className()); + transformationGroup.addValueMetric(registry.transformVersion, now -> entry.version()); + this.transformationGroups.add(transformationGroup); + } + } + + if (!pluginsMetadata.predicates().isEmpty()) { + for (TransformationStage.AliasedPluginInfo entry : pluginsMetadata.predicates()) { + MetricGroup predicateGroup = connectMetrics.group(registry.predicatesGroupName(), + registry.connectorTagName(), id.connector(), + registry.taskTagName(), Integer.toString(id.task()), + registry.predicateTagName(), entry.alias()); + predicateGroup.addValueMetric(registry.predicateClass, now -> entry.className()); + predicateGroup.addValueMetric(registry.predicateVersion, now -> entry.version()); + this.predicateGroups.add(predicateGroup); + } + } + } + void close() { metricGroup.close(); + transformationGroups.forEach(MetricGroup::close); + predicateGroups.forEach(MetricGroup::close); } void recordCommit(long duration, boolean success) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java index 82d9957b40db1..8d94d56204534 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java @@ -65,11 +65,13 @@ public abstract static class AbstractState { private final String state; private final String trace; private final String workerId; + private final String version; - public AbstractState(String state, String workerId, String trace) { + public AbstractState(String state, String workerId, String trace, String version) { this.state = state; this.workerId = workerId; this.trace = trace; + this.version = version; } @JsonProperty @@ -87,14 +89,22 @@ public String workerId() { public String trace() { return trace; } + + @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = PluginInfo.NoVersionFilter.class) + public String version() { + return version; + } } public static class ConnectorState extends AbstractState { + @JsonCreator public ConnectorState(@JsonProperty("state") String state, @JsonProperty("worker_id") String worker, - @JsonProperty("msg") String msg) { - super(state, worker, msg); + @JsonProperty("msg") String msg, + @JsonProperty("version") String version) { + super(state, worker, msg, version); } } @@ -105,8 +115,9 @@ public static class TaskState extends AbstractState implements Comparable status) { struct.put(TRACE_KEY_NAME, status.trace()); struct.put(WORKER_ID_KEY_NAME, status.workerId()); struct.put(GENERATION_KEY_NAME, status.generation()); + struct.put(VERSION_KEY_NAME, status.version()); return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0, struct); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 704a1f1ff40c0..f300aa7e05724 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -967,7 +967,7 @@ private void createWorkerTask(Converter keyConverter, Converter valueConverter, taskId, sourceTask, statusListener, TargetState.STARTED, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, workerTransactionContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, - statusBackingStore, Runnable::run, errorReportersSupplier, TestPlugins.noOpLoaderSwap()) { + statusBackingStore, Runnable::run, errorReportersSupplier, null, TestPlugins.noOpLoaderSwap()) { @Override protected void prepareToInitializeTask() { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 70edfb0f59877..340585f0fd66e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -426,9 +426,9 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator oo.put("schemas.enable", "false"); converter.configure(oo); - Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, ""); + Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, "test"); TransformationChain, SinkRecord> sinkTransforms = - new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); + new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin, "test", null, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); Plugin keyConverterPlugin = metrics.wrap(converter, taskId, true); Plugin valueConverterPlugin = metrics.wrap(converter, taskId, false); @@ -438,7 +438,7 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator ClusterConfigState.EMPTY, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator, workerErrantRecordReporter, - statusBackingStore, () -> errorReporters, TestPlugins.noOpLoaderSwap()); + statusBackingStore, () -> errorReporters, null, TestPlugins.noOpLoaderSwap()); } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, List> errorReporters) { @@ -462,9 +462,9 @@ private Converter badConverter() { private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, List> errorReporters, Converter converter) { - Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, ""); + Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, "test"); TransformationChain sourceTransforms = new TransformationChain<>(singletonList( - new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); + new TransformationStage<>(transformationPlugin, "test", null, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); Plugin keyConverterPlugin = metrics.wrap(converter, taskId, true); Plugin valueConverterPlugin = metrics.wrap(converter, taskId, false); @@ -477,7 +477,7 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat offsetReader, offsetWriter, offsetStore, workerConfig, ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, - statusBackingStore, Runnable::run, () -> errorReporters, TestPlugins.noOpLoaderSwap())); + statusBackingStore, Runnable::run, () -> errorReporters, null, TestPlugins.noOpLoaderSwap())); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index a6375398d292b..f45ec27b46e2d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -198,6 +198,7 @@ public void setup(boolean enableTopicCreation) throws Exception { Thread.sleep(10); return result; }); + when(sourceTask.version()).thenReturn(null); } @AfterEach @@ -222,8 +223,8 @@ public void teardown() throws Exception { } verify(statusBackingStore, MockitoUtils.anyTimes()).getTopic(any(), any()); - verify(offsetStore, MockitoUtils.anyTimes()).primaryOffsetsTopic(); + verify(sourceTask).version(); verifyNoMoreInteractions(statusListener, producer, sourceTask, admin, offsetWriter, statusBackingStore, offsetStore, preProducerCheck, postProducerCheck); if (metrics != null) metrics.stop(); @@ -284,7 +285,7 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore, - sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap()); + sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList, null, TestPlugins.noOpLoaderSwap()); } @ParameterizedTest diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java index 8d6f54ce2581b..d0f3f974c635f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java @@ -35,17 +35,15 @@ public class RestartPlanTest { @Test public void testRestartPlan() { ConnectorStateInfo.ConnectorState state = new ConnectorStateInfo.ConnectorState( - AbstractStatus.State.RESTARTING.name(), - "foo", - null + AbstractStatus.State.RESTARTING.name(), "foo", null, null ); List tasks = new ArrayList<>(); - tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null)); - tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null)); - tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(), "worker1", null)); - tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(), "worker1", null)); - tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(), "worker1", null)); - tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(), "worker1", null)); + tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null, null)); + tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null, null)); + tasks.add(new TaskState(3, AbstractStatus.State.RESTARTING.name(), "worker1", null, null)); + tasks.add(new TaskState(4, AbstractStatus.State.DESTROYED.name(), "worker1", null, null)); + tasks.add(new TaskState(5, AbstractStatus.State.RUNNING.name(), "worker1", null, null)); + tasks.add(new TaskState(6, AbstractStatus.State.RUNNING.name(), "worker1", null, null)); ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); @@ -61,13 +59,11 @@ public void testRestartPlan() { @Test public void testNoRestartsPlan() { ConnectorStateInfo.ConnectorState state = new ConnectorStateInfo.ConnectorState( - AbstractStatus.State.RUNNING.name(), - "foo", - null + AbstractStatus.State.RUNNING.name(), "foo", null, null ); List tasks = new ArrayList<>(); - tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null)); - tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null)); + tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null, null)); + tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null, null)); ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); RestartPlan restartPlan = new RestartPlan(restartRequest, connectorStateInfo); @@ -81,13 +77,11 @@ public void testNoRestartsPlan() { @Test public void testRestartsOnlyConnector() { ConnectorStateInfo.ConnectorState state = new ConnectorStateInfo.ConnectorState( - AbstractStatus.State.RESTARTING.name(), - "foo", - null + AbstractStatus.State.RESTARTING.name(), "foo", null, null ); List tasks = new ArrayList<>(); - tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null)); - tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null)); + tasks.add(new TaskState(1, AbstractStatus.State.RUNNING.name(), "worker1", null, null)); + tasks.add(new TaskState(2, AbstractStatus.State.PAUSED.name(), "worker1", null, null)); ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, state, tasks, ConnectorType.SOURCE); RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, false, true); RestartPlan restartPlan = new RestartPlan(restartRequest, connectorStateInfo); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java index e2791a63f7b26..959c5d2ac01f8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java @@ -61,8 +61,12 @@ private void applyAndAssert(boolean predicateResult, boolean negate, SourceRecor } TransformationStage stage = new TransformationStage<>( predicatePlugin, + "testPredicate", + null, negate, transformationPlugin, + "testTransformation", + null, TestPlugins.noOpLoaderSwap() ); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 2607ee8b03b3c..539960badec03 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -229,7 +229,7 @@ private void createTask(ConnectorTaskId taskId, SinkTask task, TaskStatus.Listen taskId, task, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, connectMetrics, keyConverterPlugin, valueConverterPlugin, errorMetrics, headerConverterPlugin, transformationChain, consumer, loader, time, - retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier, TestPlugins.noOpLoaderSwap()); + retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier, null, TestPlugins.noOpLoaderSwap()); } @AfterEach diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 740211180984a..6c2c593c35b8a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -183,7 +183,7 @@ public void setup() { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore, - Collections::emptyList, TestPlugins.noOpLoaderSwap()); + Collections::emptyList, null, TestPlugins.noOpLoaderSwap()); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 23fb3618f8191..d2fd923fdb93b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -255,7 +255,7 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList, TestPlugins.noOpLoaderSwap()); + retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList, null, TestPlugins.noOpLoaderSwap()); } @ParameterizedTest diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index eae9c96998b2c..fa445454fd088 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -300,7 +300,7 @@ public TestWorkerTask(ConnectorTaskId id, Listener statusListener, TargetState i Supplier>> errorReporterSupplier, Time time, StatusBackingStore statusBackingStore) { super(id, statusListener, initialState, loader, connectMetrics, errorHandlingMetrics, - retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap()); + retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore, null, TestPlugins.noOpLoaderSwap()); } @Override @@ -318,6 +318,11 @@ protected void execute() { @Override protected void close() { } + + @Override + protected String taskVersion() { + return null; + } } protected void assertFailedMetric(TaskMetricsGroup metricsGroup) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 2f7af629f0665..e169a952dc9cb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -63,6 +63,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -805,11 +806,11 @@ public void testTaskStatusMetricsStatuses(boolean enableTopicCreation) { // Each time we check the task metrics, the worker will call the herder when(herder.taskStatus(TASK_ID)).thenReturn( - new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg"), - new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg"), - new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg"), - new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg"), - new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg") + new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg", null), + new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg", null), + new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg", null), + new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg", null), + new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg", null) ); worker = new Worker(WORKER_ID, @@ -3072,6 +3073,7 @@ private void mockVersionedTaskIsolation(Class connectorClas when(plugins.pluginLoader(connectorClass.getName(), range)).thenReturn(pluginLoader); when(plugins.connectorClass(connectorClass.getName(), range)).thenReturn((Class) connectorClass); when(plugins.newTask(taskClass)).thenReturn(task); + when(plugins.safeLoaderSwapper()).thenReturn(TestPlugins.noOpLoaderSwap()); when(task.version()).thenReturn(range == null ? "unknown" : range.toString()); } @@ -3087,7 +3089,7 @@ private void verifyVersionedTaskIsolation(Class connectorCl verify(plugins).pluginLoader(connectorClass.getName(), range); verify(plugins).connectorClass(connectorClass.getName(), range); verify(plugins).newTask(taskClass); - verify(task).version(); + verify(task, times(2)).version(); } private void mockExecutorRealSubmit(Class runnableClass) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index c91fa1017ce88..4131aa01dd2b3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -195,8 +195,12 @@ public static > TransformationChain buildTra when(transformationPlugin.get()).thenReturn(transformation); TransformationStage stage = new TransformationStage<>( predicatePlugin, + "testPredicate", + null, false, transformationPlugin, + "testTransformation", + null, TestPlugins.noOpLoaderSwap()); TransformationChain realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator); TransformationChain transformationChainRetriableException = Mockito.spy(realTransformationChainRetriableException); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 73eaf93961e41..18589d66855a5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1348,6 +1348,7 @@ public void testDoRestartConnectorAndTasksOnlyConnector() { return true; }).when(worker).startConnector(eq(CONN1), any(), any(), eq(herder), any(), stateCallback.capture()); doNothing().when(member).wakeup(); + when(worker.connectorVersion(any())).thenReturn(null); herder.doRestartConnectorAndTasks(restartRequest); @@ -1378,6 +1379,7 @@ public void testDoRestartConnectorAndTasksOnlyTasks() { doNothing().when(statusBackingStore).put(eq(status)); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), any())).thenReturn(true); + when(worker.taskVersion(any())).thenReturn(null); herder.doRestartConnectorAndTasks(restartRequest); @@ -1419,6 +1421,8 @@ public void testDoRestartConnectorAndTasksBoth() { doNothing().when(statusBackingStore).put(eq(taskStatus)); when(worker.startSourceTask(eq(TASK0), any(), any(), any(), eq(herder), any())).thenReturn(true); + when(worker.taskVersion(any())).thenReturn(null); + when(worker.connectorVersion(any())).thenReturn(null); herder.doRestartConnectorAndTasks(restartRequest); @@ -1670,6 +1674,7 @@ public void testConnectorConfigUpdateFailedTransformation() { when(member.memberId()).thenReturn("member"); when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); when(worker.isSinkConnector(CONN1)).thenReturn(Boolean.TRUE); + when(worker.connectorVersion(CONN1)).thenReturn(null); WorkerConfigTransformer configTransformer = mock(WorkerConfigTransformer.class); // join diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 9dfead77220f6..e38cd2da60da6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -688,9 +688,7 @@ public void testRestartConnectorAndTasksRebalanceNeeded() { @Test public void testRestartConnectorAndTasksRequestAccepted() throws Throwable { ConnectorStateInfo.ConnectorState state = new ConnectorStateInfo.ConnectorState( - AbstractStatus.State.RESTARTING.name(), - "foo", - null + AbstractStatus.State.RESTARTING.name(), "foo", null, null ); ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, state, Collections.emptyList(), ConnectorType.SOURCE); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 95d48beac3faf..1c285f138001f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -280,6 +280,7 @@ public void testDestroyConnector() throws Exception { expectConfigValidation(SourceSink.SOURCE, config); when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList()); + when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); @@ -533,6 +534,7 @@ public void testRestartConnectorAndTasksOnlyConnector() throws Exception { expectConfigValidation(SourceSink.SINK, connectorConfig); doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); + when(worker.connectorVersion(CONNECTOR_NAME)).thenReturn(null); mockStartConnector(connectorConfig, null, TargetState.STARTED, null); @@ -563,6 +565,7 @@ public void testRestartConnectorAndTasksOnlyTasks() throws Exception { doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest); expectAdd(SourceSink.SINK); + when(worker.taskVersion(any())).thenReturn(null); Map connectorConfig = connectorConfig(SourceSink.SINK); expectConfigValidation(SourceSink.SINK, connectorConfig); @@ -616,6 +619,8 @@ public void testRestartConnectorAndTasksBoth() throws Exception { ArgumentCaptor taskStatus = ArgumentCaptor.forClass(TaskStatus.class); expectAdd(SourceSink.SINK, false); + when(worker.connectorVersion(any())).thenReturn(null); + when(worker.taskVersion(any())).thenReturn(null); Map connectorConfig = connectorConfig(SourceSink.SINK); expectConfigValidation(SourceSink.SINK, connectorConfig); @@ -1124,6 +1129,7 @@ private void expectAdd(SourceSink sourceSink, } when(worker.isRunning(CONNECTOR_NAME)).thenReturn(true); + if (sourceSink == SourceSink.SOURCE) { when(worker.isTopicCreationEnabled()).thenReturn(true); } @@ -1152,6 +1158,7 @@ private void expectAdd(SourceSink sourceSink, transformer); if (sourceSink.equals(SourceSink.SOURCE) && mockStartSourceTask) { + when(worker.taskVersion(any())).thenReturn(null); when(worker.startSourceTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED)).thenReturn(true); }