Skip to content

Commit 99695ae

Browse files
committed
spotless
1 parent 152bbdc commit 99695ae

6 files changed

Lines changed: 88 additions & 20 deletions

File tree

core/src/main/java/org/conductoross/conductor/core/execution/ExecutorUtils.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
@Slf4j
2727
public class ExecutorUtils {
2828

29+
private static boolean isActiveSubWorkflow(TaskModel taskModel) {
30+
return TaskType.TASK_TYPE_SUB_WORKFLOW.equals(taskModel.getTaskType())
31+
&& (taskModel.getStatus() == TaskModel.Status.SCHEDULED
32+
|| taskModel.getStatus() == TaskModel.Status.IN_PROGRESS);
33+
}
34+
2935
public static Duration computePostpone(
3036
WorkflowModel workflowModel,
3137
Duration workflowOffsetTimeout,
@@ -37,7 +43,13 @@ public static Duration computePostpone(
3743
Long postponeDurationSeconds = null;
3844
for (TaskModel taskModel : workflowModel.getTasks()) {
3945
Long candidateSeconds = null;
40-
if (taskModel.getStatus() == TaskModel.Status.IN_PROGRESS) {
46+
if (isActiveSubWorkflow(taskModel)) {
47+
// Sub-workflow progress is driven by Conductor's internal orchestration rather than
48+
// external worker polling or task-specific timeout signals. Revisit it on the
49+
// normal workflow offset so launch retries and child-completion observation
50+
// converge quickly.
51+
candidateSeconds = workflowOffsetTimeoutSeconds;
52+
} else if (taskModel.getStatus() == TaskModel.Status.IN_PROGRESS) {
4153
if (taskModel.getTaskType().equals(TASK_TYPE_WAIT)) {
4254
if (taskModel.getWaitTimeout() == 0) {
4355
candidateSeconds = workflowOffsetTimeoutSeconds;

core/src/test/java/com/netflix/conductor/core/dal/ExecutionDAOFacadeTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ public void testGetWorkflowModelFromExecutionDAODoesNotFallbackToIndex() {
118118
workflow.setWorkflowId("workflowId");
119119
when(executionDAO.getWorkflow("workflowId", false)).thenReturn(workflow);
120120

121-
WorkflowModel found = executionDAOFacade.getWorkflowModelFromExecutionDAO("workflowId", false);
121+
WorkflowModel found =
122+
executionDAOFacade.getWorkflowModelFromExecutionDAO("workflowId", false);
122123
assertSame(workflow, found);
123124
verify(indexDAO, never()).get(any(), any());
124125
}

core/src/test/java/org/conductoross/conductor/core/execution/ExecutorUtilsTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,52 @@ public void computePostponeUsesScheduledPollTimeout() {
8989
assertEquals(11, result.getSeconds());
9090
}
9191

92+
@Test
93+
public void computePostponeUsesWorkflowOffsetForScheduledSubWorkflow() {
94+
TaskModel scheduledSubWorkflow =
95+
newTask(TaskType.TASK_TYPE_SUB_WORKFLOW, TaskModel.Status.SCHEDULED);
96+
97+
WorkflowModel workflow = newWorkflow(Arrays.asList(scheduledSubWorkflow), 432000);
98+
99+
Duration result =
100+
ExecutorUtils.computePostpone(
101+
workflow, Duration.ofSeconds(30), Duration.ofSeconds(900));
102+
103+
assertEquals(30, result.getSeconds());
104+
}
105+
106+
@Test
107+
public void computePostponePrefersScheduledSubWorkflowOffsetOverWorkflowTimeout() {
108+
TaskModel scheduledSubWorkflow =
109+
newTask(TaskType.TASK_TYPE_SUB_WORKFLOW, TaskModel.Status.SCHEDULED);
110+
TaskModel scheduledSimple = newTask(TaskType.TASK_TYPE_SIMPLE, TaskModel.Status.SCHEDULED);
111+
112+
WorkflowModel workflow =
113+
newWorkflow(Arrays.asList(scheduledSimple, scheduledSubWorkflow), 432000);
114+
115+
Duration result =
116+
ExecutorUtils.computePostpone(
117+
workflow, Duration.ofSeconds(30), Duration.ofSeconds(900));
118+
119+
assertEquals(30, result.getSeconds());
120+
}
121+
122+
@Test
123+
public void computePostponeUsesWorkflowOffsetForInProgressSubWorkflow() {
124+
TaskModel inProgressSubWorkflow =
125+
newTask(TaskType.TASK_TYPE_SUB_WORKFLOW, TaskModel.Status.IN_PROGRESS);
126+
inProgressSubWorkflow.setResponseTimeoutSeconds(500);
127+
inProgressSubWorkflow.setStartTime(System.currentTimeMillis());
128+
129+
WorkflowModel workflow = newWorkflow(Arrays.asList(inProgressSubWorkflow), 432000);
130+
131+
Duration result =
132+
ExecutorUtils.computePostpone(
133+
workflow, Duration.ofSeconds(30), Duration.ofSeconds(900));
134+
135+
assertEquals(30, result.getSeconds());
136+
}
137+
92138
@Test
93139
public void computePostponeDefaultsToWorkflowOffsetWhenNoEligibleTasks() {
94140
TaskModel completed = newTask(TaskType.TASK_TYPE_SIMPLE, TaskModel.Status.COMPLETED);

mysql-persistence/src/main/java/com/netflix/conductor/mysql/dao/MySQLExecutionDAO.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,15 @@ public String reserveSubWorkflowId(
349349

350350
String reservedSubWorkflowId =
351351
getWithRetriedTransactions(
352-
connection -> {
353-
addSubWorkflowIdReservation(
354-
connection, parentWorkflowId, parentWorkflowTaskId, subWorkflowId);
355-
return getSubWorkflowIdReservation(
356-
connection, parentWorkflowId, parentWorkflowTaskId);
357-
});
352+
connection -> {
353+
addSubWorkflowIdReservation(
354+
connection,
355+
parentWorkflowId,
356+
parentWorkflowTaskId,
357+
subWorkflowId);
358+
return getSubWorkflowIdReservation(
359+
connection, parentWorkflowId, parentWorkflowTaskId);
360+
});
358361
logger.debug(
359362
"Resolved sub-workflow reservation for workflow {} task {} to child workflow {} in MySQL",
360363
parentWorkflowId,

postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,15 @@ public String reserveSubWorkflowId(
385385

386386
String reservedSubWorkflowId =
387387
getWithRetriedTransactions(
388-
connection -> {
389-
addSubWorkflowIdReservation(
390-
connection, parentWorkflowId, parentWorkflowTaskId, subWorkflowId);
391-
return getSubWorkflowIdReservation(
392-
connection, parentWorkflowId, parentWorkflowTaskId);
393-
});
388+
connection -> {
389+
addSubWorkflowIdReservation(
390+
connection,
391+
parentWorkflowId,
392+
parentWorkflowTaskId,
393+
subWorkflowId);
394+
return getSubWorkflowIdReservation(
395+
connection, parentWorkflowId, parentWorkflowTaskId);
396+
});
394397
logger.debug(
395398
"Resolved sub-workflow reservation for workflow {} task {} to child workflow {} in Postgres",
396399
parentWorkflowId,

sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteExecutionDAO.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,15 @@ public String reserveSubWorkflowId(
385385

386386
String reservedSubWorkflowId =
387387
getWithRetriedTransactions(
388-
connection -> {
389-
addSubWorkflowIdReservation(
390-
connection, parentWorkflowId, parentWorkflowTaskId, subWorkflowId);
391-
return getSubWorkflowIdReservation(
392-
connection, parentWorkflowId, parentWorkflowTaskId);
393-
});
388+
connection -> {
389+
addSubWorkflowIdReservation(
390+
connection,
391+
parentWorkflowId,
392+
parentWorkflowTaskId,
393+
subWorkflowId);
394+
return getSubWorkflowIdReservation(
395+
connection, parentWorkflowId, parentWorkflowTaskId);
396+
});
394397
logger.debug(
395398
"Resolved sub-workflow reservation for workflow {} task {} to child workflow {} in SQLite",
396399
parentWorkflowId,

0 commit comments

Comments
 (0)