Skip to content

Commit bbdb9ba

Browse files
committed
Add history hints to workflow task started attributes
1 parent 05ee827 commit bbdb9ba

2 files changed

Lines changed: 205 additions & 2 deletions

File tree

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import io.temporal.api.common.v1.WorkflowExecution;
1111
import io.temporal.api.enums.v1.EventType;
1212
import io.temporal.api.enums.v1.HistoryEventFilterType;
13+
import io.temporal.api.enums.v1.SuggestContinueAsNewReason;
1314
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
1415
import io.temporal.api.history.v1.History;
1516
import io.temporal.api.history.v1.HistoryEvent;
17+
import io.temporal.api.history.v1.WorkflowTaskStartedEventAttributes;
1618
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
1719
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
1820
import io.temporal.api.workflowservice.v1.*;
@@ -40,6 +42,8 @@
4042
class TestWorkflowStoreImpl implements TestWorkflowStore {
4143

4244
private static final Logger log = LoggerFactory.getLogger(TestWorkflowStoreImpl.class);
45+
private static final long HISTORY_SIZE_SUGGEST_CONTINUE_AS_NEW = 4L * 1024 * 1024;
46+
private static final long HISTORY_COUNT_SUGGEST_CONTINUE_AS_NEW = 4L * 1024;
4347

4448
private final Lock lock = new ReentrantLock();
4549
private final Map<ExecutionId, HistoryStore> histories = new HashMap<>();
@@ -50,12 +54,33 @@ class TestWorkflowStoreImpl implements TestWorkflowStore {
5054
private final Map<TaskQueueId, TaskQueue<NexusTask>> nexusTaskQueues = new HashMap<>();
5155
private final SelfAdvancingTimer selfAdvancingTimer;
5256

57+
private static void populateWorkflowTaskStartedEventAttributes(
58+
WorkflowTaskStartedEventAttributes.Builder attributes,
59+
long historySizeBytes,
60+
long historyCount) {
61+
// Size excludes the started event; count is the started event id.
62+
attributes.setHistorySizeBytes(historySizeBytes);
63+
if (historySizeBytes >= HISTORY_SIZE_SUGGEST_CONTINUE_AS_NEW) {
64+
attributes.setSuggestContinueAsNew(true);
65+
attributes.addSuggestContinueAsNewReasons(
66+
SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE);
67+
}
68+
if (historyCount >= HISTORY_COUNT_SUGGEST_CONTINUE_AS_NEW) {
69+
attributes.setSuggestContinueAsNew(true);
70+
attributes.addSuggestContinueAsNewReasons(
71+
SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS);
72+
}
73+
}
74+
5375
private static class HistoryStore {
5476

5577
private final ExecutionId id;
5678
private final Lock lock;
5779
private final Condition newEventsCondition;
5880
private final List<HistoryEvent> history = new ArrayList<>();
81+
82+
private long historySizeBytes;
83+
5984
private boolean completed;
6085

6186
private HistoryStore(ExecutionId id, Lock lock) {
@@ -91,8 +116,17 @@ List<HistoryEvent> addAllLocked(List<HistoryEvent> events, Timestamp eventTime)
91116
if (Timestamps.toMillis(eBuilder.getEventTime()) == 0) {
92117
eBuilder.setEventTime(eventTime);
93118
}
94-
history.add(eBuilder.build());
95-
completed = completed || WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(eBuilder);
119+
if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED == eBuilder.getEventType()) {
120+
populateWorkflowTaskStartedEventAttributes(
121+
eBuilder.getWorkflowTaskStartedEventAttributesBuilder(),
122+
historySizeBytes,
123+
history.size() + 1L);
124+
}
125+
HistoryEvent historyEvent = eBuilder.build();
126+
history.add(historyEvent);
127+
historySizeBytes += historyEvent.getSerializedSize();
128+
completed =
129+
completed || WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(historyEvent);
96130
}
97131
newEventsCondition.signalAll();
98132
return history.subList(currentSize, history.size());
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package io.temporal.testserver.functional;
2+
3+
import static io.temporal.internal.common.InternalUtils.createNormalTaskQueue;
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertFalse;
6+
import static org.junit.Assert.assertTrue;
7+
8+
import com.google.protobuf.ByteString;
9+
import io.temporal.api.command.v1.Command;
10+
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
11+
import io.temporal.api.common.v1.Payload;
12+
import io.temporal.api.common.v1.Payloads;
13+
import io.temporal.api.enums.v1.CommandType;
14+
import io.temporal.api.enums.v1.EventType;
15+
import io.temporal.api.enums.v1.SuggestContinueAsNewReason;
16+
import io.temporal.api.history.v1.HistoryEvent;
17+
import io.temporal.api.history.v1.WorkflowTaskStartedEventAttributes;
18+
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
19+
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
20+
import io.temporal.serviceclient.WorkflowServiceStubs;
21+
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
22+
import io.temporal.testing.internal.TestServiceUtils;
23+
import io.temporal.testserver.TestServer;
24+
import java.util.concurrent.TimeUnit;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
public class WorkflowTaskStartedAttributesTest {
30+
31+
private static final long HISTORY_SIZE_SUGGEST_CONTINUE_AS_NEW = 4L * 1024 * 1024;
32+
private static final long HISTORY_COUNT_SUGGEST_CONTINUE_AS_NEW = 4L * 1024;
33+
private static final int MARKER_PAYLOAD_SIZE = 1024 * 1024;
34+
private static final int MARKER_ITERATIONS = 5;
35+
36+
private final String NAMESPACE = "namespace";
37+
private final String TASK_QUEUE = "taskQueue";
38+
private final String WORKFLOW_TYPE = "wfType";
39+
40+
private TestServer.InProcessTestServer testServer;
41+
private WorkflowServiceStubs workflowServiceStubs;
42+
43+
@Before
44+
public void setUp() {
45+
this.testServer = TestServer.createServer(true);
46+
this.workflowServiceStubs =
47+
WorkflowServiceStubs.newServiceStubs(
48+
WorkflowServiceStubsOptions.newBuilder()
49+
.setChannel(testServer.getChannel())
50+
.validateAndBuildWithDefaults());
51+
}
52+
53+
@After
54+
public void tearDown() {
55+
this.workflowServiceStubs.shutdownNow();
56+
this.workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
57+
this.testServer.close();
58+
}
59+
60+
@Test
61+
public void firstWorkflowTaskStartedReportsHistorySizeWithoutSuggestion() throws Exception {
62+
TestServiceUtils.startWorkflowExecution(
63+
NAMESPACE, TASK_QUEUE, WORKFLOW_TYPE, workflowServiceStubs);
64+
65+
PollWorkflowTaskQueueResponse response =
66+
TestServiceUtils.pollWorkflowTaskQueue(
67+
NAMESPACE, createNormalTaskQueue(TASK_QUEUE), workflowServiceStubs);
68+
69+
WorkflowTaskStartedEventAttributes startedAttributes = lastStartedAttributes(response);
70+
assertTrue(startedAttributes.getHistorySizeBytes() > 0);
71+
assertFalse(startedAttributes.getSuggestContinueAsNew());
72+
assertTrue(startedAttributes.getSuggestContinueAsNewReasonsList().isEmpty());
73+
}
74+
75+
@Test
76+
public void historySizeAboveThresholdSuggestsContinueAsNew() throws Exception {
77+
TestServiceUtils.startWorkflowExecution(
78+
NAMESPACE, TASK_QUEUE, WORKFLOW_TYPE, workflowServiceStubs);
79+
80+
PollWorkflowTaskQueueResponse response =
81+
TestServiceUtils.pollWorkflowTaskQueue(
82+
NAMESPACE, createNormalTaskQueue(TASK_QUEUE), workflowServiceStubs);
83+
84+
for (int i = 0; i < MARKER_ITERATIONS; i++) {
85+
workflowServiceStubs
86+
.blockingStub()
87+
.respondWorkflowTaskCompleted(
88+
RespondWorkflowTaskCompletedRequest.newBuilder()
89+
.setTaskToken(response.getTaskToken())
90+
.addCommands(newLargeMarkerCommand())
91+
.build());
92+
TestServiceUtils.signalWorkflow(
93+
response.getWorkflowExecution(), NAMESPACE, workflowServiceStubs);
94+
response =
95+
TestServiceUtils.pollWorkflowTaskQueue(
96+
NAMESPACE, createNormalTaskQueue(TASK_QUEUE), workflowServiceStubs);
97+
}
98+
99+
WorkflowTaskStartedEventAttributes startedAttributes = lastStartedAttributes(response);
100+
assertTrue(
101+
"Expected history >= 4 MiB but was " + startedAttributes.getHistorySizeBytes(),
102+
startedAttributes.getHistorySizeBytes() >= HISTORY_SIZE_SUGGEST_CONTINUE_AS_NEW);
103+
assertSuggestsContinueAsNew(
104+
startedAttributes,
105+
SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE);
106+
}
107+
108+
@Test
109+
public void historyCountAboveThresholdSuggestsContinueAsNew() throws Exception {
110+
TestServiceUtils.startWorkflowExecution(
111+
NAMESPACE, TASK_QUEUE, WORKFLOW_TYPE, workflowServiceStubs);
112+
113+
PollWorkflowTaskQueueResponse response =
114+
TestServiceUtils.pollWorkflowTaskQueue(
115+
NAMESPACE, createNormalTaskQueue(TASK_QUEUE), workflowServiceStubs);
116+
117+
RespondWorkflowTaskCompletedRequest.Builder completedRequest =
118+
RespondWorkflowTaskCompletedRequest.newBuilder().setTaskToken(response.getTaskToken());
119+
for (int i = 0; i < HISTORY_COUNT_SUGGEST_CONTINUE_AS_NEW; i++) {
120+
completedRequest.addCommands(newMarkerCommand());
121+
}
122+
workflowServiceStubs.blockingStub().respondWorkflowTaskCompleted(completedRequest.build());
123+
TestServiceUtils.signalWorkflow(
124+
response.getWorkflowExecution(), NAMESPACE, workflowServiceStubs);
125+
126+
response =
127+
TestServiceUtils.pollWorkflowTaskQueue(
128+
NAMESPACE, createNormalTaskQueue(TASK_QUEUE), workflowServiceStubs);
129+
130+
WorkflowTaskStartedEventAttributes startedAttributes = lastStartedAttributes(response);
131+
assertSuggestsContinueAsNew(
132+
startedAttributes,
133+
SuggestContinueAsNewReason.SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS);
134+
}
135+
136+
private static WorkflowTaskStartedEventAttributes lastStartedAttributes(
137+
PollWorkflowTaskQueueResponse response) {
138+
HistoryEvent last = response.getHistory().getEvents(response.getHistory().getEventsCount() - 1);
139+
assertEquals(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED, last.getEventType());
140+
return last.getWorkflowTaskStartedEventAttributes();
141+
}
142+
143+
private static void assertSuggestsContinueAsNew(
144+
WorkflowTaskStartedEventAttributes startedAttributes, SuggestContinueAsNewReason reason) {
145+
assertTrue(startedAttributes.getSuggestContinueAsNew());
146+
assertTrue(startedAttributes.getSuggestContinueAsNewReasonsList().contains(reason));
147+
}
148+
149+
private static Command newLargeMarkerCommand() {
150+
ByteString markerData = ByteString.copyFrom(new byte[MARKER_PAYLOAD_SIZE]);
151+
Payloads markerPayloads =
152+
Payloads.newBuilder().addPayloads(Payload.newBuilder().setData(markerData)).build();
153+
return newMarkerCommand(markerPayloads);
154+
}
155+
156+
private static Command newMarkerCommand() {
157+
return newMarkerCommand(Payloads.getDefaultInstance());
158+
}
159+
160+
private static Command newMarkerCommand(Payloads markerPayloads) {
161+
return Command.newBuilder()
162+
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
163+
.setRecordMarkerCommandAttributes(
164+
RecordMarkerCommandAttributes.newBuilder()
165+
.setMarkerName("large-history-marker")
166+
.putDetails("payload", markerPayloads))
167+
.build();
168+
}
169+
}

0 commit comments

Comments
 (0)