Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions internal/pkg/controler/channels.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package controler

import "github.com/internetarchive/Zeno/pkg/models"
import (
"sync"

"github.com/internetarchive/Zeno/pkg/models"
)

// NamedChannel holds a channel with its associated name for monitoring
type NamedChannel struct {
Name string
Channel chan *models.Item
}

var (
stageChannels []chan *models.Item
stageChannels []chan *models.Item
namedChannels []NamedChannel
namedChannelsMutex sync.RWMutex
)

func makeStageChannel(bufferSize ...int) chan *models.Item {
Expand All @@ -22,6 +34,32 @@ func makeStageChannel(bufferSize ...int) chan *models.Item {
return ch
}

// makeNamedStageChannel creates a channel with a name for monitoring purposes
func makeNamedStageChannel(name string, bufferSize ...int) chan *models.Item {
ch := makeStageChannel(bufferSize...)

namedChannelsMutex.Lock()
namedChannels = append(namedChannels, NamedChannel{
Name: name,
Channel: ch,
})
namedChannelsMutex.Unlock()

return ch
}

// GetChannelQueueSizes returns the current queue sizes for all named channels
func GetChannelQueueSizes() map[string]int {
namedChannelsMutex.RLock()
defer namedChannelsMutex.RUnlock()

queueSizes := make(map[string]int)
for _, namedCh := range namedChannels {
queueSizes[namedCh.Name] = len(namedCh.Channel)
}
return queueSizes
}

func closeStageChannels() {
for _, ch := range stageChannels {
close(ch)
Expand Down
73 changes: 73 additions & 0 deletions internal/pkg/controler/channels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package controler

import (
"testing"

"github.com/internetarchive/Zeno/pkg/models"
)

func TestMakeStageChannel(t *testing.T) {
// Reset state
stageChannels = nil
namedChannels = nil

// Test basic channel creation
ch := makeStageChannel(10)
if cap(ch) != 10 {
t.Errorf("Expected channel capacity 10, got %d", cap(ch))
}

// Test named channel creation
namedCh := makeNamedStageChannel("test_channel", 5)
if cap(namedCh) != 5 {
t.Errorf("Expected named channel capacity 5, got %d", cap(namedCh))
}

// Test that named channel is tracked
queueSizes := GetChannelQueueSizes()
if len(queueSizes) != 1 {
t.Errorf("Expected 1 named channel, got %d", len(queueSizes))
}

if size, exists := queueSizes["test_channel"]; !exists {
t.Error("Expected 'test_channel' to exist in queue sizes")
} else if size != 0 {
t.Errorf("Expected queue size 0, got %d", size)
}
}

func TestGetChannelQueueSizes(t *testing.T) {
// Reset state
stageChannels = nil
namedChannels = nil

// Create multiple named channels
ch1 := makeNamedStageChannel("channel1", 10)
makeNamedStageChannel("channel2", 5)

// Add some items to test queue size tracking
item := &models.Item{}

select {
case ch1 <- item:
// Successfully added item to ch1
default:
t.Fatal("Failed to add item to ch1")
}

// Get queue sizes
queueSizes := GetChannelQueueSizes()

// Verify tracking
if len(queueSizes) != 2 {
t.Errorf("Expected 2 channels, got %d", len(queueSizes))
}

if size := queueSizes["channel1"]; size != 1 {
t.Errorf("Expected channel1 size 1, got %d", size)
}

if size := queueSizes["channel2"]; size != 0 {
t.Errorf("Expected channel2 size 0, got %d", size)
}
}
19 changes: 11 additions & 8 deletions internal/pkg/controler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func startPipeline() error {
return err
}

// Set the channel queue size getter for stats monitoring
stats.SetChannelQueueSizeGetter(GetChannelQueueSizes)

// Start the disk watcher
go watchers.WatchDiskSpace(config.Get().JobPath, 5*time.Second)

Expand All @@ -78,7 +81,7 @@ func startPipeline() error {
}

// Start the reactor that will receive
reactorOutputChan := makeStageChannel(config.Get().WorkersCount)
reactorOutputChan := makeNamedStageChannel("reactor_to_preprocessor", config.Get().WorkersCount)
err = reactor.Start(config.Get().WorkersCount, reactorOutputChan)
if err != nil {
logger.Error("error starting reactor", "err", err.Error())
Expand All @@ -94,36 +97,36 @@ func startPipeline() error {
}
}

preprocessorOutputChan := makeStageChannel(config.Get().WorkersCount)
preprocessorOutputChan := makeNamedStageChannel("preprocessor_to_archiver", config.Get().WorkersCount)
err = preprocessor.Start(reactorOutputChan, preprocessorOutputChan)
if err != nil {
logger.Error("error starting preprocessor", "err", err.Error())
return err
}

archiverOutputChan := makeStageChannel(config.Get().WorkersCount)
archiverOutputChan := makeNamedStageChannel("archiver_to_postprocessor", config.Get().WorkersCount)
err = archiver.Start(preprocessorOutputChan, archiverOutputChan)
if err != nil {
logger.Error("error starting archiver", "err", err.Error())
return err
}

// Used by optional 2nd HQ instance just to gather outlinks to a different project
hqOutlinksFinishChan := makeStageChannel(config.Get().WorkersCount)
hqOutlinksProduceChan := makeStageChannel(config.Get().WorkersCount)
hqOutlinksFinishChan := makeNamedStageChannel("hq_outlinks_finish", config.Get().WorkersCount)
hqOutlinksProduceChan := makeNamedStageChannel("hq_outlinks_produce", config.Get().WorkersCount)

// Start the WARC writing queue watcher
watchers.StartWatchWARCWritingQueue(1*time.Second, 2*time.Second, 250*time.Millisecond)

postprocessorOutputChan := makeStageChannel(config.Get().WorkersCount)
postprocessorOutputChan := makeNamedStageChannel("postprocessor_to_finisher", config.Get().WorkersCount)
err = postprocessor.Start(archiverOutputChan, postprocessorOutputChan, hqOutlinksProduceChan)
if err != nil {
logger.Error("error starting postprocessor", "err", err.Error())
return err
}

finisherFinishChan := makeStageChannel(config.Get().WorkersCount)
finisherProduceChan := makeStageChannel(config.Get().WorkersCount)
finisherFinishChan := makeNamedStageChannel("finisher_to_source", config.Get().WorkersCount)
finisherProduceChan := makeNamedStageChannel("source_to_finisher", config.Get().WorkersCount)

if config.Get().UseHQ {
hqSource := hq.New(config.Get().HQKey, config.Get().HQSecret, config.Get().HQProject, config.Get().HQAddress)
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/controler/watchers/warc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func StartWatchWARCWritingQueue(pauseCheckInterval time.Duration, pauseTimeout t
case <-statsTicker.C:
s := archiver.GetStats()
stats.WarcWritingQueueSizeSet(s.WARCWritingQueueSize)
// Update component queue sizes for Prometheus
stats.ComponentQueueSizesUpdate()
// Update dedup WARC metrics
stats.WARCDataTotalBytesSet(s.WARCTotalBytesArchived)
stats.WARCCDXDedupeTotalBytesSet(s.CDXDedupeTotalBytes)
Expand Down
19 changes: 19 additions & 0 deletions internal/pkg/stats/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package stats

// ChannelQueueSizeGetter is a function type that returns channel queue sizes
type ChannelQueueSizeGetter func() map[string]int

var channelQueueSizeGetter ChannelQueueSizeGetter

// SetChannelQueueSizeGetter sets the function to get channel queue sizes
func SetChannelQueueSizeGetter(getter ChannelQueueSizeGetter) {
channelQueueSizeGetter = getter
}

// GetChannelQueueSizes returns the current channel queue sizes
func GetChannelQueueSizes() map[string]int {
if channelQueueSizeGetter != nil {
return channelQueueSizeGetter()
}
return make(map[string]int)
}
40 changes: 40 additions & 0 deletions internal/pkg/stats/channels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package stats

import (
"testing"
)

func TestChannelQueueSizeGetter(t *testing.T) {
// Reset getter
channelQueueSizeGetter = nil

// Test with no getter set
sizes := GetChannelQueueSizes()
if len(sizes) != 0 {
t.Errorf("Expected empty map when no getter set, got %d items", len(sizes))
}

// Set a test getter
testData := map[string]int{
"test_channel_1": 5,
"test_channel_2": 10,
}

SetChannelQueueSizeGetter(func() map[string]int {
return testData
})

// Test with getter set
sizes = GetChannelQueueSizes()
if len(sizes) != 2 {
t.Errorf("Expected 2 channels, got %d", len(sizes))
}

if sizes["test_channel_1"] != 5 {
t.Errorf("Expected test_channel_1 size 5, got %d", sizes["test_channel_1"])
}

if sizes["test_channel_2"] != 10 {
t.Errorf("Expected test_channel_2 size 10, got %d", sizes["test_channel_2"])
}
}
14 changes: 14 additions & 0 deletions internal/pkg/stats/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,17 @@ func CFMitigatedIncr() {
globalPromStats.cfMitigated.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

//////////////////////////
// ComponentQueueSizes //
//////////////////////////

// ComponentQueueSizesUpdate updates all component queue sizes in Prometheus metrics.
func ComponentQueueSizesUpdate() {
if globalPromStats != nil {
channelQueues := GetChannelQueueSizes()
for component, size := range channelQueues {
globalPromStats.componentQueueSizes.WithLabelValues(config.Get().JobPrometheus, hostname, version, component).Set(float64(size))
}
}
}
8 changes: 8 additions & 0 deletions internal/pkg/stats/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type prometheusStats struct {
warcWritingQueueSize *prometheus.GaugeVec
cfMitigated *prometheus.GaugeVec

// Component queue sizes
componentQueueSizes *prometheus.GaugeVec

// Dedup WARC metrics
dataTotalBytes *prometheus.GaugeVec
cdxDedupeTotalBytes *prometheus.GaugeVec
Expand Down Expand Up @@ -135,6 +138,10 @@ func newPrometheusStats() *prometheusStats {
prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "cf_challenge_pages_seen", Help: "Total number of CF challenge pages seen"},
[]string{"project", "hostname", "version"},
),
componentQueueSizes: prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "component_queue_size", Help: "Size of component queues in the processing pipeline"},
[]string{"project", "hostname", "version", "component"},
),
}
}

Expand All @@ -155,6 +162,7 @@ func registerPrometheusMetrics() {
prometheus.MustRegister(globalPromStats.warcWritingQueueSize)
prometheus.MustRegister(globalPromStats.meanWaitOnFeedbackTime)
prometheus.MustRegister(globalPromStats.cfMitigated)
prometheus.MustRegister(globalPromStats.componentQueueSizes)

// Register dedup WARC metrics
prometheus.MustRegister(globalPromStats.dataTotalBytes)
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ func GetMapTUI() map[string]any {
"WARC data total (GB)": float64(globalStats.WARCDataTotalBytes.Load()) / 1e9,
}

// Add channel queue sizes
channelQueueSizes := GetChannelQueueSizes()
for name, size := range channelQueueSizes {
result["Queue: "+name] = size
}

// Only show CDX dedupe stats if activated and has data
if config.Get().CDXDedupeServer != "" {
if dedupeBytes := globalStats.WARCCDXDedupeTotalBytes.Load(); dedupeBytes > 0 {
Expand Down
Loading