Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
public class MaestroWorkflowInstanceDao extends AbstractDatabaseDao {
private static final int PARAMS_PER_INSTANCE = 11;
private static final String VALUE_PLACE_HOLDER = "(?,?,?,?,?,?::jsonb,?,?,?,?::json,?)";
private static final String FROM_WORKFLOW_INSTANCE_TABLE = "FROM maestro_workflow_instance ";

private static final String CREATE_WORKFLOW_INSTANCE_QUERY_TEMPLATE =
"INSERT INTO maestro_workflow_instance "
Expand Down Expand Up @@ -159,6 +160,17 @@ public class MaestroWorkflowInstanceDao extends AbstractDatabaseDao {
private static final String GET_LATEST_WORKFLOW_INSTANCE_STATUS_QUERY =
String.format(GET_WORKFLOW_INSTANCE_FIELDS_TEMPLATE, STATUS_COLUMN, LATEST_RUN_CONDITION);

private static final String GET_WORKFLOW_INSTANCE_RUNS_QUERY =
"SELECT "
+ ALL_FIELDS
+ FROM_WORKFLOW_INSTANCE_TABLE
+ "WHERE workflow_id=? AND instance_id=? AND run_id>=? AND run_id<=? ORDER BY run_id DESC";

private static final String GET_MIN_MAX_RUN_IDS_QUERY =
"SELECT min(run_id) as min_run_id, max(run_id) as max_run_id "
+ FROM_WORKFLOW_INSTANCE_TABLE
+ "WHERE workflow_id=? AND instance_id=?";

private static final String UPDATE_INSTANCE_FAILED_STATUS =
"UPDATE maestro_workflow_instance SET status='FAILED_2' "
+ "WHERE workflow_id=? AND status='FAILED' AND instance_id>=? AND instance_id<=?";
Expand All @@ -175,7 +187,6 @@ public class MaestroWorkflowInstanceDao extends AbstractDatabaseDao {
+ INSTANCE_IN_SUBQUERY
+ "AND status='FAILED' order by instance_id ASC LIMIT ?)";

private static final String FROM_WORKFLOW_INSTANCE_TABLE = "FROM maestro_workflow_instance ";
private static final String FROM_INLINE_WORKFLOW_INSTANCE_TABLE = FROM_WORKFLOW_INSTANCE_TABLE;

private static final String ORDER_BY_INSTANCE_ID_RUN_ID_DESC =
Expand Down Expand Up @@ -227,6 +238,14 @@ public class MaestroWorkflowInstanceDao extends AbstractDatabaseDao {
"SELECT 1 FROM maestro_workflow_instance WHERE workflow_id=? LIMIT 1";

private static final String INSTANCE_ID_COLUMN = "instance_id";
private static final String RUN_ID_COLUMN = "run_id";
private static final String START_TS_COLUMN = "start_ts";
private static final String END_TS_COLUMN = "end_ts";
private static final String MIN_RUN_ID = "min_run_id";
private static final String MAX_RUN_ID = "max_run_id";
private static final String INSTANCE_COLUMN = "instance";
private static final String INSTANCE_COLUMN_NOT_NULL_ERR =
"workflow instance column cannot be null";

private static final String TERMINATION_MESSAGE_TEMPLATE =
"Workflow instance status becomes [%s] due to reason [%s]";
Expand Down Expand Up @@ -458,7 +477,7 @@ public int terminateRunningInstances(
while (result.next()) {
jobEvent.addOneRun(
result.getLong(INSTANCE_ID_COLUMN),
result.getLong("run_id"),
result.getLong(RUN_ID_COLUMN),
result.getString("uuid"));
}
return null;
Expand Down Expand Up @@ -861,6 +880,76 @@ public WorkflowInstance getLatestWorkflowInstanceRun(String workflowId, long wor
return getWorkflowInstanceRun(workflowId, workflowInstanceId, Constants.LATEST_ONE);
}

/**
* Get runs for a specific workflow instance within a run id range, ordered by run id ascending.
*
* @param workflowId workflow id
* @param workflowInstanceId workflow instance id
* @param startRunId start run id (inclusive)
* @param endRunId end run id (inclusive)
* @return list of workflow instances ordered by run id descending
*/
public List<WorkflowInstance> getWorkflowInstanceRuns(
String workflowId, long workflowInstanceId, long startRunId, long endRunId) {
return withMetricLogError(
() ->
withRetryableQuery(
GET_WORKFLOW_INSTANCE_RUNS_QUERY,
stmt -> {
int idx = 0;
stmt.setString(++idx, workflowId);
stmt.setLong(++idx, workflowInstanceId);
stmt.setLong(++idx, startRunId);
stmt.setLong(++idx, endRunId);
},
result -> {
List<WorkflowInstance> runs = new ArrayList<>();
while (result.next()) {
runs.add(workflowInstanceFromResult(result));
}
return runs;
}),
"getWorkflowInstanceRuns",
"Failed to get workflow instance runs for [{}][{}]",
workflowId,
workflowInstanceId);
}

/**
* Gets the min and max run id for a specific workflow instance.
*
* @param workflowId workflow id
* @param workflowInstanceId workflow instance id
* @return a long array of length 2 where the first element is the min run id and the second is
* the max run id, or null if no runs exist
*/
public long[] getMinMaxRunIds(String workflowId, long workflowInstanceId) {
return withMetricLogError(
() ->
withRetryableQuery(
GET_MIN_MAX_RUN_IDS_QUERY,
stmt -> {
int idx = 0;
stmt.setString(++idx, workflowId);
stmt.setLong(++idx, workflowInstanceId);
},
result -> {
if (result.next()) {
long min = result.getLong(MIN_RUN_ID);
long max = result.getLong(MAX_RUN_ID);
if (min == 0 && max == 0) {
return null;
}
return new long[] {min, max};
}
return null;
}),
"getMinMaxRunIds",
"Failed to get the min and max run ids for [{}][{}]",
workflowId,
workflowInstanceId);
}

/**
* Get the latest workflow instance run for a specific workflow id between an instance id range.
*
Expand Down Expand Up @@ -937,12 +1026,12 @@ public long[] getMinMaxWorkflowInstanceIds(String workflowId) {
private WorkflowInstance workflowInstanceFromResult(ResultSet rs) throws SQLException {
WorkflowInstance instance =
Checks.notNull(
getJsonObjectIfPresent(rs, "instance", WorkflowInstance.class),
"workflow instance column cannot be null");
getJsonObjectIfPresent(rs, INSTANCE_COLUMN, WorkflowInstance.class),
INSTANCE_COLUMN_NOT_NULL_ERR);
instance.setStatus(WorkflowInstance.Status.create(rs.getString(STATUS_COLUMN)));
instance.setExecutionId(rs.getString("execution_id"));
instance.setStartTime(getTimestampIfPresent(rs, "start_ts"));
instance.setEndTime(getTimestampIfPresent(rs, "end_ts"));
instance.setStartTime(getTimestampIfPresent(rs, START_TS_COLUMN));
instance.setEndTime(getTimestampIfPresent(rs, END_TS_COLUMN));
instance.setModifyTime(getTimestampIfPresent(rs, "modify_ts"));
instance.setRuntimeOverview(
getJsonObjectIfPresent(rs, "runtime_overview", WorkflowRuntimeOverview.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,45 @@ public void testGetLatestWorkflowInstanceRun() {
assertEquals(wfi, instanceRun);
}

@Test
public void testGetWorkflowInstanceRuns() {
// single run — only run 1 exists
long[] minMax = instanceDao.getMinMaxRunIds(wfi.getWorkflowId(), wfi.getWorkflowInstanceId());
assertNotNull(minMax);
assertEquals(1L, minMax[0]);
assertEquals(1L, minMax[1]);
List<WorkflowInstance> runs =
instanceDao.getWorkflowInstanceRuns(
wfi.getWorkflowId(), wfi.getWorkflowInstanceId(), minMax[0], minMax[1]);
assertEquals(1, runs.size());
assertEquals(1, runs.getFirst().getWorkflowRunId());
assertEquals(WorkflowInstance.Status.CREATED, runs.getFirst().getStatus());

// create run 2 by failing run 1 and restarting
instanceDao.tryTerminateQueuedInstance(wfi, WorkflowInstance.Status.FAILED, "kill the test");
wfi.setWorkflowUuid("test-uuid");
wfi.setWorkflowRunId(0L);
wfi.setRunConfig(new RunConfig());
wfi.getRunConfig().setPolicy(RunPolicy.RESTART_FROM_INCOMPLETE);
int res = runStrategyDao.startWithRunStrategy(wfi, Defaults.DEFAULT_RUN_STRATEGY);
assertEquals(1, res);
assertEquals(2, wfi.getWorkflowRunId());

// two runs — ordered by run_id descending
minMax = instanceDao.getMinMaxRunIds(wfi.getWorkflowId(), wfi.getWorkflowInstanceId());
assertNotNull(minMax);
assertEquals(1L, minMax[0]);
assertEquals(2L, minMax[1]);
runs =
instanceDao.getWorkflowInstanceRuns(
wfi.getWorkflowId(), wfi.getWorkflowInstanceId(), minMax[0], minMax[1]);
assertEquals(2, runs.size());
assertEquals(2, runs.get(0).getWorkflowRunId());
assertEquals(WorkflowInstance.Status.CREATED, runs.get(0).getStatus());
assertEquals(1, runs.get(1).getWorkflowRunId());
assertEquals(WorkflowInstance.Status.FAILED, runs.get(1).getStatus());
}

@Test
public void testWorkflowInstanceMetadataForWorkflowInstancesLatestRun() {
instanceDao.tryTerminateQueuedInstance(wfi, WorkflowInstance.Status.FAILED, "kill the test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
value = "/api/v3/workflows",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
public class WorkflowInstanceController {
private static final int WORKFLOW_INSTANCE_MAX_BATCH_LIMIT = 200;
private static final int WORKFLOW_INSTANCE_MIN_BATCH_LIMIT = 1;
Expand Down Expand Up @@ -98,6 +99,93 @@ public WorkflowInstance getWorkflowInstanceView(
workflowId, workflowInstanceId, Constants.LATEST_INSTANCE_RUN, enriched, true);
}

@GetMapping(
value = "/{workflowId}/instances/{workflowInstanceId}/runs",
consumes = MediaType.ALL_VALUE)
@Operation(
summary = "Get all runs for a given workflow instance with pagination support",
description =
"Retrieves all runs for a given workflow instance with cursor-based pagination. "
+ "Use 'first' parameter for forward pagination or 'last' parameter for backward pagination, but not both. "
+ "The cursor parameter can be used to continue pagination from a specific run id.")
public PaginationResult<WorkflowInstance> getWorkflowInstanceRuns(
@Valid @NotNull @PathVariable("workflowId") String workflowId,
@PathVariable("workflowInstanceId") long workflowInstanceId,
@Parameter(
description =
"Number of results to return for forward pagination. Must be between 1 and 200 inclusive. "
+ "Cannot be used together with 'last' parameter.")
@RequestParam(name = "first", required = false)
@Max(WORKFLOW_INSTANCE_MAX_BATCH_LIMIT)
@Min(WORKFLOW_INSTANCE_MIN_BATCH_LIMIT)
Long first,
@Parameter(
description =
"Number of results to return for backward pagination. Must be between 1 and 200 inclusive. "
+ "Cannot be used together with 'first' parameter.")
@RequestParam(name = "last", required = false)
@Max(WORKFLOW_INSTANCE_MAX_BATCH_LIMIT)
@Min(WORKFLOW_INSTANCE_MIN_BATCH_LIMIT)
Long last,
@Parameter(
description =
"Cursor for pagination continuation. Should be a run id. "
+ "If not provided, pagination starts from the beginning (first) or end (last).")
@RequestParam(name = "cursor", required = false)
String cursor) {
return getWorkflowInstanceRunsResult(workflowId, workflowInstanceId, first, last, cursor);
}

private PaginationResult<WorkflowInstance> getWorkflowInstanceRunsResult(
String workflowId, long workflowInstanceId, Long first, Long last, String cursor) {
PaginationDirection direction = PaginationHelper.validateParamAndDeriveDirection(first, last);
long limit = (last == null) ? first : last;

// Early return for empty results
long[] minMaxRunIds = workflowInstanceDao.getMinMaxRunIds(workflowId, workflowInstanceId);
if (minMaxRunIds == null) {
return PaginationHelper.buildEmptyPaginationResult();
}

long earliestRunId = minMaxRunIds[0];
long latestRunId = minMaxRunIds[1];

// Calculate pagination range
PaginationHelper.PaginationRange paginationRange =
PaginationHelper.getPaginationRange(cursor, direction, earliestRunId, latestRunId, limit);

// Fetch runs
List<WorkflowInstance> runs =
workflowInstanceDao.getWorkflowInstanceRuns(
workflowId, workflowInstanceId, paginationRange.start(), paginationRange.end());

// Build pagination result
return PaginationHelper.buildPaginationResult(
runs, latestRunId, earliestRunId, this::extractRunIdRange);
}

/** Extracts the high and low run ids from a list of workflow instance runs. */
private long[] extractRunIdRange(List<WorkflowInstance> runs) {
if (runs == null || runs.isEmpty()) {
return new long[] {0L, 0L};
}

if (runs.size() == 1) {
long runId = runs.getFirst().getWorkflowRunId();
return new long[] {runId, runId};
}

long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (WorkflowInstance run : runs) {
long runId = run.getWorkflowRunId();
min = Math.min(min, runId);
max = Math.max(max, runId);
}

return new long[] {max, min}; // high, low
}

@GetMapping(value = "/{workflowId}/instances", consumes = MediaType.ALL_VALUE)
@Operation(
summary = "Get workflow instances with pagination support",
Expand Down
Loading