Skip to content
51 changes: 51 additions & 0 deletions core/src/main/java/com/netflix/conductor/metrics/Monitors.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -67,6 +69,7 @@ public static void addMeterRegistry(MeterRegistry meterRegistry) {
private static final Map<String, Timer> timers = new ConcurrentHashMap<>();
private static final Map<String, DistributionSummary> distributionSummaries =
new ConcurrentHashMap<>();
private static final Set<String> registeredWebhookQueueGauges = ConcurrentHashMap.newKeySet();

private Monitors() {}

Expand Down Expand Up @@ -478,6 +481,54 @@ public static void recordWorkflowArchived(String workflowType, WorkflowModel.Sta
counter("workflow_archived", "workflowName", workflowType, "workflowStatus", status.name());
}

public static void recordWebhookPublishSuccess(
String notificationType, String name, String status) {
counter(
"webhook_publish_success",
"notificationType",
notificationType,
"name",
StringUtils.defaultIfBlank(name, "unknown"),
"status",
StringUtils.defaultIfBlank(status, "unknown"));
}

public static void recordWebhookPublishFailure(
String notificationType, String name, String errorType) {
counter(
"webhook_publish_failure",
"notificationType",
notificationType,
"name",
StringUtils.defaultIfBlank(name, "unknown"),
"errorType",
StringUtils.defaultIfBlank(errorType, "unknown"));
}

public static void recordWebhookEnqueueFailure(String notificationType, String name) {
counter(
"webhook_enqueue_failure",
"notificationType",
notificationType,
"name",
StringUtils.defaultIfBlank(name, "unknown"));
}

/**
* Registers a pull-style gauge that reads {@link BlockingQueue#size()} at scrape time. Tagged
* by {@code notificationType} only — each publisher owns one shared queue, so per-name depth is
* not meaningful (unlike success/failure counters).
*/
public static void registerWebhookQueueDepthGauge(
BlockingQueue<?> queue, String notificationType) {
String key = "webhook_queue_depth:" + notificationType;
if (registeredWebhookQueueGauges.add(key)) {
Gauge.builder("webhook_queue_depth", queue, BlockingQueue::size)
.tag("notificationType", notificationType)
.register(registry);
}
}

public static void recordArchivalDelayQueueSize(int val) {
gauge("workflow_archival_delay_queue_size", val);
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/java/com/netflix/conductor/metrics/MonitorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.metrics;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
Expand Down Expand Up @@ -108,4 +110,22 @@ public void metersWithDifferentTagsAreDistinct() {
Monitors.getCounter("test_tag_distinct", "env", "prod"),
Monitors.getCounter("test_tag_distinct", "env", "staging"));
}

@Test
public void registerWebhookQueueDepthGauge_readsLiveQueueSize() {
SimpleMeterRegistry probe = new SimpleMeterRegistry();
Monitors.addMeterRegistry(probe);

BlockingQueue<String> queue = new LinkedBlockingDeque<>(2);
Monitors.registerWebhookQueueDepthGauge(queue, "WORKFLOW");

assertEquals(0.0, probe.find("webhook_queue_depth").gauge().value(), 0.001);

queue.offer("a");
queue.offer("b");
assertEquals(2.0, probe.find("webhook_queue_depth").gauge().value(), 0.001);

queue.poll();
assertEquals(1.0, probe.find("webhook_queue_depth").gauge().value(), 0.001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;

@Singleton
public class TaskStatusPublisher implements TaskStatusListener {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisher.class);
private static final String NOTIFICATION_TYPE = "TASK";
private static final Integer QDEPTH =
Integer.parseInt(
System.getenv().getOrDefault("ENV_TASK_NOTIFICATION_QUEUE_SIZE", "50"));
Expand Down Expand Up @@ -74,6 +76,8 @@ public void run() {
}
publishTaskNotification(taskNotification);
LOGGER.debug("Task {} publish is successful.", taskNotification.getTaskId());
Monitors.recordWebhookPublishSuccess(
NOTIFICATION_TYPE, task.getTaskDefName(), task.getStatus().name());
Thread.sleep(5);
} catch (Exception e) {
if (taskNotification != null) {
Expand All @@ -87,6 +91,12 @@ public void run() {
LOGGER.error("Failed to publish task: Task is NULL");
}
LOGGER.error("Error on publishing ", e);
if (task != null) {
Monitors.recordWebhookPublishFailure(
NOTIFICATION_TYPE,
task.getTaskDefName(),
e.getClass().getSimpleName());
}
}
}
}
Expand All @@ -101,6 +111,7 @@ public TaskStatusPublisher(
this.executionDAOFacade = executionDAOFacade;
this.subscribedTaskStatusList = subscribedTaskStatuses;
validateSubscribedTaskStatuses(subscribedTaskStatuses);
Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE);
ConsumerThread consumerThread = new ConsumerThread();
consumerThread.start();
}
Expand All @@ -116,16 +127,14 @@ private void validateSubscribedTaskStatuses(List<String> subscribedTaskStatuses)
}

private void enqueueTask(TaskModel task) {
try {
blockingQueue.put(task);
} catch (Exception e) {
LOGGER.debug(
"Failed to enqueue task: Id {} Type {} of workflow {} ",
task.getTaskId(),
task.getTaskType(),
task.getWorkflowInstanceId());
LOGGER.debug(e.toString());
if (blockingQueue.offer(task)) {
return;
}
LOGGER.warn(
"Webhook notification queue full, dropping TASK notification for task {} ({})",
task.getTaskId(),
task.getTaskDefName());
Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, task.getTaskDefName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import com.netflix.conductor.contribs.listener.RestClientManager;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.WorkflowModel;

@Singleton
public class StatusChangePublisher implements WorkflowStatusListener {

private static final Logger LOGGER = LoggerFactory.getLogger(StatusChangePublisher.class);
private static final String NOTIFICATION_TYPE = "WORKFLOW";
private static final Integer QDEPTH =
Integer.parseInt(
System.getenv().getOrDefault("ENV_WORKFLOW_NOTIFICATION_QUEUE_SIZE", "50"));
Expand Down Expand Up @@ -72,6 +74,10 @@ public void run() {
LOGGER.debug(
"Workflow {} publish is successful.",
statusChangeNotification.getWorkflowId());
Monitors.recordWebhookPublishSuccess(
NOTIFICATION_TYPE,
workflow.getWorkflowName(),
workflow.getStatus().name());
Thread.sleep(5);
} catch (Exception e) {
if (statusChangeNotification != null) {
Expand All @@ -85,6 +91,12 @@ public void run() {
LOGGER.error("Failed to publish workflow: Workflow is NULL");
}
LOGGER.error("Error on publishing workflow", e);
if (workflow != null) {
Monitors.recordWebhookPublishFailure(
NOTIFICATION_TYPE,
workflow.getWorkflowName(),
e.getClass().getSimpleName());
}
}
}
}
Expand All @@ -102,6 +114,7 @@ public StatusChangePublisher(
(subscribedWorkflowStatuses != null && !subscribedWorkflowStatuses.isEmpty())
? subscribedWorkflowStatuses
: Arrays.asList("COMPLETED", "TERMINATED");
Monitors.registerWebhookQueueDepthGauge(blockingQueue, NOTIFICATION_TYPE);
ConsumerThread consumerThread = new ConsumerThread();
consumerThread.start();
}
Expand Down Expand Up @@ -194,15 +207,14 @@ private void enqueueWorkflow(WorkflowModel workflow) {
workflow.getWorkflowId(),
workflow.getWorkflowName(),
workflow.getStatus());
try {
blockingQueue.put(workflow);
} catch (Exception e) {
LOGGER.error(
"Failed to enqueue workflow: Id {} Name {}",
workflow.getWorkflowId(),
workflow.getWorkflowName());
LOGGER.error(e.getMessage());
if (blockingQueue.offer(workflow)) {
return;
}
LOGGER.warn(
"Webhook notification queue full, dropping WORKFLOW notification for {} ({})",
workflow.getWorkflowId(),
workflow.getWorkflowName());
Monitors.recordWebhookEnqueueFailure(NOTIFICATION_TYPE, workflow.getWorkflowName());
}

private void publishStatusChangeNotification(StatusChangeNotification statusChangeNotification)
Expand Down
Loading