Skip to content

Commit 9eae4a8

Browse files
temporal-spring-ai: add side-effect replay tests for chat, activity tools, and @SideEffectTool (#2856)
* temporal-spring-ai: plan — side-effect replay tests Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * temporal-spring-ai: add side-effect replay tests for chat, activity tools, and @SideEffectTool Three new tests under src/test/.../replay/: - ChatModelSideEffectTest: register a ChatModel with an AtomicInteger counter. Run a workflow that makes one chat call, assert counter=1. Replay the captured history, assert counter still 1 — the activity result comes from history, not from re-invoking the ChatModel. - ActivityToolSideEffectTest: activity-backed @tool whose impl increments a counter. ToolCallingStubChatModel asks for the tool on the first call and returns final text on the second. Same assertion shape: counter=1 after run, counter=1 after replay. - SideEffectToolReplayTest: @SideEffectTool body increments a counter via a file-scope static. Workflow drives a tool call through ToolCallingStubChatModel. The assertion proves that Workflow.sideEffect's marker is what's consulted on replay rather than re-invoking the @tool method. MCP is intentionally omitted — spring-ai-mcp is compileOnly and adding it just for one test isn't worth the dep weight. MCP tool calls go through the same Temporal activity machinery as ChatModel, which ChatModelSideEffectTest already covers. I verified the SideEffectToolReplayTest catches a real regression by temporarily dropping the Workflow.sideEffect wrap in SideEffectToolCallback; the test correctly failed with `expected: <1> but was: <2>`. Restored before this commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * temporal-spring-ai: drop PLAN.md Planning scratchpad — not part of the shipped artifact. Removed before merge. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * temporal-spring-ai: disable workflow caching in side-effect replay tests Configure TestWorkflowEnvironment with WorkflowCacheSize(0) so the worker replays from history on every workflow task instead of resuming from in-memory cached state. That is the regime in which side-effect safety actually has to hold: a missing Workflow.sideEffect wrap or an un-guarded in-workflow mutation would run on each replay, which cached resumes mask. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * temporal-spring-ai: reframe replay-test messages around plugin behavior Reviewer pointed out that "ChatModel must not be re-invoked during replay" reads as testing Temporal's activity replay guarantee rather than the plugin's behavior. Reword class-level javadoc and assertion messages so the property under test is explicit: the plugin places ChatModel calls, activity-stub tool calls, and @SideEffectTool bodies behind the correct Temporal boundary. Temporal's replay/memoization semantics are assumed correct. The counter-stays-at-1 observation is the signal we use to detect a plugin regression, not the thing being tested. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * temporal-spring-ai: assert on scheduled-activity history, not impl counters Reviewer noted that counting invocations of the backing impl can conflate two signals: the plugin inlining a call (what we want to catch) vs. Temporal re-delivering an activity task to the worker (which can legitimately happen with maxAttempts > 1, producing >1 impl executions for a single scheduled activity). Switch ChatModelSideEffectTest and ActivityToolSideEffectTest to scan workflow history for ActivityTaskScheduled events of the expected activity type. That count is invariant under activity-task redelivery and speaks directly to the plugin property under test ("the plugin scheduled the call as an activity"). Drops the impl counters and the WorkflowReplayer pass — the former is no longer needed, the latter is redundant with WorkflowDeterminismTest. SideEffectToolReplayTest intentionally keeps its counter: the Workflow.sideEffect path runs in workflow code under deterministic replay semantics, not as an activity, so the retry-induced flake mode doesn't apply. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 87c519e commit 9eae4a8

3 files changed

Lines changed: 458 additions & 0 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.temporal.springai.replay;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import io.temporal.activity.ActivityInterface;
6+
import io.temporal.activity.ActivityMethod;
7+
import io.temporal.activity.ActivityOptions;
8+
import io.temporal.api.history.v1.HistoryEvent;
9+
import io.temporal.client.WorkflowClient;
10+
import io.temporal.client.WorkflowOptions;
11+
import io.temporal.client.WorkflowStub;
12+
import io.temporal.springai.activity.ChatModelActivityImpl;
13+
import io.temporal.springai.chat.TemporalChatClient;
14+
import io.temporal.springai.model.ActivityChatModel;
15+
import io.temporal.testing.TestEnvironmentOptions;
16+
import io.temporal.testing.TestWorkflowEnvironment;
17+
import io.temporal.worker.Worker;
18+
import io.temporal.worker.WorkerFactoryOptions;
19+
import io.temporal.workflow.Workflow;
20+
import io.temporal.workflow.WorkflowInterface;
21+
import io.temporal.workflow.WorkflowMethod;
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
import org.springframework.ai.chat.client.ChatClient;
29+
import org.springframework.ai.chat.messages.AssistantMessage;
30+
import org.springframework.ai.chat.model.ChatModel;
31+
import org.springframework.ai.chat.model.ChatResponse;
32+
import org.springframework.ai.chat.model.Generation;
33+
import org.springframework.ai.chat.prompt.Prompt;
34+
import org.springframework.ai.tool.annotation.Tool;
35+
36+
/**
37+
* Asserts that {@link TemporalChatClient#defaultTools} routes activity-stub tools through the
38+
* activity boundary the user already set up (via {@code Workflow.newActivityStub}) rather than
39+
* invoking the underlying impl directly from workflow code. This is a plugin property: the plugin
40+
* must recognize the stub as an activity-backed tool and invoke it as such, not unwrap it into a
41+
* plain in-workflow Java method call. Temporal's replay semantics for activities are assumed
42+
* correct.
43+
*
44+
* <p>We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations
45+
* of the backing activity impl would conflate two different signals: the plugin inlining the tool
46+
* call (what we want to catch) vs. Temporal re-delivering the activity task to the worker (which
47+
* can legitimately happen with {@code maxAttempts > 1}).
48+
*/
49+
class ActivityToolSideEffectTest {
50+
51+
private static final String TASK_QUEUE = "test-spring-ai-activity-tool-side-effect";
52+
53+
private TestWorkflowEnvironment testEnv;
54+
private WorkflowClient client;
55+
56+
@BeforeEach
57+
void setUp() {
58+
testEnv =
59+
TestWorkflowEnvironment.newInstance(
60+
TestEnvironmentOptions.newBuilder()
61+
.setWorkerFactoryOptions(
62+
WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build())
63+
.build());
64+
client = testEnv.getWorkflowClient();
65+
}
66+
67+
@AfterEach
68+
void tearDown() {
69+
testEnv.close();
70+
}
71+
72+
@Test
73+
void activityTool_schedulesExactlyOneActivity() {
74+
Worker worker = testEnv.newWorker(TASK_QUEUE);
75+
worker.registerWorkflowImplementationTypes(ChatWithToolsWorkflowImpl.class);
76+
worker.registerActivitiesImplementations(
77+
new ChatModelActivityImpl(new ToolCallingStubChatModel()), new AddActivityImpl());
78+
testEnv.start();
79+
80+
ChatWithToolsWorkflow workflow =
81+
client.newWorkflowStub(
82+
ChatWithToolsWorkflow.class,
83+
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
84+
assertEquals("The answer is 5", workflow.chat("What is 2+3?"));
85+
86+
List<HistoryEvent> events =
87+
client
88+
.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId())
89+
.getHistory()
90+
.getEventsList();
91+
long scheduled =
92+
events.stream()
93+
.filter(HistoryEvent::hasActivityTaskScheduledEventAttributes)
94+
.filter(
95+
e ->
96+
"Add"
97+
.equals(
98+
e.getActivityTaskScheduledEventAttributes()
99+
.getActivityType()
100+
.getName()))
101+
.count();
102+
assertEquals(
103+
1,
104+
scheduled,
105+
"TemporalChatClient must invoke activity-stub tools as activities; expected exactly one"
106+
+ " Add ActivityTaskScheduled event");
107+
}
108+
109+
@WorkflowInterface
110+
public interface ChatWithToolsWorkflow {
111+
@WorkflowMethod
112+
String chat(String message);
113+
}
114+
115+
@ActivityInterface
116+
public interface AddActivity {
117+
@Tool(description = "Add two numbers")
118+
@ActivityMethod
119+
int add(int a, int b);
120+
}
121+
122+
public static class AddActivityImpl implements AddActivity {
123+
@Override
124+
public int add(int a, int b) {
125+
return a + b;
126+
}
127+
}
128+
129+
public static class ChatWithToolsWorkflowImpl implements ChatWithToolsWorkflow {
130+
@Override
131+
public String chat(String message) {
132+
ActivityChatModel chatModel = ActivityChatModel.forDefault();
133+
AddActivity addTool =
134+
Workflow.newActivityStub(
135+
AddActivity.class,
136+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build());
137+
ChatClient chatClient = TemporalChatClient.builder(chatModel).defaultTools(addTool).build();
138+
return chatClient.prompt().user(message).call().content();
139+
}
140+
}
141+
142+
/** First call: request the "add" tool. Second call: return final text. */
143+
private static class ToolCallingStubChatModel implements ChatModel {
144+
private final AtomicInteger callCount = new AtomicInteger(0);
145+
146+
@Override
147+
public ChatResponse call(Prompt prompt) {
148+
if (callCount.getAndIncrement() == 0) {
149+
AssistantMessage toolRequest =
150+
AssistantMessage.builder()
151+
.content("")
152+
.toolCalls(
153+
List.of(
154+
new AssistantMessage.ToolCall(
155+
"call_1", "function", "add", "{\"a\":2,\"b\":3}")))
156+
.build();
157+
return ChatResponse.builder().generations(List.of(new Generation(toolRequest))).build();
158+
}
159+
return ChatResponse.builder()
160+
.generations(List.of(new Generation(new AssistantMessage("The answer is 5"))))
161+
.build();
162+
}
163+
164+
@Override
165+
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
166+
throw new UnsupportedOperationException();
167+
}
168+
}
169+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.temporal.springai.replay;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import io.temporal.api.history.v1.HistoryEvent;
6+
import io.temporal.client.WorkflowClient;
7+
import io.temporal.client.WorkflowOptions;
8+
import io.temporal.client.WorkflowStub;
9+
import io.temporal.springai.activity.ChatModelActivityImpl;
10+
import io.temporal.springai.model.ActivityChatModel;
11+
import io.temporal.testing.TestEnvironmentOptions;
12+
import io.temporal.testing.TestWorkflowEnvironment;
13+
import io.temporal.worker.Worker;
14+
import io.temporal.worker.WorkerFactoryOptions;
15+
import io.temporal.workflow.WorkflowInterface;
16+
import io.temporal.workflow.WorkflowMethod;
17+
import java.util.List;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.ai.chat.client.ChatClient;
22+
import org.springframework.ai.chat.messages.AssistantMessage;
23+
import org.springframework.ai.chat.model.ChatModel;
24+
import org.springframework.ai.chat.model.ChatResponse;
25+
import org.springframework.ai.chat.model.Generation;
26+
import org.springframework.ai.chat.prompt.Prompt;
27+
28+
/**
29+
* Asserts that {@link ActivityChatModel} routes every {@link ChatModel} invocation through an
30+
* activity boundary rather than calling the underlying {@code ChatModel} directly from workflow
31+
* code. This is a property of the plugin, not of Temporal: Temporal guarantees activity results are
32+
* replayed from history, but that only helps if the plugin actually scheduled an activity in the
33+
* first place.
34+
*
35+
* <p>We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations
36+
* of the backing {@code ChatModel} would conflate two different signals: the plugin inlining the
37+
* call (what we want to catch) vs. Temporal re-delivering an activity task to the worker (which can
38+
* legitimately happen with {@code maxAttempts > 1}). The scheduled-event count is invariant under
39+
* activity-task redelivery.
40+
*/
41+
class ChatModelSideEffectTest {
42+
43+
private static final String TASK_QUEUE = "test-spring-ai-chat-side-effect";
44+
45+
private TestWorkflowEnvironment testEnv;
46+
private WorkflowClient client;
47+
48+
@BeforeEach
49+
void setUp() {
50+
// WorkflowCacheSize(0) forces the worker to replay from history on every workflow task
51+
// instead of resuming from in-memory cached state, which is what we actually need to
52+
// assert side-effect safety: any un-wrapped side effect in workflow code would run on
53+
// each replay.
54+
testEnv =
55+
TestWorkflowEnvironment.newInstance(
56+
TestEnvironmentOptions.newBuilder()
57+
.setWorkerFactoryOptions(
58+
WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build())
59+
.build());
60+
client = testEnv.getWorkflowClient();
61+
}
62+
63+
@AfterEach
64+
void tearDown() {
65+
testEnv.close();
66+
}
67+
68+
@Test
69+
void chatCall_schedulesExactlyOneActivity() {
70+
Worker worker = testEnv.newWorker(TASK_QUEUE);
71+
worker.registerWorkflowImplementationTypes(ChatWorkflowImpl.class);
72+
worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel("pong")));
73+
testEnv.start();
74+
75+
ChatWorkflow workflow =
76+
client.newWorkflowStub(
77+
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
78+
assertEquals("pong", workflow.chat("ping"));
79+
80+
List<HistoryEvent> events =
81+
client
82+
.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId())
83+
.getHistory()
84+
.getEventsList();
85+
long scheduled =
86+
events.stream()
87+
.filter(HistoryEvent::hasActivityTaskScheduledEventAttributes)
88+
.filter(
89+
e ->
90+
"CallChatModel"
91+
.equals(
92+
e.getActivityTaskScheduledEventAttributes()
93+
.getActivityType()
94+
.getName()))
95+
.count();
96+
assertEquals(
97+
1,
98+
scheduled,
99+
"ActivityChatModel must place ChatModel calls behind an activity boundary; expected"
100+
+ " exactly one CallChatModel ActivityTaskScheduled event");
101+
}
102+
103+
@WorkflowInterface
104+
public interface ChatWorkflow {
105+
@WorkflowMethod
106+
String chat(String message);
107+
}
108+
109+
public static class ChatWorkflowImpl implements ChatWorkflow {
110+
@Override
111+
public String chat(String message) {
112+
ActivityChatModel chatModel = ActivityChatModel.forDefault();
113+
ChatClient chatClient = ChatClient.builder(chatModel).build();
114+
return chatClient.prompt().user(message).call().content();
115+
}
116+
}
117+
118+
private static class StubChatModel implements ChatModel {
119+
private final String response;
120+
121+
StubChatModel(String response) {
122+
this.response = response;
123+
}
124+
125+
@Override
126+
public ChatResponse call(Prompt prompt) {
127+
return ChatResponse.builder()
128+
.generations(List.of(new Generation(new AssistantMessage(response))))
129+
.build();
130+
}
131+
132+
@Override
133+
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
134+
throw new UnsupportedOperationException();
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)