diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 011f129c05..37487f4068 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -16,6 +16,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -67,6 +69,7 @@ public static void addMeterRegistry(MeterRegistry meterRegistry) { private static final Map timers = new ConcurrentHashMap<>(); private static final Map distributionSummaries = new ConcurrentHashMap<>(); + private static final Set registeredWebhookQueueGauges = ConcurrentHashMap.newKeySet(); private Monitors() {} @@ -478,6 +481,54 @@ public static void recordWorkflowArchived(String workflowType, WorkflowModel.Sta counter("workflow_archived", "workflowName", workflowType, "workflowStatus", status.name()); } + public static void recordWebhookPublishSuccess( + String notificationType, String name, String status) { + counter( + "webhook_publish_success", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown"), + "status", + StringUtils.defaultIfBlank(status, "unknown")); + } + + public static void recordWebhookPublishFailure( + String notificationType, String name, String errorType) { + counter( + "webhook_publish_failure", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown"), + "errorType", + StringUtils.defaultIfBlank(errorType, "unknown")); + } + + public static void recordWebhookEnqueueFailure(String notificationType, String name) { + counter( + "webhook_enqueue_failure", + "notificationType", + notificationType, + "name", + StringUtils.defaultIfBlank(name, "unknown")); + } + + /** + * Registers a pull-style gauge that reads {@link BlockingQueue#size()} at scrape time. Tagged + * by {@code notificationType} only — each publisher owns one shared queue, so per-name depth is + * not meaningful (unlike success/failure counters). + */ + public static void registerWebhookQueueDepthGauge( + BlockingQueue queue, String notificationType) { + String key = "webhook_queue_depth:" + notificationType; + if (registeredWebhookQueueGauges.add(key)) { + Gauge.builder("webhook_queue_depth", queue, BlockingQueue::size) + .tag("notificationType", notificationType) + .register(registry); + } + } + public static void recordArchivalDelayQueueSize(int val) { gauge("workflow_archival_delay_queue_size", val); } diff --git a/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java b/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java index 84ce3a895d..d323043389 100644 --- a/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java +++ b/core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java @@ -12,6 +12,8 @@ */ package com.netflix.conductor.metrics; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -108,4 +110,22 @@ public void metersWithDifferentTagsAreDistinct() { Monitors.getCounter("test_tag_distinct", "env", "prod"), Monitors.getCounter("test_tag_distinct", "env", "staging")); } + + @Test + public void registerWebhookQueueDepthGauge_readsLiveQueueSize() { + SimpleMeterRegistry probe = new SimpleMeterRegistry(); + Monitors.addMeterRegistry(probe); + + BlockingQueue queue = new LinkedBlockingDeque<>(2); + Monitors.registerWebhookQueueDepthGauge(queue, "WORKFLOW"); + + assertEquals(0.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + + queue.offer("a"); + queue.offer("b"); + assertEquals(2.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + + queue.poll(); + assertEquals(1.0, probe.find("webhook_queue_depth").gauge().value(), 0.001); + } } diff --git a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java index 9cc8ea1973..05ac68f411 100644 --- a/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java +++ b/task-status-listener/src/main/java/com/netflix/conductor/contribs/listener/TaskStatusPublisher.java @@ -25,12 +25,14 @@ import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @Singleton public class TaskStatusPublisher implements TaskStatusListener { private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisher.class); + private static final String NOTIFICATION_TYPE = "TASK"; private static final Integer QDEPTH = Integer.parseInt( System.getenv().getOrDefault("ENV_TASK_NOTIFICATION_QUEUE_SIZE", "50")); @@ -74,6 +76,8 @@ public void run() { } publishTaskNotification(taskNotification); LOGGER.debug("Task {} publish is successful.", taskNotification.getTaskId()); + Monitors.recordWebhookPublishSuccess( + NOTIFICATION_TYPE, task.getTaskDefName(), task.getStatus().name()); Thread.sleep(5); } catch (Exception e) { if (taskNotification != null) { @@ -87,6 +91,12 @@ public void run() { LOGGER.error("Failed to publish task: Task is NULL"); } LOGGER.error("Error on publishing ", e); + if (task != null) { + Monitors.recordWebhookPublishFailure( + NOTIFICATION_TYPE, + task.getTaskDefName(), + e.getClass().getSimpleName()); + } } } } @@ -101,6 +111,7 @@ public TaskStatusPublisher( this.executionDAOFacade = executionDAOFacade; this.subscribedTaskStatusList = subscribedTaskStatuses; validateSubscribedTaskStatuses(subscribedTaskStatuses); + Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE); ConsumerThread consumerThread = new ConsumerThread(); consumerThread.start(); } @@ -116,16 +127,14 @@ private void validateSubscribedTaskStatuses(List subscribedTaskStatuses) } private void enqueueTask(TaskModel task) { - try { - blockingQueue.put(task); - } catch (Exception e) { - LOGGER.debug( - "Failed to enqueue task: Id {} Type {} of workflow {} ", - task.getTaskId(), - task.getTaskType(), - task.getWorkflowInstanceId()); - LOGGER.debug(e.toString()); + if (blockingQueue.offer(task)) { + return; } + LOGGER.warn( + "Webhook notification queue full, dropping TASK notification for task {} ({})", + task.getTaskId(), + task.getTaskDefName()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, task.getTaskDefName()); } @Override diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java index 735dbcf1d3..a7f7e8d49a 100644 --- a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/statuschange/StatusChangePublisher.java @@ -27,12 +27,14 @@ import com.netflix.conductor.contribs.listener.RestClientManager; import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.WorkflowModel; @Singleton public class StatusChangePublisher implements WorkflowStatusListener { private static final Logger LOGGER = LoggerFactory.getLogger(StatusChangePublisher.class); + private static final String NOTIFICATION_TYPE = "WORKFLOW"; private static final Integer QDEPTH = Integer.parseInt( System.getenv().getOrDefault("ENV_WORKFLOW_NOTIFICATION_QUEUE_SIZE", "50")); @@ -72,6 +74,10 @@ public void run() { LOGGER.debug( "Workflow {} publish is successful.", statusChangeNotification.getWorkflowId()); + Monitors.recordWebhookPublishSuccess( + NOTIFICATION_TYPE, + workflow.getWorkflowName(), + workflow.getStatus().name()); Thread.sleep(5); } catch (Exception e) { if (statusChangeNotification != null) { @@ -85,6 +91,12 @@ public void run() { LOGGER.error("Failed to publish workflow: Workflow is NULL"); } LOGGER.error("Error on publishing workflow", e); + if (workflow != null) { + Monitors.recordWebhookPublishFailure( + NOTIFICATION_TYPE, + workflow.getWorkflowName(), + e.getClass().getSimpleName()); + } } } } @@ -102,6 +114,7 @@ public StatusChangePublisher( (subscribedWorkflowStatuses != null && !subscribedWorkflowStatuses.isEmpty()) ? subscribedWorkflowStatuses : Arrays.asList("COMPLETED", "TERMINATED"); + Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE); ConsumerThread consumerThread = new ConsumerThread(); consumerThread.start(); } @@ -194,15 +207,14 @@ private void enqueueWorkflow(WorkflowModel workflow) { workflow.getWorkflowId(), workflow.getWorkflowName(), workflow.getStatus()); - try { - blockingQueue.put(workflow); - } catch (Exception e) { - LOGGER.error( - "Failed to enqueue workflow: Id {} Name {}", - workflow.getWorkflowId(), - workflow.getWorkflowName()); - LOGGER.error(e.getMessage()); + if (blockingQueue.offer(workflow)) { + return; } + LOGGER.warn( + "Webhook notification queue full, dropping WORKFLOW notification for {} ({})", + workflow.getWorkflowId(), + workflow.getWorkflowName()); + Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, workflow.getWorkflowName()); } private void publishStatusChangeNotification(StatusChangeNotification statusChangeNotification)