Skip to content

Fix connector state transitions from FAILED to PAUSED or STOPPED#12750

Open
guancioul wants to merge 6 commits into
strimzi:mainfrom
guancioul:fix-failed-connector-state-transition
Open

Fix connector state transitions from FAILED to PAUSED or STOPPED#12750
guancioul wants to merge 6 commits into
strimzi:mainfrom
guancioul:fix-failed-connector-state-transition

Conversation

@guancioul
Copy link
Copy Markdown
Contributor

@guancioul guancioul commented May 18, 2026

Type of change

  • Bugfix

Description

Close #12542

When a Kafka connector enters the FAILED state, the operator was previously unable to transition it to PAUSED or STOPPED. The state-machine switch in AbstractConnectOperator only handled transitions from RUNNING and PAUSED states, so any request to pause or stop a FAILED connector was silently ignored (falling through to the default branch).

This PR adds a FAILED case to the connector state transition logic so that:

  • FAILED → PAUSED: calls the Kafka Connect REST API pause endpoint
  • FAILED → STOPPED: calls the Kafka Connect REST API stop endpoint

Changes:

  • AbstractConnectOperator.java: Added case "FAILED" to the state-machine switch to handle transitions from FAILED to PAUSED or STOPPED.
  • ConnectorMockTest.java: Added two new tests (testConnectorFailedToPaused, testConnectorFailedToStopped) to verify the correct REST API calls are made. Also refactored ConnectorStatus record to use String instead of ConnectorState enum so that the mock can simulate arbitrary states (e.g., FAILED) that are not part of the target-state enum.

Checklist

  • Write tests
  • Make sure all tests pass
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md

guancioul added 2 commits May 18, 2026 16:59
Signed-off-by: guancioul <guancioul@gmail.com>
Signed-off-by: guancioul <guancioul@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented May 18, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 75.16%. Comparing base (151b17d) to head (f60684a).

Additional details and impacted files
@@            Coverage Diff            @@
##               main   #12750   +/-   ##
=========================================
  Coverage     75.16%   75.16%           
- Complexity     6458     6460    +2     
=========================================
  Files           346      346           
  Lines         24329    24337    +8     
  Branches       3120     3122    +2     
=========================================
+ Hits          18287    18294    +7     
  Misses         4808     4808           
- Partials       1234     1235    +1     
Files with missing lines Coverage Δ
...ter/operator/assembly/AbstractConnectOperator.java 87.82% <100.00%> (+0.52%) ⬆️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@guancioul guancioul marked this pull request as ready for review May 18, 2026 11:40
@scholzj scholzj requested a review from katheris May 18, 2026 12:30
@scholzj scholzj added this to the 1.1.0 milestone May 18, 2026
@scholzj
Copy link
Copy Markdown
Member

scholzj commented May 18, 2026

/gha run pipeline=regression

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 18, 2026

⏳ System test verification started: link

The following 6 job(s) will be executed:

  • regression-brokers-and-security-amd64 (cncf-ubuntu-8-32-x86)
  • regression-operators-amd64 (cncf-ubuntu-8-32-x86)
  • regression-operands-amd64 (cncf-ubuntu-8-32-x86)
  • regression-brokers-and-security-arm64 (cncf-ubuntu-8-32-arm)
  • regression-operators-arm64 (cncf-ubuntu-8-32-arm)
  • regression-operands-arm64 (cncf-ubuntu-8-32-arm)

Tests will start after successful build completion.

@github-actions
Copy link
Copy Markdown

❌ System test verification failed: link

@github-actions
Copy link
Copy Markdown

🎉 System test verification passed: link

Copy link
Copy Markdown
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM. But should be reviewed by Kate as the SME.

Comment thread CHANGELOG.md Outdated

## 1.1.0

* Fix connector state transitions from `FAILED` to `PAUSED` or `STOPPED`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this would be more clear and more positive (as the CHANGELOG is really about new features and major changes rather than bugfixes).

Suggested change
* Fix connector state transitions from `FAILED` to `PAUSED` or `STOPPED`
* Allow failed `KafkaConnectors` to be paused or stopped

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment, I will make the CHANGELOG clearer and more positive.

Signed-off-by: guancioul <guancioul@gmail.com>
Copy link
Copy Markdown
Member

@katheris katheris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guancioul the changes look generally good, however I did a check and it looks like you can't pause a failed connector. When I tried with a local Kafka Connect I got back:

Failed to transition connector to target state (org.apache.kafka.connect.runtime.distributed.DistributedHerder:758)
org.apache.kafka.connect.errors.ConnectException: WorkerConnector{id=echo-sink} Cannot transition connector to PAUSED since it has failed

So I think to save the operator making a request that won't succeed we should instead in that case log a warning and add that to the conditions, but then carry on so that the connector can be restarted, so we would have something like:

case "FAILED" -> {
    if (targetState == ConnectorState.PAUSED) {
        String message = "Connector " + connectorName + "cannot be paused since it is in failed state.";
        LOGGER.warnCr(reconciliation, message);
        future = Future.succeededFuture(conditions);
    } else if (targetState == ConnectorState.STOPPED) {
        LOGGER.infoCr(reconciliation, "Stopping connector {}", connectorName);
        future = Future.fromCompletionStage(apiClient.stop(reconciliation, host, port, connectorName));
    }
}

