Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public class MaestroStepInstanceDao extends AbstractDatabaseDao {
+ GET_STEP_FIELD_QUERY_FROM
+ "AND workflow_run_id=?) SELECT * FROM inner_ranked WHERE rank=1";

private static final String GET_ALL_STEP_INSTANCE_VIEWS_QUERY =
INNER_RANK_QUERY_ALL_FIELD_WITH
+ ", ROW_NUMBER() OVER (PARTITION BY step_id ORDER BY workflow_run_id DESC, step_attempt_id DESC) AS rank"
+ GET_STEP_FIELD_QUERY_FROM
+ ") SELECT * FROM inner_ranked WHERE rank=1";
Copy link
Copy Markdown
Collaborator

@akashdw akashdw Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have any benchmarks or EXPLAIN / query plan results, could you share those as well?

Copy link
Copy Markdown
Collaborator Author

@anjujha anjujha Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pasted the query plan below

QUERY PLAN

 Subquery Scan on inner_ranked  (cost=56.59..57.05 rows=1 width=1707) (actual time=1.327..1.486 rows=211 loops=1)                                                                                                                                                   
   Filter: (inner_ranked.rank = 1)                                                                                                                                                                                                                                  
   ->  WindowAgg  (cost=56.59..56.88 rows=13 width=1707) (actual time=1.326..1.471 rows=211 loops=1)                                                                                                                                                                
         Run Condition: (row_number() OVER (?) <= 1)                                                                                                                                                                                                                
         ->  Sort  (cost=56.59..56.62 rows=13 width=1699) (actual time=1.320..1.329 rows=211 loops=1)                                                                                                                                                               
               Sort Key: maestro_step_instance.step_id COLLATE "C", maestro_step_instance.workflow_run_id DESC, maestro_step_instance.step_attempt_id DESC                                                                                                          
               Sort Method: quicksort  Memory: 410kB                                                                                                                                                                                                                
               ->  Index Scan using maestro_step_instance_pkey on maestro_step_instance  (cost=0.42..56.35 rows=13 width=1699) (actual time=0.028..0.254 rows=211 loops=1)                                                                                          
                     Index Cond: ((workflow_id = '<redacted>_large_demo'::text) AND (workflow_instance_id = 1))                                                                                                                                                        
 Planning Time: 0.346 ms                                                                                                                                                                                                                                            
 Execution Time: 1.567 ms                                                                                                                                                                                                                                           
(11 rows)               
Query always hits the primary key index on (workflow_id, workflow_instance_id)


private final MaestroQueueSystem queueSystem;

/**
Expand Down Expand Up @@ -1229,6 +1235,37 @@ public Long getNextUniqueId() {
"Failed to get the next unique id");
}

/**
* Get the latest step attempts of all the steps across all runs for a given workflow instance
* (workflow id, instance id).
*
* @param workflowId workflow id
* @param workflowInstanceId workflow instance id
* @return latest step attempts of all the steps across all runs
*/
public List<StepInstance> getAllStepInstanceViews(String workflowId, long workflowInstanceId) {
return withMetricLogError(
() ->
withRetryableQuery(
GET_ALL_STEP_INSTANCE_VIEWS_QUERY,
stmt -> {
int idx = 0;
stmt.setString(++idx, workflowId);
stmt.setLong(++idx, workflowInstanceId);
},
result -> {
List<StepInstance> instances = new ArrayList<>();
while (result.next()) {
instances.add(maestroStepFromResult(result));
}
return instances;
}),
"getAllStepInstanceViews",
"Failed to get latest step attempts across all runs for workflow instance [{}][{}]",
workflowId,
workflowInstanceId);
}

/**
* Get the latest step attempts of all the steps for a given workflow instance run (workflow id,
* instance id, run id).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -321,6 +322,37 @@ public void testGetAllStepInstances() {
Assertions.assertThat(instance).usingRecursiveComparison().isEqualTo(si);
}

@Test
public void testGetAllStepInstanceViews() throws Exception {
// run 1: job1 (RUNNING) already seeded by setUp; add job2 (RUNNING) to the same run
StepInstance job2Run1 = loadObject(TEST_STEP_INSTANCE, StepInstance.class);
job2Run1.setStepId("job2");
job2Run1.setStepInstanceId(2);
stepDao.insertOrUpsertStepInstance(job2Run1, false, null);

// run 2 (restart from failure): only job1 ran again — job2 succeeded in run 1 and was skipped
StepInstance job1Run2 =
loadObject("fixtures/instances/sample-step-instance-finishing.json", StepInstance.class);
stepDao.insertOrUpsertStepInstance(job1Run2, false, null);

// should return one entry per step: job1 from run 2, job2 from run 1
List<StepInstance> instances = stepDao.getAllStepInstanceViews(TEST_WORKFLOW_ID, 1);
instances.sort(Comparator.comparingLong(StepInstance::getStepInstanceId));
assertEquals(2, instances.size());

StepInstance job1Result = instances.get(0);
assertEquals("job1", job1Result.getStepId());
assertEquals(2, job1Result.getWorkflowRunId());
assertEquals(StepInstance.Status.FINISHING, job1Result.getRuntimeState().getStatus());
assertEquals("ff4ccce2-0fda-4882-9cd8-12ff90cb5f02", job1Result.getStepUuid());

StepInstance job2Result = instances.get(1);
assertEquals("job2", job2Result.getStepId());
assertEquals(1, job2Result.getWorkflowRunId());
assertEquals(StepInstance.Status.RUNNING, job2Result.getRuntimeState().getStatus());
assertEquals("ff4ccce2-0fda-4882-9cd8-12ff90cb5f06", job2Result.getStepUuid());
}

@Test
public void testGetStepInstances() {
List<StepInstance> instances = stepDao.getStepInstances(TEST_WORKFLOW_ID, 1, 1, "job1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
value = "/api/v3/workflows",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why this is needed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this here because without this we will have to use constant for 'workflowId' and 'workflowInstanceId' in line 98 and 99 below
@Valid @NotNull @PathVariable("workflowId") String workflowId

Previously this file has a few such strings but with my new endpoint it crossed over the PMD threshold

Similar pattern is used in other controllers

public class StepInstanceController {

private final MaestroStepInstanceDao stepInstanceDao;
Expand Down Expand Up @@ -88,6 +89,20 @@ public StepInstance getStepInstance(
return instance;
}

@GetMapping(
value = "/{workflowId}/instances/{workflowInstanceId}/steps",
consumes = MediaType.ALL_VALUE)
@Operation(
summary = "Get the most recent step instance across all runs for a given workflow instance")
public List<StepInstance> getAllStepInstanceViews(
@Valid @NotNull @PathVariable("workflowId") String workflowId,
@PathVariable("workflowInstanceId") long workflowInstanceId) {
List<StepInstance> instances =
stepInstanceDao.getAllStepInstanceViews(workflowId, workflowInstanceId);
instances.sort(Comparator.comparingLong(StepInstance::getStepInstanceId));
return instances;
}

@GetMapping(
value = "/{workflowId}/instances/{workflowInstanceId}/runs/{workflowRunId}/steps",
consumes = MediaType.ALL_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,19 @@ public void testGetStepInstanceViews() {
assertEquals(1, ret.size());
assertEquals(instance2, ret.get(0));
}

@Test
public void testGetAllStepInstanceViews() {
StepInstance instance1 = mock(StepInstance.class);
StepInstance instance2 = mock(StepInstance.class);
when(instance1.getStepInstanceId()).thenReturn(2L);
when(instance2.getStepInstanceId()).thenReturn(1L);
when(mockInstanceDao.getAllStepInstanceViews("test-workflow", 1))
.thenReturn(Arrays.asList(instance1, instance2));
List<StepInstance> ret = stepInstanceController.getAllStepInstanceViews("test-workflow", 1);
verify(mockInstanceDao, times(1)).getAllStepInstanceViews("test-workflow", 1);
assertEquals(2, ret.size());
assertEquals(instance2, ret.get(0));
assertEquals(instance1, ret.get(1));
}
}
Loading