Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1056,20 +1056,59 @@ private void processTaskConfigRecord(ConnectorTaskId taskId, SchemaAndValue valu
private void processTasksCommitRecord(String connectorName, SchemaAndValue value) {
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// KAFKA-17676 fix: Check if we have deferred task updates or if connector was previously applied
// This helps distinguish between:
// 1. Connector was truly deleted (no deferred updates, not previously applied)
// 2. Connector config was compacted but connector is still active (has deferred updates or was applied)
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
boolean hasDeferredUpdates = deferred != null && !deferred.isEmpty();
boolean wasPreviouslyApplied = appliedConnectorConfigs.containsKey(connectorName);
AppliedConnectorConfig previousAppliedConfig = appliedConnectorConfigs.get(connectorName);

// Edge case: connector was deleted before these task configs were published,
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
Map<String, String> appliedConnectorConfig = connectorConfigs.get(connectorName);
if (appliedConnectorConfig == null) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
+ "and that log compaction has since removed any trace of its previous configurations "
+ "from the config topic",
connectorName
);
return;
// KAFKA-17676: Only ignore if connector was truly deleted (no deferred updates and not previously applied)
if (!hasDeferredUpdates && !wasPreviouslyApplied) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
+ "and that log compaction has since removed any trace of its previous configurations "
+ "from the config topic",
connectorName
);
return;
} else {
// KAFKA-17676: Connector config was compacted but connector is still active
// Use the previously applied config if available, otherwise mark as inconsistent
if (previousAppliedConfig != null) {
appliedConnectorConfig = previousAppliedConfig.transformedConfig(configTransformer);
if (appliedConnectorConfig != null) {
connectorConfigs.put(connectorName, appliedConnectorConfig);
log.warn(
"Connector config for {} was compacted but using previously applied config. "
+ "Connector may need to be reconfigured if config has changed.",
connectorName
);
}
}
if (appliedConnectorConfig == null) {
log.warn(
"Connector config for {} was compacted but connector appears to still be active "
+ "(has deferred updates: {}, was previously applied: {}). "
+ "Marking as inconsistent - connector will need to be reconfigured.",
connectorName, hasDeferredUpdates, wasPreviouslyApplied
);
inconsistent.add(connectorName);
if (deferred != null) {
deferred.clear();
}
return;
}
}
}

// Apply any outstanding deferred task updates for the given connector. Note that just because we
Expand Down Expand Up @@ -1097,7 +1136,7 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value
log.error("Ignoring connector tasks configuration commit for connector '{}' because it is in the wrong format: {}", connectorName, className(value.value()));
return;
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
// Note: 'deferred' already retrieved at top of method for KAFKA-17676 fix

@SuppressWarnings("unchecked")
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
Expand All @@ -1124,14 +1163,16 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value
connectorName,
new AppliedConnectorConfig(appliedConnectorConfig)
);

// Only update task count when we actually apply configs
// This prevents advertising tasks we don't have configs for
connectorTaskCounts.put(connectorName, newTaskCount);
}
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
// want any of these outdated configs
if (deferred != null)
deferred.clear();

connectorTaskCounts.put(connectorName, newTaskCount);
}

// If task configs appear after the latest task count record, the connector needs a new round of zombie fencing
Expand Down