From 7427cfca23a5c335e204677f120d91dd63cac50c Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Tue, 21 Apr 2026 15:54:32 -0400 Subject: [PATCH 1/6] =?UTF-8?q?temporal-spring-ai:=20plan=20=E2=80=94=20si?= =?UTF-8?q?de-effect=20replay=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- temporal-spring-ai/PLAN.md | 81 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 temporal-spring-ai/PLAN.md diff --git a/temporal-spring-ai/PLAN.md b/temporal-spring-ai/PLAN.md new file mode 100644 index 000000000..c71b42af4 --- /dev/null +++ b/temporal-spring-ai/PLAN.md @@ -0,0 +1,81 @@ +# Plan: Side-effect replay tests + +## Scope + +The existing `WorkflowDeterminismTest` runs a workflow, fetches its history, +and calls `WorkflowReplayer.replayWorkflowExecution` — this catches +non-determinism but does **not** verify that plugin-managed side effects +(ChatModel calls, tool methods, MCP calls, `@SideEffectTool` bodies) are not +re-executed on replay. The Temporal AI partner review standards specifically +call out "tests that make sure you're avoiding repeated side effects on +replays." + +This branch adds counting/assertion tests for each replayable surface the +plugin introduces. + +## Files to change / add + +- `src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java` + - Register a `ChatModel` that increments an `AtomicInteger` on every call. + - Drive a workflow that calls it once, capture the history, then replay + with `WorkflowReplayer`. Assert the counter is still 1 after replay. + +- `src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java` + - Build an `@ActivityInterface` whose `@Tool`-annotated method increments + a counter on the activity implementation. + - Use `ToolCallingStubChatModel`-style stub to produce a tool call, let + the workflow drive the tool via `ActivityToolCallback`. Replay and + assert the tool method counter is still 1. + +- `src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java` + - Register a `@SideEffectTool` whose `@Tool` body increments a counter. + - Drive the workflow to call it via the stubbed model. Capture history. + - Replay and assert the inner body's counter did not advance + (Workflow.sideEffect is memoized, so this is the behavior we're + asserting). + +- `src/test/java/io/temporal/springai/replay/McpToolSideEffectTest.java` + - Register a fake `McpSyncClient` (or mock) that increments a counter on + `callTool`. Replay and assert it stays at 1. + - If mocking `McpSyncClient` is too heavy, skip this and cover MCP at the + activity level (counter on `McpClientActivityImpl.callTool`). + +All four tests share the pattern from `WorkflowDeterminismTest`: start +`TestWorkflowEnvironment`, register the counting impl, run, fetch history, +call `WorkflowReplayer.replayWorkflowExecution`, then +`assertEquals(1, counter.get())`. + +## Test plan + +- `./gradlew :temporal-spring-ai:test` locally and in CI. +- Verify the new tests actually fail if we regress (remove the activity + memoization intentionally in a scratch branch to confirm the counter + asserts catch it). + +## PR + +**Title:** `temporal-spring-ai: add side-effect replay tests for chat, tools, SideEffectTool, and MCP` + +**Body:** + +``` +## What was changed +Four new replay tests under `src/test/.../replay/`: +- ChatModelSideEffectTest +- ActivityToolSideEffectTest +- SideEffectToolReplayTest +- McpToolSideEffectTest + +Each one runs a workflow that drives the corresponding surface, captures +history, calls `WorkflowReplayer.replayWorkflowExecution`, and asserts that +the underlying side effect (counter increment) did not run a second time +during replay. + +## Why? +The existing determinism test catches history-vs-command mismatches but +doesn't prove the plugin isn't re-invoking user side effects on replay. +Temporal's AI partner review standards require side-effect safety tests, +and these regressions would be easy to introduce accidentally if someone +later added an in-workflow fallback or a cache path. Counter-based tests +make any such regression show up immediately. +``` From bfdd5b328f2bf7e313f66bb4039d63935b808cfe Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Tue, 21 Apr 2026 18:29:48 -0400 Subject: [PATCH 2/6] temporal-spring-ai: add side-effect replay tests for chat, activity tools, and @SideEffectTool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new tests under src/test/.../replay/: - ChatModelSideEffectTest: register a ChatModel with an AtomicInteger counter. Run a workflow that makes one chat call, assert counter=1. Replay the captured history, assert counter still 1 — the activity result comes from history, not from re-invoking the ChatModel. - ActivityToolSideEffectTest: activity-backed @Tool whose impl increments a counter. ToolCallingStubChatModel asks for the tool on the first call and returns final text on the second. Same assertion shape: counter=1 after run, counter=1 after replay. - SideEffectToolReplayTest: @SideEffectTool body increments a counter via a file-scope static. Workflow drives a tool call through ToolCallingStubChatModel. The assertion proves that Workflow.sideEffect's marker is what's consulted on replay rather than re-invoking the @Tool method. MCP is intentionally omitted — spring-ai-mcp is compileOnly and adding it just for one test isn't worth the dep weight. MCP tool calls go through the same Temporal activity machinery as ChatModel, which ChatModelSideEffectTest already covers. I verified the SideEffectToolReplayTest catches a real regression by temporarily dropping the Workflow.sideEffect wrap in SideEffectToolCallback; the test correctly failed with `expected: <1> but was: <2>`. Restored before this commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../replay/ActivityToolSideEffectTest.java | 150 ++++++++++++++++++ .../replay/ChatModelSideEffectTest.java | 116 ++++++++++++++ .../replay/SideEffectToolReplayTest.java | 139 ++++++++++++++++ 3 files changed, 405 insertions(+) create mode 100644 temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java create mode 100644 temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java create mode 100644 temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java new file mode 100644 index 000000000..718e89fb7 --- /dev/null +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java @@ -0,0 +1,150 @@ +package io.temporal.springai.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.springai.activity.ChatModelActivityImpl; +import io.temporal.springai.chat.TemporalChatClient; +import io.temporal.springai.model.ActivityChatModel; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.worker.Worker; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.ai.chat.client.ChatClient; +import org.springframework.ai.chat.messages.AssistantMessage; +import org.springframework.ai.chat.model.ChatModel; +import org.springframework.ai.chat.model.ChatResponse; +import org.springframework.ai.chat.model.Generation; +import org.springframework.ai.chat.prompt.Prompt; +import org.springframework.ai.tool.annotation.Tool; + +/** + * Asserts that a workflow replay does not re-invoke activity-backed tools. The {@link AddActivity} + * impl holds a counter that increments on each tool call. After the initial run the counter is 1; + * after replaying the captured history, it must still be 1 — activity results for the tool call + * come from history, not from re-invoking the activity impl. + */ +class ActivityToolSideEffectTest { + + private static final String TASK_QUEUE = "test-spring-ai-activity-tool-side-effect"; + + private TestWorkflowEnvironment testEnv; + private WorkflowClient client; + private AddActivityImpl addActivity; + + @BeforeEach + void setUp() { + testEnv = TestWorkflowEnvironment.newInstance(); + client = testEnv.getWorkflowClient(); + addActivity = new AddActivityImpl(); + } + + @AfterEach + void tearDown() { + testEnv.close(); + } + + @Test + void activityTool_notReInvokedOnReplay() throws Exception { + Worker worker = testEnv.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(ChatWithToolsWorkflowImpl.class); + worker.registerActivitiesImplementations( + new ChatModelActivityImpl(new ToolCallingStubChatModel()), addActivity); + testEnv.start(); + + ChatWithToolsWorkflow workflow = + client.newWorkflowStub( + ChatWithToolsWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + assertEquals("The answer is 5", workflow.chat("What is 2+3?")); + assertEquals( + 1, addActivity.callCount.get(), "Tool activity should run once during the initial run"); + + WorkflowExecutionHistory history = + client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); + WorkflowReplayer.replayWorkflowExecution(history, ChatWithToolsWorkflowImpl.class); + + assertEquals( + 1, + addActivity.callCount.get(), + "Tool activity must not be re-invoked during replay — results come from history"); + } + + @WorkflowInterface + public interface ChatWithToolsWorkflow { + @WorkflowMethod + String chat(String message); + } + + @ActivityInterface + public interface AddActivity { + @Tool(description = "Add two numbers") + @ActivityMethod + int add(int a, int b); + } + + public static class AddActivityImpl implements AddActivity { + final AtomicInteger callCount = new AtomicInteger(0); + + @Override + public int add(int a, int b) { + callCount.incrementAndGet(); + return a + b; + } + } + + public static class ChatWithToolsWorkflowImpl implements ChatWithToolsWorkflow { + @Override + public String chat(String message) { + ActivityChatModel chatModel = ActivityChatModel.forDefault(); + AddActivity addTool = + Workflow.newActivityStub( + AddActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build()); + ChatClient chatClient = TemporalChatClient.builder(chatModel).defaultTools(addTool).build(); + return chatClient.prompt().user(message).call().content(); + } + } + + /** First call: request the "add" tool. Second call: return final text. */ + private static class ToolCallingStubChatModel implements ChatModel { + private final AtomicInteger callCount = new AtomicInteger(0); + + @Override + public ChatResponse call(Prompt prompt) { + if (callCount.getAndIncrement() == 0) { + AssistantMessage toolRequest = + AssistantMessage.builder() + .content("") + .toolCalls( + List.of( + new AssistantMessage.ToolCall( + "call_1", "function", "add", "{\"a\":2,\"b\":3}"))) + .build(); + return ChatResponse.builder().generations(List.of(new Generation(toolRequest))).build(); + } + return ChatResponse.builder() + .generations(List.of(new Generation(new AssistantMessage("The answer is 5")))) + .build(); + } + + @Override + public reactor.core.publisher.Flux stream(Prompt prompt) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java new file mode 100644 index 000000000..4b74afd50 --- /dev/null +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java @@ -0,0 +1,116 @@ +package io.temporal.springai.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.springai.activity.ChatModelActivityImpl; +import io.temporal.springai.model.ActivityChatModel; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.worker.Worker; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.ai.chat.client.ChatClient; +import org.springframework.ai.chat.messages.AssistantMessage; +import org.springframework.ai.chat.model.ChatModel; +import org.springframework.ai.chat.model.ChatResponse; +import org.springframework.ai.chat.model.Generation; +import org.springframework.ai.chat.prompt.Prompt; + +/** + * Asserts that a workflow replay does not re-invoke the underlying {@link ChatModel}. The counter + * lives on the activity's backing ChatModel, which is only reached when the {@code CallChatModel} + * activity is scheduled by the workflow. On replay, the activity result is fetched from history; + * the impl is not re-invoked. If we ever regress by dropping that guarantee — say by adding an + * in-workflow cache that falls back to invoking the model directly — the counter will advance to 2 + * and this test will fail. + */ +class ChatModelSideEffectTest { + + private static final String TASK_QUEUE = "test-spring-ai-chat-side-effect"; + + private TestWorkflowEnvironment testEnv; + private WorkflowClient client; + private CountingChatModel model; + + @BeforeEach + void setUp() { + testEnv = TestWorkflowEnvironment.newInstance(); + client = testEnv.getWorkflowClient(); + model = new CountingChatModel("pong"); + } + + @AfterEach + void tearDown() { + testEnv.close(); + } + + @Test + void chatModel_notReInvokedOnReplay() throws Exception { + Worker worker = testEnv.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(ChatWorkflowImpl.class); + worker.registerActivitiesImplementations(new ChatModelActivityImpl(model)); + testEnv.start(); + + ChatWorkflow workflow = + client.newWorkflowStub( + ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + assertEquals("pong", workflow.chat("ping")); + assertEquals( + 1, model.callCount.get(), "ChatModel should be called once during the initial run"); + + WorkflowExecutionHistory history = + client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); + WorkflowReplayer.replayWorkflowExecution(history, ChatWorkflowImpl.class); + + assertEquals( + 1, + model.callCount.get(), + "ChatModel must not be re-invoked during replay — activity results come from history"); + } + + @WorkflowInterface + public interface ChatWorkflow { + @WorkflowMethod + String chat(String message); + } + + public static class ChatWorkflowImpl implements ChatWorkflow { + @Override + public String chat(String message) { + ActivityChatModel chatModel = ActivityChatModel.forDefault(); + ChatClient chatClient = ChatClient.builder(chatModel).build(); + return chatClient.prompt().user(message).call().content(); + } + } + + private static class CountingChatModel implements ChatModel { + final AtomicInteger callCount = new AtomicInteger(0); + private final String response; + + CountingChatModel(String response) { + this.response = response; + } + + @Override + public ChatResponse call(Prompt prompt) { + callCount.incrementAndGet(); + return ChatResponse.builder() + .generations(List.of(new Generation(new AssistantMessage(response)))) + .build(); + } + + @Override + public reactor.core.publisher.Flux stream(Prompt prompt) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java new file mode 100644 index 000000000..68e787611 --- /dev/null +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java @@ -0,0 +1,139 @@ +package io.temporal.springai.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.springai.activity.ChatModelActivityImpl; +import io.temporal.springai.chat.TemporalChatClient; +import io.temporal.springai.model.ActivityChatModel; +import io.temporal.springai.tool.SideEffectTool; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.testing.WorkflowReplayer; +import io.temporal.worker.Worker; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.ai.chat.client.ChatClient; +import org.springframework.ai.chat.messages.AssistantMessage; +import org.springframework.ai.chat.model.ChatModel; +import org.springframework.ai.chat.model.ChatResponse; +import org.springframework.ai.chat.model.Generation; +import org.springframework.ai.chat.prompt.Prompt; +import org.springframework.ai.tool.annotation.Tool; + +/** + * Asserts that {@code Workflow.sideEffect(...)} memoization works for {@link SideEffectTool} + * bodies. Unlike activity-backed tools, the {@code SideEffectToolCallback} wrapper runs on the + * workflow side, so every workflow statement re-runs during replay — including the call + * into {@code SideEffectToolCallback.call()}. What must NOT re-run is the inner tool body (the + * lambda passed to {@code Workflow.sideEffect}); that result is fetched from the recorded marker. + * + *