@ppatierno @scholzj what do you think?

@guancioul
Copy link
Copy Markdown
Contributor Author

Sorry I missed this, I should have verified the actual Kafka Connect behavior before implementing. Thanks for catching it!
I've separated the FAILED case from RUNNING in the switch statement. When the connector is in FAILED state and the target state is PAUSED, instead of calling pause() (which would fail), we now log a warning and add it to the conditions so the user is aware, while still allowing the connector to be restarted. Stopping a failed connector still works as expected.
I also verified the behavior locally with Kafka Connect 4.2.0:

FAILED → STOP

[2026-05-22 02:38:47,314] INFO [0:0:0:0:0:0:0:1] - - [21/May/2026:18:38:47 +0000] "PUT /connectors/test-connector/stop HTTP/1.1" 204 0 "" "curl/8.14.1" 30 (org.apache.kafka.connect.runtime.rest.RestServer:57)

FAILED → PAUSE

[2026-05-22 02:34:22,450] INFO [0:0:0:0:0:0:0:1] - - [21/May/2026:18:34:22 +0000] "PUT /connectors/test-connector/pause HTTP/1.1" 202 0 "" "curl/8.14.1" 19 (org.apache.kafka.connect.runtime.rest.RestServer:57)
[2026-05-22 02:34:22,452] INFO Setting connector test-connector state to PAUSED (org.apache.kafka.connect.runtime.Worker:1192)
[2026-05-22 02:34:22,453] ERROR [test-connector|worker] [Worker clientId=connect-127.0.1.1:8083, groupId=connect-cluster] Failed to transition connector to target state (org.apache.kafka.connect.runtime.distributed.DistributedHerder:756)
org.apache.kafka.connect.errors.ConnectException: WorkerConnector{id=test-connector} Cannot transition connector to PAUSED since it has failed

As confirmed, FAILED → PAUSE behaves exactly as @katheris described — the request returns 202 but actually fails in the background with Cannot transition connector to PAUSED since it has failed. And FAILED → STOP works as expected with a 204 response.

Signed-off-by: guancioul <guancioul@gmail.com>
@guancioul guancioul requested a review from katheris May 23, 2026 05:37
String message = "Connector " + connectorName + " cannot be paused since it is in failed state.";
LOGGER.warnCr(reconciliation, message);
conditions.add(StatusUtils.buildWarningCondition("UpdateConnectorState", message));
return Future.succeededFuture(conditions);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with this approach @katheris

@scholzj
Copy link
Copy Markdown
Member

scholzj commented May 25, 2026

TBH, I'm not sure I follow the Kafka Connect logic here:

  • It returns 20x but fails internally? Is that a Kafka bug? Should we treat it as success? What happens when the Connect cluster is restarted?
  • Assuming we want to refelct this, why is warning the right solution rather then failure @katheris?

@guancioul
Copy link
Copy Markdown
Contributor Author

guancioul commented May 26, 2026

After checking the Kafka Connect source code, restart and stop both use completeOrForwardRequest with a FutureCallback, which blocks until the operation completes. pause, however, calls herder.pauseConnector() and immediately returns Response.accepted() without waiting for the actual state transition. This is why the API returns 202 even when the internal transition fails silently. This looks like an API design inconsistency within Kafka Connect itself rather than a Strimzi issue. @katheris do you have more context on this?

    @POST
    @Path("/{connector}/restart")
    @Operation(summary = "Restart the specified connector")
    public Response restartConnector(...) throws Throwable {
        ...
        FutureCallback<ConnectorStateInfo> cb = new FutureCallback<>();
        ...
        ConnectorStateInfo stateInfo = requestHandler.completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<>() {
        }, new IdentityTranslator<>(), forward);
        return Response.accepted().entity(stateInfo).build();
    }

    @PUT
    @Path("/{connector}/stop")
    @Operation(summary = "Stop the specified connector",
               description = "This operation is idempotent and has no effects if the connector is already stopped")
    public void stopConnector(...) throws Throwable {
        FutureCallback<Void> cb = new FutureCallback<>();
        herder.stopConnector(connector, cb);
        requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/stop", "PUT", headers, null, forward);
    }

ref: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L272

    @PUT
    @Path("/{connector}/pause")
    @Operation(summary = "Pause the specified connector",
               description = "This operation is idempotent and has no effects if the connector is already paused")
    public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) {
        herder.pauseConnector(connector);
        return Response.accepted().build();
    }

ref: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L296

Signed-off-by: Kuan-Hao (Michael) Lai <42037371+guancioul@users.noreply.github.com>
@snyk-io
Copy link
Copy Markdown

snyk-io Bot commented May 26, 2026

Snyk checks have passed. No issues have been found so far.

Status Scan Engine Critical High Medium Low Total (0)
Open Source Security 0 0 0 0 0 issues
Licenses 0 0 0 0 0 issues
Code Security 0 0 0 0 0 issues

💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse.

Signed-off-by: Kuan-Hao (Michael) Lai <42037371+guancioul@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: KafkaConnector with state stopped is not stopped when current connector state is FAILED

4 participants