Skip to content

Commit 4afe743

Browse files
authored
runtime-v1: option for event batching for runner events (#950)
1 parent 745f2bf commit 4afe743

File tree

8 files changed

+480
-12
lines changed

8 files changed

+480
-12
lines changed

it/runtime-v1/src/test/java/com/walmartlabs/concord/it/runtime/v1/ProcessIT.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import com.google.common.io.Resources;
2828
import com.walmartlabs.concord.client2.FormListEntry;
2929
import com.walmartlabs.concord.client2.FormSubmitResponse;
30+
import com.walmartlabs.concord.client2.ProcessApi;
3031
import com.walmartlabs.concord.client2.ProcessEntry;
3132
import com.walmartlabs.concord.client2.ProcessEntry.StatusEnum;
33+
import com.walmartlabs.concord.client2.ProcessEventEntry;
34+
import com.walmartlabs.concord.client2.ProcessEventsApi;
3235
import com.walmartlabs.concord.client2.ProcessListFilter;
3336
import com.walmartlabs.concord.it.common.JGitUtils;
3437
import com.walmartlabs.concord.sdk.Constants;
@@ -590,6 +593,71 @@ public void testTaskWithClient1() throws Exception {
590593
proc.assertLog(".*Works!.*");
591594
}
592595

596+
/**
597+
* Tests process event batch flushing when a long-running task executes.
598+
*/
599+
@Test
600+
public void testEventBatchingShortTimer() throws Exception {
601+
Payload payload = new Payload()
602+
.activeProfiles("shortFlush")
603+
.archive(resource("eventBatchingTimer"));
604+
605+
ConcordProcess proc = concord.processes().start(payload);
606+
ProcessEntry pe = proc.expectStatus(StatusEnum.RUNNING);
607+
608+
// let it run at least long enough to report an event batch (1-second interval)
609+
Thread.sleep(1_500);
610+
611+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
612+
613+
// At this point the process is still executing the sleep task.
614+
// We set a 1-second batch duration, so we can not expect a batch to have
615+
// been reported even though the max batch size (100) was not met.
616+
617+
// ---
618+
List<ProcessEventEntry> events = processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
619+
620+
// clean up
621+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
622+
623+
// ---
624+
assertNotNull(events);
625+
assertFalse(events.isEmpty());
626+
assertEquals(1, events.size());
627+
628+
ProcessEventEntry sleepEvent = events.get(0);
629+
630+
assertEquals("sleep", sleepEvent.getData().get("name"));
631+
}
632+
633+
/**
634+
* Demonstrates what happens if process event batching flush timer is too long,
635+
* or effectively doesn't exist.
636+
*/
637+
@Test
638+
public void testEventBatchingLongTimer() throws Exception {
639+
Payload payload = new Payload()
640+
.activeProfiles("longFlush")
641+
.archive(resource("eventBatchingTimer"));
642+
643+
ConcordProcess proc = concord.processes().start(payload);
644+
ProcessEntry pe = proc.expectStatus(StatusEnum.RUNNING);
645+
646+
// let it run long enough to prove events aren't going to update any time soon
647+
Thread.sleep(1_500);
648+
649+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
650+
651+
// ---
652+
List<ProcessEventEntry> events = processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
653+
assertNotNull(events);
654+
// No events because batch is still waiting to get large enough to report
655+
assertTrue(events.isEmpty());
656+
657+
// clean up
658+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
659+
}
660+
593661
@SuppressWarnings("unchecked")
594662
private static void assertProcessErrorMessage(ProcessEntry p, String expected) {
595663
assertNotNull(p);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
profiles:
2+
shortFlush:
3+
configuration:
4+
runner:
5+
events:
6+
batchSize: 100
7+
batchFlushInterval: 1
8+
9+
longFlush:
10+
configuration:
11+
runner:
12+
events:
13+
batchSize: 100
14+
batchFlushInterval: 120
15+
16+
flows:
17+
default:
18+
- task: sleep
19+
in:
20+
duration: 120

runtime/v1/impl/src/main/java/com/walmartlabs/concord/runner/engine/DefaultElementEventProcessor.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
* =====
2121
*/
2222

23-
import com.walmartlabs.concord.client2.ApiClientConfiguration;
24-
import com.walmartlabs.concord.client2.ApiClientFactory;
2523
import com.walmartlabs.concord.client2.ProcessEventRequest;
26-
import com.walmartlabs.concord.client2.ProcessEventsApi;
2724
import io.takari.bpm.ProcessDefinitionProvider;
2825
import io.takari.bpm.ProcessDefinitionUtils;
2926
import io.takari.bpm.api.ExecutionException;
@@ -45,12 +42,13 @@ public class DefaultElementEventProcessor implements ElementEventProcessor {
4542

4643
private static final Logger log = LoggerFactory.getLogger(DefaultElementEventProcessor.class);
4744

48-
private final ApiClientFactory apiClientFactory;
4945
private final ProcessDefinitionProvider processDefinitionProvider;
46+
private final EventReportingService eventReportingService;
5047

51-
public DefaultElementEventProcessor(ApiClientFactory apiClientFactory, ProcessDefinitionProvider processDefinitionProvider) {
52-
this.apiClientFactory = apiClientFactory;
48+
public DefaultElementEventProcessor(ProcessDefinitionProvider processDefinitionProvider,
49+
EventReportingService eventReportingService) {
5350
this.processDefinitionProvider = processDefinitionProvider;
51+
this.eventReportingService = eventReportingService;
5452
}
5553

5654
@Override
@@ -96,11 +94,7 @@ public void process(ElementEvent event, EventParamsBuilder builder, Predicate<Ab
9694
req.setData(e);
9795
req.setEventDate(Instant.now().atOffset(ZoneOffset.UTC));
9896

99-
ProcessEventsApi client = new ProcessEventsApi(apiClientFactory.create(
100-
ApiClientConfiguration.builder()
101-
.sessionToken(event.getSessionToken())
102-
.build()));
103-
client.event(UUID.fromString(event.getInstanceId()), req);
97+
eventReportingService.report(req, UUID.fromString(event.getInstanceId()), event.getSessionToken());
10498
} catch (Exception e) {
10599
log.warn("process ['{}'] -> transfer error: {}", event.getInstanceId(), e.getMessage());
106100
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package com.walmartlabs.concord.runner.engine;
2+
3+
/*-
4+
* *****
5+
* Concord
6+
* -----
7+
* Copyright (C) 2017 - 2024 Walmart Inc.
8+
* -----
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* =====
21+
*/
22+
23+
import com.walmartlabs.concord.client2.ApiClientConfiguration;
24+
import com.walmartlabs.concord.client2.ApiClientFactory;
25+
import com.walmartlabs.concord.client2.ApiException;
26+
import com.walmartlabs.concord.client2.ProcessEventRequest;
27+
import com.walmartlabs.concord.client2.ProcessEventsApi;
28+
import io.takari.bpm.state.ProcessInstance;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.ArrayDeque;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.Queue;
39+
import java.util.TimerTask;
40+
import java.util.UUID;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.ScheduledExecutorService;
43+
import java.util.concurrent.TimeUnit;
44+
45+
public class DefaultEventReportingService implements EventReportingService {
46+
47+
private static final Logger log = LoggerFactory.getLogger(DefaultEventReportingService.class);
48+
49+
private final ApiClientFactory apiClientFactory;
50+
private final int maxBatchSize;
51+
private final Object batchLock = new Object();
52+
private final ScheduledExecutorService flushScheduler;
53+
private final HashMap<ReportingContext, Queue<ProcessEventRequest>> eventQueues;
54+
55+
public DefaultEventReportingService(ApiClientFactory apiClientFactory,
56+
int batchSize,
57+
int batchFlushInterval) {
58+
this.apiClientFactory = apiClientFactory;
59+
this.maxBatchSize = batchSize;
60+
this.eventQueues = new HashMap<>(1);
61+
this.flushScheduler = Executors.newSingleThreadScheduledExecutor();
62+
63+
flushScheduler.scheduleAtFixedRate(new FlushTimer(this), batchFlushInterval, batchFlushInterval, TimeUnit.SECONDS);
64+
}
65+
66+
@Override
67+
public ProcessInstance onFinalize(ProcessInstance state) {
68+
flushScheduler.shutdown();
69+
flush();
70+
return state;
71+
}
72+
73+
private class ReportingBatch {
74+
private final ReportingContext reportingContext;
75+
private final List<ProcessEventRequest> events;
76+
77+
public ReportingBatch(ReportingContext reportingContext, List<ProcessEventRequest> events) {
78+
this.reportingContext = reportingContext;
79+
this.events = events;
80+
}
81+
82+
public void send() {
83+
try {
84+
getProcessEventsApi(reportingContext.sessionToken).batchEvent(reportingContext.instanceId, events);
85+
} catch (ApiException e) {
86+
log.warn("Error while sending batch of {} event{} to the server: {}",
87+
events.size(), events.isEmpty() ? "" : "s", e.getMessage());
88+
}
89+
}
90+
}
91+
92+
ProcessEventsApi getProcessEventsApi(String sessionToken) {
93+
return new ProcessEventsApi(apiClientFactory.create(
94+
ApiClientConfiguration.builder()
95+
.sessionToken(sessionToken)
96+
.build()));
97+
}
98+
99+
/**
100+
* Event source context. Basically a combo of instance ID and session token.
101+
* Intended to be used as a key for a map of context -> queue of events
102+
*/
103+
static class ReportingContext {
104+
private final UUID instanceId;
105+
private final String sessionToken;
106+
107+
public ReportingContext(UUID instanceId, String sessionToken) {
108+
this.instanceId = instanceId;
109+
this.sessionToken = sessionToken;
110+
}
111+
112+
@Override
113+
public boolean equals(Object o) {
114+
if (this == o) return true;
115+
if (o == null || getClass() != o.getClass()) return false;
116+
117+
ReportingContext that = (ReportingContext) o;
118+
119+
if (!instanceId.equals(that.instanceId)) return false;
120+
return sessionToken.equals(that.sessionToken);
121+
}
122+
123+
@Override
124+
public int hashCode() {
125+
int result = instanceId.hashCode();
126+
result = 31 * result + sessionToken.hashCode();
127+
return result;
128+
}
129+
}
130+
131+
@Override
132+
public void report(ProcessEventRequest req, UUID instanceId, String sessionToken) {
133+
if (maxBatchSize > 1) {
134+
batch(req, instanceId, sessionToken);
135+
} else {
136+
sendSingle(req, instanceId, sessionToken);
137+
}
138+
}
139+
140+
void batch(ProcessEventRequest req, UUID instanceId, String sessionToken) {
141+
Queue<ProcessEventRequest> queue;
142+
143+
synchronized (batchLock) {
144+
queue = eventQueues.computeIfAbsent(new ReportingContext(instanceId, sessionToken), ctx -> new ArrayDeque<>(maxBatchSize));
145+
queue.add(req);
146+
}
147+
148+
if (queue.size() >= maxBatchSize) {
149+
flush();
150+
}
151+
}
152+
153+
void sendSingle(ProcessEventRequest req, UUID instanceId, String sessionToken) {
154+
try {
155+
ProcessEventsApi client = getProcessEventsApi(sessionToken);
156+
client.event(instanceId, req);
157+
} catch (ApiException e) {
158+
log.warn("error while sending an event to the server: {}", e.getMessage());
159+
}
160+
}
161+
162+
/**
163+
* Flushes all queued events across all reporting contexts.
164+
*/
165+
void flush() {
166+
ReportingBatch eventBatch = takeBatch();
167+
168+
while (eventBatch != null && !eventBatch.events.isEmpty()) {
169+
eventBatch.send();
170+
eventBatch = takeBatch();
171+
}
172+
}
173+
174+
/**
175+
* @return batch of up-to {@link #maxBatchSize} queued process events
176+
*/
177+
private ReportingBatch takeBatch() {
178+
ReportingContext ctx;
179+
List<ProcessEventRequest> batch = new ArrayList<>(maxBatchSize);
180+
181+
synchronized (batchLock) {
182+
if (eventQueues.isEmpty()) {
183+
return null; // nothing to report
184+
}
185+
186+
Optional<ReportingContext> optionalCtx = eventQueues.entrySet().stream()
187+
.findFirst()
188+
.map(Map.Entry::getKey);
189+
190+
if (optionalCtx.isEmpty()) {
191+
// that's odd, should've been cleaned up already
192+
eventQueues.clear();
193+
return null;
194+
}
195+
196+
ctx = optionalCtx.get();
197+
198+
Queue<ProcessEventRequest> eventQueue = eventQueues.get(ctx);
199+
200+
for (int i = 0; i < maxBatchSize; i++) {
201+
if (eventQueue.isEmpty()) {
202+
eventQueues.remove(ctx);
203+
break;
204+
}
205+
206+
batch.add(eventQueue.poll());
207+
}
208+
}
209+
210+
return new ReportingBatch(ctx, batch);
211+
}
212+
213+
private static class FlushTimer extends TimerTask {
214+
private final DefaultEventReportingService reportingService;
215+
216+
public FlushTimer(DefaultEventReportingService reportingService) {
217+
this.reportingService = reportingService;
218+
}
219+
220+
@Override
221+
public void run() {
222+
reportingService.flush();
223+
}
224+
}
225+
226+
}

runtime/v1/impl/src/main/java/com/walmartlabs/concord/runner/engine/EngineFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ protected ELResolver createResolver(ExecutionContext ctx) {
139139
cfg.setWrapAllExceptionsAsBpmnErrors(true);
140140
cfg.setCopyAllCallActivityOutVariables(true);
141141

142+
EventReportingService evs = new DefaultEventReportingService(apiClientFactory, eventCfg.getBatchSize(), eventCfg.getBatchFlushInterval());
142143
ElementEventProcessor eventProcessor;
143144
if (eventCfg.isRecordEvents()) {
144-
eventProcessor = new DefaultElementEventProcessor(apiClientFactory, adapter.processes());
145+
eventProcessor = new DefaultElementEventProcessor(adapter.processes(), evs);
145146
} else {
146147
eventProcessor = new NopElementEventProcessor();
147148
}
@@ -165,6 +166,7 @@ protected ELResolver createResolver(ExecutionContext ctx) {
165166
.withConfiguration(cfg)
166167
.withListener(new ProcessOutVariablesListener(attachmentsDir, outVariables))
167168
.withListener(new VariablesSnapshotListener(stateDir))
169+
.withListener(evs)
168170
.withResourceResolver(new ResourceResolverImpl(baseDir))
169171
.build();
170172

0 commit comments

Comments
 (0)