A regression that dropped the sideEffect wrapper (e.g., directly invoking the delegate) would + * bump this counter to 2 on replay. + */ +class SideEffectToolReplayTest { + + private static final String TASK_QUEUE = "test-spring-ai-side-effect-tool"; + + /** Lives at file scope so the counter is visible from workflow code via static reference. */ + static final AtomicInteger CALL_COUNT = new AtomicInteger(0); + + private TestWorkflowEnvironment testEnv; + private WorkflowClient client; + + @BeforeEach + void setUp() { + CALL_COUNT.set(0); + testEnv = TestWorkflowEnvironment.newInstance(); + client = testEnv.getWorkflowClient(); + } + + @AfterEach + void tearDown() { + testEnv.close(); + } + + @Test + void sideEffectTool_notReInvokedOnReplay() throws Exception { + Worker worker = testEnv.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(TimestampWorkflowImpl.class); + worker.registerActivitiesImplementations( + new ChatModelActivityImpl(new ToolCallingStubChatModel())); + testEnv.start(); + + TimestampWorkflow workflow = + client.newWorkflowStub( + TimestampWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + assertEquals("got: 2026-04-21T00:00:00Z", workflow.chat("what time is it?")); + assertEquals( + 1, CALL_COUNT.get(), "@SideEffectTool body should run once during the initial run"); + + WorkflowExecutionHistory history = + client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); + WorkflowReplayer.replayWorkflowExecution(history, TimestampWorkflowImpl.class); + + assertEquals( + 1, + CALL_COUNT.get(), + "@SideEffectTool body must not re-run on replay — result comes from the recorded" + + " sideEffect marker"); + } + + @WorkflowInterface + public interface TimestampWorkflow { + @WorkflowMethod + String chat(String message); + } + + @SideEffectTool + public static class CountingTimestampTool { + @Tool(description = "Get the current timestamp") + public String now() { + CALL_COUNT.incrementAndGet(); + return "2026-04-21T00:00:00Z"; + } + } + + public static class TimestampWorkflowImpl implements TimestampWorkflow { + @Override + public String chat(String message) { + ActivityChatModel chatModel = ActivityChatModel.forDefault(); + ChatClient chatClient = + TemporalChatClient.builder(chatModel).defaultTools(new CountingTimestampTool()).build(); + return chatClient.prompt().user(message).call().content(); + } + } + + /** First call: request the "now" tool. Second call: return final text with the timestamp. */ + private static class ToolCallingStubChatModel implements ChatModel { + private final AtomicInteger callCount = new AtomicInteger(0); + + @Override + public ChatResponse call(Prompt prompt) { + if (callCount.getAndIncrement() == 0) { + AssistantMessage toolRequest = + AssistantMessage.builder() + .content("") + .toolCalls( + List.of(new AssistantMessage.ToolCall("call_1", "function", "now", "{}"))) + .build(); + return ChatResponse.builder().generations(List.of(new Generation(toolRequest))).build(); + } + return ChatResponse.builder() + .generations(List.of(new Generation(new AssistantMessage("got: 2026-04-21T00:00:00Z")))) + .build(); + } + + @Override + public reactor.core.publisher.Flux stream(Prompt prompt) { + throw new UnsupportedOperationException(); + } + } +} From f5058afc4ab81b433d3dd0e055b77333bc1f0996 Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Wed, 22 Apr 2026 12:06:17 -0400 Subject: [PATCH 3/6] temporal-spring-ai: drop PLAN.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Planning scratchpad — not part of the shipped artifact. Removed before merge. Co-Authored-By: Claude Opus 4.7 (1M context) --- temporal-spring-ai/PLAN.md | 81 -------------------------------------- 1 file changed, 81 deletions(-) delete mode 100644 temporal-spring-ai/PLAN.md diff --git a/temporal-spring-ai/PLAN.md b/temporal-spring-ai/PLAN.md deleted file mode 100644 index c71b42af4..000000000 --- a/temporal-spring-ai/PLAN.md +++ /dev/null @@ -1,81 +0,0 @@ -# Plan: Side-effect replay tests - -## Scope - -The existing `WorkflowDeterminismTest` runs a workflow, fetches its history, -and calls `WorkflowReplayer.replayWorkflowExecution` — this catches -non-determinism but does **not** verify that plugin-managed side effects -(ChatModel calls, tool methods, MCP calls, `@SideEffectTool` bodies) are not -re-executed on replay. The Temporal AI partner review standards specifically -call out "tests that make sure you're avoiding repeated side effects on -replays." - -This branch adds counting/assertion tests for each replayable surface the -plugin introduces. - -## Files to change / add - -- `src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java` - - Register a `ChatModel` that increments an `AtomicInteger` on every call. - - Drive a workflow that calls it once, capture the history, then replay - with `WorkflowReplayer`. Assert the counter is still 1 after replay. - -- `src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java` - - Build an `@ActivityInterface` whose `@Tool`-annotated method increments - a counter on the activity implementation. - - Use `ToolCallingStubChatModel`-style stub to produce a tool call, let - the workflow drive the tool via `ActivityToolCallback`. Replay and - assert the tool method counter is still 1. - -- `src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java` - - Register a `@SideEffectTool` whose `@Tool` body increments a counter. - - Drive the workflow to call it via the stubbed model. Capture history. - - Replay and assert the inner body's counter did not advance - (Workflow.sideEffect is memoized, so this is the behavior we're - asserting). - -- `src/test/java/io/temporal/springai/replay/McpToolSideEffectTest.java` - - Register a fake `McpSyncClient` (or mock) that increments a counter on - `callTool`. Replay and assert it stays at 1. - - If mocking `McpSyncClient` is too heavy, skip this and cover MCP at the - activity level (counter on `McpClientActivityImpl.callTool`). - -All four tests share the pattern from `WorkflowDeterminismTest`: start -`TestWorkflowEnvironment`, register the counting impl, run, fetch history, -call `WorkflowReplayer.replayWorkflowExecution`, then -`assertEquals(1, counter.get())`. - -## Test plan - -- `./gradlew :temporal-spring-ai:test` locally and in CI. -- Verify the new tests actually fail if we regress (remove the activity - memoization intentionally in a scratch branch to confirm the counter - asserts catch it). - -## PR - -**Title:** `temporal-spring-ai: add side-effect replay tests for chat, tools, SideEffectTool, and MCP` - -**Body:** - -``` -## What was changed -Four new replay tests under `src/test/.../replay/`: -- ChatModelSideEffectTest -- ActivityToolSideEffectTest -- SideEffectToolReplayTest -- McpToolSideEffectTest - -Each one runs a workflow that drives the corresponding surface, captures -history, calls `WorkflowReplayer.replayWorkflowExecution`, and asserts that -the underlying side effect (counter increment) did not run a second time -during replay. - -## Why? -The existing determinism test catches history-vs-command mismatches but -doesn't prove the plugin isn't re-invoking user side effects on replay. -Temporal's AI partner review standards require side-effect safety tests, -and these regressions would be easy to introduce accidentally if someone -later added an in-workflow fallback or a cache path. Counter-based tests -make any such regression show up immediately. -``` From f6b29d0f2c2888a6ebfe9398481ba99bb40aa003 Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Thu, 23 Apr 2026 16:08:25 -0400 Subject: [PATCH 4/6] temporal-spring-ai: disable workflow caching in side-effect replay tests Configure TestWorkflowEnvironment with WorkflowCacheSize(0) so the worker replays from history on every workflow task instead of resuming from in-memory cached state. That is the regime in which side-effect safety actually has to hold: a missing Workflow.sideEffect wrap or an un-guarded in-workflow mutation would run on each replay, which cached resumes mask. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../springai/replay/ActivityToolSideEffectTest.java | 12 +++++++++++- .../springai/replay/ChatModelSideEffectTest.java | 13 ++++++++++++- .../springai/replay/SideEffectToolReplayTest.java | 12 +++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java index 718e89fb7..9107c497f 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java @@ -12,9 +12,11 @@ import io.temporal.springai.activity.ChatModelActivityImpl; import io.temporal.springai.chat.TemporalChatClient; import io.temporal.springai.model.ActivityChatModel; +import io.temporal.testing.TestEnvironmentOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.testing.WorkflowReplayer; import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; @@ -48,7 +50,15 @@ class ActivityToolSideEffectTest { @BeforeEach void setUp() { - testEnv = TestWorkflowEnvironment.newInstance(); + // WorkflowCacheSize(0) forces the worker to replay from history on every workflow task + // instead of resuming from in-memory cached state — the regime in which a missing + // Workflow.sideEffect wrap or an un-guarded in-workflow mutation would actually bite. + testEnv = + TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build()) + .build()); client = testEnv.getWorkflowClient(); addActivity = new AddActivityImpl(); } diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java index 4b74afd50..5dd12653e 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java @@ -8,9 +8,11 @@ import io.temporal.common.WorkflowExecutionHistory; import io.temporal.springai.activity.ChatModelActivityImpl; import io.temporal.springai.model.ActivityChatModel; +import io.temporal.testing.TestEnvironmentOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.testing.WorkflowReplayer; import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.util.List; @@ -43,7 +45,16 @@ class ChatModelSideEffectTest { @BeforeEach void setUp() { - testEnv = TestWorkflowEnvironment.newInstance(); + // WorkflowCacheSize(0) forces the worker to replay from history on every workflow task + // instead of resuming from in-memory cached state, which is what we actually need to + // assert side-effect safety: any un-wrapped side effect in workflow code would run on + // each replay and bump the counter. + testEnv = + TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build()) + .build()); client = testEnv.getWorkflowClient(); model = new CountingChatModel("pong"); } diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java index 68e787611..68e93597d 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java @@ -10,9 +10,11 @@ import io.temporal.springai.chat.TemporalChatClient; import io.temporal.springai.model.ActivityChatModel; import io.temporal.springai.tool.SideEffectTool; +import io.temporal.testing.TestEnvironmentOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.testing.WorkflowReplayer; import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.util.List; @@ -51,7 +53,15 @@ class SideEffectToolReplayTest { @BeforeEach void setUp() { CALL_COUNT.set(0); - testEnv = TestWorkflowEnvironment.newInstance(); + // WorkflowCacheSize(0) forces the worker to replay from history on every workflow task. + // This is exactly the regime a missing Workflow.sideEffect wrap would fail in: every + // tick the @Tool body would run again and bump the counter past 1. + testEnv = + TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build()) + .build()); client = testEnv.getWorkflowClient(); } From 3e43a7dd4249a7477106522b25996b7f4072f948 Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Thu, 23 Apr 2026 16:19:41 -0400 Subject: [PATCH 5/6] temporal-spring-ai: reframe replay-test messages around plugin behavior Reviewer pointed out that "ChatModel must not be re-invoked during replay" reads as testing Temporal's activity replay guarantee rather than the plugin's behavior. Reword class-level javadoc and assertion messages so the property under test is explicit: the plugin places ChatModel calls, activity-stub tool calls, and @SideEffectTool bodies behind the correct Temporal boundary. Temporal's replay/memoization semantics are assumed correct. The counter-stays-at-1 observation is the signal we use to detect a plugin regression, not the thing being tested. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../replay/ActivityToolSideEffectTest.java | 21 +++++++++++----- .../replay/ChatModelSideEffectTest.java | 25 +++++++++++++------ .../replay/SideEffectToolReplayTest.java | 23 +++++++++-------- 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java index 9107c497f..e70252c8c 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java @@ -35,10 +35,15 @@ import org.springframework.ai.tool.annotation.Tool; /** - * Asserts that a workflow replay does not re-invoke activity-backed tools. The {@link AddActivity} - * impl holds a counter that increments on each tool call. After the initial run the counter is 1; - * after replaying the captured history, it must still be 1 — activity results for the tool call - * come from history, not from re-invoking the activity impl. + * Asserts that {@link TemporalChatClient#defaultTools} routes activity-stub tools through the + * activity boundary the user already set up (via {@code Workflow.newActivityStub}) rather than + * invoking the underlying impl directly from workflow code. This is a plugin property: the plugin + * must recognize the stub as an activity-backed tool and invoke it as such, not unwrap it into a + * plain in-workflow Java method call. Temporal's replay semantics for activities are assumed + * correct. + * + *

