Skip to content

Commit e0d362c

Browse files
committed
Improve SUB_WORKFLOW launches for dyn fork-joins
1 parent cc726a0 commit e0d362c

23 files changed

Lines changed: 1716 additions & 306 deletions

File tree

cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@
3636
import static com.netflix.conductor.cassandra.util.Constants.EVENT_HANDLER_NAME_KEY;
3737
import static com.netflix.conductor.cassandra.util.Constants.HANDLERS_KEY;
3838
import static com.netflix.conductor.cassandra.util.Constants.MESSAGE_ID_KEY;
39+
import static com.netflix.conductor.cassandra.util.Constants.PARENT_WORKFLOW_ID_KEY;
40+
import static com.netflix.conductor.cassandra.util.Constants.PARENT_WORKFLOW_TASK_ID_KEY;
3941
import static com.netflix.conductor.cassandra.util.Constants.PAYLOAD_KEY;
4042
import static com.netflix.conductor.cassandra.util.Constants.SHARD_ID_KEY;
43+
import static com.netflix.conductor.cassandra.util.Constants.SUB_WORKFLOW_ID_KEY;
4144
import static com.netflix.conductor.cassandra.util.Constants.TABLE_EVENT_EXECUTIONS;
4245
import static com.netflix.conductor.cassandra.util.Constants.TABLE_EVENT_HANDLERS;
46+
import static com.netflix.conductor.cassandra.util.Constants.TABLE_SUB_WORKFLOW_ID_RESERVATIONS;
4347
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_DEFS;
4448
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_DEF_LIMIT;
4549
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_LOOKUP;
@@ -127,6 +131,7 @@ private void init() {
127131
session.execute(getCreateWorkflowsTableStatement());
128132
session.execute(getCreateTaskLookupTableStatement());
129133
session.execute(getCreateTaskDefLimitTableStatement());
134+
session.execute(getCreateSubWorkflowIdReservationsTableStatement());
130135
session.execute(getCreateWorkflowDefsTableStatement());
131136
session.execute(getCreateWorkflowDefsIndexTableStatement());
132137
session.execute(getCreateTaskDefsTableStatement());
@@ -186,6 +191,16 @@ private String getCreateTaskDefLimitTableStatement() {
186191
.getQueryString();
187192
}
188193

194+
private String getCreateSubWorkflowIdReservationsTableStatement() {
195+
return SchemaBuilder.createTable(
196+
properties.getKeyspace(), TABLE_SUB_WORKFLOW_ID_RESERVATIONS)
197+
.ifNotExists()
198+
.addPartitionKey(PARENT_WORKFLOW_ID_KEY, DataType.uuid())
199+
.addClusteringColumn(PARENT_WORKFLOW_TASK_ID_KEY, DataType.uuid())
200+
.addColumn(SUB_WORKFLOW_ID_KEY, DataType.uuid())
201+
.getQueryString();
202+
}
203+
189204
private String getCreateWorkflowDefsTableStatement() {
190205
return SchemaBuilder.createTable(properties.getKeyspace(), TABLE_WORKFLOW_DEFS)
191206
.ifNotExists()

cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.netflix.conductor.cassandra.util.Statements;
2424
import com.netflix.conductor.common.metadata.events.EventExecution;
2525
import com.netflix.conductor.common.metadata.tasks.TaskDef;
26+
import com.netflix.conductor.common.metadata.tasks.TaskType;
2627
import com.netflix.conductor.core.exception.NonTransientException;
2728
import com.netflix.conductor.core.exception.NotFoundException;
2829
import com.netflix.conductor.core.exception.TransientException;
@@ -50,6 +51,7 @@ public class CassandraExecutionDAO extends CassandraBaseDAO
5051
protected final PreparedStatement insertWorkflowStatement;
5152
protected final PreparedStatement insertTaskStatement;
5253
protected final PreparedStatement insertEventExecutionStatement;
54+
protected final PreparedStatement insertSubWorkflowIdReservationStatement;
5355

5456
protected final PreparedStatement selectTotalStatement;
5557
protected final PreparedStatement selectTaskStatement;
@@ -58,6 +60,7 @@ public class CassandraExecutionDAO extends CassandraBaseDAO
5860
protected final PreparedStatement selectTaskLookupStatement;
5961
protected final PreparedStatement selectTasksFromTaskDefLimitStatement;
6062
protected final PreparedStatement selectEventExecutionsStatement;
63+
protected final PreparedStatement selectSubWorkflowIdReservationStatement;
6164

6265
protected final PreparedStatement updateWorkflowStatement;
6366
protected final PreparedStatement updateTotalTasksStatement;
@@ -71,6 +74,8 @@ public class CassandraExecutionDAO extends CassandraBaseDAO
7174
protected final PreparedStatement deleteTaskLookupStatement;
7275
protected final PreparedStatement deleteTaskDefLimitStatement;
7376
protected final PreparedStatement deleteEventExecutionStatement;
77+
protected final PreparedStatement deleteSubWorkflowIdReservationStatement;
78+
protected final PreparedStatement deleteSubWorkflowIdReservationsStatement;
7479

7580
protected final int eventExecutionsTTL;
7681

@@ -92,6 +97,9 @@ public CassandraExecutionDAO(
9297
this.insertEventExecutionStatement =
9398
session.prepare(statements.getInsertEventExecutionStatement())
9499
.setConsistencyLevel(properties.getWriteConsistencyLevel());
100+
this.insertSubWorkflowIdReservationStatement =
101+
session.prepare(statements.getInsertSubWorkflowIdReservationStatement())
102+
.setConsistencyLevel(properties.getWriteConsistencyLevel());
95103

96104
this.selectTotalStatement =
97105
session.prepare(statements.getSelectTotalStatement())
@@ -116,6 +124,9 @@ public CassandraExecutionDAO(
116124
statements
117125
.getSelectAllEventExecutionsForMessageFromEventExecutionsStatement())
118126
.setConsistencyLevel(properties.getReadConsistencyLevel());
127+
this.selectSubWorkflowIdReservationStatement =
128+
session.prepare(statements.getSelectSubWorkflowIdReservationStatement())
129+
.setConsistencyLevel(properties.getReadConsistencyLevel());
119130

120131
this.updateWorkflowStatement =
121132
session.prepare(statements.getUpdateWorkflowStatement())
@@ -151,6 +162,12 @@ public CassandraExecutionDAO(
151162
this.deleteEventExecutionStatement =
152163
session.prepare(statements.getDeleteEventExecutionsStatement())
153164
.setConsistencyLevel(properties.getWriteConsistencyLevel());
165+
this.deleteSubWorkflowIdReservationStatement =
166+
session.prepare(statements.getDeleteSubWorkflowIdReservationStatement())
167+
.setConsistencyLevel(properties.getWriteConsistencyLevel());
168+
this.deleteSubWorkflowIdReservationsStatement =
169+
session.prepare(statements.getDeleteSubWorkflowIdReservationsStatement())
170+
.setConsistencyLevel(properties.getWriteConsistencyLevel());
154171
}
155172

156173
@Override
@@ -456,6 +473,7 @@ public boolean removeWorkflow(String workflowId) {
456473
deleteWorkflowStatement.bind(
457474
UUID.fromString(workflowId), DEFAULT_SHARD_ID));
458475
removed = resultSet.wasApplied();
476+
removeOwnedSubWorkflowIdReservations(workflow);
459477
} catch (DriverException e) {
460478
Monitors.error(CLASS_NAME, "removeWorkflow");
461479
String errorMsg = String.format("Failed to remove workflow: %s", workflowId);
@@ -555,6 +573,83 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
555573
}
556574
}
557575

576+
@Override
577+
public String reserveSubWorkflowId(
578+
String parentWorkflowId, String parentWorkflowTaskId, String subWorkflowId) {
579+
UUID parentWorkflowUUID = toUUID(parentWorkflowId, "Invalid parent workflow id");
580+
UUID parentWorkflowTaskUUID =
581+
toUUID(parentWorkflowTaskId, "Invalid parent workflow task id");
582+
UUID subWorkflowUUID = toUUID(subWorkflowId, "Invalid sub workflow id");
583+
584+
try {
585+
session.execute(
586+
insertSubWorkflowIdReservationStatement.bind(
587+
parentWorkflowUUID, parentWorkflowTaskUUID, subWorkflowUUID));
588+
Row row =
589+
session.execute(
590+
selectSubWorkflowIdReservationStatement.bind(
591+
parentWorkflowUUID, parentWorkflowTaskUUID))
592+
.one();
593+
String reservedSubWorkflowId = row.getUUID(SUB_WORKFLOW_ID_KEY).toString();
594+
LOGGER.debug(
595+
"Resolved sub-workflow reservation for workflow {} task {} to child workflow {} in Cassandra",
596+
parentWorkflowId,
597+
parentWorkflowTaskId,
598+
reservedSubWorkflowId);
599+
return reservedSubWorkflowId;
600+
} catch (DriverException e) {
601+
Monitors.error(CLASS_NAME, "reserveSubWorkflowId");
602+
String errorMsg =
603+
String.format(
604+
"Failed to reserve sub workflow id for parent workflow task: %s",
605+
parentWorkflowTaskId);
606+
LOGGER.error(errorMsg, e);
607+
throw new TransientException(errorMsg);
608+
}
609+
}
610+
611+
@Override
612+
public void removeSubWorkflowIdReservation(String workflowId, String taskId) {
613+
UUID workflowUUID = toUUID(workflowId, "Invalid workflow id");
614+
UUID taskUUID = toUUID(taskId, "Invalid task id");
615+
616+
try {
617+
LOGGER.debug(
618+
"Removing owned sub-workflow reservation for workflow {} task {} from Cassandra",
619+
workflowId,
620+
taskId);
621+
session.execute(deleteSubWorkflowIdReservationStatement.bind(workflowUUID, taskUUID));
622+
} catch (DriverException e) {
623+
Monitors.error(CLASS_NAME, "removeSubWorkflowIdReservation");
624+
String errorMsg =
625+
String.format(
626+
"Failed to remove sub workflow id reservation for workflow task: %s",
627+
taskId);
628+
LOGGER.error(errorMsg, e);
629+
throw new TransientException(errorMsg);
630+
}
631+
}
632+
633+
@Override
634+
public void removeSubWorkflowIdReservations(String workflowId) {
635+
UUID workflowUUID = toUUID(workflowId, "Invalid workflow id");
636+
637+
try {
638+
LOGGER.debug(
639+
"Removing all owned sub-workflow reservations for workflow {} from Cassandra",
640+
workflowId);
641+
session.execute(deleteSubWorkflowIdReservationsStatement.bind(workflowUUID));
642+
} catch (DriverException e) {
643+
Monitors.error(CLASS_NAME, "removeSubWorkflowIdReservations");
644+
String errorMsg =
645+
String.format(
646+
"Failed to remove sub workflow id reservations for workflow: %s",
647+
workflowId);
648+
LOGGER.error(errorMsg, e);
649+
throw new TransientException(errorMsg);
650+
}
651+
}
652+
558653
/**
559654
* This is a dummy implementation and this feature is not implemented for Cassandra backed
560655
* Conductor
@@ -807,6 +902,18 @@ protected void removeTaskLookup(TaskModel task) {
807902
}
808903
}
809904

905+
private void removeOwnedSubWorkflowIdReservations(WorkflowModel workflow) {
906+
if (workflow.getTasks() == null
907+
|| workflow.getTasks().stream()
908+
.noneMatch(
909+
task ->
910+
TaskType.TASK_TYPE_SUB_WORKFLOW.equals(
911+
task.getTaskType()))) {
912+
return;
913+
}
914+
removeSubWorkflowIdReservations(workflow.getWorkflowId());
915+
}
916+
810917
@VisibleForTesting
811918
void validateTasks(List<TaskModel> tasks) {
812919
Preconditions.checkNotNull(tasks, "Tasks object cannot be null");

cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ public interface Constants {
1919
String TABLE_WORKFLOWS = "workflows";
2020
String TABLE_TASK_LOOKUP = "task_lookup";
2121
String TABLE_TASK_DEF_LIMIT = "task_def_limit";
22+
String TABLE_SUB_WORKFLOW_ID_RESERVATIONS = "sub_workflow_id_reservations";
2223
String TABLE_WORKFLOW_DEFS = "workflow_definitions";
2324
String TABLE_WORKFLOW_DEFS_INDEX = "workflow_defs_index";
2425
String TABLE_TASK_DEFS = "task_definitions";
2526
String TABLE_EVENT_HANDLERS = "event_handlers";
2627
String TABLE_EVENT_EXECUTIONS = "event_executions";
2728

2829
String WORKFLOW_ID_KEY = "workflow_id";
30+
String PARENT_WORKFLOW_ID_KEY = "parent_workflow_id";
31+
String PARENT_WORKFLOW_TASK_ID_KEY = "parent_workflow_task_id";
32+
String SUB_WORKFLOW_ID_KEY = "sub_workflow_id";
2933
String SHARD_ID_KEY = "shard_id";
3034
String TASK_ID_KEY = "task_id";
3135
String ENTITY_KEY = "entity";

cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import static com.netflix.conductor.cassandra.util.Constants.EVENT_HANDLER_NAME_KEY;
2323
import static com.netflix.conductor.cassandra.util.Constants.HANDLERS_KEY;
2424
import static com.netflix.conductor.cassandra.util.Constants.MESSAGE_ID_KEY;
25+
import static com.netflix.conductor.cassandra.util.Constants.PARENT_WORKFLOW_ID_KEY;
26+
import static com.netflix.conductor.cassandra.util.Constants.PARENT_WORKFLOW_TASK_ID_KEY;
2527
import static com.netflix.conductor.cassandra.util.Constants.PAYLOAD_KEY;
2628
import static com.netflix.conductor.cassandra.util.Constants.SHARD_ID_KEY;
29+
import static com.netflix.conductor.cassandra.util.Constants.SUB_WORKFLOW_ID_KEY;
2730
import static com.netflix.conductor.cassandra.util.Constants.TABLE_EVENT_EXECUTIONS;
2831
import static com.netflix.conductor.cassandra.util.Constants.TABLE_EVENT_HANDLERS;
32+
import static com.netflix.conductor.cassandra.util.Constants.TABLE_SUB_WORKFLOW_ID_RESERVATIONS;
2933
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_DEFS;
3034
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_DEF_LIMIT;
3135
import static com.netflix.conductor.cassandra.util.Constants.TABLE_TASK_LOOKUP;
@@ -413,6 +417,14 @@ public String getSelectTasksFromTaskDefLimitStatement() {
413417
.getQueryString();
414418
}
415419

420+
public String getSelectSubWorkflowIdReservationStatement() {
421+
return QueryBuilder.select(SUB_WORKFLOW_ID_KEY)
422+
.from(keyspace, TABLE_SUB_WORKFLOW_ID_RESERVATIONS)
423+
.where(eq(PARENT_WORKFLOW_ID_KEY, bindMarker()))
424+
.and(eq(PARENT_WORKFLOW_TASK_ID_KEY, bindMarker()))
425+
.getQueryString();
426+
}
427+
416428
/**
417429
* @return cql query statement to retrieve all event executions for a given message and event
418430
* handler from the "event_executions" table
@@ -488,6 +500,15 @@ public String getUpdateTaskDefLimitStatement() {
488500
.getQueryString();
489501
}
490502

503+
public String getInsertSubWorkflowIdReservationStatement() {
504+
return QueryBuilder.insertInto(keyspace, TABLE_SUB_WORKFLOW_ID_RESERVATIONS)
505+
.value(PARENT_WORKFLOW_ID_KEY, bindMarker())
506+
.value(PARENT_WORKFLOW_TASK_ID_KEY, bindMarker())
507+
.value(SUB_WORKFLOW_ID_KEY, bindMarker())
508+
.ifNotExists()
509+
.getQueryString();
510+
}
511+
491512
/**
492513
* @return cql query statement to update an event execution in the "event_executions" table
493514
*/
@@ -525,6 +546,21 @@ public String getDeleteTaskLookupStatement() {
525546
.getQueryString();
526547
}
527548

549+
public String getDeleteSubWorkflowIdReservationStatement() {
550+
return QueryBuilder.delete()
551+
.from(keyspace, TABLE_SUB_WORKFLOW_ID_RESERVATIONS)
552+
.where(eq(PARENT_WORKFLOW_ID_KEY, bindMarker()))
553+
.and(eq(PARENT_WORKFLOW_TASK_ID_KEY, bindMarker()))
554+
.getQueryString();
555+
}
556+
557+
public String getDeleteSubWorkflowIdReservationsStatement() {
558+
return QueryBuilder.delete()
559+
.from(keyspace, TABLE_SUB_WORKFLOW_ID_RESERVATIONS)
560+
.where(eq(PARENT_WORKFLOW_ID_KEY, bindMarker()))
561+
.getQueryString();
562+
}
563+
528564
/**
529565
* @return cql query statement to delete a task from the "workflows" table
530566
*/

core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.netflix.conductor.common.metadata.tasks.Task;
3232
import com.netflix.conductor.common.metadata.tasks.TaskDef;
3333
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
34+
import com.netflix.conductor.common.metadata.tasks.TaskType;
3435
import com.netflix.conductor.common.run.SearchResult;
3536
import com.netflix.conductor.common.run.TaskSummary;
3637
import com.netflix.conductor.common.run.Workflow;
@@ -139,6 +140,26 @@ public WorkflowModel getWorkflowModel(String workflowId, boolean includeTasks) {
139140
return workflowModel;
140141
}
141142

143+
/**
144+
* Fetches the {@link WorkflowModel} from the primary execution store only.
145+
*
146+
* <p>Unlike {@link #getWorkflowModel(String, boolean)}, this method does not fall back to
147+
* {@link IndexDAO}. Use it for control-flow decisions that must rely on the source of truth.
148+
*
149+
* @param workflowId the id of the workflow to be fetched
150+
* @param includeTasks if true, fetches the {@link Task} data in the workflow.
151+
* @return the {@link WorkflowModel} object from {@link ExecutionDAO}
152+
* @throws NotFoundException no such {@link WorkflowModel} is found in {@link ExecutionDAO}.
153+
*/
154+
public WorkflowModel getWorkflowModelFromExecutionDAO(String workflowId, boolean includeTasks) {
155+
WorkflowModel workflow = executionDAO.getWorkflow(workflowId, includeTasks);
156+
if (workflow == null) {
157+
throw new NotFoundException("No such workflow found by id: %s", workflowId);
158+
}
159+
populateWorkflowAndTaskPayloadData(workflow);
160+
return workflow;
161+
}
162+
142163
/**
143164
* Fetches the {@link Workflow} object from the data store given the id. Attempts to fetch from
144165
* {@link ExecutionDAO} first, if not found, attempts to fetch from {@link IndexDAO}.
@@ -264,6 +285,16 @@ public String createWorkflow(WorkflowModel workflowModel) {
264285
return workflowModel.getWorkflowId();
265286
}
266287

288+
public String reserveSubWorkflowId(
289+
String parentWorkflowId, String parentWorkflowTaskId, String subWorkflowId) {
290+
return executionDAO.reserveSubWorkflowId(
291+
parentWorkflowId, parentWorkflowTaskId, subWorkflowId);
292+
}
293+
294+
public void removeSubWorkflowIdReservation(String workflowId, String taskId) {
295+
executionDAO.removeSubWorkflowIdReservation(workflowId, taskId);
296+
}
297+
267298
private void externalizeTaskData(TaskModel taskModel) {
268299
externalPayloadStorageUtils.verifyAndUpload(
269300
taskModel, ExternalPayloadStorage.PayloadType.TASK_INPUT);
@@ -569,7 +600,38 @@ public void updateTasks(List<TaskModel> tasks) {
569600
}
570601

571602
public void removeTask(String taskId) {
572-
executionDAO.removeTask(taskId);
603+
TaskModel taskModel = executionDAO.getTask(taskId);
604+
boolean removed = executionDAO.removeTask(taskId);
605+
if (!removed) {
606+
LOGGER.warn(
607+
"Task {} was not removed; skipping owned sub-workflow reservation cleanup",
608+
taskId);
609+
return;
610+
}
611+
cleanupOwnedSubWorkflowReservation(taskModel);
612+
}
613+
614+
private void cleanupOwnedSubWorkflowReservation(TaskModel task) {
615+
if (task == null
616+
|| !TaskType.TASK_TYPE_SUB_WORKFLOW.equals(task.getTaskType())
617+
|| StringUtils.isBlank(task.getWorkflowInstanceId())
618+
|| StringUtils.isBlank(task.getTaskId())) {
619+
return;
620+
}
621+
LOGGER.debug(
622+
"Removing owned sub-workflow reservation for task {} in workflow {} during task deletion",
623+
task.getTaskId(),
624+
task.getWorkflowInstanceId());
625+
try {
626+
executionDAO.removeSubWorkflowIdReservation(
627+
task.getWorkflowInstanceId(), task.getTaskId());
628+
} catch (Exception e) {
629+
LOGGER.warn(
630+
"Unable to remove sub-workflow reservation owned by workflow {} task {}",
631+
task.getWorkflowInstanceId(),
632+
task.getTaskId(),
633+
e);
634+
}
573635
}
574636

575637
private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask)

0 commit comments

Comments
 (0)