Skip to content

Commit 7175e23

Browse files
authored
Merge branch 'master' into ib/tidy-up-more
2 parents a06601c + 4afe743 commit 7175e23

File tree

22 files changed

+1111
-53
lines changed

22 files changed

+1111
-53
lines changed

it/runtime-v1/src/test/java/com/walmartlabs/concord/it/runtime/v1/ProcessIT.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import com.google.common.io.Resources;
2828
import com.walmartlabs.concord.client2.FormListEntry;
2929
import com.walmartlabs.concord.client2.FormSubmitResponse;
30+
import com.walmartlabs.concord.client2.ProcessApi;
3031
import com.walmartlabs.concord.client2.ProcessEntry;
3132
import com.walmartlabs.concord.client2.ProcessEntry.StatusEnum;
33+
import com.walmartlabs.concord.client2.ProcessEventEntry;
34+
import com.walmartlabs.concord.client2.ProcessEventsApi;
3235
import com.walmartlabs.concord.client2.ProcessListFilter;
3336
import com.walmartlabs.concord.it.common.JGitUtils;
3437
import com.walmartlabs.concord.sdk.Constants;
@@ -590,6 +593,71 @@ public void testTaskWithClient1() throws Exception {
590593
proc.assertLog(".*Works!.*");
591594
}
592595

596+
/**
597+
* Tests process event batch flushing when a long-running task executes.
598+
*/
599+
@Test
600+
public void testEventBatchingShortTimer() throws Exception {
601+
Payload payload = new Payload()
602+
.activeProfiles("shortFlush")
603+
.archive(resource("eventBatchingTimer"));
604+
605+
ConcordProcess proc = concord.processes().start(payload);
606+
ProcessEntry pe = proc.expectStatus(StatusEnum.RUNNING);
607+
608+
// let it run at least long enough to report an event batch (1-second interval)
609+
Thread.sleep(1_500);
610+
611+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
612+
613+
// At this point the process is still executing the sleep task.
614+
// We set a 1-second batch duration, so we can not expect a batch to have
615+
// been reported even though the max batch size (100) was not met.
616+
617+
// ---
618+
List<ProcessEventEntry> events = processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
619+
620+
// clean up
621+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
622+
623+
// ---
624+
assertNotNull(events);
625+
assertFalse(events.isEmpty());
626+
assertEquals(1, events.size());
627+
628+
ProcessEventEntry sleepEvent = events.get(0);
629+
630+
assertEquals("sleep", sleepEvent.getData().get("name"));
631+
}
632+
633+
/**
634+
* Demonstrates what happens if process event batching flush timer is too long,
635+
* or effectively doesn't exist.
636+
*/
637+
@Test
638+
public void testEventBatchingLongTimer() throws Exception {
639+
Payload payload = new Payload()
640+
.activeProfiles("longFlush")
641+
.archive(resource("eventBatchingTimer"));
642+
643+
ConcordProcess proc = concord.processes().start(payload);
644+
ProcessEntry pe = proc.expectStatus(StatusEnum.RUNNING);
645+
646+
// let it run long enough to prove events aren't going to update any time soon
647+
Thread.sleep(1_500);
648+
649+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
650+
651+
// ---
652+
List<ProcessEventEntry> events = processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
653+
assertNotNull(events);
654+
// No events because batch is still waiting to get large enough to report
655+
assertTrue(events.isEmpty());
656+
657+
// clean up
658+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
659+
}
660+
593661
@SuppressWarnings("unchecked")
594662
private static void assertProcessErrorMessage(ProcessEntry p, String expected) {
595663
assertNotNull(p);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
profiles:
2+
shortFlush:
3+
configuration:
4+
runner:
5+
events:
6+
batchSize: 100
7+
batchFlushInterval: 1
8+
9+
longFlush:
10+
configuration:
11+
runner:
12+
events:
13+
batchSize: 100
14+
batchFlushInterval: 120
15+
16+
flows:
17+
default:
18+
- task: sleep
19+
in:
20+
duration: 120

it/runtime-v2/src/test/java/com/walmartlabs/concord/it/runtime/v2/ProcessIT.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,4 +604,88 @@ public void metaAfterSuspend() throws Exception {
604604
Object myMetaValue = pe.getMeta().get("myMetaVar");
605605
assertEquals("myMetaVarValue", myMetaValue);
606606
}
607+
608+
/**
609+
* Tests process event batch flushing when a long-running task executes.
610+
*/
611+
@Test
612+
public void testEventBatchingShortTimer() throws Exception {
613+
Payload payload = new Payload()
614+
.activeProfiles("shortFlush")
615+
.archive(resource("eventBatchingTimer"));
616+
617+
ConcordProcess proc = concord.processes().start(payload);
618+
ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.RUNNING);
619+
620+
// let it run at least long enough to report an event batch (1-second interval)
621+
Thread.sleep(1_500);
622+
623+
// At this point the process is still executing the sleep task.
624+
// We set a 1-second batch duration, so we can not expect a batch to have
625+
// been reported even though the max batch size (100) was not met.
626+
627+
// ---
628+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
629+
630+
// clean up
631+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
632+
633+
// ---
634+
assertNotNull(events);
635+
assertFalse(events.isEmpty());
636+
assertEquals(1, events.size());
637+
638+
ProcessEventEntry sleepEvent = events.get(0);
639+
640+
assertEquals("sleep", sleepEvent.getData().get("name"));
641+
}
642+
643+
/**
644+
* Demonstrates what happens if process event batching flush timer is too long,
645+
* or effectively doesn't exist.
646+
*/
647+
@Test
648+
public void testEventBatchingLongTimer() throws Exception {
649+
Payload payload = new Payload()
650+
.activeProfiles("longFlush")
651+
.archive(resource("eventBatchingTimer"));
652+
653+
ConcordProcess proc = concord.processes().start(payload);
654+
ProcessEntry pe = expectStatus(proc, ProcessEntry.StatusEnum.RUNNING);
655+
656+
// let it run long enough to prove events aren't going to update any time soon
657+
Thread.sleep(1_500);
658+
659+
// ---
660+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
661+
assertNotNull(events);
662+
// No events because batch is still waiting to get large enough to report
663+
assertTrue(events.isEmpty());
664+
665+
// clean up
666+
new ProcessApi(concord.apiClient()).kill(pe.getInstanceId());
667+
}
668+
669+
/**
670+
* Executes a flow that will over-fill process event queue if not properly synchronized
671+
*/
672+
@Test
673+
public void testEventBatchingParallel() throws Exception {
674+
Payload payload = new Payload()
675+
.archive(resource("eventBatchingParallel"));
676+
677+
ConcordProcess proc = concord.processes().start(payload);
678+
expectStatus(proc, ProcessEntry.StatusEnum.FINISHED);
679+
680+
// ---
681+
List<ProcessEventEntry> events = getProcessElementEvents(proc);
682+
assertNotNull(events);
683+
assertFalse(events.isEmpty());
684+
}
685+
686+
private List<ProcessEventEntry> getProcessElementEvents(ConcordProcess proc) throws Exception {
687+
ProcessEventsApi processEventsApi = new ProcessEventsApi(concord.apiClient());
688+
return processEventsApi.listProcessEvents(proc.instanceId(), "ELEMENT", null, null, null, "pre", null, null);
689+
}
690+
607691
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
configuration:
2+
runtime: concord-v2
3+
events:
4+
batchSize: 5
5+
6+
flows:
7+
default:
8+
- parallel:
9+
- call: doALoop
10+
- call: doALoop
11+
- call: doALoop
12+
- call: doALoop
13+
- call: doALoop
14+
- call: doALoop
15+
- call: doALoop
16+
- call: doALoop
17+
- call: doALoop
18+
- call: doALoop
19+
20+
doALoop:
21+
- call: logSomething
22+
in:
23+
something: "${item}"
24+
loop:
25+
items:
26+
- a
27+
- b
28+
- c
29+
30+
logSomething:
31+
- log: "hello ${something}"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
configuration:
2+
runtime: concord-v2
3+
4+
profiles:
5+
shortFlush:
6+
configuration:
7+
events:
8+
batchSize: 100
9+
batchFlushInterval: 1
10+
11+
longFlush:
12+
configuration:
13+
events:
14+
batchSize: 100
15+
batchFlushInterval: 120
16+
17+
flows:
18+
default:
19+
- task: sleep
20+
in:
21+
duration: 120

