Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cb8a9a2
[cuebot/pycue/proto] Add OpenCue render farm monitoring system
ramonfigueiredo Nov 25, 2025
a51494a
[cuebot/sandbox] Add monitoring stack infrastructure and fix ElasticS…
ramonfigueiredo Nov 25, 2025
385561a
[docs] Add documentation for render farm monitoring system
ramonfigueiredo Nov 25, 2025
b40538f
[docs] Add render farm monitoring documentation and update nav ordering
ramonfigueiredo Nov 25, 2025
3285e41
[cuebot/docs/sandbox] Add Elasticsearch queue metric, enhance dashboa…
ramonfigueiredo Nov 26, 2025
0ed2b0f
[docs] Fix monitoring architecture diagram and remove non-existent Mo…
ramonfigueiredo Nov 26, 2025
5f763e6
[cuebot/sandbox] Remove redundant Kafka monitoring metrics from Prome…
ramonfigueiredo Nov 27, 2025
06fee7c
[cuebot/sandbox] Remove Elasticsearch queue metric from Prometheus
ramonfigueiredo Nov 27, 2025
6f17ca7
[sandbox] Add command-line parameters to load_test_jobs.py
ramonfigueiredo Nov 27, 2025
e0ec762
[cuebot] Add shot label to frame and job completion metrics
ramonfigueiredo Nov 28, 2025
1c68a97
[cuebot/sandbox] Add job core seconds histogram metric
ramonfigueiredo Nov 28, 2025
440ffa7
[cuebot/sandbox] Change frame histograms to layer-level max histograms
ramonfigueiredo Nov 28, 2025
f8c7ea9
[cuebot] Add Elasticsearch queries and complete event publishing for …
ramonfigueiredo Nov 28, 2025
b3cd2cf
[cuebot] Refactor Kafka event publisher setters for null safety
ramonfigueiredo Nov 28, 2025
3ea47e2
[cuebot] Remove try-catch blocks from monitoring code in FrameComplet…
ramonfigueiredo Nov 28, 2025
ae7e5b1
[cuebot] Refactor MonitoringEventBuilder to Spring-managed bean
ramonfigueiredo Nov 29, 2025
0a582c3
[cuebot/sandbox] Add pickup time tracking for frame dispatch metrics
ramonfigueiredo Nov 29, 2025
ba11dea
[sandbox] Reference guide for exploring OpenCue monitoring data in El…
ramonfigueiredo Nov 29, 2025
3c0686e
[cuebot] Fix exception handling in job completion metrics
ramonfigueiredo Nov 29, 2025
b8792e1
[cuebot/proto] Fix Elasticsearch timestamp mapping and use proto comp…
ramonfigueiredo Nov 29, 2025
2f26a91
[pycue] Make monitoring imports unconditional
ramonfigueiredo Nov 29, 2025
b690319
[monitoring] Remove HostReportEvent from Kafka/Elasticsearch pipeline
ramonfigueiredo Nov 29, 2025
b03689f
[cuebot] Add Kafka AdminClient for topic administration
ramonfigueiredo Nov 30, 2025
8f39cc3
[cuebot/rust/sandbox] Decouple Kafka consumer into standalone Rust in…
ramonfigueiredo Nov 30, 2025
56e7416
[docs] Update monitoring documentation
ramonfigueiredo Nov 30, 2025
eb0122c
[rust/docs] Add kafka-es-indexer sample config file and update docume…
ramonfigueiredo Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ docs/bin/
sandbox/kafka-data
sandbox/zookeeper-data
sandbox/zookeeper-logs
sandbox/rqd/shots/
docs/_data/version.yml
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14
1.15
11 changes: 11 additions & 0 deletions cuebot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ repositories {
def grpcVersion = '1.47.0'
def protobufVersion = '3.21.2'
def activemqVersion = '5.12.0'
def kafkaVersion = '3.4.0'
def elasticsearchVersion = '8.8.0'

// Spring dependency versions are managed by the io.spring.dependency-management plugin.
// Appropriate versions will be pulled based on the spring boot version specified in the
Expand All @@ -52,6 +54,15 @@ dependencies {
implementation group: 'io.prometheus', name: 'simpleclient', version: '0.16.0'
implementation group: 'io.prometheus', name: 'simpleclient_servlet', version: '0.16.0'

// Kafka for event publishing
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.9.0'

// Elasticsearch for historical data storage
implementation group: 'co.elastic.clients', name: 'elasticsearch-java', version: "${elasticsearchVersion}"
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: "${elasticsearchVersion}"
implementation group: 'jakarta.json', name: 'jakarta.json-api', version: '2.1.1'

protobuf files("../proto/src/")

testImplementation group: 'junit', name: 'junit', version: '4.12'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import org.apache.logging.log4j.LogManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

@SpringBootApplication
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
public class CuebotApplication extends SpringApplication {
private static String[] checkArgs(String[] args) {
Optional<String> deprecatedFlag = Arrays.stream(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.imageworks.spcue.dispatcher.DispatchQueue;
import com.imageworks.spcue.dispatcher.HostReportHandler;
import com.imageworks.spcue.dispatcher.HostReportQueue;
import com.imageworks.spcue.monitoring.ElasticsearchClient;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
Expand All @@ -26,6 +27,9 @@ public class PrometheusMetricsCollector {

private HostReportQueue reportQueue;

@Autowired(required = false)
private ElasticsearchClient elasticsearchClient;

private boolean enabled;

// BookingQueue bookingQueue
Expand Down Expand Up @@ -119,6 +123,52 @@ public class PrometheusMetricsCollector {
.labelNames("env", "cuebot_host", "render_node", "job_name", "frame_name", "frame_id")
.register();

// Monitoring event metrics
private static final Counter monitoringEventsPublishedCounter =
Counter.build().name("cue_monitoring_events_published_total")
.help("Total number of monitoring events published to Kafka")
.labelNames("env", "cuebot_host", "event_type").register();

private static final Counter monitoringEventsDroppedCounter =
Counter.build().name("cue_monitoring_events_dropped_total")
.help("Total number of monitoring events dropped due to queue overflow")
.labelNames("env", "cuebot_host", "event_type").register();

private static final Gauge monitoringEventQueueSize =
Gauge.build().name("cue_monitoring_event_queue_size")
.help("Current size of the monitoring event publishing queue")
.labelNames("env", "cuebot_host").register();

private static final Gauge elasticsearchIndexQueueSize =
Gauge.build().name("cue_elasticsearch_index_queue_size")
.help("Current size of the Elasticsearch indexing queue")
.labelNames("env", "cuebot_host").register();

private static final Counter frameCompletedCounter = Counter.build()
.name("cue_frames_completed_total").help("Total number of frames completed")
.labelNames("env", "cuebot_host", "state", "show").register();

private static final Counter jobCompletedCounter =
Counter.build().name("cue_jobs_completed_total").help("Total number of jobs completed")
.labelNames("env", "cuebot_host", "state", "show").register();

private static final Histogram frameRuntimeHistogram = Histogram.build()
.name("cue_frame_runtime_seconds").help("Histogram of frame runtimes in seconds")
.labelNames("env", "cuebot_host", "show", "layer_type")
.buckets(60, 300, 600, 1800, 3600, 7200, 14400, 28800, 86400).register();

private static final Histogram frameMemoryHistogram = Histogram.build()
.name("cue_frame_memory_bytes").help("Histogram of frame peak memory usage in bytes")
.labelNames("env", "cuebot_host", "show", "layer_type")
.buckets(256L * 1024 * 1024, 512L * 1024 * 1024, 1024L * 1024 * 1024,
2048L * 1024 * 1024, 4096L * 1024 * 1024, 8192L * 1024 * 1024,
16384L * 1024 * 1024, 32768L * 1024 * 1024)
.register();

private static final Counter hostReportsReceivedCounter = Counter.build()
.name("cue_host_reports_received_total").help("Total number of host reports received")
.labelNames("env", "cuebot_host", "facility").register();

private String deployment_environment;
private String cuebot_host;

Expand Down Expand Up @@ -219,6 +269,12 @@ public void collectPrometheusMetrics() {
.set(reportQueue.getTaskCount());
reportQueueRejectedTotal.labels(this.deployment_environment, this.cuebot_host)
.set(reportQueue.getRejectedTaskCount());

// ElasticsearchClient queue
if (elasticsearchClient != null) {
elasticsearchIndexQueueSize.labels(this.deployment_environment, this.cuebot_host)
.set(elasticsearchClient.getPendingIndexCount());
}
}
}

Expand Down Expand Up @@ -269,6 +325,100 @@ public void incrementFrameKillFailureCounter(String hostname, String jobName, St
jobName, frameName, frameId).inc();
}

/**
* Increment the monitoring event published counter
*
* @param eventType type of event that was published
*/
public void incrementMonitoringEventPublished(String eventType) {
monitoringEventsPublishedCounter
.labels(this.deployment_environment, this.cuebot_host, eventType).inc();
}

/**
* Increment the monitoring event dropped counter
*
* @param eventType type of event that was dropped
*/
public void incrementMonitoringEventDropped(String eventType) {
monitoringEventsDroppedCounter
.labels(this.deployment_environment, this.cuebot_host, eventType).inc();
}

/**
* Set the monitoring event queue size
*
* @param size current queue size
*/
public void setMonitoringEventQueueSize(int size) {
monitoringEventQueueSize.labels(this.deployment_environment, this.cuebot_host).set(size);
}

/**
* Set the Elasticsearch indexing queue size
*
* @param size current queue size
*/
public void setElasticsearchIndexQueueSize(int size) {
elasticsearchIndexQueueSize.labels(this.deployment_environment, this.cuebot_host).set(size);
}

/**
* Record a frame completion
*
* @param state final state of the frame
* @param show show name
*/
public void recordFrameCompleted(String state, String show) {
frameCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show)
.inc();
}

/**
* Record a job completion
*
* @param state final state of the job
* @param show show name
*/
public void recordJobCompleted(String state, String show) {
jobCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show)
.inc();
}

/**
* Record frame runtime for histogramming
*
* @param runtimeSeconds runtime in seconds
* @param show show name
* @param layerType layer type
*/
public void recordFrameRuntime(double runtimeSeconds, String show, String layerType) {
frameRuntimeHistogram.labels(this.deployment_environment, this.cuebot_host, show, layerType)
.observe(runtimeSeconds);
}

/**
* Record frame peak memory usage for histogramming
*
* @param memoryBytes peak memory in bytes
* @param show show name
* @param layerType layer type
*/
public void recordFrameMemory(double memoryBytes, String show, String layerType) {
frameMemoryHistogram.labels(this.deployment_environment, this.cuebot_host, show, layerType)
.observe(memoryBytes);
}

/**
* Record a host report received
*
* @param facility facility name
*/
public void recordHostReport(String facility) {
hostReportsReceivedCounter.labels(this.deployment_environment, this.cuebot_host, facility)
.inc();
}

// Setters used for dependency injection
public void setBookingQueue(BookingQueue bookingQueue) {
this.bookingQueue = bookingQueue;
Expand All @@ -285,4 +435,8 @@ public void setDispatchQueue(DispatchQueue dispatchQueue) {
public void setReportQueue(HostReportQueue reportQueue) {
this.reportQueue = reportQueue;
}

public void setElasticsearchClient(ElasticsearchClient elasticsearchClient) {
this.elasticsearchClient = elasticsearchClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"classpath:conf/spring/applicationContext-grpcServer.xml",
"classpath:conf/spring/applicationContext-service.xml",
"classpath:conf/spring/applicationContext-jms.xml",
"classpath:conf/spring/applicationContext-criteria.xml"})
"classpath:conf/spring/applicationContext-criteria.xml",
"classpath:conf/spring/applicationContext-monitoring.xml"})
@EnableConfigurationProperties
@PropertySource({"classpath:opencue.properties"})
public class AppConfig {
Expand Down
31 changes: 31 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/HistoricalDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
import java.util.List;

import com.imageworks.spcue.JobInterface;
import com.imageworks.spcue.grpc.job.FrameState;
import com.imageworks.spcue.grpc.job.JobState;
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;

public interface HistoricalDao {

Expand All @@ -36,4 +42,29 @@ public interface HistoricalDao {
*/
void transferJob(JobInterface job);

/**
* Query historical job records from the job_history table.
*/
List<HistoricalJob> getJobHistory(List<String> shows, List<String> users, List<String> shots,
List<String> jobNameRegex, List<JobState> states, long startTime, long endTime,
int page, int pageSize, int maxResults);

/**
* Query historical frame records from the frame_history table.
*/
List<HistoricalFrame> getFrameHistory(String jobId, String jobName, List<String> layerNames,
List<FrameState> states, long startTime, long endTime, int page, int pageSize);

/**
* Query historical layer records from the layer_history table.
*/
List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
long endTime, int page, int pageSize);

/**
* Query historical memory usage for a layer type.
*/
List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
long startTime, long endTime, int maxResults);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@

package com.imageworks.spcue.dao.postgres;

import java.util.Collections;
import java.util.List;

import org.springframework.jdbc.core.support.JdbcDaoSupport;

import com.imageworks.spcue.JobInterface;
import com.imageworks.spcue.dao.HistoricalDao;
import com.imageworks.spcue.grpc.job.FrameState;
import com.imageworks.spcue.grpc.job.JobState;
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;

public class HistoricalDaoJdbc extends JdbcDaoSupport implements HistoricalDao {

Expand All @@ -40,4 +46,34 @@ public void transferJob(JobInterface job) {
*/
getJdbcTemplate().update("DELETE FROM job WHERE pk_job=?", job.getJobId());
}

@Override
public List<HistoricalJob> getJobHistory(List<String> shows, List<String> users,
List<String> shots, List<String> jobNameRegex, List<JobState> states, long startTime,
long endTime, int page, int pageSize, int maxResults) {
// Historical queries are handled via Elasticsearch when enabled
return Collections.emptyList();
}

@Override
public List<HistoricalFrame> getFrameHistory(String jobId, String jobName,
List<String> layerNames, List<FrameState> states, long startTime, long endTime,
int page, int pageSize) {
// Historical queries are handled via Elasticsearch when enabled
return Collections.emptyList();
}

@Override
public List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
long endTime, int page, int pageSize) {
// Historical queries are handled via Elasticsearch when enabled
return Collections.emptyList();
}

@Override
public List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
long startTime, long endTime, int maxResults) {
// Historical queries are handled via Elasticsearch when enabled
return Collections.emptyList();
}
}
Loading
Loading