Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added due to prometheus/testutil in counter_test.go

github.com/lucasb-eyer/go-colorful v1.3.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (a *archiver) worker(workerID string) {
controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)

stats.ArchiverRoutinesIncr()
defer stats.ArchiverRoutinesDecr()
stats.ArchiverRoutines.Add(1)
defer stats.ArchiverRoutines.Done()

for {
select {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/postprocessor/postprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (p *postprocessor) worker(workerID string) {
"worker_id": workerID,
})

stats.PostprocessorRoutinesIncr()
defer stats.PostprocessorRoutinesDecr()
stats.PostprocessorRoutines.Add(1)
defer stats.PostprocessorRoutines.Done()

// Subscribe to the pause controler
controlChans := pause.Subscribe()
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/preprocessor/preprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (p *preprocessor) worker(workerID string) {
controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)

stats.PreprocessorRoutinesIncr()
defer stats.PreprocessorRoutinesDecr()
stats.PreprocessorRoutines.Add(1)
defer stats.PreprocessorRoutines.Done()

for {
select {
Expand Down
34 changes: 33 additions & 1 deletion internal/pkg/stats/counter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package stats

import "sync/atomic"
import (
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"

"github.com/internetarchive/Zeno/internal/pkg/config"
)

type counter struct {
count uint64
Expand All @@ -21,3 +27,29 @@ func (c *counter) get() uint64 {
func (c *counter) reset() {
atomic.StoreUint64(&c.count, 0)
}

// A GaugedCounter is a atomic counter + Prometheus gauge
// The promGauge needs to be wired in the Init, although it is optional
// Add(i) and Done() mutate the counter and Prometheus gauge in one call
type GaugedCounter struct {
counter
promGauge *prometheus.GaugeVec
}

func (gc *GaugedCounter) Add(n uint64) {
gc.incr(n)
if gc.promGauge != nil {
gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Add(float64(n))
}
}

func (gc *GaugedCounter) Done() {
gc.decr(1)
if gc.promGauge != nil {
gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

func (gc *GaugedCounter) Value() uint64 {
return gc.get()
}
94 changes: 94 additions & 0 deletions internal/pkg/stats/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package stats
import (
"sync/atomic"
"testing"

"github.com/internetarchive/Zeno/internal/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
)

func TestCounter_Incr(t *testing.T) {
Expand Down Expand Up @@ -74,3 +78,93 @@ func TestCounter_Reset(t *testing.T) {
t.Errorf("expected count to be 0 after reset, got %d", atomic.LoadUint64(&c.count))
}
}

func TestGaugedCounter_AddDoneValue(t *testing.T) {
gc := &GaugedCounter{}

gc.Add(3)
if gc.Value() != 3 {
t.Errorf("expected 3, got %d", gc.Value())
}

gc.Done()
if gc.Value() != 2 {
t.Errorf("expected 2, got %d", gc.Value())
}

gc.Add(5)
gc.Done()
gc.Done()
if gc.Value() != 5 {
t.Errorf("expected 5, got %d", gc.Value())
}
}

func TestGaugedCounter_NilPrometheus(t *testing.T) {
gc := &GaugedCounter{}

gc.Add(1)
defer gc.Done()

if gc.Value() != 1 {
t.Errorf("expected 1, got %d", gc.Value())
}
}

func TestGaugedCounter_WithPrometheus(t *testing.T) {
config.Set(&config.Config{JobPrometheus: "testjob"})
hostname = "testhost"
version = "v0.0.0-test"

gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "test_gauged_counter", Help: "test"},
[]string{"project", "hostname", "version"},
)

gc := &GaugedCounter{promGauge: gauge}

gc.Add(3)
if gc.Value() != 3 {
t.Errorf("atomic: expected 3, got %d", gc.Value())
}
promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 3 {
t.Errorf("prometheus: expected 3, got %f", promVal)
}

gc.Done()
gc.Done()
gc.Done()
if gc.Value() != 0 {
t.Errorf("atomic: expected 0, got %d", gc.Value())
}
promVal = testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 0 {
t.Errorf("prometheus: expected 0, got %f", promVal)
}
}

func TestGaugedCounter_PrometheusBatchAdd(t *testing.T) {
config.Set(&config.Config{JobPrometheus: "testjob"})
hostname = "testhost"
version = "v0.0.0-test"

gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "test_gauged_counter_batch", Help: "test"},
[]string{"project", "hostname", "version"},
)

gc := &GaugedCounter{promGauge: gauge}

gc.Add(10)
gc.Done()
gc.Add(5)

if gc.Value() != 14 {
t.Errorf("atomic: expected 14, got %d", gc.Value())
}
promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 14 {
t.Errorf("prometheus: expected 14, got %f", promVal)
}
}
80 changes: 0 additions & 80 deletions internal/pkg/stats/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,86 +31,6 @@ func SeedsFinishedIncr() {
}
}

//////////////////////////
// PreprocessorRoutines //
//////////////////////////

// PreprocessorRoutinesIncr increments the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesIncr() {
globalStats.PreprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// PreprocessorRoutinesDecr decrements the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesDecr() {
globalStats.PreprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// ArchiverRoutines //
//////////////////////////

// ArchiverRoutinesIncr increments the ArchiverRoutines counter by 1.
func ArchiverRoutinesIncr() {
globalStats.ArchiverRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// ArchiverRoutinesDecr decrements the ArchiverRoutines counter by 1.
func ArchiverRoutinesDecr() {
globalStats.ArchiverRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// PostprocessorRoutines //
//////////////////////////

// PostprocessorRoutinesIncr increments the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesIncr() {
globalStats.PostprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// PostprocessorRoutinesDecr decrements the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesDecr() {
globalStats.PostprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// FinisherRoutines //
//////////////////////////

// FinisherRoutinesIncr increments the FinisherRoutines counter by 1.
func FinisherRoutinesIncr() {
globalStats.FinisherRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// FinisherRoutinesDecr decrements the FinisherRoutines counter by 1.
func FinisherRoutinesDecr() {
globalStats.FinisherRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// Paused //
//////////////////////////
Expand Down
40 changes: 24 additions & 16 deletions internal/pkg/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
type stats struct {
URLsCrawled *rate
SeedsFinished *rate
PreprocessorRoutines *counter
ArchiverRoutines *counter
PostprocessorRoutines *counter
FinisherRoutines *counter
Paused atomic.Bool
HTTPReturnCodes *rateBucket
SeencheckFailures atomic.Int64
Expand All @@ -42,6 +38,11 @@ var (
doOnce sync.Once
hostname string
version string

PreprocessorRoutines *GaugedCounter
ArchiverRoutines *GaugedCounter
PostprocessorRoutines *GaugedCounter
FinisherRoutines *GaugedCounter
)

func Init() error {
Expand All @@ -52,16 +53,17 @@ func Init() error {
globalStats = &stats{
URLsCrawled: &rate{},
SeedsFinished: &rate{},
PreprocessorRoutines: &counter{},
ArchiverRoutines: &counter{},
PostprocessorRoutines: &counter{},
FinisherRoutines: &counter{},
HTTPReturnCodes: newRateBucket(),
MeanHTTPResponseTime: &mean{},
MeanProcessBodyTime: &mean{},
MeanWaitOnFeedbackTime: &mean{},
}

PreprocessorRoutines = &GaugedCounter{}
ArchiverRoutines = &GaugedCounter{}
PostprocessorRoutines = &GaugedCounter{}
FinisherRoutines = &GaugedCounter{}

if config.Get() != nil && config.Get().Prometheus {
globalPromStats = newPrometheusStats()

Expand All @@ -75,6 +77,12 @@ func Init() error {
versionStruct := utils.GetVersion()
version = versionStruct.Version

// Wire prometheus gauges into the counters
PreprocessorRoutines.promGauge = globalPromStats.preprocessorRoutines
ArchiverRoutines.promGauge = globalPromStats.archiverRoutines
PostprocessorRoutines.promGauge = globalPromStats.postprocessorRoutines
FinisherRoutines.promGauge = globalPromStats.finisherRoutines

registerPrometheusMetrics()
}

Expand All @@ -95,10 +103,10 @@ func Init() error {
func Reset() {
globalStats.URLsCrawled.reset()
globalStats.SeedsFinished.reset()
globalStats.PreprocessorRoutines.reset()
globalStats.ArchiverRoutines.reset()
globalStats.PostprocessorRoutines.reset()
globalStats.FinisherRoutines.reset()
PreprocessorRoutines.reset()
ArchiverRoutines.reset()
PostprocessorRoutines.reset()
FinisherRoutines.reset()
globalStats.HTTPReturnCodes.resetAll()
globalStats.MeanHTTPResponseTime.reset()
globalStats.MeanProcessBodyTime.reset()
Expand All @@ -112,10 +120,10 @@ func GetMapTUI() map[string]any {
"URL/s": globalStats.URLsCrawled.get(),
"Total URL crawled": globalStats.URLsCrawled.getTotal(),
"Finished seeds": globalStats.SeedsFinished.getTotal(),
"Preprocessor routines": globalStats.PreprocessorRoutines.get(),
"Archiver routines": globalStats.ArchiverRoutines.get(),
"Postprocessor routines": globalStats.PostprocessorRoutines.get(),
"Finisher routines": globalStats.FinisherRoutines.get(),
"Preprocessor routines": PreprocessorRoutines.Value(),
"Archiver routines": ArchiverRoutines.Value(),
"Postprocessor routines": PostprocessorRoutines.Value(),
"Finisher routines": FinisherRoutines.Value(),
"Is paused?": globalStats.Paused.Load(),
"HTTP 2xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("2*")),
"HTTP 3xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("3*")),
Expand Down