Skip to content

Commit 3f0791f

Browse files
[cuebot/pycue/proto] Add OpenCue render farm monitoring system
Implement event-driven monitoring infrastructure for collecting and storing render farm statistics with historical data access capabilities. Key components: - Define monitoring.proto with job/layer/frame/host lifecycle events - Add KafkaEventPublisher for async event publishing to Kafka topics - Create Elasticsearch client and consumer for historical data storage - Hook event publishing into FrameCompleteHandler and HostReportHandler - Extend PrometheusMetricsCollector with frame/job completion metrics - Add MonitoringInterface gRPC service for historical data queries - Create pycue monitoring wrapper with historical data API methods - Add applicationContext-monitoring.xml Spring configuration for monitoring beans - Update TestAppConfig to include monitoring context for tests Configuration: - All features disabled by default (opt-in via properties) - Kafka: monitoring.kafka.enabled, monitoring.kafka.bootstrap.servers - Elasticsearch: monitoring.elasticsearch.enabled, monitoring.elasticsearch.host This enables: - Extended memory prediction beyond the 3-day pycue API limit - Real-time farm monitoring via Prometheus/Grafana dashboards - Historical job/frame/layer analytics via Elasticsearch
1 parent b5959bb commit 3f0791f

23 files changed

+3007
-3
lines changed

VERSION.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.14
1+
1.15

