Skip to content

Commit f4fd738

Browse files
committed
prevent WorkflowMonitor OOM by projecting only metric fields
1 parent 1ca7ba5 commit f4fd738

11 files changed

Lines changed: 377 additions & 72 deletions

File tree

core/src/main/java/com/netflix/conductor/dao/MetadataDAO.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.util.List;
1616
import java.util.Optional;
17+
import java.util.stream.Collectors;
1718

1819
import com.netflix.conductor.common.metadata.tasks.TaskDef;
1920
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
@@ -86,4 +87,38 @@ public interface MetadataDAO {
8687
* @return List the latest versions of the workflow definitions
8788
*/
8889
List<WorkflowDef> getAllWorkflowDefsLatestVersions();
90+
91+
/**
92+
* Lightweight projection of the latest version of each workflow definition, used to publish
93+
* monitoring metrics without loading full definition bodies.
94+
*
95+
* <p>The default implementation projects from {@link #getAllWorkflowDefsLatestVersions()}.
96+
* Persistence modules backed by a query-capable store should override this to project the
97+
* fields at the database.
98+
*
99+
* @return one {@link WorkflowMetricInfo} per workflow name (latest version)
100+
*/
101+
default List<WorkflowMetricInfo> getWorkflowMetricInfo() {
102+
return getAllWorkflowDefsLatestVersions().stream()
103+
.map(def -> new WorkflowMetricInfo(def.getName(), def.getOwnerApp()))
104+
.collect(Collectors.toList());
105+
}
106+
107+
/**
108+
* Lightweight projection of task definitions, used to publish monitoring metrics without
109+
* loading full definition bodies.
110+
*
111+
* <p>The default implementation projects from {@link #getAllTaskDefs()}. Persistence modules
112+
* backed by a query-capable store should override this to project the fields at the database.
113+
*
114+
* @return one {@link TaskMetricInfo} per task definition
115+
*/
116+
default List<TaskMetricInfo> getTaskMetricInfo() {
117+
return getAllTaskDefs().stream()
118+
.map(
119+
def ->
120+
new TaskMetricInfo(
121+
def.getName(), def.getOwnerApp(), def.concurrencyLimit()))
122+
.collect(Collectors.toList());
123+
}
89124
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2025 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.dao;
14+
15+
/**
16+
* Lightweight projection of a task definition carrying only the fields needed to publish monitoring
17+
* metrics. Avoids deserializing the full {@code json_data} body when only the name, owner app and
18+
* concurrency limit are required.
19+
*/
20+
public record TaskMetricInfo(String name, String ownerApp, int concurrencyLimit) {}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2025 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.dao;
14+
15+
/**
16+
* Lightweight projection of a workflow definition carrying only the fields needed to publish
17+
* monitoring metrics. Avoids deserializing the full {@code json_data} body when only the name and
18+
* owner app are required.
19+
*/
20+
public record WorkflowMetricInfo(String name, String ownerApp) {}

core/src/main/java/com/netflix/conductor/metrics/WorkflowMonitor.java

Lines changed: 19 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,8 @@
1212
*/
1313
package com.netflix.conductor.metrics;
1414

15-
import java.util.ArrayList;
16-
import java.util.Comparator;
17-
import java.util.HashMap;
1815
import java.util.List;
19-
import java.util.Map;
20-
import java.util.NoSuchElementException;
2116
import java.util.Set;
22-
import java.util.stream.Collectors;
2317

2418
import org.slf4j.Logger;
2519
import org.slf4j.LoggerFactory;
@@ -29,12 +23,11 @@
2923
import org.springframework.scheduling.annotation.Scheduled;
3024
import org.springframework.stereotype.Component;
3125

32-
import com.netflix.conductor.annotations.VisibleForTesting;
33-
import com.netflix.conductor.common.metadata.tasks.TaskDef;
34-
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
3526
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
3627
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
3728
import com.netflix.conductor.dao.QueueDAO;
29+
import com.netflix.conductor.dao.TaskMetricInfo;
30+
import com.netflix.conductor.dao.WorkflowMetricInfo;
3831
import com.netflix.conductor.service.MetadataService;
3932

4033
import static com.netflix.conductor.core.execution.tasks.SystemTaskRegistry.ASYNC_SYSTEM_TASKS_QUALIFIER;
@@ -54,8 +47,8 @@ public class WorkflowMonitor {
5447
private final int metadataRefreshInterval;
5548
private final Set<WorkflowSystemTask> asyncSystemTasks;
5649

57-
private List<TaskDef> taskDefs;
58-
private List<WorkflowDef> workflowDefs;
50+
private List<TaskMetricInfo> taskMetricInfos;
51+
private List<WorkflowMetricInfo> workflowMetricInfos;
5952
private int refreshCounter = 0;
6053

6154
public WorkflowMonitor(
@@ -79,28 +72,27 @@ public WorkflowMonitor(
7972
public void reportMetrics() {
8073
try {
8174
if (refreshCounter <= 0) {
82-
workflowDefs = metadataService.getWorkflowDefs();
83-
taskDefs = new ArrayList<>(metadataService.getTaskDefs());
75+
workflowMetricInfos = metadataService.getWorkflowMetricInfo();
76+
taskMetricInfos = metadataService.getTaskMetricInfo();
8477
refreshCounter = metadataRefreshInterval;
8578
}
8679

87-
getPendingWorkflowToOwnerAppMap(workflowDefs)
88-
.forEach(
89-
(workflowName, ownerApp) -> {
90-
long count =
91-
executionDAOFacade.getPendingWorkflowCount(workflowName);
92-
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
93-
});
80+
workflowMetricInfos.forEach(
81+
workflow -> {
82+
long count = executionDAOFacade.getPendingWorkflowCount(workflow.name());
83+
Monitors.recordRunningWorkflows(
84+
count, workflow.name(), workflow.ownerApp());
85+
});
9486

95-
taskDefs.forEach(
96-
taskDef -> {
97-
long size = queueDAO.getSize(taskDef.getName());
87+
taskMetricInfos.forEach(
88+
task -> {
89+
long size = queueDAO.getSize(task.name());
9890
long inProgressCount =
99-
executionDAOFacade.getInProgressTaskCount(taskDef.getName());
100-
Monitors.recordQueueDepth(taskDef.getName(), size, taskDef.getOwnerApp());
101-
if (taskDef.concurrencyLimit() > 0) {
91+
executionDAOFacade.getInProgressTaskCount(task.name());
92+
Monitors.recordQueueDepth(task.name(), size, task.ownerApp());
93+
if (task.concurrencyLimit() > 0) {
10294
Monitors.recordTaskInProgress(
103-
taskDef.getName(), inProgressCount, taskDef.getOwnerApp());
95+
task.name(), inProgressCount, task.ownerApp());
10496
}
10597
});
10698

@@ -120,26 +112,4 @@ public void reportMetrics() {
120112
LOGGER.error("Error while publishing scheduled metrics", e);
121113
}
122114
}
123-
124-
/**
125-
* Pending workflow data does not contain information about version. We only need the owner app
126-
* and workflow name, and we only need to query for the workflow once.
127-
*/
128-
@VisibleForTesting
129-
Map<String, String> getPendingWorkflowToOwnerAppMap(List<WorkflowDef> workflowDefs) {
130-
final Map<String, List<WorkflowDef>> groupedWorkflowDefs =
131-
workflowDefs.stream().collect(Collectors.groupingBy(WorkflowDef::getName));
132-
133-
Map<String, String> workflowNameToOwnerMap = new HashMap<>();
134-
groupedWorkflowDefs.forEach(
135-
(key, value) -> {
136-
final WorkflowDef workflowDef =
137-
value.stream()
138-
.max(Comparator.comparing(WorkflowDef::getVersion))
139-
.orElseThrow(NoSuchElementException::new);
140-
141-
workflowNameToOwnerMap.put(key, workflowDef.getOwnerApp());
142-
});
143-
return workflowNameToOwnerMap;
144-
}
145115
}

core/src/main/java/com/netflix/conductor/service/MetadataService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
2424
import com.netflix.conductor.common.metadata.workflow.WorkflowDefSummary;
2525
import com.netflix.conductor.common.model.BulkResponse;
26+
import com.netflix.conductor.dao.TaskMetricInfo;
27+
import com.netflix.conductor.dao.WorkflowMetricInfo;
2628

2729
import jakarta.validation.Valid;
2830
import jakarta.validation.constraints.NotEmpty;
@@ -156,4 +158,16 @@ List<EventHandler> getEventHandlersForEvent(
156158
boolean activeOnly);
157159

158160
List<WorkflowDef> getWorkflowDefsLatestVersions();
161+
162+
/**
163+
* @return Lightweight projection of the latest version of each workflow definition (name and
164+
* owner app only), used to publish monitoring metrics.
165+
*/
166+
List<WorkflowMetricInfo> getWorkflowMetricInfo();
167+
168+
/**
169+
* @return Lightweight projection of task definitions (name, owner app, concurrency limit), used
170+
* to publish monitoring metrics.
171+
*/
172+
List<TaskMetricInfo> getTaskMetricInfo();
159173
}

core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.netflix.conductor.core.exception.NotFoundException;
3434
import com.netflix.conductor.dao.EventHandlerDAO;
3535
import com.netflix.conductor.dao.MetadataDAO;
36+
import com.netflix.conductor.dao.TaskMetricInfo;
37+
import com.netflix.conductor.dao.WorkflowMetricInfo;
3638
import com.netflix.conductor.validations.ValidationContext;
3739

3840
@Service
@@ -225,6 +227,16 @@ public List<WorkflowDef> getWorkflowDefsLatestVersions() {
225227
return metadataDAO.getAllWorkflowDefsLatestVersions();
226228
}
227229

230+
@Override
231+
public List<WorkflowMetricInfo> getWorkflowMetricInfo() {
232+
return metadataDAO.getWorkflowMetricInfo();
233+
}
234+
235+
@Override
236+
public List<TaskMetricInfo> getTaskMetricInfo() {
237+
return metadataDAO.getTaskMetricInfo();
238+
}
239+
228240
public Map<String, ? extends Iterable<WorkflowDefSummary>> getWorkflowNamesAndVersions() {
229241
List<WorkflowDef> workflowDefs = metadataDAO.getAllWorkflowDefs();
230242

core/src/test/java/com/netflix/conductor/metrics/WorkflowMonitorTest.java

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,26 @@
1313
package com.netflix.conductor.metrics;
1414

1515
import java.util.List;
16-
import java.util.Map;
1716
import java.util.Set;
1817

19-
import org.junit.Assert;
2018
import org.junit.Before;
2119
import org.junit.Test;
2220
import org.junit.runner.RunWith;
2321
import org.mockito.Mock;
2422
import org.springframework.test.context.junit4.SpringRunner;
2523

26-
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
2724
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
2825
import com.netflix.conductor.dao.QueueDAO;
26+
import com.netflix.conductor.dao.TaskMetricInfo;
27+
import com.netflix.conductor.dao.WorkflowMetricInfo;
2928
import com.netflix.conductor.service.MetadataService;
3029

30+
import static org.mockito.ArgumentMatchers.anyString;
31+
import static org.mockito.Mockito.never;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
35+
3136
@RunWith(SpringRunner.class)
3237
public class WorkflowMonitorTest {
3338

@@ -43,32 +48,59 @@ public void beforeEach() {
4348
new WorkflowMonitor(metadataService, queueDAO, executionDAOFacade, 1000, Set.of());
4449
}
4550

46-
private WorkflowDef makeDef(String name, int version, String ownerApp) {
47-
WorkflowDef wd = new WorkflowDef();
48-
wd.setName(name);
49-
wd.setVersion(version);
50-
wd.setOwnerApp(ownerApp);
51-
return wd;
51+
@Test
52+
public void testReportMetricsQueriesPerWorkflowName() {
53+
when(metadataService.getWorkflowMetricInfo())
54+
.thenReturn(
55+
List.of(
56+
new WorkflowMetricInfo("workflow1", "owner1"),
57+
new WorkflowMetricInfo("workflow2", "owner2")));
58+
when(metadataService.getTaskMetricInfo()).thenReturn(List.of());
59+
60+
workflowMonitor.reportMetrics();
61+
62+
verify(executionDAOFacade).getPendingWorkflowCount("workflow1");
63+
verify(executionDAOFacade).getPendingWorkflowCount("workflow2");
64+
}
65+
66+
@Test
67+
public void testReportMetricsRecordsInProgressOnlyWhenConcurrencyLimited() {
68+
when(metadataService.getWorkflowMetricInfo()).thenReturn(List.of());
69+
when(metadataService.getTaskMetricInfo())
70+
.thenReturn(
71+
List.of(
72+
new TaskMetricInfo("limited", "owner", 5),
73+
new TaskMetricInfo("unlimited", "owner", 0)));
74+
75+
workflowMonitor.reportMetrics();
76+
77+
// Queue depth is recorded for every task; in-progress count is queried for both, but
78+
// only the concurrency-limited task should drive an in-progress metric.
79+
verify(queueDAO).getSize("limited");
80+
verify(queueDAO).getSize("unlimited");
81+
verify(executionDAOFacade).getInProgressTaskCount("limited");
5282
}
5383

5484
@Test
55-
public void testPendingWorkflowDataMap() {
56-
WorkflowDef test1_1 = makeDef("test1", 1, null);
57-
WorkflowDef test1_2 = makeDef("test1", 2, "name1");
85+
public void testRefreshHappensOncePerInterval() {
86+
when(metadataService.getWorkflowMetricInfo()).thenReturn(List.of());
87+
when(metadataService.getTaskMetricInfo()).thenReturn(List.of());
5888

59-
WorkflowDef test2_1 = makeDef("test2", 1, "first");
60-
WorkflowDef test2_2 = makeDef("test2", 2, "mid");
61-
WorkflowDef test2_3 = makeDef("test2", 3, "last");
89+
workflowMonitor.reportMetrics();
90+
workflowMonitor.reportMetrics();
6291

63-
final Map<String, String> mapping =
64-
workflowMonitor.getPendingWorkflowToOwnerAppMap(
65-
List.of(test1_1, test1_2, test2_1, test2_2, test2_3));
92+
// metadataRefreshInterval is 1000, so the cached defs are reused on the second call.
93+
verify(metadataService, times(1)).getWorkflowMetricInfo();
94+
verify(metadataService, times(1)).getTaskMetricInfo();
95+
}
96+
97+
@Test
98+
public void testNoMetricsWhenCatalogEmpty() {
99+
when(metadataService.getWorkflowMetricInfo()).thenReturn(List.of());
100+
when(metadataService.getTaskMetricInfo()).thenReturn(List.of());
66101

67-
Assert.assertEquals(2, mapping.keySet().size());
68-
Assert.assertTrue(mapping.containsKey("test1"));
69-
Assert.assertTrue(mapping.containsKey("test2"));
102+
workflowMonitor.reportMetrics();
70103

71-
Assert.assertEquals("name1", mapping.get("test1"));
72-
Assert.assertEquals("last", mapping.get("test2"));
104+
verify(executionDAOFacade, never()).getPendingWorkflowCount(anyString());
73105
}
74106
}

0 commit comments

Comments
 (0)