diff --git a/internal/pkg/controler/channels.go b/internal/pkg/controler/channels.go index 7d5a6984..68834ddf 100644 --- a/internal/pkg/controler/channels.go +++ b/internal/pkg/controler/channels.go @@ -1,9 +1,21 @@ package controler -import "github.com/internetarchive/Zeno/pkg/models" +import ( + "sync" + + "github.com/internetarchive/Zeno/pkg/models" +) + +// NamedChannel holds a channel with its associated name for monitoring +type NamedChannel struct { + Name string + Channel chan *models.Item +} var ( - stageChannels []chan *models.Item + stageChannels []chan *models.Item + namedChannels []NamedChannel + namedChannelsMutex sync.RWMutex ) func makeStageChannel(bufferSize ...int) chan *models.Item { @@ -22,6 +34,32 @@ func makeStageChannel(bufferSize ...int) chan *models.Item { return ch } +// makeNamedStageChannel creates a channel with a name for monitoring purposes +func makeNamedStageChannel(name string, bufferSize ...int) chan *models.Item { + ch := makeStageChannel(bufferSize...) + + namedChannelsMutex.Lock() + namedChannels = append(namedChannels, NamedChannel{ + Name: name, + Channel: ch, + }) + namedChannelsMutex.Unlock() + + return ch +} + +// GetChannelQueueSizes returns the current queue sizes for all named channels +func GetChannelQueueSizes() map[string]int { + namedChannelsMutex.RLock() + defer namedChannelsMutex.RUnlock() + + queueSizes := make(map[string]int) + for _, namedCh := range namedChannels { + queueSizes[namedCh.Name] = len(namedCh.Channel) + } + return queueSizes +} + func closeStageChannels() { for _, ch := range stageChannels { close(ch) diff --git a/internal/pkg/controler/channels_test.go b/internal/pkg/controler/channels_test.go new file mode 100644 index 00000000..11a0cbd1 --- /dev/null +++ b/internal/pkg/controler/channels_test.go @@ -0,0 +1,73 @@ +package controler + +import ( + "testing" + + "github.com/internetarchive/Zeno/pkg/models" +) + +func TestMakeStageChannel(t *testing.T) { + // Reset state + stageChannels = nil + namedChannels = nil + + // Test basic channel creation + ch := makeStageChannel(10) + if cap(ch) != 10 { + t.Errorf("Expected channel capacity 10, got %d", cap(ch)) + } + + // Test named channel creation + namedCh := makeNamedStageChannel("test_channel", 5) + if cap(namedCh) != 5 { + t.Errorf("Expected named channel capacity 5, got %d", cap(namedCh)) + } + + // Test that named channel is tracked + queueSizes := GetChannelQueueSizes() + if len(queueSizes) != 1 { + t.Errorf("Expected 1 named channel, got %d", len(queueSizes)) + } + + if size, exists := queueSizes["test_channel"]; !exists { + t.Error("Expected 'test_channel' to exist in queue sizes") + } else if size != 0 { + t.Errorf("Expected queue size 0, got %d", size) + } +} + +func TestGetChannelQueueSizes(t *testing.T) { + // Reset state + stageChannels = nil + namedChannels = nil + + // Create multiple named channels + ch1 := makeNamedStageChannel("channel1", 10) + makeNamedStageChannel("channel2", 5) + + // Add some items to test queue size tracking + item := &models.Item{} + + select { + case ch1 <- item: + // Successfully added item to ch1 + default: + t.Fatal("Failed to add item to ch1") + } + + // Get queue sizes + queueSizes := GetChannelQueueSizes() + + // Verify tracking + if len(queueSizes) != 2 { + t.Errorf("Expected 2 channels, got %d", len(queueSizes)) + } + + if size := queueSizes["channel1"]; size != 1 { + t.Errorf("Expected channel1 size 1, got %d", size) + } + + if size := queueSizes["channel2"]; size != 0 { + t.Errorf("Expected channel2 size 0, got %d", size) + } +} diff --git a/internal/pkg/controler/pipeline.go b/internal/pkg/controler/pipeline.go index 229becb1..06c853d9 100644 --- a/internal/pkg/controler/pipeline.go +++ b/internal/pkg/controler/pipeline.go @@ -60,6 +60,9 @@ func startPipeline() error { return err } + // Set the channel queue size getter for stats monitoring + stats.SetChannelQueueSizeGetter(GetChannelQueueSizes) + // Start the disk watcher go watchers.WatchDiskSpace(config.Get().JobPath, 5*time.Second) @@ -78,7 +81,7 @@ func startPipeline() error { } // Start the reactor that will receive - reactorOutputChan := makeStageChannel(config.Get().WorkersCount) + reactorOutputChan := makeNamedStageChannel("reactor_to_preprocessor", config.Get().WorkersCount) err = reactor.Start(config.Get().WorkersCount, reactorOutputChan) if err != nil { logger.Error("error starting reactor", "err", err.Error()) @@ -94,14 +97,14 @@ func startPipeline() error { } } - preprocessorOutputChan := makeStageChannel(config.Get().WorkersCount) + preprocessorOutputChan := makeNamedStageChannel("preprocessor_to_archiver", config.Get().WorkersCount) err = preprocessor.Start(reactorOutputChan, preprocessorOutputChan) if err != nil { logger.Error("error starting preprocessor", "err", err.Error()) return err } - archiverOutputChan := makeStageChannel(config.Get().WorkersCount) + archiverOutputChan := makeNamedStageChannel("archiver_to_postprocessor", config.Get().WorkersCount) err = archiver.Start(preprocessorOutputChan, archiverOutputChan) if err != nil { logger.Error("error starting archiver", "err", err.Error()) @@ -109,21 +112,21 @@ func startPipeline() error { } // Used by optional 2nd HQ instance just to gather outlinks to a different project - hqOutlinksFinishChan := makeStageChannel(config.Get().WorkersCount) - hqOutlinksProduceChan := makeStageChannel(config.Get().WorkersCount) + hqOutlinksFinishChan := makeNamedStageChannel("hq_outlinks_finish", config.Get().WorkersCount) + hqOutlinksProduceChan := makeNamedStageChannel("hq_outlinks_produce", config.Get().WorkersCount) // Start the WARC writing queue watcher watchers.StartWatchWARCWritingQueue(1*time.Second, 2*time.Second, 250*time.Millisecond) - postprocessorOutputChan := makeStageChannel(config.Get().WorkersCount) + postprocessorOutputChan := makeNamedStageChannel("postprocessor_to_finisher", config.Get().WorkersCount) err = postprocessor.Start(archiverOutputChan, postprocessorOutputChan, hqOutlinksProduceChan) if err != nil { logger.Error("error starting postprocessor", "err", err.Error()) return err } - finisherFinishChan := makeStageChannel(config.Get().WorkersCount) - finisherProduceChan := makeStageChannel(config.Get().WorkersCount) + finisherFinishChan := makeNamedStageChannel("finisher_to_source", config.Get().WorkersCount) + finisherProduceChan := makeNamedStageChannel("source_to_finisher", config.Get().WorkersCount) if config.Get().UseHQ { hqSource := hq.New(config.Get().HQKey, config.Get().HQSecret, config.Get().HQProject, config.Get().HQAddress) diff --git a/internal/pkg/controler/watchers/warc.go b/internal/pkg/controler/watchers/warc.go index feb3bc42..e57a8f9d 100644 --- a/internal/pkg/controler/watchers/warc.go +++ b/internal/pkg/controler/watchers/warc.go @@ -91,6 +91,8 @@ func StartWatchWARCWritingQueue(pauseCheckInterval time.Duration, pauseTimeout t case <-statsTicker.C: s := archiver.GetStats() stats.WarcWritingQueueSizeSet(s.WARCWritingQueueSize) + // Update component queue sizes for Prometheus + stats.ComponentQueueSizesUpdate() // Update dedup WARC metrics stats.WARCDataTotalBytesSet(s.WARCTotalBytesArchived) stats.WARCCDXDedupeTotalBytesSet(s.CDXDedupeTotalBytes) diff --git a/internal/pkg/stats/channels.go b/internal/pkg/stats/channels.go new file mode 100644 index 00000000..fe1df75d --- /dev/null +++ b/internal/pkg/stats/channels.go @@ -0,0 +1,19 @@ +package stats + +// ChannelQueueSizeGetter is a function type that returns channel queue sizes +type ChannelQueueSizeGetter func() map[string]int + +var channelQueueSizeGetter ChannelQueueSizeGetter + +// SetChannelQueueSizeGetter sets the function to get channel queue sizes +func SetChannelQueueSizeGetter(getter ChannelQueueSizeGetter) { + channelQueueSizeGetter = getter +} + +// GetChannelQueueSizes returns the current channel queue sizes +func GetChannelQueueSizes() map[string]int { + if channelQueueSizeGetter != nil { + return channelQueueSizeGetter() + } + return make(map[string]int) +} diff --git a/internal/pkg/stats/channels_test.go b/internal/pkg/stats/channels_test.go new file mode 100644 index 00000000..b164b901 --- /dev/null +++ b/internal/pkg/stats/channels_test.go @@ -0,0 +1,40 @@ +package stats + +import ( + "testing" +) + +func TestChannelQueueSizeGetter(t *testing.T) { + // Reset getter + channelQueueSizeGetter = nil + + // Test with no getter set + sizes := GetChannelQueueSizes() + if len(sizes) != 0 { + t.Errorf("Expected empty map when no getter set, got %d items", len(sizes)) + } + + // Set a test getter + testData := map[string]int{ + "test_channel_1": 5, + "test_channel_2": 10, + } + + SetChannelQueueSizeGetter(func() map[string]int { + return testData + }) + + // Test with getter set + sizes = GetChannelQueueSizes() + if len(sizes) != 2 { + t.Errorf("Expected 2 channels, got %d", len(sizes)) + } + + if sizes["test_channel_1"] != 5 { + t.Errorf("Expected test_channel_1 size 5, got %d", sizes["test_channel_1"]) + } + + if sizes["test_channel_2"] != 10 { + t.Errorf("Expected test_channel_2 size 10, got %d", sizes["test_channel_2"]) + } +} diff --git a/internal/pkg/stats/methods.go b/internal/pkg/stats/methods.go index 182fa7f3..7e09a2a1 100644 --- a/internal/pkg/stats/methods.go +++ b/internal/pkg/stats/methods.go @@ -275,3 +275,17 @@ func CFMitigatedIncr() { globalPromStats.cfMitigated.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc() } } + +////////////////////////// +// ComponentQueueSizes // +////////////////////////// + +// ComponentQueueSizesUpdate updates all component queue sizes in Prometheus metrics. +func ComponentQueueSizesUpdate() { + if globalPromStats != nil { + channelQueues := GetChannelQueueSizes() + for component, size := range channelQueues { + globalPromStats.componentQueueSizes.WithLabelValues(config.Get().JobPrometheus, hostname, version, component).Set(float64(size)) + } + } +} diff --git a/internal/pkg/stats/prometheus.go b/internal/pkg/stats/prometheus.go index 1179c1ac..1693ab84 100644 --- a/internal/pkg/stats/prometheus.go +++ b/internal/pkg/stats/prometheus.go @@ -27,6 +27,9 @@ type prometheusStats struct { warcWritingQueueSize *prometheus.GaugeVec cfMitigated *prometheus.GaugeVec + // Component queue sizes + componentQueueSizes *prometheus.GaugeVec + // Dedup WARC metrics dataTotalBytes *prometheus.GaugeVec cdxDedupeTotalBytes *prometheus.GaugeVec @@ -135,6 +138,10 @@ func newPrometheusStats() *prometheusStats { prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "cf_challenge_pages_seen", Help: "Total number of CF challenge pages seen"}, []string{"project", "hostname", "version"}, ), + componentQueueSizes: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "component_queue_size", Help: "Size of component queues in the processing pipeline"}, + []string{"project", "hostname", "version", "component"}, + ), } } @@ -155,6 +162,7 @@ func registerPrometheusMetrics() { prometheus.MustRegister(globalPromStats.warcWritingQueueSize) prometheus.MustRegister(globalPromStats.meanWaitOnFeedbackTime) prometheus.MustRegister(globalPromStats.cfMitigated) + prometheus.MustRegister(globalPromStats.componentQueueSizes) // Register dedup WARC metrics prometheus.MustRegister(globalPromStats.dataTotalBytes) diff --git a/internal/pkg/stats/stats.go b/internal/pkg/stats/stats.go index f508eafd..cc14977e 100644 --- a/internal/pkg/stats/stats.go +++ b/internal/pkg/stats/stats.go @@ -127,6 +127,12 @@ func GetMapTUI() map[string]any { "WARC data total (GB)": float64(globalStats.WARCDataTotalBytes.Load()) / 1e9, } + // Add channel queue sizes + channelQueueSizes := GetChannelQueueSizes() + for name, size := range channelQueueSizes { + result["Queue: "+name] = size + } + // Only show CDX dedupe stats if activated and has data if config.Get().CDXDedupeServer != "" { if dedupeBytes := globalStats.WARCCDXDedupeTotalBytes.Load(); dedupeBytes > 0 {