If the plugin regressed to invoking a stub's backing impl directly, replay would re-run that + * call and the counter would exceed 1. */ class ActivityToolSideEffectTest { @@ -82,7 +87,9 @@ void activityTool_notReInvokedOnReplay() throws Exception { WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); assertEquals("The answer is 5", workflow.chat("What is 2+3?")); assertEquals( - 1, addActivity.callCount.get(), "Tool activity should run once during the initial run"); + 1, + addActivity.callCount.get(), + "sanity check: the tool activity impl ran exactly once for one workflow invocation"); WorkflowExecutionHistory history = client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); @@ -91,7 +98,9 @@ void activityTool_notReInvokedOnReplay() throws Exception { assertEquals( 1, addActivity.callCount.get(), - "Tool activity must not be re-invoked during replay — results come from history"); + "TemporalChatClient must invoke activity-stub tools as activities; a counter above 1" + + " means the plugin unwrapped the stub and called the impl directly from workflow" + + " code"); } @WorkflowInterface diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java index 5dd12653e..a72e8c7fb 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java @@ -28,12 +28,17 @@ import org.springframework.ai.chat.prompt.Prompt; /** - * Asserts that a workflow replay does not re-invoke the underlying {@link ChatModel}. The counter - * lives on the activity's backing ChatModel, which is only reached when the {@code CallChatModel} - * activity is scheduled by the workflow. On replay, the activity result is fetched from history; - * the impl is not re-invoked. If we ever regress by dropping that guarantee — say by adding an - * in-workflow cache that falls back to invoking the model directly — the counter will advance to 2 - * and this test will fail. + * Asserts that {@link ActivityChatModel} routes every {@link ChatModel} invocation through an + * activity boundary rather than calling the underlying {@code ChatModel} directly from workflow + * code. This is a property of the plugin, not of Temporal: Temporal guarantees activity results are + * replayed from history, but that only helps if the plugin actually scheduled an activity in the + * first place. + * + *

