Skip to content

Comments

fix(graph): Fixed checkpoint state merge when in resume mode#4194

Open
PeatBoy wants to merge 5 commits intoalibaba:mainfrom
PeatBoy:fix-resume-checkpoint-state-merge
Open

fix(graph): Fixed checkpoint state merge when in resume mode#4194
PeatBoy wants to merge 5 commits intoalibaba:mainfrom
PeatBoy:fix-resume-checkpoint-state-merge

Conversation

@PeatBoy
Copy link
Contributor

@PeatBoy PeatBoy commented Feb 4, 2026

Describe what this PR does / why we need it

Fix the parameter order issue in OverAllState.input during resume scenarios. When resuming from a checkpoint, the passed input represents the old state, while the current state holds the new value. The original implementation reversed these, causing incorrect merge behavior

graphrunnercontext.java 117

private void initializeFromResume(OverAllState initialState, RunnableConfig config) {
		log.trace("RESUME REQUEST");

		var saver = compiledGraph.compileConfig.checkpointSaver()
				.orElseThrow(() -> new IllegalStateException("Resume request without a configured checkpoint saver!"));
		var checkpoint = saver.get(config)
				.orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));

		var startCheckpointNextNodeAction = compiledGraph.getNodeAction(checkpoint.getNextNodeId());
		if (startCheckpointNextNodeAction instanceof ResumableSubGraphAction resumableAction) {
			// RESUME FORM SUBGRAPH DETECTED
			this.config = RunnableConfig.builder(config)
					.checkPointId(null) // Reset checkpoint id
					.addMetadata(resumableAction.getResumeSubGraphId(), true) // add metadata for
					// sub graph
					.build();
			this.config.clearContext();
		} else {
			// Reset checkpoint id
			this.config = config.withCheckPointId(null);
		}

		this.currentNodeId = null;
		this.nextNodeId = checkpoint.getNextNodeId();
		this.overallState = initialState.input(checkpoint.getState()); // !! Call the input function here !!
		this.resumeFrom = checkpoint.getNodeId();

		log.trace("RESUME FROM {}", checkpoint.getNodeId());
	}

overallstate.java 222

	public OverAllState input(Map<String, Object> input) {
		if (input == null) {
			return this;
		}

		if (CollectionUtils.isEmpty(input)) {
			return this;
		}

		Map<String, KeyStrategy> keyStrategies = keyStrategies();
		input.keySet().stream().filter(key -> keyStrategies.containsKey(key)).forEach(key -> {
			this.data.put(key, keyStrategies.get(key).apply(value(key, null), input.get(key))); // The semantics within the function are correct, but the input passed at the calling site is actually the old value of the checkpoint state.
		});
		return this;
	}

Does this pull request fix one issue?

Fixed checkpoint state merge when in resume mode

Describe how you did it

Changed the strategy call in OverAllState.input from keyStrategies.get(key).apply(value(key, null), input.get(key)) to keyStrategies.get(key).apply(input.get(key), value(key, null)), ensuring that during resume, oldValue is the checkpoint’s old value and newValue is the current state’s new value

Describe how to verify it

  • Add a unit test simulating resume: use a key with AppendStrategy, set a new value in the current state, then call input(checkpointState), and verify the merged list order is [oldValue, newValue].
  • Run the existing test suite to ensure no regression.
  • Enable checkpointing in a real workflow, perform interrupt and resume, and observe that state merging behaves as expected.

Special notes for reviews

  • This change only affects resume scenarios (initializeFromResume); normal startup is unaffected

GraphRunnerContext.java:115-118 。

  • Pay special attention to AppendStrategy and MergeStrategy to ensure the order adjustment preserves correct semantics.

@github-actions github-actions bot added the area/graph SAA Grpah module label Feb 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/graph SAA Grpah module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant