Skip to content

Commit 8025763

Browse files
jiangpengchengTechnoboy-
authored andcommitted
[fix][fn] Add missing version field back to querystate API (#21966)
1 parent f9e0237 commit 8025763

File tree

9 files changed

+165
-13
lines changed

9 files changed

+165
-13
lines changed

pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java

+27
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,31 @@ public interface ByteBufferStateStore extends StateStore {
7373
*/
7474
CompletableFuture<ByteBuffer> getAsync(String key);
7575

76+
/**
77+
* Retrieve the StateValue for the key.
78+
*
79+
* @param key name of the key
80+
* @return the StateValue.
81+
*/
82+
default StateValue getStateValue(String key) {
83+
return getStateValueAsync(key).join();
84+
}
85+
86+
/**
87+
* Retrieve the StateValue for the key, but don't wait for the operation to be completed.
88+
*
89+
* @param key name of the key
90+
* @return the StateValue.
91+
*/
92+
default CompletableFuture<StateValue> getStateValueAsync(String key) {
93+
return getAsync(key).thenApply(val -> {
94+
if (val != null && val.remaining() >= 0) {
95+
byte[] data = new byte[val.remaining()];
96+
val.get(data);
97+
return new StateValue(data, null, null);
98+
} else {
99+
return null;
100+
}
101+
});
102+
}
76103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.api.state;
20+
21+
import lombok.AllArgsConstructor;
22+
import lombok.Getter;
23+
24+
@Getter
25+
@AllArgsConstructor
26+
public class StateValue {
27+
private final byte[] value;
28+
private final Long version;
29+
private final Boolean isNumber;
30+
}

pulsar-functions/api-java/src/main/resources/findbugsExclude.xml

+9
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
<Method name="getSchema"/>
3030
<Bug pattern="EI_EXPOSE_REP"/>
3131
</Match>
32+
<Match>
33+
<Class name="org.apache.pulsar.functions.api.state.StateValue"/>
34+
<Method name="getValue"/>
35+
<Bug pattern="EI_EXPOSE_REP"/>
36+
</Match>
3237
<Match>
3338
<Class name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
3439
<Method name="properties"/>
@@ -39,4 +44,8 @@
3944
<Method name="schema"/>
4045
<Bug pattern="EI_EXPOSE_REP2"/>
4146
</Match>
47+
<Match>
48+
<Class name="org.apache.pulsar.functions.api.state.StateValue"/>
49+
<Bug pattern="EI_EXPOSE_REP2"/>
50+
</Match>
4251
</FindBugsFilter>

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java

+30
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.bookkeeper.api.kv.Table;
2929
import org.apache.bookkeeper.api.kv.options.Options;
3030
import org.apache.pulsar.functions.api.StateStoreContext;
31+
import org.apache.pulsar.functions.api.state.StateValue;
3132
import org.apache.pulsar.functions.utils.FunctionCommon;
3233

3334
/**
@@ -190,4 +191,33 @@ public ByteBuffer get(String key) {
190191
throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
191192
}
192193
}
194+
195+
@Override
196+
public StateValue getStateValue(String key) {
197+
try {
198+
return result(getStateValueAsync(key));
199+
} catch (Exception e) {
200+
throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
201+
}
202+
}
203+
204+
@Override
205+
public CompletableFuture<StateValue> getStateValueAsync(String key) {
206+
return table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
207+
data -> {
208+
try {
209+
if (data != null && data.value() != null && data.value().readableBytes() >= 0) {
210+
byte[] result = new byte[data.value().readableBytes()];
211+
data.value().readBytes(result);
212+
return new StateValue(result, data.version(), data.isNumber());
213+
}
214+
return null;
215+
} finally {
216+
if (data != null) {
217+
ReferenceCountUtil.safeRelease(data);
218+
}
219+
}
220+
}
221+
);
222+
}
193223
}

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323
import java.util.concurrent.CompletableFuture;
2424
import org.apache.pulsar.functions.api.StateStoreContext;
25+
import org.apache.pulsar.functions.api.state.StateValue;
2526
import org.apache.pulsar.metadata.api.MetadataCache;
2627
import org.apache.pulsar.metadata.api.MetadataStore;
2728

@@ -111,6 +112,20 @@ public CompletableFuture<ByteBuffer> getAsync(String key) {
111112
.orElse(null));
112113
}
113114

115+
@Override
116+
public StateValue getStateValue(String key) {
117+
return getStateValueAsync(key).join();
118+
}
119+
120+
@Override
121+
public CompletableFuture<StateValue> getStateValueAsync(String key) {
122+
return store.get(getPath(key))
123+
.thenApply(optRes ->
124+
optRes.map(x ->
125+
new StateValue(x.getValue(), x.getStat().getVersion(), null))
126+
.orElse(null));
127+
}
128+
114129
@Override
115130
public void incrCounter(String key, long amount) {
116131
incrCounterAsync(key, amount).join();

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.apache.bookkeeper.api.kv.Table;
3636
import org.apache.bookkeeper.api.kv.options.Options;
3737
import org.apache.bookkeeper.api.kv.result.DeleteResult;
38+
import org.apache.bookkeeper.api.kv.result.KeyValue;
3839
import org.apache.bookkeeper.common.concurrent.FutureUtils;
40+
import org.apache.pulsar.functions.api.state.StateValue;
3941
import org.testng.annotations.BeforeMethod;
4042
import org.testng.annotations.Test;
4143

@@ -114,6 +116,24 @@ public void testGetValue() throws Exception {
114116
);
115117
}
116118

119+
@Test
120+
public void testGetStateValue() throws Exception {
121+
KeyValue returnedKeyValue = mock(KeyValue.class);
122+
ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
123+
when(returnedKeyValue.value()).thenReturn(returnedValue);
124+
when(returnedKeyValue.version()).thenReturn(1l);
125+
when(returnedKeyValue.isNumber()).thenReturn(false);
126+
when(mockTable.getKv(any(ByteBuf.class)))
127+
.thenReturn(FutureUtils.value(returnedKeyValue));
128+
StateValue result = stateContext.getStateValue("test-key");
129+
assertEquals("test-value", new String(result.getValue(), UTF_8));
130+
assertEquals(1l, result.getVersion().longValue());
131+
assertEquals(false, result.getIsNumber().booleanValue());
132+
verify(mockTable, times(1)).getKv(
133+
eq(Unpooled.copiedBuffer("test-key", UTF_8))
134+
);
135+
}
136+
117137
@Test
118138
public void testGetAmount() throws Exception {
119139
when(mockTable.getNumber(any(ByteBuf.class)))
@@ -132,6 +152,12 @@ public void testGetKeyNotPresent() throws Exception {
132152
assertTrue(result != null);
133153
assertEquals(result.get(), null);
134154

155+
when(mockTable.getKv(any(ByteBuf.class)))
156+
.thenReturn(FutureUtils.value(null));
157+
CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
158+
assertTrue(stateValueResult != null);
159+
assertEquals(stateValueResult.get(), null);
160+
135161
}
136162

137163
}

pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.testng.Assert.assertTrue;
2525
import java.nio.ByteBuffer;
2626
import java.util.concurrent.CompletableFuture;
27+
import org.apache.pulsar.functions.api.state.StateValue;
2728
import org.apache.pulsar.metadata.api.MetadataCache;
2829
import org.apache.pulsar.metadata.api.MetadataStore;
2930
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -101,6 +102,10 @@ public void testGetKeyNotPresent() throws Exception {
101102
CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
102103
assertTrue(result != null);
103104
assertEquals(result.get(), null);
105+
106+
CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
107+
assertTrue(stateValueResult != null);
108+
assertEquals(stateValueResult.get(), null);
104109
}
105110

106111
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
7575
import org.apache.pulsar.common.util.Codec;
7676
import org.apache.pulsar.common.util.RestException;
77+
import org.apache.pulsar.functions.api.state.StateValue;
7778
import org.apache.pulsar.functions.instance.InstanceUtils;
7879
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
7980
import org.apache.pulsar.functions.proto.Function;
@@ -1151,23 +1152,29 @@ public FunctionState getFunctionState(final String tenant,
11511152

11521153
try {
11531154
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
1154-
ByteBuffer buf = store.get(key);
1155-
if (buf == null) {
1155+
StateValue value = store.getStateValue(key);
1156+
if (value == null) {
1157+
throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
1158+
}
1159+
byte[] data = value.getValue();
1160+
if (data == null) {
11561161
throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
11571162
}
11581163

1159-
// try to parse the state as a long
1160-
// but even if it can be parsed as a long, this number may not be the actual state,
1161-
// so we will always return a `stringValue` or `bytesValue` with the number value
1164+
ByteBuffer buf = ByteBuffer.wrap(data);
1165+
11621166
Long number = null;
11631167
if (buf.remaining() == Long.BYTES) {
11641168
number = buf.getLong();
11651169
}
1170+
if (Boolean.TRUE.equals(value.getIsNumber())) {
1171+
return new FunctionState(key, null, null, number, value.getVersion());
1172+
}
11661173

1167-
if (Utf8.isWellFormed(buf.array())) {
1168-
return new FunctionState(key, new String(buf.array(), UTF_8), null, number, null);
1174+
if (Utf8.isWellFormed(data)) {
1175+
return new FunctionState(key, new String(data, UTF_8), null, number, value.getVersion());
11691176
} else {
1170-
return new FunctionState(key, null, buf.array(), number, null);
1177+
return new FunctionState(key, null, data, number, value.getVersion());
11711178
}
11721179
} catch (RestException e) {
11731180
throw e;
@@ -1215,7 +1222,7 @@ public void putFunctionState(final String tenant,
12151222
try {
12161223
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
12171224
ByteBuffer data;
1218-
if (StringUtils.isNotEmpty(state.getStringValue())) {
1225+
if (state.getStringValue() != null) {
12191226
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
12201227
} else if (state.getByteValue() != null) {
12211228
data = ByteBuffer.wrap(state.getByteValue());

tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ private void doTestPythonWordCountFunction(String functionName) throws Exception
9797
getFunctionStatus(functionName, numMessages);
9898

9999
// get state
100-
queryState(functionName, "hello", numMessages);
101-
queryState(functionName, "test", numMessages);
100+
queryState(functionName, "hello", numMessages, numMessages - 1);
101+
queryState(functionName, "test", numMessages, numMessages - 1);
102102
for (int i = 0; i < numMessages; i++) {
103-
queryState(functionName, "message-" + i, 1);
103+
queryState(functionName, "message-" + i, 1, 0);
104104
}
105105

106106
// test put state
@@ -468,7 +468,7 @@ private void getFunctionStatus(String functionName, int numMessages) throws Exce
468468
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
469469
}
470470

471-
private void queryState(String functionName, String key, int amount)
471+
private void queryState(String functionName, String key, int amount, long version)
472472
throws Exception {
473473
ContainerExecResult result = container.execCmd(
474474
PulsarCluster.ADMIN_SCRIPT,
@@ -480,6 +480,9 @@ private void queryState(String functionName, String key, int amount)
480480
"--key", key
481481
);
482482
assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
483+
assertTrue(result.getStdout().contains("\"version\": " + version));
484+
assertFalse(result.getStdout().contains("stringValue"));
485+
assertFalse(result.getStdout().contains("byteValue"));
483486
}
484487

485488
private void putAndQueryState(String functionName, String key, String state, String expect)

0 commit comments

Comments
 (0)