Skip to content

Commit 6142e5e

Browse files
authored
[runtime] Add comprehensive tests for ActionExecutionOperator manager classes (#659)
* [runtime] Add @VisibleForTesting seams for manager tests * [runtime] Add comprehensive tests for DurableExecutionManager * [runtime] Add comprehensive tests for EventRouter * [runtime] Add comprehensive tests for ActionTaskContextManager
1 parent 064341a commit 6142e5e

5 files changed

Lines changed: 464 additions & 2 deletions

File tree

runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,11 @@ ActionStateStore getActionStateStore() {
416416
return actionStateStore;
417417
}
418418

419+
@VisibleForTesting
420+
Map<Long, Map<Object, Long>> getCheckpointIdToSeqNums() {
421+
return checkpointIdToSeqNums;
422+
}
423+
419424
@Override
420425
public void close() throws Exception {
421426
if (actionStateStore != null) {

runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,13 @@ class EventRouter<IN, OUT> implements AutoCloseable {
8888
private BuiltInMetrics builtInMetrics;
8989

9090
EventRouter(AgentPlan agentPlan, boolean inputIsJava) {
91+
this(agentPlan, inputIsJava, createEventLogger(agentPlan));
92+
}
93+
94+
@VisibleForTesting
95+
EventRouter(AgentPlan agentPlan, boolean inputIsJava, EventLogger eventLogger) {
9196
this.inputIsJava = inputIsJava;
92-
this.eventLogger = createEventLogger(agentPlan);
97+
this.eventLogger = eventLogger;
9398
this.eventListeners = new ArrayList<>();
9499
}
95100

@@ -242,7 +247,12 @@ EventLogger getEventLogger() {
242247
return eventLogger;
243248
}
244249

245-
private EventLogger createEventLogger(AgentPlan agentPlan) {
250+
@VisibleForTesting
251+
void addEventListener(EventListener listener) {
252+
eventListeners.add(listener);
253+
}
254+
255+
private static EventLogger createEventLogger(AgentPlan agentPlan) {
246256
// Honor the EVENT_LOGGER_TYPE config, defaulting to SLF4J so events surface in the Flink
247257
// Web UI by default. An explicit baseLogDir forces the file logger for backward
248258
// compatibility with the existing file-based logging path.

runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,29 @@
1818
package org.apache.flink.agents.runtime.operator;
1919

2020
import org.apache.flink.agents.api.InputEvent;
21+
import org.apache.flink.agents.plan.AgentPlan;
2122
import org.apache.flink.agents.plan.actions.Action;
23+
import org.apache.flink.agents.runtime.ResourceCache;
24+
import org.apache.flink.agents.runtime.actionstate.ActionState;
25+
import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
2226
import org.apache.flink.agents.runtime.async.ContinuationContext;
27+
import org.apache.flink.agents.runtime.context.JavaRunnerContextImpl;
28+
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
29+
import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
30+
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
31+
import org.apache.flink.api.common.state.MapState;
2332
import org.junit.jupiter.api.Test;
2433

34+
import java.util.HashMap;
35+
2536
import static org.assertj.core.api.Assertions.assertThat;
2637
import static org.assertj.core.api.Assertions.assertThatThrownBy;
38+
import static org.mockito.ArgumentMatchers.any;
39+
import static org.mockito.ArgumentMatchers.eq;
40+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.spy;
43+
import static org.mockito.Mockito.verify;
2744

2845
/** Contract tests for {@link ActionTaskContextManager}. */
2946
class ActionTaskContextManagerTest {
@@ -73,4 +90,166 @@ void createOrGetRunnerContextThrowsWhenPythonContextRequestedButNull() throws Ex
7390
.hasMessageContaining("PythonRunnerContextImpl has not been initialized");
7491
}
7592
}
93+
94+
@Test
95+
void createAndSetRunnerContextBuildsFreshMemoryContextOnFirstCall() throws Exception {
96+
try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
97+
ActionTask t = new JavaActionTask("k", new InputEvent(1L), TestActions.noopAction());
98+
invokeCreateAndSetRunnerContext(mgr, t);
99+
100+
// Production path: createAndSetRunnerContext at ActionTaskContextManager.java:210-218
101+
// — the else branch builds a fresh MemoryContext when the map has no entry.
102+
assertThat(t.getRunnerContext()).isInstanceOf(JavaRunnerContextImpl.class);
103+
assertThat(t.getRunnerContext().getMemoryContext()).isNotNull();
104+
}
105+
}
106+
107+
@Test
108+
void createAndSetRunnerContextReusesExistingMemoryContext() throws Exception {
109+
try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
110+
Action action = TestActions.noopAction();
111+
ActionTask from = new JavaActionTask("k", new InputEvent(1L), action);
112+
ActionTask to = new JavaActionTask("k", new InputEvent(2L), action);
113+
114+
// Step 1: createAndSetRunnerContext(from) — runner context now carries a fresh
115+
// MemoryContext, but the map (actionTaskMemoryContexts) is still empty (production
116+
// code at lines 210-218 only reads from the map, never writes).
117+
invokeCreateAndSetRunnerContext(mgr, from);
118+
RunnerContextImpl.MemoryContext fromMemCtx = from.getRunnerContext().getMemoryContext();
119+
assertThat(fromMemCtx).isNotNull();
120+
121+
// Step 2: transferContexts populates the map entry for `to` via the private
122+
// putMemoryContext (ActionTaskContextManager.java:266-286). DEM null is OK because
123+
// from has no DurableExecutionContext.
124+
mgr.transferContexts(from, to, new DurableExecutionManager(null));
125+
126+
// Step 3: createAndSetRunnerContext(to) — production code at lines 211-212 reads
127+
// the map for `to` and reuses fromMemCtx (the if-branch of the reuse check).
128+
invokeCreateAndSetRunnerContext(mgr, to);
129+
130+
// The runner context is shared (single Java JavaRunnerContextImpl instance), but
131+
// its memoryContext was switched to the entry that was in the map for `to`. Verify
132+
// the same MemoryContext instance is now wired on the runner context.
133+
assertThat(to.getRunnerContext().getMemoryContext()).isSameAs(fromMemCtx);
134+
}
135+
}
136+
137+
@Test
138+
void transferContextsCopiesMemoryAndContinuationToNewTask() throws Exception {
139+
try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
140+
Action action = TestActions.noopAction();
141+
ActionTask from = new JavaActionTask("k", new InputEvent(1L), action);
142+
ActionTask to = new JavaActionTask("k", new InputEvent(2L), action);
143+
144+
// Populate `from`'s runner context with a MemoryContext and ContinuationContext.
145+
invokeCreateAndSetRunnerContext(mgr, from);
146+
RunnerContextImpl.MemoryContext fromMemCtx = from.getRunnerContext().getMemoryContext();
147+
assertThat(fromMemCtx).isNotNull();
148+
149+
// transferContexts (ActionTaskContextManager.java:266-286) copies but does NOT
150+
// remove from source. The from-side continuation map is never populated (the
151+
// continuation lives on from's runner context until transfer copies it over for
152+
// `to`). Operator-side cleanup of `from`'s entries is the operator's
153+
// responsibility — see ActionExecutionOperator.java:366-369.
154+
mgr.transferContexts(from, to, new DurableExecutionManager(null));
155+
156+
// (a) The memory context entry for `to` is the same instance fromTask holds.
157+
RunnerContextImpl.MemoryContext toMemCtx = mgr.removeMemoryContext(to);
158+
assertThat(toMemCtx).isSameAs(fromMemCtx);
159+
160+
// After remove, the map no longer has `to`'s entry.
161+
assertThat(mgr.removeMemoryContext(to)).isNull();
162+
163+
// (b) Continuation context routed to `to`.
164+
assertThat(mgr.hasContinuationContext(to)).isTrue();
165+
166+
// (c) The `from`-side continuation map entry was never populated by the transfer
167+
// — the source carries its continuation on its runner context, not on the
168+
// manager's map.
169+
assertThat(mgr.hasContinuationContext(from)).isFalse();
170+
}
171+
}
172+
173+
@Test
174+
void transferContextsRoutesDurableContextThroughManager() throws Exception {
175+
try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
176+
Action action = TestActions.noopAction();
177+
InputEvent event = new InputEvent(1L);
178+
ActionTask from = new JavaActionTask("k", event, action);
179+
ActionTask to = new JavaActionTask("k", new InputEvent(2L), action);
180+
181+
invokeCreateAndSetRunnerContext(mgr, from);
182+
183+
// Spy on DEM backed by a real InMemoryActionStateStore so spied internals don't
184+
// NPE. The store doesn't really need to be exercised — we only verify the
185+
// putDurableContext call site at ActionTaskContextManager.java:271-273.
186+
DurableExecutionManager spyDem =
187+
spy(new DurableExecutionManager(new InMemoryActionStateStore(false)));
188+
189+
// Attach a DurableExecutionContext to `from`'s runner context. The persister is
190+
// the DEM itself (DurableExecutionManager implements ActionStatePersister at
191+
// DurableExecutionManager.java:78). ActionState ctor needs an Event so getCallResults()
192+
// returns a non-null list inside the DurableExecutionContext ctor.
193+
ActionState actionState = new ActionState(event);
194+
RunnerContextImpl.DurableExecutionContext durableCtx =
195+
new RunnerContextImpl.DurableExecutionContext(
196+
"k", 0L, action, event, actionState, spyDem);
197+
from.getRunnerContext().setDurableExecutionContext(durableCtx);
198+
199+
mgr.transferContexts(from, to, spyDem);
200+
201+
// The durable-context branch routes via the DEM's putDurableContext, satisfying
202+
// the no-manager-to-manager-references design constraint (DEM passed as a
203+
// parameter, not held as a field).
204+
verify(spyDem)
205+
.putDurableContext(
206+
eq(to), any(RunnerContextImpl.DurableExecutionContext.class));
207+
}
208+
}
209+
210+
@Test
211+
void closeIsIdempotent() throws Exception {
212+
// Not using try-with-resources here because we want to call close() explicitly twice.
213+
ActionTaskContextManager mgr = new ActionTaskContextManager(1);
214+
ActionTask t = new JavaActionTask("k", new InputEvent(1L), TestActions.noopAction());
215+
invokeCreateAndSetRunnerContext(mgr, t);
216+
217+
// First close() shuts down the runner context and the continuation executor
218+
// (ActionTaskContextManager.java:319-330). The second close() must be a no-op:
219+
// runnerContext is nulled and ContinuationActionExecutor.close() is backed by
220+
// ExecutorService.shutdownNow() which is itself idempotent.
221+
mgr.close();
222+
mgr.close();
223+
}
224+
225+
/**
226+
* Shared helper: install a runner context on {@code task} using mocked collaborators. Used by
227+
* tests that need a fully wired runner context but do not care about the collaborator details.
228+
*/
229+
@SuppressWarnings("unchecked")
230+
private static void invokeCreateAndSetRunnerContext(
231+
ActionTaskContextManager mgr, ActionTask task) {
232+
AgentPlan plan = newEmptyAgentPlan();
233+
ResourceCache cache = mock(ResourceCache.class);
234+
FlinkAgentsMetricGroupImpl metricGroup =
235+
mock(FlinkAgentsMetricGroupImpl.class, RETURNS_DEEP_STUBS);
236+
MapState<String, MemoryObjectImpl.MemoryItem> sensoryMem = mock(MapState.class);
237+
MapState<String, MemoryObjectImpl.MemoryItem> shortTermMem = mock(MapState.class);
238+
mgr.createAndSetRunnerContext(
239+
task,
240+
"k",
241+
plan,
242+
cache,
243+
metricGroup,
244+
"job",
245+
() -> {},
246+
sensoryMem,
247+
shortTermMem,
248+
/* pythonRunnerContext */ null,
249+
/* longTermMemory */ null);
250+
}
251+
252+
private static AgentPlan newEmptyAgentPlan() {
253+
return new AgentPlan(new HashMap<>(), new HashMap<>());
254+
}
76255
}

runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,23 @@
2525
import org.apache.flink.agents.runtime.actionstate.ActionState;
2626
import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
2727
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
28+
import org.apache.flink.api.common.state.ListState;
29+
import org.apache.flink.api.common.state.ListStateDescriptor;
30+
import org.apache.flink.api.common.state.ValueState;
31+
import org.apache.flink.runtime.state.KeyedStateBackend;
32+
import org.apache.flink.runtime.state.KeyedStateFunction;
33+
import org.apache.flink.runtime.state.OperatorStateBackend;
2834
import org.junit.jupiter.api.Test;
2935

36+
import java.util.HashMap;
3037
import java.util.List;
38+
import java.util.Map;
3139

3240
import static org.assertj.core.api.Assertions.assertThat;
41+
import static org.mockito.ArgumentMatchers.any;
42+
import static org.mockito.Mockito.doAnswer;
3343
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.spy;
3445
import static org.mockito.Mockito.verify;
3546
import static org.mockito.Mockito.when;
3647

@@ -96,4 +107,112 @@ void withInjectedStorePersistsTaskResult() throws Exception {
96107

97108
dem.close();
98109
}
110+
111+
/**
112+
* Verifies {@code snapshotLastCompletedSequenceNumbers} captures the per-key sequence numbers
113+
* returned by the keyed state backend and records them under the given checkpoint id.
114+
*/
115+
@Test
116+
@SuppressWarnings("unchecked")
117+
void snapshotCapturesKeySequenceNumbers() throws Exception {
118+
InMemoryActionStateStore store = new InMemoryActionStateStore(false);
119+
DurableExecutionManager dem = new DurableExecutionManager(store);
120+
121+
KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
122+
doAnswer(
123+
invocation -> {
124+
KeyedStateFunction<Object, ValueState<Long>> function =
125+
invocation.getArgument(3);
126+
ValueState<Long> state1 = mock(ValueState.class);
127+
when(state1.value()).thenReturn(10L);
128+
ValueState<Long> state2 = mock(ValueState.class);
129+
when(state2.value()).thenReturn(20L);
130+
function.process("k1", state1);
131+
function.process("k2", state2);
132+
return null;
133+
})
134+
.when(backend)
135+
.applyToAllKeys(any(), any(), any(), any());
136+
137+
dem.snapshotLastCompletedSequenceNumbers(backend, 1000L);
138+
139+
assertThat(dem.getCheckpointIdToSeqNums())
140+
.containsOnlyKeys(1000L)
141+
.extractingByKey(1000L)
142+
.asInstanceOf(
143+
org.assertj.core.api.InstanceOfAssertFactories.map(
144+
Object.class, Long.class))
145+
.containsEntry("k1", 10L)
146+
.containsEntry("k2", 20L)
147+
.hasSize(2);
148+
149+
dem.close();
150+
}
151+
152+
/**
153+
* Verifies {@code notifyCheckpointComplete} prunes every captured key for the notified
154+
* checkpoint, leaves entries captured for other checkpoints intact, and removes the notified
155+
* checkpoint's map entry. Combines the multi-key (loop body) and multi-checkpoint (selectivity)
156+
* concerns into one stronger test.
157+
*/
158+
@Test
159+
void notifyPrunesNotifiedCheckpointAndPreservesOthers() throws Exception {
160+
InMemoryActionStateStore store = new InMemoryActionStateStore(true);
161+
DurableExecutionManager dem = new DurableExecutionManager(store);
162+
163+
Action action = TestActions.noopAction();
164+
// Seed the store with state for three keys.
165+
dem.maybeInitActionState("k1", 10L, action, new InputEvent(1L));
166+
dem.maybeInitActionState("k2", 20L, action, new InputEvent(2L));
167+
dem.maybeInitActionState("k3", 30L, action, new InputEvent(3L));
168+
assertThat(store.getKeyedActionStates()).containsKeys("k1", "k2", "k3");
169+
170+
// Record captured sequence numbers: checkpoint 1000 covers {k1, k2}, checkpoint 1001
171+
// covers {k3}. Use the @VisibleForTesting seam to install the snapshot directly.
172+
Map<Object, Long> ckpt1000 = new HashMap<>();
173+
ckpt1000.put("k1", 10L);
174+
ckpt1000.put("k2", 20L);
175+
Map<Object, Long> ckpt1001 = new HashMap<>();
176+
ckpt1001.put("k3", 30L);
177+
dem.getCheckpointIdToSeqNums().put(1000L, ckpt1000);
178+
dem.getCheckpointIdToSeqNums().put(1001L, ckpt1001);
179+
180+
dem.notifyCheckpointComplete(1000L);
181+
182+
// InMemoryActionStateStore.pruneState (lines 67–71) ignores the seqNum and removes the
183+
// whole key. The notified checkpoint's keys (k1, k2) are pruned; k3 — captured under a
184+
// different checkpoint — must remain. The notified checkpoint's map entry is removed;
185+
// entries for other checkpoints stay.
186+
assertThat(store.getKeyedActionStates())
187+
.doesNotContainKey("k1")
188+
.doesNotContainKey("k2")
189+
.containsKey("k3");
190+
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(1000L).containsKey(1001L);
191+
192+
dem.close();
193+
}
194+
195+
/**
196+
* Verifies {@code handleRecovery} reads markers from the operator state backend and forwards
197+
* them to {@code store.rebuildState(...)}.
198+
*/
199+
@Test
200+
@SuppressWarnings("unchecked")
201+
void handleRecoveryCallsRebuildState() throws Exception {
202+
InMemoryActionStateStore spyStore = spy(new InMemoryActionStateStore(true));
203+
DurableExecutionManager dem = new DurableExecutionManager(spyStore);
204+
205+
OperatorStateBackend opBackend = mock(OperatorStateBackend.class);
206+
ListState<Object> markerState = mock(ListState.class);
207+
when(opBackend.getUnionListState(any(ListStateDescriptor.class))).thenReturn(markerState);
208+
when(markerState.get()).thenReturn(List.of("test-marker"));
209+
210+
dem.handleRecovery(opBackend);
211+
212+
// InMemoryActionStateStore.rebuildState is a no-op (lines 62–64), so state mutation is
213+
// not observable here — the test verifies the call contract only.
214+
verify(spyStore).rebuildState(List.of("test-marker"));
215+
216+
dem.close();
217+
}
99218
}

0 commit comments

Comments
 (0)