Skip to content

Commit 0f2523f

Browse files
[fix][fn] Throw 404 RestException when state key not found (#21921)
1 parent 22838ea commit 0f2523f

File tree

2 files changed

+20
-0
lines changed
  • pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api
  • tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions

2 files changed

+20
-0
lines changed

Diff for: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,8 @@ public FunctionState getFunctionState(final String tenant,
11691169
} else {
11701170
return new FunctionState(key, null, buf.array(), number, null);
11711171
}
1172+
} catch (RestException e) {
1173+
throw e;
11721174
} catch (Throwable e) {
11731175
log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
11741176
tenant, namespace, functionName, key, e);

Diff for: tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import static org.testng.Assert.assertNotEquals;
2626
import static org.testng.Assert.assertNull;
2727
import static org.testng.Assert.assertTrue;
28+
import static org.testng.Assert.expectThrows;
2829
import static org.testng.Assert.fail;
2930
import com.google.common.base.Utf8;
3031
import java.util.Base64;
3132
import lombok.Cleanup;
3233
import lombok.extern.slf4j.Slf4j;
3334
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.pulsar.client.admin.PulsarAdmin;
36+
import org.apache.pulsar.client.admin.PulsarAdminException;
3537
import org.apache.pulsar.client.api.Consumer;
3638
import org.apache.pulsar.client.api.Message;
3739
import org.apache.pulsar.client.api.Producer;
@@ -144,6 +146,14 @@ public void testSourceState() throws Exception {
144146
assertEquals(functionState.getStringValue(), "val1");
145147
}
146148

149+
// query a non-exist key should get a 404 error
150+
{
151+
PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
152+
admin.functions().getFunctionState("public", "default", sourceName, "non-exist");
153+
});
154+
assertEquals(e.getStatusCode(), 404);
155+
}
156+
147157
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
148158
FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
149159
assertTrue(functionState.getStringValue().matches("val1-.*"));
@@ -186,6 +196,14 @@ public void testSinkState() throws Exception {
186196
assertEquals(functionState.getStringValue(), "val1");
187197
}
188198

199+
// query a non-exist key should get a 404 error
200+
{
201+
PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
202+
admin.functions().getFunctionState("public", "default", sinkName, "non-exist");
203+
});
204+
assertEquals(e.getStatusCode(), 404);
205+
}
206+
189207
for (int i = 0; i < numMessages; i++) {
190208
producer.send("foo");
191209
}

0 commit comments

Comments
 (0)