Skip to content

Commit 8f79c09

Browse files
authored
[runtime] Lock null-store symmetry invariant in DurableExecutionManager (#666)
1 parent 8174771 commit 8f79c09

2 files changed

Lines changed: 37 additions & 0 deletions

File tree

runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ void maybePruneState(Object key, long sequenceNum) throws Exception {
333333
* via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the entry for that
334334
* checkpoint is removed. No-op when durable execution is disabled.
335335
*
336+
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and the {@code put} in
337+
* {@link #snapshotLastCompletedSequenceNumbers} MUST share the same {@code actionStateStore !=
338+
* null} guard. Dropping the guard on either side breaks the symmetry and reintroduces the
339+
* unbounded-map leak tracked by <a href="https://github.com/apache/flink-agents/issues/645">
340+
* issue #645</a>.
341+
*
336342
* @param checkpointId the id of the completed checkpoint.
337343
*/
338344
void notifyCheckpointComplete(long checkpointId) {
@@ -365,6 +371,12 @@ void snapshotRecoveryMarker() throws Exception {
365371
* strictly up to the sequence number that was committed by that checkpoint. No-op when durable
366372
* execution is disabled.
367373
*
374+
* <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and the {@code remove} in
375+
* {@link #notifyCheckpointComplete(long)} MUST share the same {@code actionStateStore != null}
376+
* guard. Dropping the guard on either side breaks the symmetry and reintroduces the
377+
* unbounded-map leak tracked by <a href="https://github.com/apache/flink-agents/issues/645">
378+
* issue #645</a>.
379+
*
368380
* @param keyedStateBackend the keyed state backend to scan.
369381
* @param checkpointId the id of the checkpoint being snapshotted.
370382
*/

runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.mockito.Mockito.mock;
4444
import static org.mockito.Mockito.spy;
4545
import static org.mockito.Mockito.verify;
46+
import static org.mockito.Mockito.verifyNoInteractions;
4647
import static org.mockito.Mockito.when;
4748

4849
/** Contract tests for {@link DurableExecutionManager}. */
@@ -69,6 +70,30 @@ void noStoreModeMakesAllMaybeOperationsNoOp() throws Exception {
6970
dem.close();
7071
}
7172

73+
@Test
74+
@SuppressWarnings("unchecked")
75+
void noStoreModeSnapshotAndNotifyKeepCheckpointMapEmpty() throws Exception {
76+
DurableExecutionManager dem = new DurableExecutionManager(null);
77+
KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
78+
79+
// Cycle 1: snapshot + notify with null store. The snapshot-side guard must short-circuit
80+
// before any backend access, and the cleanup-side guard must leave the map untouched.
81+
dem.snapshotLastCompletedSequenceNumbers(backend, 1L);
82+
assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
83+
verifyNoInteractions(backend);
84+
dem.notifyCheckpointComplete(1L);
85+
assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
86+
87+
// Cycle 2: confirm the invariant holds across multiple checkpoints.
88+
dem.snapshotLastCompletedSequenceNumbers(backend, 2L);
89+
assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
90+
verifyNoInteractions(backend);
91+
dem.notifyCheckpointComplete(2L);
92+
assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
93+
94+
dem.close();
95+
}
96+
7297
@Test
7398
void withInjectedStorePersistsTaskResult() throws Exception {
7499
InMemoryActionStateStore store = new InMemoryActionStateStore(false);

0 commit comments

Comments
 (0)