Skip to content

Commit 795fcbe

Browse files
authored
[FLINK-37554][state/forst] Add UT for state compatibility between ForStKeyeStateBackend and ForStSyncKeyeStateBackend (#26440)
1 parent 03bea48 commit 795fcbe

File tree

3 files changed

+259
-1
lines changed

3 files changed

+259
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst;
20+
21+
import org.apache.flink.api.common.operators.MailboxExecutor;
22+
import org.apache.flink.api.common.state.v2.MapState;
23+
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
24+
import org.apache.flink.api.common.typeutils.base.IntSerializer;
25+
import org.apache.flink.api.common.typeutils.base.StringSerializer;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.core.fs.FileSystem;
28+
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
29+
import org.apache.flink.runtime.asyncprocessing.RecordContext;
30+
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
31+
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
32+
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
33+
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
34+
import org.apache.flink.runtime.state.KeyedStateHandle;
35+
import org.apache.flink.runtime.state.SnapshotResult;
36+
import org.apache.flink.runtime.state.internal.InternalKvState;
37+
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
38+
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
39+
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
40+
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
41+
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
42+
import org.apache.flink.util.IOUtils;
43+
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
import org.junit.jupiter.api.io.TempDir;
47+
48+
import java.io.File;
49+
import java.io.IOException;
50+
import java.util.Collection;
51+
import java.util.Collections;
52+
import java.util.concurrent.RunnableFuture;
53+
54+
import static org.apache.flink.state.forst.ForStStateTestBase.getMockEnvironment;
55+
import static org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBackend;
56+
import static org.apache.flink.state.forst.ForStTestUtils.createSyncKeyedStateBackend;
57+
import static org.assertj.core.api.Assertions.assertThat;
58+
import static org.assertj.core.api.Assertions.fail;
59+
60+
/** Compatibility test for {@link ForStKeyedStateBackend} and {@link ForStSyncKeyedStateBackend}. */
61+
class ForStAsyncAndSyncCompatibilityTest {
62+
protected ForStStateBackend forStStateBackend;
63+
protected AsyncExecutionController<String> aec;
64+
protected MailboxExecutor mailboxExecutor;
65+
66+
protected RecordContext<String> context;
67+
68+
protected MockEnvironment env;
69+
70+
@BeforeEach
71+
public void setup(@TempDir File temporaryFolder) throws IOException {
72+
FileSystem.initialize(new Configuration(), null);
73+
Configuration configuration = new Configuration();
74+
configuration.set(ForStOptions.PRIMARY_DIRECTORY, temporaryFolder.toURI().toString());
75+
forStStateBackend = new ForStStateBackend().configure(configuration, null);
76+
77+
env = getMockEnvironment(temporaryFolder);
78+
79+
mailboxExecutor =
80+
new MailboxExecutorImpl(
81+
new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE);
82+
}
83+
84+
@Test
85+
void testForStTransFromAsyncToSync() throws Exception {
86+
ForStKeyedStateBackend<String> keyedBackend =
87+
setUpAsyncKeyedStateBackend(Collections.emptyList());
88+
MapStateDescriptor<Integer, String> descriptor =
89+
new MapStateDescriptor<>(
90+
"testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
91+
92+
MapState<Integer, String> asyncMapState =
93+
keyedBackend.createState(1, IntSerializer.INSTANCE, descriptor);
94+
95+
context = aec.buildContext("testRecord", "testKey");
96+
context.retain();
97+
aec.setCurrentContext(context);
98+
asyncMapState.asyncPut(1, "1");
99+
context.release();
100+
aec.drainInflightRecords(0);
101+
102+
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
103+
keyedBackend.snapshot(
104+
1L,
105+
System.currentTimeMillis(),
106+
env.getCheckpointStorageAccess()
107+
.resolveCheckpointStorageLocation(
108+
1L, CheckpointStorageLocationReference.getDefault()),
109+
CheckpointOptions.forCheckpointWithDefaultLocation());
110+
111+
if (!snapshot.isDone()) {
112+
snapshot.run();
113+
}
114+
SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get();
115+
KeyedStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
116+
IOUtils.closeQuietly(keyedBackend);
117+
ForStSyncKeyedStateBackend<String> syncKeyedStateBackend =
118+
createSyncKeyedStateBackend(
119+
forStStateBackend,
120+
env,
121+
StringSerializer.INSTANCE,
122+
Collections.singletonList(stateHandle));
123+
124+
try {
125+
org.apache.flink.api.common.state.MapState<Integer, String> syncMapState =
126+
syncKeyedStateBackend.getOrCreateKeyedState(
127+
IntSerializer.INSTANCE,
128+
StateDescriptorUtils.transformFromV2ToV1(descriptor));
129+
fail();
130+
131+
syncKeyedStateBackend.setCurrentKey("testKey");
132+
((InternalKvState) syncKeyedStateBackend).setCurrentNamespace(1);
133+
assertThat(syncMapState.get(1)).isEqualTo("1");
134+
} catch (Exception e) {
135+
// Currently, ForStStateBackend does not support switching from Async to Sync, so this
136+
// exception will be caught here
137+
assertThat(e).isInstanceOf(ClassCastException.class);
138+
assertThat(e.getMessage())
139+
.contains(
140+
"org.apache.flink.runtime.state.v2.RegisteredKeyAndUserKeyValueStateBackendMetaInfo cannot be cast to class org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo");
141+
142+
} finally {
143+
IOUtils.closeQuietly(syncKeyedStateBackend);
144+
}
145+
}
146+
147+
@Test
148+
void testForStTransFromSyncToAsync() throws Exception {
149+
ForStSyncKeyedStateBackend<String> keyedBackend =
150+
createSyncKeyedStateBackend(
151+
forStStateBackend, env, StringSerializer.INSTANCE, Collections.emptyList());
152+
org.apache.flink.api.common.state.MapStateDescriptor<Integer, String> descriptor =
153+
new org.apache.flink.api.common.state.MapStateDescriptor<>(
154+
"testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
155+
org.apache.flink.api.common.state.MapState<Integer, String> mapState =
156+
keyedBackend.getOrCreateKeyedState(IntSerializer.INSTANCE, descriptor);
157+
keyedBackend.setCurrentKey("testKey");
158+
((InternalKvState) mapState).setCurrentNamespace(1);
159+
mapState.put(1, "1");
160+
161+
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
162+
keyedBackend.snapshot(
163+
1L,
164+
System.currentTimeMillis(),
165+
env.getCheckpointStorageAccess()
166+
.resolveCheckpointStorageLocation(
167+
1L, CheckpointStorageLocationReference.getDefault()),
168+
CheckpointOptions.forCheckpointWithDefaultLocation());
169+
170+
if (!snapshot.isDone()) {
171+
snapshot.run();
172+
}
173+
SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get();
174+
KeyedStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
175+
IOUtils.closeQuietly(keyedBackend);
176+
177+
ForStKeyedStateBackend<String> asyncKeyedStateBackend =
178+
setUpAsyncKeyedStateBackend(Collections.singletonList(stateHandle));
179+
180+
MapStateDescriptor<Integer, String> newStateDescriptor =
181+
new MapStateDescriptor<>(
182+
"testState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
183+
try {
184+
MapState<Integer, String> asyncMapState =
185+
asyncKeyedStateBackend.createState(
186+
1, IntSerializer.INSTANCE, newStateDescriptor);
187+
fail();
188+
189+
context = aec.buildContext("testRecord", "testKey");
190+
context.retain();
191+
aec.setCurrentContext(context);
192+
asyncMapState
193+
.asyncGet(1)
194+
.thenAccept(
195+
value -> {
196+
assertThat(value).isEqualTo("1");
197+
});
198+
context.release();
199+
aec.drainInflightRecords(0);
200+
} catch (Exception e) {
201+
// Currently, ForStStateBackend does not support switching from Sync to Async, so this
202+
// exception will be caught here
203+
assertThat(e).isInstanceOf(ClassCastException.class);
204+
assertThat(e.getMessage())
205+
.contains(
206+
"org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo cannot be cast to class org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo");
207+
} finally {
208+
IOUtils.closeQuietly(asyncKeyedStateBackend);
209+
}
210+
}
211+
212+
private ForStKeyedStateBackend setUpAsyncKeyedStateBackend(
213+
Collection<KeyedStateHandle> stateHandles) throws IOException {
214+
ForStKeyedStateBackend<String> keyedStateBackend =
215+
createKeyedStateBackend(
216+
forStStateBackend, env, StringSerializer.INSTANCE, stateHandles);
217+
aec =
218+
new AsyncExecutionController<>(
219+
mailboxExecutor,
220+
(a, b) -> {},
221+
keyedStateBackend.createStateExecutor(),
222+
new DeclarationManager(),
223+
1,
224+
100,
225+
0,
226+
1,
227+
null,
228+
null);
229+
keyedStateBackend.setup(aec);
230+
return keyedStateBackend;
231+
}
232+
}

Diff for: flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateMigrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import static org.assertj.core.api.Assertions.assertThat;
5252
import static org.junit.Assert.fail;
5353

54-
/** Tests for {@link ForStListState}. */
54+
/** Tests for the State Migration of {@link ForStKeyedStateBackend}. */
5555
public class ForStStateMigrationTest extends ForStStateTestBase {
5656

5757
@Test

Diff for: flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStTestUtils.java

+26
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
2727
import org.apache.flink.runtime.state.KeyedStateHandle;
2828
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
29+
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
2930

3031
import java.io.IOException;
3132
import java.util.Collection;
@@ -58,6 +59,31 @@ public static <K> ForStKeyedStateBackend<K> createKeyedStateBackend(
5859
1.0));
5960
}
6061

62+
public static <K> ForStSyncKeyedStateBackend<K> createSyncKeyedStateBackend(
63+
ForStStateBackend forStStateBackend,
64+
Environment env,
65+
TypeSerializer<K> keySerializer,
66+
Collection<KeyedStateHandle> stateHandles)
67+
throws IOException {
68+
69+
return (ForStSyncKeyedStateBackend<K>)
70+
forStStateBackend.createKeyedStateBackend(
71+
new KeyedStateBackendParametersImpl<>(
72+
env,
73+
env.getJobID(),
74+
"test_op",
75+
keySerializer,
76+
1,
77+
new KeyGroupRange(0, 0),
78+
env.getTaskKvStateRegistry(),
79+
TtlTimeProvider.DEFAULT,
80+
new UnregisteredMetricsGroup(),
81+
(name, value) -> {},
82+
stateHandles,
83+
new CloseableRegistry(),
84+
1.0));
85+
}
86+
6187
public static <K> ForStKeyedStateBackend<K> createKeyedStateBackend(
6288
ForStStateBackend forStStateBackend, Environment env, TypeSerializer<K> keySerializer)
6389
throws IOException {

0 commit comments

Comments
 (0)