Skip to content

Commit f3d1fa3

Browse files
Merge branch 'main' into dependabot/github_actions/docker/setup-qemu-action-3
2 parents 1df50b4 + 82d4d12 commit f3d1fa3

1,376 files changed

Lines changed: 219823 additions & 11 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ jobs:
9292
uses: docker/setup-qemu-action@v4
9393

9494
- name: Set up Docker Buildx
95-
uses: docker/setup-buildx-action@v3
95+
uses: docker/setup-buildx-action@v4
9696

9797
- name: Build and push Server
98-
uses: docker/build-push-action@v3
98+
uses: docker/build-push-action@v7
9999
with:
100100
context: .
101101
file: docker/server/Dockerfile

.github/workflows/publish_build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ jobs:
102102
uses: docker/setup-qemu-action@v4
103103

104104
- name: Set up Docker Buildx
105-
uses: docker/setup-buildx-action@v3
105+
uses: docker/setup-buildx-action@v4
106106

107107
- name: Build and push Server
108-
uses: docker/build-push-action@v3
108+
uses: docker/build-push-action@v7
109109
with:
110110
context: .
111111
file: docker/server/Dockerfile

.github/workflows/ui-next-ci.yml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
name: UI v2 CI
2+
3+
on:
4+
pull_request:
5+
branches:
6+
- main
7+
paths:
8+
- "ui-next/**"
9+
10+
permissions:
11+
contents: read
12+
13+
jobs:
14+
lint-format-test:
15+
name: Lint, Format & Test
16+
runs-on: ubuntu-latest
17+
defaults:
18+
run:
19+
working-directory: ui-next
20+
21+
steps:
22+
- uses: actions/checkout@v4
23+
24+
- name: Setup Node.js
25+
uses: actions/setup-node@v4
26+
with:
27+
node-version: 22
28+
29+
- name: Setup pnpm
30+
uses: pnpm/action-setup@v4
31+
with:
32+
version: 10.32.0
33+
34+
- name: Get pnpm store directory
35+
id: pnpm-cache
36+
run: echo "store=$(pnpm store path)" >> $GITHUB_OUTPUT
37+
38+
- name: Cache pnpm store
39+
uses: actions/cache@v4
40+
with:
41+
path: ${{ steps.pnpm-cache.outputs.store }}
42+
key: ${{ runner.os }}-pnpm-${{ hashFiles('ui-next/pnpm-lock.yaml') }}
43+
restore-keys: ${{ runner.os }}-pnpm-
44+
45+
- name: Install dependencies
46+
run: pnpm install --frozen-lockfile
47+
48+
- name: Prettier check
49+
run: pnpm prettier:check
50+
51+
- name: Lint
52+
run: pnpm lint
53+
54+
- name: Type check
55+
run: pnpm typecheck
56+
57+
- name: Test
58+
run: pnpm test
59+
60+
- name: Build
61+
run: pnpm build

