|
15 | 15 | import org.junit.jupiter.api.BeforeEach; |
16 | 16 | import org.junit.jupiter.api.Test; |
17 | 17 |
|
| 18 | +import java.util.Collections; |
18 | 19 | import java.util.Map; |
| 20 | +import java.util.concurrent.ExecutionException; |
19 | 21 |
|
20 | 22 | import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; |
21 | 23 | import static com.github.tomakehurst.wiremock.client.WireMock.get; |
|
26 | 28 | import static org.hamcrest.Matchers.allOf; |
27 | 29 | import static org.hamcrest.Matchers.containsString; |
28 | 30 | import static org.hamcrest.Matchers.hasEntry; |
| 31 | +import static org.hamcrest.Matchers.instanceOf; |
29 | 32 | import static org.hamcrest.Matchers.is; |
30 | 33 | import static org.junit.jupiter.api.Assertions.assertThrows; |
31 | 34 |
|
@@ -86,6 +89,45 @@ public void testFeatureCompletionWithWellFormattedError() { |
86 | 89 | .whenComplete((res, ex) -> assertThat(ex.getMessage(), containsString("This is the error")))::join); |
87 | 90 | } |
88 | 91 |
|
| 92 | + @Test |
| 93 | + public void testStatusDoesNotTreatOkBodyAsErrorMessage() throws Exception { |
| 94 | + // Poll /status for 404 only; 200 with connector status is normal until removal. |
| 95 | + String statusBody = "{\"name\":\"my-connector\",\"connector\":{\"state\":\"RUNNING\"}," |
| 96 | + + "\"tasks\":[]}"; |
| 97 | + server.stubFor(get(urlPathMatching(".*/connectors/my-connector/status")) |
| 98 | + .willReturn(aResponse() |
| 99 | + .withStatus(200) |
| 100 | + .withBody(statusBody))); |
| 101 | + |
| 102 | + KafkaConnectApi api = new KafkaConnectApiImpl(); |
| 103 | + |
| 104 | + ExecutionException ex = assertThrows(ExecutionException.class, |
| 105 | + () -> api.status( |
| 106 | + Reconciliation.DUMMY_RECONCILIATION, "127.0.0.1", server.port(), |
| 107 | + "my-connector", Collections.singleton(404)) |
| 108 | + .get()); |
| 109 | + assertThat(ex.getCause(), instanceOf(ConnectRestException.class)); |
| 110 | + assertThat(ex.getCause().getMessage(), containsString("Unexpected HTTP status code 200")); |
| 111 | + } |
| 112 | + |
| 113 | + @Test |
| 114 | + public void testStatusWithValidErrorBody() throws Exception { |
| 115 | + server.stubFor(get(urlPathMatching(".*/connectors/c/status")) |
| 116 | + .willReturn(aResponse() |
| 117 | + .withStatus(500) |
| 118 | + .withBody("{\"message\": \"Connect cluster error\"}"))); |
| 119 | + |
| 120 | + KafkaConnectApi api = new KafkaConnectApiImpl(); |
| 121 | + |
| 122 | + ExecutionException ex = assertThrows(ExecutionException.class, |
| 123 | + () -> api.status( |
| 124 | + Reconciliation.DUMMY_RECONCILIATION, "127.0.0.1", server.port(), |
| 125 | + "c", Collections.singleton(200)) |
| 126 | + .get()); |
| 127 | + assertThat(ex.getCause(), instanceOf(ConnectRestException.class)); |
| 128 | + assertThat(ex.getCause().getMessage(), containsString("Connect cluster error")); |
| 129 | + } |
| 130 | + |
89 | 131 | @Test |
90 | 132 | public void testListConnectLoggersWithLevel() throws Exception { |
91 | 133 | server.stubFor(get(urlPathMatching(".*")) |
|
0 commit comments