From 2f3e0750f54b3e91c4a792012eac0baaffe60dca Mon Sep 17 00:00:00 2001 From: adonthi-fws Date: Wed, 3 Jun 2026 12:18:42 +0530 Subject: [PATCH 1/3] prometheus metrics for http webhook publisheres --- .../netflix/conductor/metrics/Monitors.java | 37 +++++++++++++++++++ .../listener/TaskStatusPublisher.java | 14 +++++++ .../statuschange/StatusChangePublisher.java | 14 +++++++ 3 files changed, 65 insertions(+) diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 011f129c05..09656885ab 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -478,6 +478,43 @@ public static void recordWorkflowArchived(String workflowType, WorkflowModel.Sta counter("workflow_archived", "workflowName", workflowType, "workflowStatus", status.name()); } + public static void recordWebhookPublishSuccess( + String notificationType, String name, String status) { + counter( + "webhook_publish_success", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown"), + "status", + StringUtils.defaultIfBlank(status, "unknown")); + } + + public static void recordWebhookPublishFailure( + String notificationType, String name, String errorType) { + counter( + "webhook_publish_failure", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown"), + "errorType", + StringUtils.defaultIfBlank(errorType, "unknown")); + } + + public static void recordWebhookEnqueueFailure(String notificationType, String name) { + counter( + "webhook_enqueue_failure", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown")); + } + + public static void recordWebhookQueueDepth(String notificationType, int size) { + gauge("webhook_queue_depth", size, "notificationType", notificationType); + } + public static void recordArchivalDelayQueueSize(int val) { gauge("workflow_archival_delay_queue_size", val); } diff --git a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java index 9cc8ea1973..4f9f45918a 100644 --- a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java +++ b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java @@ -25,12 +25,14 @@ import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @Singleton public class TaskStatusPublisher implements TaskStatusListener { private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisher.class); + private static final String NOTIFICATION_TYPE = "TASK"; private static final Integer QDEPTH = Integer.parseInt( System.getenv().getOrDefault("ENV_TASK_NOTIFICATION_QUEUE_SIZE", "50")); @@ -74,6 +76,10 @@ public void run() { } publishTaskNotification(taskNotification); LOGGER.debug("Task {} publish is successful.", taskNotification.getTaskId()); + Monitors.recordWebhookPublishSuccess( + NOTIFICATION_TYPE, + task.getTaskDefName(), + task.getStatus().name()); Thread.sleep(5); } catch (Exception e) { if (taskNotification != null) { @@ -87,6 +93,12 @@ public void run() { LOGGER.error("Failed to publish task: Task is NULL"); } LOGGER.error("Error on publishing ", e); + if (task != null) { + Monitors.recordWebhookPublishFailure( + NOTIFICATION_TYPE, + task.getTaskDefName(), + e.getClass().getSimpleName()); + } } } } @@ -118,6 +130,7 @@ private void validateSubscribedTaskStatuses(List subscribedTaskStatuses) private void enqueueTask(TaskModel task) { try { blockingQueue.put(task); + Monitors.recordWebhookQueueDepth(NOTIFICATION_TYPE, blockingQueue.size()); } catch (Exception e) { LOGGER.debug( "Failed to enqueue task: Id {} Type {} of workflow {} ", @@ -125,6 +138,7 @@ private void enqueueTask(TaskModel task) { task.getTaskType(), task.getWorkflowInstanceId()); LOGGER.debug(e.toString()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, task.getTaskDefName()); } } diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java index 735dbcf1d3..f5e4d89b22 100644 --- a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java @@ -27,12 +27,14 @@ import com.netflix.conductor.contribs.listener.RestClientManager; import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.WorkflowModel; @Singleton public class StatusChangePublisher implements WorkflowStatusListener { private static final Logger LOGGER = LoggerFactory.getLogger(StatusChangePublisher.class); + private static final String NOTIFICATION_TYPE = "WORKFLOW"; private static final Integer QDEPTH = Integer.parseInt( System.getenv().getOrDefault("ENV_WORKFLOW_NOTIFICATION_QUEUE_SIZE", "50")); @@ -72,6 +74,10 @@ public void run() { LOGGER.debug( "Workflow {} publish is successful.", statusChangeNotification.getWorkflowId()); + Monitors.recordWebhookPublishSuccess( + NOTIFICATION_TYPE, + workflow.getWorkflowName(), + workflow.getStatus().name()); Thread.sleep(5); } catch (Exception e) { if (statusChangeNotification != null) { @@ -85,6 +91,12 @@ public void run() { LOGGER.error("Failed to publish workflow: Workflow is NULL"); } LOGGER.error("Error on publishing workflow", e); + if (workflow != null) { + Monitors.recordWebhookPublishFailure( + NOTIFICATION_TYPE, + workflow.getWorkflowName(), + e.getClass().getSimpleName()); + } } } } @@ -196,12 +208,14 @@ private void enqueueWorkflow(WorkflowModel workflow) { workflow.getStatus()); try { blockingQueue.put(workflow); + Monitors.recordWebhookQueueDepth(NOTIFICATION_TYPE, blockingQueue.size()); } catch (Exception e) { LOGGER.error( "Failed to enqueue workflow: Id {} Name {}", workflow.getWorkflowId(), workflow.getWorkflowName()); LOGGER.error(e.getMessage()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, workflow.getWorkflowName()); } } From 91d787ef9c2d6b98b54d2730b0c06b70bacdefb2 Mon Sep 17 00:00:00 2001 From: adonthi-fws Date: Mon, 15 Jun 2026 16:16:30 +0530 Subject: [PATCH 2/3] code review requested changes --- .../netflix/conductor/metrics/Monitors.java | 18 +++++++++++++++-- .../conductor/metrics/MonitorsTest.java | 20 +++++++++++++++++++ .../listener/TaskStatusPublisher.java | 19 ++++++++---------- .../statuschange/StatusChangePublisher.java | 18 ++++++++--------- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 09656885ab..37487f4068 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -16,6 +16,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -67,6 +69,7 @@ public static void addMeterRegistry(MeterRegistry meterRegistry) { private static final Map timers = new ConcurrentHashMap<>(); private static final Map distributionSummaries = new ConcurrentHashMap<>(); + private static final Set registeredWebhookQueueGauges = ConcurrentHashMap.newKeySet(); private Monitors() {} @@ -511,8 +514,19 @@ public static void recordWebhookEnqueueFailure(String notificationType, String n StringUtils.defaultIfBlank(name, "unknown")); } - public static void recordWebhookQueueDepth(String notificationType, int size) { - gauge("webhook_queue_depth", size, "notificationType", notificationType); + /** + * Registers a pull-style gauge that reads {@link BlockingQueue#size()} at scrape time. Tagged + * by {@code notificationType} only — each publisher owns one shared queue, so per-name depth is + * not meaningful (unlike success/failure counters). + */ + public static void registerWebhookQueueDepthGauge( + BlockingQueue queue, String notificationType) { + String key = "webhook_queue_depth:" + notificationType; + if (registeredWebhookQueueGauges.add(key)) { + Gauge.builder("webhook_queue_depth", queue, BlockingQueue::size) + .tag("notificationType", notificationType) + .register(registry); + } } public static void recordArchivalDelayQueueSize(int val) { diff --git a/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java b/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java index 84ce3a895d..d323043389 100644 --- a/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java +++ b/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java @@ -12,6 +12,8 @@ */ package com.netflix.conductor.metrics; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -108,4 +110,22 @@ public void metersWithDifferentTagsAreDistinct() { Monitors.getCounter("test_tag_distinct", "env", "prod"), Monitors.getCounter("test_tag_distinct", "env", "staging")); } + + @Test + public void registerWebhookQueueDepthGauge_readsLiveQueueSize() { + SimpleMeterRegistry probe = new SimpleMeterRegistry(); + Monitors.addMeterRegistry(probe); + + BlockingQueue queue = new LinkedBlockingDeque<>(2); + Monitors.registerWebhookQueueDepthGauge(queue, "WORKFLOW"); + + assertEquals(0.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + + queue.offer("a"); + queue.offer("b"); + assertEquals(2.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + + queue.poll(); + assertEquals(1.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + } } diff --git a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java index 4f9f45918a..b4bda286da 100644 --- a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java +++ b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java @@ -113,6 +113,7 @@ public TaskStatusPublisher( this.executionDAOFacade = executionDAOFacade; this.subscribedTaskStatusList = subscribedTaskStatuses; validateSubscribedTaskStatuses(subscribedTaskStatuses); + Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE); ConsumerThread consumerThread = new ConsumerThread(); consumerThread.start(); } @@ -128,18 +129,14 @@ private void validateSubscribedTaskStatuses(List subscribedTaskStatuses) } private void enqueueTask(TaskModel task) { - try { - blockingQueue.put(task); - Monitors.recordWebhookQueueDepth(NOTIFICATION_TYPE, blockingQueue.size()); - } catch (Exception e) { - LOGGER.debug( - "Failed to enqueue task: Id {} Type {} of workflow {} ", - task.getTaskId(), - task.getTaskType(), - task.getWorkflowInstanceId()); - LOGGER.debug(e.toString()); - Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, task.getTaskDefName()); + if (blockingQueue.offer(task)) { + return; } + LOGGER.warn( + "Webhook notification queue full, dropping TASK notification for task {} ({})", + task.getTaskId(), + task.getTaskDefName()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, task.getTaskDefName()); } @Override diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java index f5e4d89b22..a7f7e8d49a 100644 --- a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java @@ -114,6 +114,7 @@ public StatusChangePublisher( (subscribedWorkflowStatuses != null && !subscribedWorkflowStatuses.isEmpty()) ? subscribedWorkflowStatuses : Arrays.asList("COMPLETED", "TERMINATED"); + Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE); ConsumerThread consumerThread = new ConsumerThread(); consumerThread.start(); } @@ -206,17 +207,14 @@ private void enqueueWorkflow(WorkflowModel workflow) { workflow.getWorkflowId(), workflow.getWorkflowName(), workflow.getStatus()); - try { - blockingQueue.put(workflow); - Monitors.recordWebhookQueueDepth(NOTIFICATION_TYPE, blockingQueue.size()); - } catch (Exception e) { - LOGGER.error( - "Failed to enqueue workflow: Id {} Name {}", - workflow.getWorkflowId(), - workflow.getWorkflowName()); - LOGGER.error(e.getMessage()); - Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, workflow.getWorkflowName()); + if (blockingQueue.offer(workflow)) { + return; } + LOGGER.warn( + "Webhook notification queue full, dropping WORKFLOW notification for {} ({})", + workflow.getWorkflowId(), + workflow.getWorkflowName()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, workflow.getWorkflowName()); } private void publishStatusChangeNotification(StatusChangeNotification statusChangeNotification) From 704b2ebed9aaf24b007c909c3aecabbf635dbe85 Mon Sep 17 00:00:00 2001 From: nthmost-orkes Date: Tue, 16 Jun 2026 12:31:41 -0700 Subject: [PATCH 3/3] style: apply spotless formatting to TaskStatusPublisher --- .../conductor/contribs/listener/TaskStatusPublisher.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java index b4bda286da..05ac68f411 100644 --- a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java +++ b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java @@ -77,9 +77,7 @@ public void run() { publishTaskNotification(taskNotification); LOGGER.debug("Task {} publish is successful.", taskNotification.getTaskId()); Monitors.recordWebhookPublishSuccess( - NOTIFICATION_TYPE, - task.getTaskDefName(), - task.getStatus().name()); + NOTIFICATION_TYPE, task.getTaskDefName(), task.getStatus().name()); Thread.sleep(5); } catch (Exception e) { if (taskNotification != null) {