Skip to content

Commit 745f2bf

Browse files
authored
runtime-v2: option for event batching for runner events (#949)
1 parent 7a7eaf0 commit 745f2bf

File tree

14 files changed

+631
-41
lines changed

14 files changed

+631
-41
lines changed

it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,4 +604,88 @@ public void metaAfterSuspend() throws Exception {
604604
Object myMetaValue = pe.getMeta().get("myMetaVar");
605605
assertEquals("myMetaVarValue", myMetaValue);
606606
}
607+
608+
/**
609+
* Tests process event batch flushing when a long-running task executes.
610+
*/
611+
@Test
612+
public void testEventBatchingShortTimer() throws Exception {
613+
Payload payload = new Payload()
614+
.activeProfiles("shortFlush")
615+
.archive(resource("eventBatchingTimer"));
616+
617+
ConcordProcess proc = concord.processes().start(payload);
618+
ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.RUNNING);
619+
620+
// let it run at least long enough to report an event batch (1-second interval)
621+
Thread.sleep(1_500);
622+
623+
// At this point the process is still executing the sleep task.
624+
// We set a 1-second batch duration, so we can not expect a batch to have
625+
// been reported even though the max batch size (100) was not met.
626+
627+
// ---
628+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
629+
630+
// clean up
631+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
632+
633+
// ---
634+
assertNotNull(events);
635+
assertFalse(events.isEmpty());
636+
assertEquals(1, events.size());
637+
638+
ProcessEventEntry sleepEvent = events.get(0);
639+
640+
assertEquals("sleep", sleepEvent.getData().get("name"));
641+
}
642+
643+
/**
644+
* Demonstrates what happens if process event batching flush timer is too long,
645+
* or effectively doesn't exist.
646+
*/
647+
@Test
648+
public void testEventBatchingLongTimer() throws Exception {
649+
Payload payload = new Payload()
650+
.activeProfiles("longFlush")
651+
.archive(resource("eventBatchingTimer"));
652+
653+
ConcordProcess proc = concord.processes().start(payload);
654+
ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.RUNNING);
655+
656+
// let it run long enough to prove events aren't going to update any time soon
657+
Thread.sleep(1_500);
658+
659+
// ---
660+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
661+
assertNotNull(events);
662+
// No events because batch is still waiting to get large enough to report
663+
assertTrue(events.isEmpty());
664+
665+
// clean up
666+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
667+
}
668+
669+
/**
670+
* Executes a flow that will over-fill process event queue if not properly synchronized
671+
*/
672+
@Test
673+
public void testEventBatchingParallel() throws Exception {
674+
Payload payload = new Payload()
675+
.archive(resource("eventBatchingParallel"));
676+
677+
ConcordProcess proc = concord.processes().start(payload);
678+
expectStatus(proc, ProcessEntry.StatusEnum.FINISHED);
679+
680+
// ---
681+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
682+
assertNotNull(events);
683+
assertFalse(events.isEmpty());
684+
}
685+
686+
private List<ProcessEventEntry> getProcessElementEvents(ConcordProcess proc) throws Exception {
687+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
688+
return processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
689+
}
690+
607691
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
configuration:
2+
runtime: concord-v2
3+
events:
4+
batchSize: 5
5+
6+
flows:
7+
default:
8+
- parallel:
9+
- call: doALoop
10+
- call: doALoop
11+
- call: doALoop
12+
- call: doALoop
13+
- call: doALoop
14+
- call: doALoop
15+
- call: doALoop
16+
- call: doALoop
17+
- call: doALoop
18+
- call: doALoop
19+
20+
doALoop:
21+
- call: logSomething
22+
in:
23+
something: "${item}"
24+
loop:
25+
items:
26+
- a
27+
- b
28+
- c
29+
30+
logSomething:
31+
- log: "hello ${something}"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
configuration:
2+
runtime: concord-v2
3+
4+
profiles:
5+
shortFlush:
6+
configuration:
7+
events:
8+
batchSize: 100
9+
batchFlushInterval: 1
10+
11+
longFlush:
12+
configuration:
13+
events:
14+
batchSize: 100
15+
batchFlushInterval: 120
16+
17+
flows:
18+
default:
19+
- task: sleep
20+
in:
21+
duration: 120

runtime/v2/model/src/main/java/com/walmartlabs/concord/runtime/v2/model/EventConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,26 @@ default boolean recordEvents() {
5757
return true;
5858
}
5959