annotations-processor/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ sourceSets {
55

66
dependencies {
77
implementation project(':conductor-annotations')
8-
api 'com.google.guava:guava:31.1-jre'
8+
api 'com.google.guava:guava:33.5.0-jre'
99
api 'com.squareup:javapoet:1.13.0'
1010
api 'com.github.jknack:handlebars:4.5.0'
1111
api "com.google.protobuf:protobuf-java:${revProtoBuf}"

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222
import java.util.Objects;
2323

24+
import com.netflix.conductor.annotations.protogen.ProtoEnum;
2425
import com.netflix.conductor.annotations.protogen.ProtoField;
2526
import com.netflix.conductor.annotations.protogen.ProtoMessage;
2627
import com.netflix.conductor.common.metadata.tasks.TaskDef;
@@ -165,6 +166,16 @@ public void setTasks(List<WorkflowTask> tasks) {
165166
@ProtoField(id = 32)
166167
private boolean permissive;
167168

169+
/** Controls whether a JOIN task is evaluated synchronously (no backoff) or asynchronously. */
170+
@ProtoEnum
171+
public enum JoinMode {
172+
SYNC,
173+
ASYNC
174+
}
175+
176+
@ProtoField(id = 34)
177+
private JoinMode joinMode;
178+
168179
/**
169180
* @return the name
170181
*/
@@ -601,6 +612,20 @@ public void setPermissive(boolean permissive) {
601612
this.permissive = permissive;
602613
}
603614

615+
/**
616+
* @return the join mode (SYNC or ASYNC)
617+
*/
618+
public JoinMode getJoinMode() {
619+
return joinMode;
620+
}
621+
622+
/**
623+
* @param joinMode the join mode to set
624+
*/
625+
public void setJoinMode(JoinMode joinMode) {
626+
this.joinMode = joinMode;
627+
}
628+
604629
private Collection<List<WorkflowTask>> children() {
605630
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();
606631

core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.springframework.stereotype.Component;
2121

2222
import com.netflix.conductor.annotations.VisibleForTesting;
23+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
2324
import com.netflix.conductor.common.utils.TaskUtils;
2425
import com.netflix.conductor.core.config.ConductorProperties;
2526
import com.netflix.conductor.core.execution.WorkflowExecutor;
@@ -125,6 +126,15 @@ public boolean execute(
125126

126127
@Override
127128
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long maxOffset) {
129+
// Check if joinMode is set to SYNC — read directly from the workflow task definition
130+
// rather than from input data so the value is never duplicated into the task's payload.
131+
WorkflowTask workflowTask = taskModel.getWorkflowTask();
132+
if (workflowTask != null && WorkflowTask.JoinMode.SYNC == workflowTask.getJoinMode()) {
133+
// Synchronous mode: evaluate immediately every time (no backoff)
134+
return Optional.of(0L);
135+
}
136+
137+
// Asynchronous mode (default): use exponential backoff
128138
int pollCount = taskModel.getPollCount();
129139
// Assuming pollInterval = 50ms and evaluationOffsetThreshold = 200 this will cause
130140
// a JOIN task to be evaluated continuously during the first 10 seconds and the FORK/JOIN

core/src/test/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapperTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import static org.junit.Assert.assertEquals;
3131
import static org.junit.Assert.assertNotNull;
32+
import static org.junit.Assert.assertNull;
3233

3334
public class JoinTaskMapperTest {
3435

@@ -59,4 +60,37 @@ public void getMappedTasks() {
5960
assertNotNull(mappedTasks);
6061
assertEquals(TASK_TYPE_JOIN, mappedTasks.get(0).getTaskType());
6162
}
63+
64+
@Test
65+
public void getMappedTasksWithJoinMode() {
66+
67+
WorkflowTask workflowTask = new WorkflowTask();
68+
workflowTask.setType(TaskType.JOIN.name());
69+
workflowTask.setJoinOn(Arrays.asList("task1", "task2"));
70+
workflowTask.setJoinMode(WorkflowTask.JoinMode.SYNC);
71+
72+
String taskId = new IDGenerator().generate();
73+
74+
WorkflowDef wd = new WorkflowDef();
75+
WorkflowModel workflow = new WorkflowModel();
76+
workflow.setWorkflowDefinition(wd);
77+
78+
TaskMapperContext taskMapperContext =
79+
TaskMapperContext.newBuilder()
80+
.withWorkflowModel(workflow)
81+
.withTaskDefinition(new TaskDef())
82+
.withWorkflowTask(workflowTask)
83+
.withRetryCount(0)
84+
.withTaskId(taskId)
85+
.build();
86+
87+
List<TaskModel> mappedTasks = new JoinTaskMapper().getMappedTasks(taskMapperContext);
88+
89+
assertNotNull(mappedTasks);
90+
assertEquals(TASK_TYPE_JOIN, mappedTasks.get(0).getTaskType());
91+
// joinMode is read directly from workflowTask, not injected into input data
92+
assertNull(mappedTasks.get(0).getInputData().get("joinMode"));
93+
assertEquals(
94+
WorkflowTask.JoinMode.SYNC, mappedTasks.get(0).getWorkflowTask().getJoinMode());
95+
}
6296
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2022 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.core.execution.tasks;
14+
15+
import java.util.Optional;
16+
17+
import org.junit.Test;
18+
19+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
20+
import com.netflix.conductor.core.config.ConductorProperties;
21+
import com.netflix.conductor.model.TaskModel;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertTrue;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.when;
27+
28+
public class JoinTest {
29+
30+
private static TaskModel taskWithJoinMode(WorkflowTask.JoinMode mode) {
31+
WorkflowTask workflowTask = new WorkflowTask();
32+
workflowTask.setJoinMode(mode);
33+
TaskModel task = new TaskModel();
34+
task.setWorkflowTask(workflowTask);
35+
return task;
36+
}
37+
38+
@Test
39+
public void testSynchronousJoinModeEvaluationOffset() {
40+
ConductorProperties properties = mock(ConductorProperties.class);
41+
when(properties.getSystemTaskPostponeThreshold()).thenReturn(200);
42+
43+
Join join = new Join(properties);
44+
TaskModel task = taskWithJoinMode(WorkflowTask.JoinMode.SYNC);
45+
46+
// Synchronous mode should always return 0 offset
47+
task.setPollCount(100);
48+
Optional<Long> offset = join.getEvaluationOffset(task, 10000L);
49+
assertTrue(offset.isPresent());
50+
assertEquals(0L, offset.get().longValue());
51+
52+
// Even with high poll count, SYNC mode returns 0
53+
task.setPollCount(500);
54+
offset = join.getEvaluationOffset(task, 10000L);
55+
assertTrue(offset.isPresent());
56+
assertEquals(0L, offset.get().longValue());
57+
}
58+
59+
@Test
60+
public void testAsynchronousJoinModeEvaluationOffset() {
61+
ConductorProperties properties = mock(ConductorProperties.class);
62+
when(properties.getSystemTaskPostponeThreshold()).thenReturn(200);
63+
64+
Join join = new Join(properties);
65+
TaskModel task = taskWithJoinMode(WorkflowTask.JoinMode.ASYNC);
66+
67+
// Low poll count should return 0
68+
task.setPollCount(100);
69+
Optional<Long> offset = join.getEvaluationOffset(task, 10000L);
70+
assertTrue(offset.isPresent());
71+
assertEquals(0L, offset.get().longValue());
72+
73+
// High poll count should use exponential backoff
74+
task.setPollCount(250);
75+
offset = join.getEvaluationOffset(task, 10000L);
76+
assertTrue(offset.isPresent());
77+
assertTrue(offset.get() > 0L);
78+
}
79+
80+
@Test
81+
public void testDefaultAsyncBehavior() {
82+
ConductorProperties properties = mock(ConductorProperties.class);
83+
when(properties.getSystemTaskPostponeThreshold()).thenReturn(200);
84+
85+
Join join = new Join(properties);
86+
87+
// No joinMode on workflowTask — should default to async behavior
88+
TaskModel task = new TaskModel();
89+
task.setWorkflowTask(new WorkflowTask());
90+
91+
// Low poll count should return 0
92+
task.setPollCount(100);
93+
Optional<Long> offset = join.getEvaluationOffset(task, 10000L);
94+
assertTrue(offset.isPresent());
95+
assertEquals(0L, offset.get().longValue());
96+
97+
// High poll count should use exponential backoff (default async behavior)
98+
task.setPollCount(250);
99+
offset = join.getEvaluationOffset(task, 10000L);
100+
assertTrue(offset.isPresent());
101+
assertTrue(offset.get() > 0L);
102+
}
103+
104+
@Test
105+
public void testNullWorkflowTaskDefaultsToAsync() {
106+
ConductorProperties properties = mock(ConductorProperties.class);
107+
when(properties.getSystemTaskPostponeThreshold()).thenReturn(200);
108+
109+
Join join = new Join(properties);
110+
111+
// No workflowTask at all — should default to async behavior
112+
TaskModel task = new TaskModel();
113+
114+
task.setPollCount(250);
115+
Optional<Long> offset = join.getEvaluationOffset(task, 10000L);
116+
assertTrue(offset.isPresent());
117+
assertTrue(offset.get() > 0L);
118+
}
119+
120+
@Test
121+
public void testIsAsync() {
122+
ConductorProperties properties = mock(ConductorProperties.class);
123+
Join join = new Join(properties);
124+
125+
// isAsync should always return true
126+
assertTrue(join.isAsync());
127+
}
128+
}

grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
17211721
to.setCacheConfig( toProto( from.getCacheConfig() ) );
17221722
}
17231723
to.setPermissive( from.isPermissive() );
1724+
if (from.getJoinMode() != null) {
1725+
to.setJoinMode( toProto( from.getJoinMode() ) );
1726+
}
17241727
return to.build();
17251728
}
17261729

@@ -1772,6 +1775,27 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
17721775
to.setCacheConfig( fromProto( from.getCacheConfig() ) );
17731776
}
17741777
to.setPermissive( from.getPermissive() );
1778+
to.setJoinMode( fromProto( from.getJoinMode() ) );
1779+
return to;
1780+
}
1781+
1782+
public WorkflowTaskPb.WorkflowTask.JoinMode toProto(WorkflowTask.JoinMode from) {
1783+
WorkflowTaskPb.WorkflowTask.JoinMode to;
1784+
switch (from) {
1785+
case SYNC: to = WorkflowTaskPb.WorkflowTask.JoinMode.SYNC; break;
1786+
case ASYNC: to = WorkflowTaskPb.WorkflowTask.JoinMode.ASYNC; break;
1787+
default: throw new IllegalArgumentException("Unexpected enum constant: " + from);
1788+
}
1789+
return to;
1790+
}
1791+
1792+
public WorkflowTask.JoinMode fromProto(WorkflowTaskPb.WorkflowTask.JoinMode from) {
1793+
WorkflowTask.JoinMode to;
1794+
switch (from) {
1795+
case SYNC: to = WorkflowTask.JoinMode.SYNC; break;
1796+
case ASYNC: to = WorkflowTask.JoinMode.ASYNC; break;
1797+
default: throw new IllegalArgumentException("Unexpected enum constant: " + from);
1798+
}
17751799
return to;
17761800
}
17771801

grpc/src/main/proto/model/workflowtask.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ option java_outer_classname = "WorkflowTaskPb";
1111
option go_package = "github.com/netflix/conductor/client/gogrpc/conductor/model";
1212

1313
message WorkflowTask {
14+
enum JoinMode {
15+
SYNC = 0;
16+
ASYNC = 1;
17+
}
1418
message WorkflowTaskList {
1519
repeated WorkflowTask tasks = 1;
1620
}
@@ -46,4 +50,5 @@ message WorkflowTask {
4650
string join_status = 30;
4751
CacheConfig cache_config = 31;
4852
bool permissive = 32;
53+
WorkflowTask.JoinMode join_mode = 34;
4954
}

0 commit comments

Comments
 (0)