Skip to content

Commit c202e51

Browse files
Merge branch 'master' into spring-ai/readme-and-naming
2 parents 3f656c4 + 9eae4a8 commit c202e51

3 files changed

Lines changed: 458 additions & 0 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.temporal.springai.replay;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import io.temporal.activity.ActivityInterface;
6+
import io.temporal.activity.ActivityMethod;
7+
import io.temporal.activity.ActivityOptions;
8+
import io.temporal.api.history.v1.HistoryEvent;
9+
import io.temporal.client.WorkflowClient;
10+
import io.temporal.client.WorkflowOptions;
11+
import io.temporal.client.WorkflowStub;
12+
import io.temporal.springai.activity.ChatModelActivityImpl;
13+
import io.temporal.springai.chat.TemporalChatClient;
14+
import io.temporal.springai.model.ActivityChatModel;
15+
import io.temporal.testing.TestEnvironmentOptions;
16+
import io.temporal.testing.TestWorkflowEnvironment;
17+
import io.temporal.worker.Worker;
18+
import io.temporal.worker.WorkerFactoryOptions;
19+
import io.temporal.workflow.Workflow;
20+
import io.temporal.workflow.WorkflowInterface;
21+
import io.temporal.workflow.WorkflowMethod;
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
import org.springframework.ai.chat.client.ChatClient;
29+
import org.springframework.ai.chat.messages.AssistantMessage;
30+
import org.springframework.ai.chat.model.ChatModel;
31+
import org.springframework.ai.chat.model.ChatResponse;
32+
import org.springframework.ai.chat.model.Generation;
33+
import org.springframework.ai.chat.prompt.Prompt;
34+
import org.springframework.ai.tool.annotation.Tool;
35+
36+
/**
37+
* Asserts that {@link TemporalChatClient#defaultTools} routes activity-stub tools through the
38+
* activity boundary the user already set up (via {@code Workflow.newActivityStub}) rather than
39+
* invoking the underlying impl directly from workflow code. This is a plugin property: the plugin
40+
* must recognize the stub as an activity-backed tool and invoke it as such, not unwrap it into a
41+
* plain in-workflow Java method call. Temporal's replay semantics for activities are assumed
42+
* correct.
43+
*
44+
* <p>We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations
45+
* of the backing activity impl would conflate two different signals: the plugin inlining the tool
46+
* call (what we want to catch) vs. Temporal re-delivering the activity task to the worker (which
47+
* can legitimately happen with {@code maxAttempts > 1}).
48+
*/
49+
class ActivityToolSideEffectTest {
50+
51+
private static final String TASK_QUEUE = "test-spring-ai-activity-tool-side-effect";
52+
53+
private TestWorkflowEnvironment testEnv;
54+
private WorkflowClient client;
55+
56+
@BeforeEach
57+
void setUp() {
58+
testEnv =
59+
TestWorkflowEnvironment.newInstance(
60+
TestEnvironmentOptions.newBuilder()
61+
.setWorkerFactoryOptions(
62+
WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build())
63+
.build());
64+
client = testEnv.getWorkflowClient();
65+
}
66+
67+
@AfterEach
68+
void tearDown() {
69+
testEnv.close();
70+
}
71+
72+
@Test
73+
void activityTool_schedulesExactlyOneActivity() {
74+
Worker worker = testEnv.newWorker(TASK_QUEUE);
75+
worker.registerWorkflowImplementationTypes(ChatWithToolsWorkflowImpl.class);
76+
worker.registerActivitiesImplementations(
77+
new ChatModelActivityImpl(new ToolCallingStubChatModel()), new AddActivityImpl());
78+
testEnv.start();
79+
80+
ChatWithToolsWorkflow workflow =
81+
client.newWorkflowStub(
82+
ChatWithToolsWorkflow.class,
83+
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
84+
assertEquals("The answer is 5", workflow.chat("What is 2+3?"));
85+
86+
List<HistoryEvent> events =
87+
client
88+
.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId())
89+
.getHistory()
90+
.getEventsList();
91+
long scheduled =
92+
events.stream()
93+
.filter(HistoryEvent::hasActivityTaskScheduledEventAttributes)
94+
.filter(
95+
e ->
96+
"Add"
97+
.equals(
98+
e.getActivityTaskScheduledEventAttributes()
99+
.getActivityType()
100+
.getName()))
101+
.count();
102+
assertEquals(
103+
1,
104+
scheduled,
105+
"TemporalChatClient must invoke activity-stub tools as activities; expected exactly one"
106+
+ " Add ActivityTaskScheduled event");
107+
}
108+
109+
@WorkflowInterface
110+
public interface ChatWithToolsWorkflow {
111+
@WorkflowMethod
112+
String chat(String message);
113+
}
114+
115+
@ActivityInterface
116+
public interface AddActivity {
117+
@Tool(description = "Add two numbers")
118+
@ActivityMethod
119+
int add(int a, int b);
120+
}
121+
122+
public static class AddActivityImpl implements AddActivity {
123+
@Override
124+
public int add(int a, int b) {
125+
return a + b;
126+
}
127+
}
128+
129+
public static class ChatWithToolsWorkflowImpl implements ChatWithToolsWorkflow {
130+
@Override
131+
public String chat(String message) {
132+
ActivityChatModel chatModel = ActivityChatModel.forDefault();
133+
AddActivity addTool =
134+
Workflow.newActivityStub(
135+
AddActivity.class,
136+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build());
137+
ChatClient chatClient = TemporalChatClient.builder(chatModel).defaultTools(addTool).build();
138+
return chatClient.prompt().user(message).call().content();
139+
}
140+
}
141+
142+
/** First call: request the "add" tool. Second call: return final text. */
143+
private static class ToolCallingStubChatModel implements ChatModel {
144+
private final AtomicInteger callCount = new AtomicInteger(0);
145+
146+
@Override
147+
public ChatResponse call(Prompt prompt) {
148+
if (callCount.getAndIncrement() == 0) {
149+
AssistantMessage toolRequest =
150+
AssistantMessage.builder()
151+
.content("")
152+
.toolCalls(
153+
List.of(
154+
new AssistantMessage.ToolCall(
155+
"call_1", "function", "add", "{\"a\":2,\"b\":3}")))
156+
.build();
157+
return ChatResponse.builder().generations(List.of(new Generation(toolRequest))).build();
158+
}
159+
return ChatResponse.builder()
160+
.generations(List.of(new Generation(new AssistantMessage("The answer is 5"))))
161+
.build();
162+
}
163+
164+
@Override
165+
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
166+
throw new UnsupportedOperationException();
167+
}
168+
}
169+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.temporal.springai.replay;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import io.temporal.api.history.v1.HistoryEvent;
6+
import io.temporal.client.WorkflowClient;
7+
import io.temporal.client.WorkflowOptions;
8+
import io.temporal.client.WorkflowStub;
9+
import io.temporal.springai.activity.ChatModelActivityImpl;
10+
import io.temporal.springai.model.ActivityChatModel;
11+
import io.temporal.testing.TestEnvironmentOptions;
12+
import io.temporal.testing.TestWorkflowEnvironment;
13+
import io.temporal.worker.Worker;
14+
import io.temporal.worker.WorkerFactoryOptions;
15+
import io.temporal.workflow.WorkflowInterface;
16+
import io.temporal.workflow.WorkflowMethod;
17+
import java.util.List;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.ai.chat.client.ChatClient;
22+
import org.springframework.ai.chat.messages.AssistantMessage;
23+
import org.springframework.ai.chat.model.ChatModel;
24+
import org.springframework.ai.chat.model.ChatResponse;
25+
import org.springframework.ai.chat.model.Generation;
26+
import org.springframework.ai.chat.prompt.Prompt;
27+
28+
/**
29+
* Asserts that {@link ActivityChatModel} routes every {@link ChatModel} invocation through an
30+
* activity boundary rather than calling the underlying {@code ChatModel} directly from workflow
31+
* code. This is a property of the plugin, not of Temporal: Temporal guarantees activity results are
32+
* replayed from history, but that only helps if the plugin actually scheduled an activity in the
33+
* first place.
34+
*
35+
* <p>We verify by scanning history for {@code ActivityTaskScheduled} events. Counting invocations
36+
* of the backing {@code ChatModel} would conflate two different signals: the plugin inlining the
37+
* call (what we want to catch) vs. Temporal re-delivering an activity task to the worker (which can
38+
* legitimately happen with {@code maxAttempts > 1}). The scheduled-event count is invariant under
39+
* activity-task redelivery.
40+
*/
41+
class ChatModelSideEffectTest {
42+
43+
private static final String TASK_QUEUE = "test-spring-ai-chat-side-effect";
44+
45+
private TestWorkflowEnvironment testEnv;
46+
private WorkflowClient client;
47+
48+
@BeforeEach
49+
void setUp() {
50+
// WorkflowCacheSize(0) forces the worker to replay from history on every workflow task
51+
// instead of resuming from in-memory cached state, which is what we actually need to
52+
// assert side-effect safety: any un-wrapped side effect in workflow code would run on
53+
// each replay.
54+
testEnv =
55+
TestWorkflowEnvironment.newInstance(
56+
TestEnvironmentOptions.newBuilder()
57+
.setWorkerFactoryOptions(
58+
WorkerFactoryOptions.newBuilder().setWorkflowCacheSize(0).build())
59+
.build());
60+
client = testEnv.getWorkflowClient();
61+
}
62+
63+
@AfterEach
64+
void tearDown() {
65+
testEnv.close();
66+
}
67+
68+
@Test
69+
void chatCall_schedulesExactlyOneActivity() {
70+
Worker worker = testEnv.newWorker(TASK_QUEUE);
71+
worker.registerWorkflowImplementationTypes(ChatWorkflowImpl.class);
72+
worker.registerActivitiesImplementations(new ChatModelActivityImpl(new StubChatModel("pong")));
73+
testEnv.start();
74+
75+
ChatWorkflow workflow =
76+
client.newWorkflowStub(
77+
ChatWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
78+
assertEquals("pong", workflow.chat("ping"));
79+
80+
List<HistoryEvent> events =
81+
client
82+
.fetchHistory(WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId())
83+
.getHistory()
84+
.getEventsList();
85+
long scheduled =
86+
events.stream()
87+
.filter(HistoryEvent::hasActivityTaskScheduledEventAttributes)
88+
.filter(
89+
e ->
90+
"CallChatModel"
91+
.equals(
92+
e.getActivityTaskScheduledEventAttributes()
93+
.getActivityType()
94+
.getName()))
95+
.count();
96+
assertEquals(
97+
1,
98+
scheduled,
99+
"ActivityChatModel must place ChatModel calls behind an activity boundary; expected"
100+
+ " exactly one CallChatModel ActivityTaskScheduled event");
101+
}
102+
103+
@WorkflowInterface
104+
public interface ChatWorkflow {
105+
@WorkflowMethod
106+
String chat(String message);
107+
}
108+
109+
public static class ChatWorkflowImpl implements ChatWorkflow {
110+
@Override
111+
public String chat(String message) {
112+
ActivityChatModel chatModel = ActivityChatModel.forDefault();
113+
ChatClient chatClient = ChatClient.builder(chatModel).build();
114+
return chatClient.prompt().user(message).call().content();
115+
}
116+
}
117+
118+
private static class StubChatModel implements ChatModel {
119+
private final String response;
120+
121+
StubChatModel(String response) {
122+
this.response = response;
123+
}
124+
125+
@Override
126+
public ChatResponse call(Prompt prompt) {
127+
return ChatResponse.builder()
128+
.generations(List.of(new Generation(new AssistantMessage(response))))
129+
.build();
130+
}
131+
132+
@Override
133+
public reactor.core.publisher.Flux<ChatResponse> stream(Prompt prompt) {
134+
throw new UnsupportedOperationException();
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)