Concretely, if a regression routed chat calls directly (e.g. an in-workflow cache whose miss + * path invokes the {@code ChatModel} inline), replay would re-run that inline call and the counter + * would advance past 1. Caching is disabled in {@link #setUp()} so the worker replays on every + * workflow task — making this failure mode observable during the initial run, not only via the + * explicit {@code WorkflowReplayer} pass. */ class ChatModelSideEffectTest { @@ -76,7 +81,9 @@ void chatModel_notReInvokedOnReplay() throws Exception { ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); assertEquals("pong", workflow.chat("ping")); assertEquals( - 1, model.callCount.get(), "ChatModel should be called once during the initial run"); + 1, + model.callCount.get(), + "sanity check: the ChatModel ran exactly once for one workflow invocation"); WorkflowExecutionHistory history = client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); @@ -85,7 +92,9 @@ void chatModel_notReInvokedOnReplay() throws Exception { assertEquals( 1, model.callCount.get(), - "ChatModel must not be re-invoked during replay — activity results come from history"); + "ActivityChatModel must place ChatModel calls behind an activity boundary; a counter" + + " above 1 means the plugin invoked the ChatModel directly from workflow code" + + " and replay re-ran it"); } @WorkflowInterface diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java index 68e93597d..9f99ce947 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/SideEffectToolReplayTest.java @@ -31,14 +31,15 @@ import org.springframework.ai.tool.annotation.Tool; /** - * Asserts that {@code Workflow.sideEffect(...)} memoization works for {@link SideEffectTool} - * bodies. Unlike activity-backed tools, the {@code SideEffectToolCallback} wrapper runs on the - * workflow side, so every workflow statement re-runs during replay — including the call - * into {@code SideEffectToolCallback.call()}. What must NOT re-run is the inner tool body (the - * lambda passed to {@code Workflow.sideEffect}); that result is fetched from the recorded marker. + * Asserts that the plugin's {@link SideEffectTool} callback actually wraps the user's {@code @Tool} + * method body in {@code Workflow.sideEffect(...)}. This is a plugin property: the {@code + * SideEffectToolCallback} itself runs on the workflow side and gets re-entered on every replay, so + * if the callback were to invoke the tool body directly instead of via {@code Workflow.sideEffect}, + * the body would run again on each replay. Temporal's memoization of {@code sideEffect} markers is + * assumed correct. * - *

A regression that dropped the sideEffect wrapper (e.g., directly invoking the delegate) would - * bump this counter to 2 on replay. + *

A regression that drops the {@code Workflow.sideEffect} wrap (calling the delegate directly) + * would bump this counter past 1. */ class SideEffectToolReplayTest { @@ -83,7 +84,9 @@ void sideEffectTool_notReInvokedOnReplay() throws Exception { TimestampWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); assertEquals("got: 2026-04-21T00:00:00Z", workflow.chat("what time is it?")); assertEquals( - 1, CALL_COUNT.get(), "@SideEffectTool body should run once during the initial run"); + 1, + CALL_COUNT.get(), + "sanity check: the @SideEffectTool body ran exactly once for one workflow invocation"); WorkflowExecutionHistory history = client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); @@ -92,8 +95,8 @@ void sideEffectTool_notReInvokedOnReplay() throws Exception { assertEquals( 1, CALL_COUNT.get(), - "@SideEffectTool body must not re-run on replay — result comes from the recorded" - + " sideEffect marker"); + "SideEffectToolCallback must wrap the @Tool body in Workflow.sideEffect; a counter above" + + " 1 means the plugin invoked the tool body directly and replay re-ran it"); } @WorkflowInterface From b27976d39f4a7033554c304d1ee03c3c30ed745d Mon Sep 17 00:00:00 2001 From: Donald Pinckney Date: Thu, 23 Apr 2026 16:28:17 -0400 Subject: [PATCH 6/6] temporal-spring-ai: assert on scheduled-activity history, not impl counters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer noted that counting invocations of the backing impl can conflate two signals: the plugin inlining a call (what we want to catch) vs. Temporal re-delivering an activity task to the worker (which can legitimately happen with maxAttempts > 1, producing >1 impl executions for a single scheduled activity). Switch ChatModelSideEffectTest and ActivityToolSideEffectTest to scan workflow history for ActivityTaskScheduled events of the expected activity type. That count is invariant under activity-task redelivery and speaks directly to the plugin property under test ("the plugin scheduled the call as an activity"). Drops the impl counters and the WorkflowReplayer pass — the former is no longer needed, the latter is redundant with WorkflowDeterminismTest. SideEffectToolReplayTest intentionally keeps its counter: the Workflow.sideEffect path runs in workflow code under deterministic replay semantics, not as an activity, so the retry-induced flake mode doesn't apply. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../replay/ActivityToolSideEffectTest.java | 52 ++++++++-------- .../replay/ChatModelSideEffectTest.java | 59 ++++++++++--------- 2 files changed, 56 insertions(+), 55 deletions(-) diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java index e70252c8c..28654585b 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ActivityToolSideEffectTest.java @@ -5,16 +5,15 @@ import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; -import io.temporal.common.WorkflowExecutionHistory; import io.temporal.springai.activity.ChatModelActivityImpl; import io.temporal.springai.chat.TemporalChatClient; import io.temporal.springai.model.ActivityChatModel; import io.temporal.testing.TestEnvironmentOptions; import io.temporal.testing.TestWorkflowEnvironment; -import io.temporal.testing.WorkflowReplayer; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.Workflow; @@ -42,8 +41,10 @@ * plain in-workflow Java method call. Temporal's replay semantics for activities are assumed * correct. * - *

