From c09fc39b69f28b7ff8ff837f2bba08257839aac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=A4=E9=9B=A8?= Date: Fri, 20 Mar 2026 12:49:39 +0800 Subject: [PATCH] [runtime] Filter resumed currentProcessingKeys by subtask ownership after rescaling --- .../operator/ActionExecutionOperator.java | 16 +++++ .../operator/ActionExecutionOperatorTest.java | 68 +++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index f4e58efe9..82dac3f01 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -68,6 +68,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.python.env.PythonDependencyInfo; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.VoidNamespace; @@ -916,6 +917,9 @@ private void tryResumeProcessActionTasks() throws Exception { Iterable keys = currentProcessingKeysOpState.get(); if (keys != null) { for (Object key : keys) { + if (!isKeyOwnedByCurrentSubtask(key)) { + continue; + } keySegmentQueue.addKeyToLastSegment(key); mailboxExecutor.submit( () -> tryProcessActionTaskForKey(key), "process action task"); @@ -1122,6 +1126,18 @@ private void maybeInitActionStateStore() { } } + private boolean isKeyOwnedByCurrentSubtask(Object key) { + int maxParallelism = getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(); + int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism); + int owner = + KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup( + maxParallelism, parallelism, keyGroup); + return owner == subtaskIndex; + } + /** Failed to execute Action task. */ public static class ActionTaskExecutionException extends Exception { public ActionTaskExecutionException(String message, Throwable cause) { diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java index 646d1e633..3aa6fea36 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java @@ -33,9 +33,12 @@ import org.apache.flink.agents.runtime.eventlog.FileEventLogger; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.Test; @@ -151,6 +154,71 @@ void testDifferentKeyDataCanRunConcurrently() throws Exception { } } + @Test + void testRestoreOnlyResumesKeysOwnedByCurrentSubtask() throws Exception { + final int maxParallelism = 4; + final int oldParallelism = 1; + final int newParallelism = 2; + final long key = 1L; + + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + new ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true), + (KeySelector) value -> value, + TypeInformation.of(Long.class), + maxParallelism, + oldParallelism, + 0)) { + testHarness.open(); + testHarness.processElement(new StreamRecord<>(key)); + assertThat(testHarness.getTaskMailbox().size()).isEqualTo(1); + snapshot = testHarness.snapshot(1L, 1L); + } + + int ownerSubtask = + KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup( + maxParallelism, + newParallelism, + KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism)); + int nonOwnerSubtask = 1 - ownerSubtask; + + OperatorSubtaskState ownerState = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot, maxParallelism, oldParallelism, newParallelism, ownerSubtask); + OperatorSubtaskState nonOwnerState = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + snapshot, maxParallelism, oldParallelism, newParallelism, nonOwnerSubtask); + + try (KeyedOneInputStreamOperatorTestHarness ownerHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + new ActionExecutionOperatorFactory( + TestAgent.getAgentPlan(false), true), + (KeySelector) value -> value, + TypeInformation.of(Long.class), + maxParallelism, + newParallelism, + ownerSubtask); + KeyedOneInputStreamOperatorTestHarness nonOwnerHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + new ActionExecutionOperatorFactory( + TestAgent.getAgentPlan(false), true), + (KeySelector) value -> value, + TypeInformation.of(Long.class), + maxParallelism, + newParallelism, + nonOwnerSubtask)) { + ownerHarness.initializeState(ownerState); + nonOwnerHarness.initializeState(nonOwnerState); + + ownerHarness.open(); + nonOwnerHarness.open(); + + assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1); + assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero(); + } + } + @Test void testMemoryAccessProhibitedOutsideMailboxThread() throws Exception { try (KeyedOneInputStreamOperatorTestHarness testHarness =