60+
/**
61+
* Maximum number of process events to report per batch.
62+
*/
63+
@Value.Default
64+
default int batchSize() {
65+
return 1;
66+
}
67+
68+
/**
69+
* Interval, in seconds after which any queued process events will be reported.
70+
* <p>
71+
* Typically, batched events are reported on process termination or when
72+
* the queued number equals {@link #batchSize()}. A long-running task call
73+
* holds up event recording if a scheduled flush is not performed.
74+
*/
75+
@Value.Default
76+
default int batchFlushInterval() {
77+
return 15;
78+
}
79+
6080
/**
6181
* Enable/disable recording of IN variables in task calls.
6282
*/

runtime/v2/model/src/main/java/com/walmartlabs/concord/runtime/v2/parser/ConfigurationGrammar.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public final class ConfigurationGrammar {
4848
betweenTokens(JsonToken.START_OBJECT, JsonToken.END_OBJECT,
4949
with(ImmutableEventConfiguration::builder,
5050
o -> options(
51+
optional("batchFlushInterval", intVal.map(o::batchFlushInterval)),
52+
optional("batchSize", intVal.map(o::batchSize)),
5153
optional("recordEvents", booleanVal.map(o::recordEvents)),
5254
optional("recordTaskInVars", booleanVal.map(o::recordTaskInVars)),
5355
optional("truncateInVars", booleanVal.map(o::truncateInVars)),

runtime/v2/model/src/test/java/com/walmartlabs/concord/project/runtime/v2/parser/YamlErrorParserTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2044,7 +2044,7 @@ public void test1317() throws Exception {
20442044
@Test
20452045
public void test1318() throws Exception {
20462046
String msg =
2047-
"(018.yml): Error @ line: 26, col: 11. Unknown options: ['trash' [NULL] @ line: 26, col: 11], expected: [inVarsBlacklist, metaBlacklist, outVarsBlacklist, recordEvents, recordTaskInVars, recordTaskMeta, recordTaskOutVars, truncateInVars, truncateMaxArrayLength, truncateMaxDepth, truncateMaxStringLength, truncateMeta, truncateOutVars, updateMetaOnAllEvents]. Remove invalid options and/or fix indentation\n" +
2047+
"(018.yml): Error @ line: 26, col: 11. Unknown options: ['trash' [NULL] @ line: 26, col: 11], expected: [batchFlushInterval, batchSize, inVarsBlacklist, metaBlacklist, outVarsBlacklist, recordEvents, recordTaskInVars, recordTaskMeta, recordTaskOutVars, truncateInVars, truncateMaxArrayLength, truncateMaxDepth, truncateMaxStringLength, truncateMeta, truncateOutVars, updateMetaOnAllEvents]. Remove invalid options and/or fix indentation\n" +
20482048
"\twhile processing steps:\n" +
20492049
"\t'events' @ line: 14, col: 3\n" +
20502050
"\t\t'configuration' @ line: 1, col: 1";

runtime/v2/model/src/test/resources/serializer/processDefinition.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ configuration:
44
entryPoint: "default"
55
events:
66
recordEvents: true
7+
batchSize: 1
8+
batchFlushInterval: 15
79
recordTaskInVars: false
810
truncateInVars: true
911
truncateMaxStringLength: 1024
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package com.walmartlabs.concord.runtime.v2.runner;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2024 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
import com.walmartlabs.concord.client2.ApiClient;
24+
import com.walmartlabs.concord.client2.ApiException;
25+
import com.walmartlabs.concord.client2.ProcessEventRequest;
26+
import com.walmartlabs.concord.client2.ProcessEventsApi;
27+
import com.walmartlabs.concord.runtime.common.injector.InstanceId;
28+
import com.walmartlabs.concord.runtime.v2.sdk.ProcessConfiguration;
29+
import com.walmartlabs.concord.svm.Frame;
30+
import com.walmartlabs.concord.svm.Runtime;
31+
import com.walmartlabs.concord.svm.State;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import javax.inject.Inject;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
import java.util.TimerTask;
39+
import java.util.concurrent.BlockingQueue;
40+
import java.util.concurrent.Executors;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.ScheduledExecutorService;
43+
import java.util.concurrent.TimeUnit;
44+
45+
public class DefaultEventReportingService implements EventReportingService {
46+
47+
private static final Logger log = LoggerFactory.getLogger(DefaultEventReportingService.class);
48+
49+
private final InstanceId instanceId;
50+
private final ProcessEventsApi processEventsApi;
51+
private final BlockingQueue<ProcessEventRequest> eventQueue;
52+
private final int maxBatchSize;
53+
private final Object batchLock = new Object();
54+
private final ScheduledExecutorService flushScheduler;
55+
56+
@Inject
57+
public DefaultEventReportingService(InstanceId instanceId,
58+
ProcessConfiguration processConfiguration,
59+
ApiClient apiClient) {
60+
this.instanceId = instanceId;
61+
this.processEventsApi = new ProcessEventsApi(apiClient);
62+
this.maxBatchSize = processConfiguration.events().batchSize();
63+
this.eventQueue = initializeQueue(maxBatchSize);
64+
this.flushScheduler = Executors.newSingleThreadScheduledExecutor();
65+
66+
int period = processConfiguration.events().batchFlushInterval();
67+
flushScheduler.scheduleAtFixedRate(new FlushTimer(this), period, period, TimeUnit.SECONDS);
68+
}
69+
70+
private static BlockingQueue<ProcessEventRequest> initializeQueue(int maxBatchSize) {
71+
if (maxBatchSize <= 0) {
72+
throw new IllegalArgumentException("Invalid event batch size '" + maxBatchSize + "'. Must be greater than zero.");
73+
}
74+
75+
return new LinkedBlockingQueue<>();
76+
}
77+
78+
@Override
79+
public synchronized void report(ProcessEventRequest req) {
80+
if (req == null) {
81+
return;
82+
}
83+
84+
// avoid modification while a flush may be occurring from another thread
85+
synchronized (batchLock) {
86+
eventQueue.add(req);
87+
}
88+
89+
// Don't allow batch to grow larger than max batch size
90+
// This is why the method is synchronized. If not, another thread may add
91+
// one-too-many items to the batch before this flush finishes.
92+
if (eventQueue.size() >= maxBatchSize) {
93+
flush();
94+
}
95+
}
96+
97+
@Override
98+
public void afterProcessEnds(Runtime runtime, State state, Frame lastFrame) {
99+
flushScheduler.shutdown();
100+
flush();
101+
}
102+
103+
ProcessEventsApi getProcessEventsApi() {
104+
return processEventsApi;
105+
}
106+
107+
synchronized void flush() {
108+
List<ProcessEventRequest> eventBatch = takeBatch();
109+
110+
while (!eventBatch.isEmpty()) {
111+
send(eventBatch);
112+
eventBatch = takeBatch();
113+
}
114+
}
115+
116+
private void send(List<ProcessEventRequest> eventBatch) {
117+
if (eventBatch.size() == 1) {
118+
sendSingle(eventBatch.get(0));
119+
} else {
120+
sendBatch(eventBatch);
121+
}
122+
}
123+
124+
private void sendBatch(List<ProcessEventRequest> eventBatch) {
125+
try {
126+
getProcessEventsApi().batchEvent(instanceId.getValue(), eventBatch);
127+
} catch (ApiException e) {
128+
log.warn("Error while sending batch of {} event{} to the server: {}",
129+
eventBatch.size(), eventBatch.isEmpty() ? "" : "s", e.getMessage());
130+
}
131+
}
132+
133+
private void sendSingle(ProcessEventRequest req) {
134+
try {
135+
getProcessEventsApi().event(instanceId.getValue(), req);
136+
} catch (ApiException e) {
137+
log.warn("error while sending an event to the server: {}", e.getMessage());
138+
}
139+
}
140+
141+
/**
142+
* @return batch of up-to {@link #maxBatchSize} queued process events
143+
*/
144+
private List<ProcessEventRequest> takeBatch() {
145+
List<ProcessEventRequest> batch = new ArrayList<>(maxBatchSize);
146+
147+
synchronized (batchLock) { // avoid draining while an element may be added
148+
eventQueue.drainTo(batch, maxBatchSize);
149+
}
150+
151+
return batch;
152+
}
153+
154+
private static class FlushTimer extends TimerTask {
155+
private final DefaultEventReportingService reportingService;
156+
157+
public FlushTimer(DefaultEventReportingService reportingService) {
158+
this.reportingService = reportingService;
159+
}
160+
161+
@Override
162+
public void run() {
163+
reportingService.flush();
164+
}
165+
}
166+
167+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.walmartlabs.concord.runtime.v2.runner;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2024 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
import com.walmartlabs.concord.client2.ProcessEventRequest;
24+
import com.walmartlabs.concord.svm.ExecutionListener;
25+
26+
public interface EventReportingService extends ExecutionListener {
27+
28+
/**
29+
* Report a process event to the server.
30+
*/
31+
void report(ProcessEventRequest req);
32+
33+
}

0 commit comments

Comments
 (0)