cuebot/build.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ repositories {
2626
def grpcVersion = '1.47.0'
2727
def protobufVersion = '3.21.2'
2828
def activemqVersion = '5.12.0'
29+
def kafkaVersion = '3.4.0'
30+
def elasticsearchVersion = '8.8.0'
2931

3032
// Spring dependency versions are managed by the io.spring.dependency-management plugin.
3133
// Appropriate versions will be pulled based on the spring boot version specified in the
@@ -52,6 +54,14 @@ dependencies {
5254
implementation group: 'io.prometheus', name: 'simpleclient', version: '0.16.0'
5355
implementation group: 'io.prometheus', name: 'simpleclient_servlet', version: '0.16.0'
5456

57+
// Kafka for event publishing
58+
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
59+
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.9.0'
60+
61+
// Elasticsearch for historical data storage
62+
implementation group: 'co.elastic.clients', name: 'elasticsearch-java', version: "${elasticsearchVersion}"
63+
implementation group: 'jakarta.json', name: 'jakarta.json-api', version: '2.1.1'
64+
5565
protobuf files("../proto/src/")
5666

5767
testImplementation group: 'junit', name: 'junit', version: '4.12'

cuebot/src/main/java/com/imageworks/spcue/PrometheusMetricsCollector.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,52 @@ public class PrometheusMetricsCollector {
119119
.labelNames("env", "cuebot_host", "render_node", "job_name", "frame_name", "frame_id")
120120
.register();
121121

122+
// Monitoring event metrics
123+
private static final Counter monitoringEventsPublishedCounter =
124+
Counter.build().name("cue_monitoring_events_published_total")
125+
.help("Total number of monitoring events published to Kafka")
126+
.labelNames("env", "cuebot_host", "event_type").register();
127+
128+
private static final Counter monitoringEventsDroppedCounter =
129+
Counter.build().name("cue_monitoring_events_dropped_total")
130+
.help("Total number of monitoring events dropped due to queue overflow")
131+
.labelNames("env", "cuebot_host", "event_type").register();
132+
133+
private static final Gauge monitoringEventQueueSize =
134+
Gauge.build().name("cue_monitoring_event_queue_size")
135+
.help("Current size of the monitoring event publishing queue")
136+
.labelNames("env", "cuebot_host").register();
137+
138+
private static final Gauge elasticsearchIndexQueueSize =
139+
Gauge.build().name("cue_elasticsearch_index_queue_size")
140+
.help("Current size of the Elasticsearch indexing queue")
141+
.labelNames("env", "cuebot_host").register();
142+
143+
private static final Counter frameCompletedCounter = Counter.build()
144+
.name("cue_frames_completed_total").help("Total number of frames completed")
145+
.labelNames("env", "cuebot_host", "state", "show").register();
146+
147+
private static final Counter jobCompletedCounter =
148+
Counter.build().name("cue_jobs_completed_total").help("Total number of jobs completed")
149+
.labelNames("env", "cuebot_host", "state", "show").register();
150+
151+
private static final Histogram frameRuntimeHistogram = Histogram.build()
152+
.name("cue_frame_runtime_seconds").help("Histogram of frame runtimes in seconds")
153+
.labelNames("env", "cuebot_host", "show", "layer_type")
154+
.buckets(60, 300, 600, 1800, 3600, 7200, 14400, 28800, 86400).register();
155+
156+
private static final Histogram frameMemoryHistogram = Histogram.build()
157+
.name("cue_frame_memory_bytes").help("Histogram of frame peak memory usage in bytes")
158+
.labelNames("env", "cuebot_host", "show", "layer_type")
159+
.buckets(256L * 1024 * 1024, 512L * 1024 * 1024, 1024L * 1024 * 1024,
160+
2048L * 1024 * 1024, 4096L * 1024 * 1024, 8192L * 1024 * 1024,
161+
16384L * 1024 * 1024, 32768L * 1024 * 1024)
162+
.register();
163+
164+
private static final Counter hostReportsReceivedCounter = Counter.build()
165+
.name("cue_host_reports_received_total").help("Total number of host reports received")
166+
.labelNames("env", "cuebot_host", "facility").register();
167+
122168
private String deployment_environment;
123169
private String cuebot_host;
124170

@@ -269,6 +315,100 @@ public void incrementFrameKillFailureCounter(String hostname, String jobName, St
269315
jobName, frameName, frameId).inc();
270316
}
271317

318+
/**
319+
* Increment the monitoring event published counter
320+
*
321+
* @param eventType type of event that was published
322+
*/
323+
public void incrementMonitoringEventPublished(String eventType) {
324+
monitoringEventsPublishedCounter
325+
.labels(this.deployment_environment, this.cuebot_host, eventType).inc();
326+
}
327+
328+
/**
329+
* Increment the monitoring event dropped counter
330+
*
331+
* @param eventType type of event that was dropped
332+
*/
333+
public void incrementMonitoringEventDropped(String eventType) {
334+
monitoringEventsDroppedCounter
335+
.labels(this.deployment_environment, this.cuebot_host, eventType).inc();
336+
}
337+
338+
/**
339+
* Set the monitoring event queue size
340+
*
341+
* @param size current queue size
342+
*/
343+
public void setMonitoringEventQueueSize(int size) {
344+
monitoringEventQueueSize.labels(this.deployment_environment, this.cuebot_host).set(size);
345+
}
346+
347+
/**
348+
* Set the Elasticsearch indexing queue size
349+
*
350+
* @param size current queue size
351+
*/
352+
public void setElasticsearchIndexQueueSize(int size) {
353+
elasticsearchIndexQueueSize.labels(this.deployment_environment, this.cuebot_host).set(size);
354+
}
355+
356+
/**
357+
* Record a frame completion
358+
*
359+
* @param state final state of the frame
360+
* @param show show name
361+
*/
362+
public void recordFrameCompleted(String state, String show) {
363+
frameCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show)
364+
.inc();
365+
}
366+
367+
/**
368+
* Record a job completion
369+
*
370+
* @param state final state of the job
371+
* @param show show name
372+
*/
373+
public void recordJobCompleted(String state, String show) {
374+
jobCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show)
375+
.inc();
376+
}
377+
378+
/**
379+
* Record frame runtime for histogramming
380+
*
381+
* @param runtimeSeconds runtime in seconds
382+
* @param show show name
383+
* @param layerType layer type
384+
*/
385+
public void recordFrameRuntime(double runtimeSeconds, String show, String layerType) {
386+
frameRuntimeHistogram.labels(this.deployment_environment, this.cuebot_host, show, layerType)
387+
.observe(runtimeSeconds);
388+
}
389+
390+
/**
391+
* Record frame peak memory usage for histogramming
392+
*
393+
* @param memoryBytes peak memory in bytes
394+
* @param show show name
395+
* @param layerType layer type
396+
*/
397+
public void recordFrameMemory(double memoryBytes, String show, String layerType) {
398+
frameMemoryHistogram.labels(this.deployment_environment, this.cuebot_host, show, layerType)
399+
.observe(memoryBytes);
400+
}
401+
402+
/**
403+
* Record a host report received
404+
*
405+
* @param facility facility name
406+
*/
407+
public void recordHostReport(String facility) {
408+
hostReportsReceivedCounter.labels(this.deployment_environment, this.cuebot_host, facility)
409+
.inc();
410+
}
411+
272412
// Setters used for dependency injection
273413
public void setBookingQueue(BookingQueue bookingQueue) {
274414
this.bookingQueue = bookingQueue;

cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
"classpath:conf/spring/applicationContext-grpcServer.xml",
3939
"classpath:conf/spring/applicationContext-service.xml",
4040
"classpath:conf/spring/applicationContext-jms.xml",
41-
"classpath:conf/spring/applicationContext-criteria.xml"})
41+
"classpath:conf/spring/applicationContext-criteria.xml",
42+
"classpath:conf/spring/applicationContext-monitoring.xml"})
4243
@EnableConfigurationProperties
4344
@PropertySource({"classpath:opencue.properties"})
4445
public class AppConfig {

cuebot/src/main/java/com/imageworks/spcue/dao/HistoricalDao.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
import java.util.List;
1919

2020
import com.imageworks.spcue.JobInterface;
21+
import com.imageworks.spcue.grpc.job.FrameState;
22+
import com.imageworks.spcue.grpc.job.JobState;
23+
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
24+
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
25+
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
26+
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;
2127

2228
public interface HistoricalDao {
2329

@@ -36,4 +42,29 @@ public interface HistoricalDao {
3642
*/
3743
void transferJob(JobInterface job);
3844

45+
/**
46+
* Query historical job records from the job_history table.
47+
*/
48+
List<HistoricalJob> getJobHistory(List<String> shows, List<String> users, List<String> shots,
49+
List<String> jobNameRegex, List<JobState> states, long startTime, long endTime,
50+
int page, int pageSize, int maxResults);
51+
52+
/**
53+
* Query historical frame records from the frame_history table.
54+
*/
55+
List<HistoricalFrame> getFrameHistory(String jobId, String jobName, List<String> layerNames,
56+
List<FrameState> states, long startTime, long endTime, int page, int pageSize);
57+
58+
/**
59+
* Query historical layer records from the layer_history table.
60+
*/
61+
List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
62+
long endTime, int page, int pageSize);
63+
64+
/**
65+
* Query historical memory usage for a layer type.
66+
*/
67+
List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
68+
long startTime, long endTime, int maxResults);
69+
3970
}

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HistoricalDaoJdbc.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@
1515