runtime/v1/impl/src/main/java/com/walmartlabs/concord/runner/engine/DefaultElementEventProcessor.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
* =====
2121
*/
2222

23-
import com.walmartlabs.concord.client2.ApiClientConfiguration;
24-
import com.walmartlabs.concord.client2.ApiClientFactory;
2523
import com.walmartlabs.concord.client2.ProcessEventRequest;
26-
import com.walmartlabs.concord.client2.ProcessEventsApi;
2724
import io.takari.bpm.ProcessDefinitionProvider;
2825
import io.takari.bpm.ProcessDefinitionUtils;
2926
import io.takari.bpm.api.ExecutionException;
@@ -45,12 +42,13 @@ public class DefaultElementEventProcessor implements ElementEventProcessor {
4542

4643
private static final Logger log = LoggerFactory.getLogger(DefaultElementEventProcessor.class);
4744

48-
private final ApiClientFactory apiClientFactory;
4945
private final ProcessDefinitionProvider processDefinitionProvider;
46+
private final EventReportingService eventReportingService;
5047

51-
public DefaultElementEventProcessor(ApiClientFactory apiClientFactory, ProcessDefinitionProvider processDefinitionProvider) {
52-
this.apiClientFactory = apiClientFactory;
48+
public DefaultElementEventProcessor(ProcessDefinitionProvider processDefinitionProvider,
49+
EventReportingService eventReportingService) {
5350
this.processDefinitionProvider = processDefinitionProvider;
51+
this.eventReportingService = eventReportingService;
5452
}
5553

5654
@Override
@@ -96,11 +94,7 @@ public void process(ElementEvent event, EventParamsBuilder builder, Predicate<Ab
9694
req.setData(e);
9795
req.setEventDate(Instant.now().atOffset(ZoneOffset.UTC));
9896

99-
ProcessEventsApi client = new ProcessEventsApi(apiClientFactory.create(
100-
ApiClientConfiguration.builder()
101-
.sessionToken(event.getSessionToken())
102-
.build()));
103-
client.event(UUID.fromString(event.getInstanceId()), req);
97+
eventReportingService.report(req, UUID.fromString(event.getInstanceId()), event.getSessionToken());
10498
} catch (Exception e) {
10599
log.warn("process ['{}'] -> transfer error: {}", event.getInstanceId(), e.getMessage());
106100
}

0 commit comments

Comments
 (0)