If the plugin regressed to invoking a stub's backing impl directly, replay would re-run that - * call and the counter would exceed 1. + *

We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations + * of the backing activity impl would conflate two different signals: the plugin inlining the tool + * call (what we want to catch) vs. Temporal re-delivering the activity task to the worker (which + * can legitimately happen with {@code maxAttempts > 1}). */ class ActivityToolSideEffectTest { @@ -51,13 +52,9 @@ class ActivityToolSideEffectTest { private TestWorkflowEnvironment testEnv; private WorkflowClient client; - private AddActivityImpl addActivity; @BeforeEach void setUp() { - // WorkflowCacheSize(0) forces the worker to replay from history on every workflow task - // instead of resuming from in-memory cached state — the regime in which a missing - // Workflow.sideEffect wrap or an un-guarded in-workflow mutation would actually bite. testEnv = TestWorkflowEnvironment.newInstance( TestEnvironmentOptions.newBuilder() @@ -65,7 +62,6 @@ void setUp() { WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build()) .build()); client = testEnv.getWorkflowClient(); - addActivity = new AddActivityImpl(); } @AfterEach @@ -74,11 +70,11 @@ void tearDown() { } @Test - void activityTool_notReInvokedOnReplay() throws Exception { + void activityTool_schedulesExactlyOneActivity() { Worker worker = testEnv.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes(ChatWithToolsWorkflowImpl.class); worker.registerActivitiesImplementations( - new ChatModelActivityImpl(new ToolCallingStubChatModel()), addActivity); + new ChatModelActivityImpl(new ToolCallingStubChatModel()), new AddActivityImpl()); testEnv.start(); ChatWithToolsWorkflow workflow = @@ -86,21 +82,28 @@ void activityTool_notReInvokedOnReplay() throws Exception { ChatWithToolsWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); assertEquals("The answer is 5", workflow.chat("What is 2+3?")); - assertEquals( - 1, - addActivity.callCount.get(), - "sanity check: the tool activity impl ran exactly once for one workflow invocation"); - - WorkflowExecutionHistory history = - client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); - WorkflowReplayer.replayWorkflowExecution(history, ChatWithToolsWorkflowImpl.class); + List events = + client + .fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()) + .getHistory() + .getEventsList(); + long scheduled = + events.stream() + .filter(HistoryEvent::hasActivityTaskScheduledEventAttributes) + .filter( + e -> + "Add" + .equals( + e.getActivityTaskScheduledEventAttributes() + .getActivityType() + .getName())) + .count(); assertEquals( 1, - addActivity.callCount.get(), - "TemporalChatClient must invoke activity-stub tools as activities; a counter above 1" - + " means the plugin unwrapped the stub and called the impl directly from workflow" - + " code"); + scheduled, + "TemporalChatClient must invoke activity-stub tools as activities; expected exactly one" + + " Add ActivityTaskScheduled event"); } @WorkflowInterface @@ -117,11 +120,8 @@ public interface AddActivity { } public static class AddActivityImpl implements AddActivity { - final AtomicInteger callCount = new AtomicInteger(0); - @Override public int add(int a, int b) { - callCount.incrementAndGet(); return a + b; } } diff --git a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java index a72e8c7fb..86f00256c 100644 --- a/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java +++ b/temporal-spring-ai/src/test/java/io/temporal/springai/replay/ChatModelSideEffectTest.java @@ -2,21 +2,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; -import io.temporal.common.WorkflowExecutionHistory; import io.temporal.springai.activity.ChatModelActivityImpl; import io.temporal.springai.model.ActivityChatModel; import io.temporal.testing.TestEnvironmentOptions; import io.temporal.testing.TestWorkflowEnvironment; -import io.temporal.testing.WorkflowReplayer; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactoryOptions; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,11 +32,11 @@ * replayed from history, but that only helps if the plugin actually scheduled an activity in the * first place. * - *

Concretely, if a regression routed chat calls directly (e.g. an in-workflow cache whose miss - * path invokes the {@code ChatModel} inline), replay would re-run that inline call and the counter - * would advance past 1. Caching is disabled in {@link #setUp()} so the worker replays on every - * workflow task — making this failure mode observable during the initial run, not only via the - * explicit {@code WorkflowReplayer} pass. + *

We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations + * of the backing {@code ChatModel} would conflate two different signals: the plugin inlining the + * call (what we want to catch) vs. Temporal re-delivering an activity task to the worker (which can + * legitimately happen with {@code maxAttempts > 1}). The scheduled-event count is invariant under + * activity-task redelivery. */ class ChatModelSideEffectTest { @@ -46,14 +44,13 @@ class ChatModelSideEffectTest { private TestWorkflowEnvironment testEnv; private WorkflowClient client; - private CountingChatModel model; @BeforeEach void setUp() { // WorkflowCacheSize(0) forces the worker to replay from history on every workflow task // instead of resuming from in-memory cached state, which is what we actually need to // assert side-effect safety: any un-wrapped side effect in workflow code would run on - // each replay and bump the counter. + // each replay. testEnv = TestWorkflowEnvironment.newInstance( TestEnvironmentOptions.newBuilder() @@ -61,7 +58,6 @@ void setUp() { WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build()) .build()); client = testEnv.getWorkflowClient(); - model = new CountingChatModel("pong"); } @AfterEach @@ -70,31 +66,38 @@ void tearDown() { } @Test - void chatModel_notReInvokedOnReplay() throws Exception { + void chatCall_schedulesExactlyOneActivity() { Worker worker = testEnv.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes(ChatWorkflowImpl.class); - worker.registerActivitiesImplementations(new ChatModelActivityImpl(model)); + worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel("pong"))); testEnv.start(); ChatWorkflow workflow = client.newWorkflowStub( ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); assertEquals("pong", workflow.chat("ping")); - assertEquals( - 1, - model.callCount.get(), - "sanity check: the ChatModel ran exactly once for one workflow invocation"); - - WorkflowExecutionHistory history = - client.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()); - WorkflowReplayer.replayWorkflowExecution(history, ChatWorkflowImpl.class); + List events = + client + .fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId()) + .getHistory() + .getEventsList(); + long scheduled = + events.stream() + .filter(HistoryEvent::hasActivityTaskScheduledEventAttributes) + .filter( + e -> + "CallChatModel" + .equals( + e.getActivityTaskScheduledEventAttributes() + .getActivityType() + .getName())) + .count(); assertEquals( 1, - model.callCount.get(), - "ActivityChatModel must place ChatModel calls behind an activity boundary; a counter" - + " above 1 means the plugin invoked the ChatModel directly from workflow code" - + " and replay re-ran it"); + scheduled, + "ActivityChatModel must place ChatModel calls behind an activity boundary; expected" + + " exactly one CallChatModel ActivityTaskScheduled event"); } @WorkflowInterface @@ -112,17 +115,15 @@ public String chat(String message) { } } - private static class CountingChatModel implements ChatModel { - final AtomicInteger callCount = new AtomicInteger(0); + private static class StubChatModel implements ChatModel { private final String response; - CountingChatModel(String response) { + StubChatModel(String response) { this.response = response; } @Override public ChatResponse call(Prompt prompt) { - callCount.incrementAndGet(); return ChatResponse.builder() .generations(List.of(new Generation(new AssistantMessage(response)))) .build();