1616
package com.imageworks.spcue.dao.postgres;
1717

18+
import java.util.Collections;
1819
import java.util.List;
1920

2021
import org.springframework.jdbc.core.support.JdbcDaoSupport;
2122

2223
import com.imageworks.spcue.JobInterface;
2324
import com.imageworks.spcue.dao.HistoricalDao;
25+
import com.imageworks.spcue.grpc.job.FrameState;
2426
import com.imageworks.spcue.grpc.job.JobState;
27+
import com.imageworks.spcue.grpc.monitoring.HistoricalFrame;
28+
import com.imageworks.spcue.grpc.monitoring.HistoricalJob;
29+
import com.imageworks.spcue.grpc.monitoring.HistoricalLayer;
30+
import com.imageworks.spcue.grpc.monitoring.LayerMemoryRecord;
2531

2632
public class HistoricalDaoJdbc extends JdbcDaoSupport implements HistoricalDao {
2733

@@ -40,4 +46,34 @@ public void transferJob(JobInterface job) {
4046
*/
4147
getJdbcTemplate().update("DELETE FROM job WHERE pk_job=?", job.getJobId());
4248
}
49+
50+
@Override
51+
public List<HistoricalJob> getJobHistory(List<String> shows, List<String> users,
52+
List<String> shots, List<String> jobNameRegex, List<JobState> states, long startTime,
53+
long endTime, int page, int pageSize, int maxResults) {
54+
// Historical queries are handled via Elasticsearch when enabled
55+
return Collections.emptyList();
56+
}
57+
58+
@Override
59+
public List<HistoricalFrame> getFrameHistory(String jobId, String jobName,
60+
List<String> layerNames, List<FrameState> states, long startTime, long endTime,
61+
int page, int pageSize) {
62+
// Historical queries are handled via Elasticsearch when enabled
63+
return Collections.emptyList();
64+
}
65+
66+
@Override
67+
public List<HistoricalLayer> getLayerHistory(String jobId, String jobName, long startTime,
68+
long endTime, int page, int pageSize) {
69+
// Historical queries are handled via Elasticsearch when enabled
70+
return Collections.emptyList();
71+
}
72+
73+
@Override
74+
public List<LayerMemoryRecord> getLayerMemoryHistory(String layerName, List<String> shows,
75+
long startTime, long endTime, int maxResults) {
76+
// Historical queries are handled via Elasticsearch when enabled
77+
return Collections.emptyList();
78+
}
4379
}

cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@
5656
import com.imageworks.spcue.dao.ServiceDao;
5757
import com.imageworks.spcue.grpc.service.Service;
5858
import com.imageworks.spcue.grpc.service.ServiceOverride;
59+
import com.imageworks.spcue.monitoring.KafkaEventPublisher;
60+
import com.imageworks.spcue.monitoring.MonitoringEventBuilder;
61+
import com.imageworks.spcue.grpc.monitoring.EventType;
62+
import com.imageworks.spcue.grpc.monitoring.FrameEvent;
63+
import com.imageworks.spcue.grpc.monitoring.JobEvent;
5964

6065
/**
6166
* The FrameCompleteHandler encapsulates all logic necessary for processing FrameComplete reports
@@ -83,6 +88,8 @@ public class FrameCompleteHandler {
8388
private ServiceDao serviceDao;
8489
private ShowDao showDao;
8590
private Environment env;
91+
private KafkaEventPublisher kafkaEventPublisher;
92+
private MonitoringEventBuilder monitoringEventBuilder;
8693

8794
/*
8895
* The last time a proc was unbooked for subscription or job balancing. Since there are so many
@@ -255,6 +262,11 @@ public void handlePostFrameCompleteOperations(VirtualProc proc, FrameCompleteRep
255262
FrameDetail frameDetail) {
256263
try {
257264

265+
/*
266+
* Publish frame complete event to Kafka for monitoring
267+
*/
268+
publishFrameCompleteEvent(report, frame, frameDetail, newFrameState, proc);
269+
258270
/*
259271
* The default behavior is to keep the proc on the same job.
260272
*/
@@ -721,4 +733,34 @@ public void setShowDao(ShowDao showDao) {
721733
this.showDao = showDao;
722734
}
723735

736+
public KafkaEventPublisher getKafkaEventPublisher() {
737+
return kafkaEventPublisher;
738+
}
739+
740+
public void setKafkaEventPublisher(KafkaEventPublisher kafkaEventPublisher) {
741+
this.kafkaEventPublisher = kafkaEventPublisher;
742+
if (kafkaEventPublisher != null) {
743+
this.monitoringEventBuilder = new MonitoringEventBuilder(kafkaEventPublisher);
744+
}
745+
}
746+
747+
/**
748+
* Publishes a frame complete event to Kafka for monitoring purposes. This method is called
749+
* asynchronously to avoid blocking the dispatch thread.
750+
*/
751+
private void publishFrameCompleteEvent(FrameCompleteReport report, DispatchFrame frame,
752+
FrameDetail frameDetail, FrameState newFrameState, VirtualProc proc) {
753+
if (kafkaEventPublisher == null || !kafkaEventPublisher.isEnabled()) {
754+
return;
755+
}
756+
757+
try {
758+
FrameEvent event = monitoringEventBuilder.buildFrameCompleteEvent(report, newFrameState,
759+
frameDetail.state, frame, proc);
760+
kafkaEventPublisher.publishFrameEvent(event);
761+
} catch (Exception e) {
762+
logger.trace("Failed to publish frame complete event: {}", e.getMessage());
763+
}
764+
}
765+
724766
}

0 commit comments

Comments
 (0)