Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added due to prometheus/testutil in counter_test.go

github.com/lucasb-eyer/go-colorful v1.3.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (a *archiver) worker(workerID string) {
controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)

stats.ArchiverRoutinesIncr()
defer stats.ArchiverRoutinesDecr()
stats.ArchiverRoutines.Add(1)
defer stats.ArchiverRoutines.Done()

for {
select {
Expand All @@ -178,6 +178,7 @@ func (a *archiver) worker(workerID string) {
logger.Debug("received resume event")
case seed, ok := <-a.inputCh:
if ok {
stats.ArchiverInTransit.Add(1)
logger.Debug("received seed", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops())

if err := seed.CheckConsistency(); err != nil {
Expand All @@ -192,10 +193,12 @@ func (a *archiver) worker(workerID string) {

select {
case <-a.ctx.Done():
stats.ArchiverInTransit.Done()
logger.Debug("aborting seed due to stop", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops())
return
case a.outputCh <- seed:
}
stats.ArchiverInTransit.Done()
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/finisher/finisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ func (f *finisher) worker(workerID string) {
logger.Debug("received resume event")
case seed, ok := <-f.inputCh:
if ok {
stats.FinisherInTransit.Add(1)
if err := f.handleSeed(seed, workerID, logger); err != nil {
panic(err)
}
stats.FinisherInTransit.Done()
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions internal/pkg/postprocessor/postprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (p *postprocessor) worker(workerID string) {
"worker_id": workerID,
})

stats.PostprocessorRoutinesIncr()
defer stats.PostprocessorRoutinesDecr()
stats.PostprocessorRoutines.Add(1)
defer stats.PostprocessorRoutines.Done()

// Subscribe to the pause controler
controlChans := pause.Subscribe()
Expand All @@ -90,6 +90,7 @@ func (p *postprocessor) worker(workerID string) {
logger.Debug("received resume event")
case seed, ok := <-p.inputCh:
if ok {
stats.PostprocessorInTransit.Add(1)
logger.Debug("received seed", "seed", seed.GetShortID())

if err := seed.CheckConsistency(); err != nil {
Expand All @@ -103,6 +104,7 @@ func (p *postprocessor) worker(workerID string) {
for i := range outlinks {
select {
case <-p.ctx.Done():
stats.PostprocessorInTransit.Done()
logger.Debug("aborting outlink feeding due to stop", "seed", outlinks[i].GetShortID())
return
case p.outputCh <- outlinks[i]:
Expand All @@ -115,10 +117,12 @@ func (p *postprocessor) worker(workerID string) {

select {
case <-p.ctx.Done():
stats.PostprocessorInTransit.Done()
logger.Debug("aborting seed due to stop", "seed", seed.GetShortID())
return
case p.outputCh <- seed:
}
stats.PostprocessorInTransit.Done()
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/preprocessor/preprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (p *preprocessor) worker(workerID string) {
controlChans := pause.Subscribe()
defer pause.Unsubscribe(controlChans)

stats.PreprocessorRoutinesIncr()
defer stats.PreprocessorRoutinesDecr()
stats.PreprocessorRoutines.Add(1)
defer stats.PreprocessorRoutines.Done()

for {
select {
Expand All @@ -128,6 +128,7 @@ func (p *preprocessor) worker(workerID string) {
logger.Debug("received resume event")
case seed, ok := <-p.inputCh:
if ok {
stats.PreprocessorInTransit.Add(1)
logger.Debug("received seed", "seed", seed.GetShortID())

if err := seed.CheckConsistency(); err != nil {
Expand All @@ -144,10 +145,12 @@ func (p *preprocessor) worker(workerID string) {

select {
case <-p.ctx.Done():
stats.PreprocessorInTransit.Done()
logger.Debug("aborting seed due to stop", "seed", seed.GetShortID())
return
case p.outputCh <- seed:
}
stats.PreprocessorInTransit.Done()
}
}
}
Expand Down
34 changes: 33 additions & 1 deletion internal/pkg/stats/counter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package stats

import "sync/atomic"
import (
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"

"github.com/internetarchive/Zeno/internal/pkg/config"
)

type counter struct {
count uint64
Expand All @@ -21,3 +27,29 @@ func (c *counter) get() uint64 {
func (c *counter) reset() {
atomic.StoreUint64(&c.count, 0)
}

// A GaugedCounter is a atomic counter + Prometheus gauge
// The promGauge needs to be wired in the Init, although it is optional
// Add(i) and Done() mutate the counter and Prometheus gauge in one call
type GaugedCounter struct {
counter
promGauge *prometheus.GaugeVec
}

func (gc *GaugedCounter) Add(n uint64) {
gc.incr(n)
if gc.promGauge != nil {
gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Add(float64(n))
}
}

func (gc *GaugedCounter) Done() {
gc.decr(1)
if gc.promGauge != nil {
gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

func (gc *GaugedCounter) Value() uint64 {
return gc.get()
}
94 changes: 94 additions & 0 deletions internal/pkg/stats/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package stats
import (
"sync/atomic"
"testing"

"github.com/internetarchive/Zeno/internal/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
)

func TestCounter_Incr(t *testing.T) {
Expand Down Expand Up @@ -74,3 +78,93 @@ func TestCounter_Reset(t *testing.T) {
t.Errorf("expected count to be 0 after reset, got %d", atomic.LoadUint64(&c.count))
}
}

func TestGaugedCounter_AddDoneValue(t *testing.T) {
gc := &GaugedCounter{}

gc.Add(3)
if gc.Value() != 3 {
t.Errorf("expected 3, got %d", gc.Value())
}

gc.Done()
if gc.Value() != 2 {
t.Errorf("expected 2, got %d", gc.Value())
}

gc.Add(5)
gc.Done()
gc.Done()
if gc.Value() != 5 {
t.Errorf("expected 5, got %d", gc.Value())
}
}

func TestGaugedCounter_NilPrometheus(t *testing.T) {
gc := &GaugedCounter{}

gc.Add(1)
defer gc.Done()

if gc.Value() != 1 {
t.Errorf("expected 1, got %d", gc.Value())
}
}

func TestGaugedCounter_WithPrometheus(t *testing.T) {
config.Set(&config.Config{JobPrometheus: "testjob"})
hostname = "testhost"
version = "v0.0.0-test"

gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "test_gauged_counter", Help: "test"},
[]string{"project", "hostname", "version"},
)

gc := &GaugedCounter{promGauge: gauge}

gc.Add(3)
if gc.Value() != 3 {
t.Errorf("atomic: expected 3, got %d", gc.Value())
}
promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 3 {
t.Errorf("prometheus: expected 3, got %f", promVal)
}

gc.Done()
gc.Done()
gc.Done()
if gc.Value() != 0 {
t.Errorf("atomic: expected 0, got %d", gc.Value())
}
promVal = testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 0 {
t.Errorf("prometheus: expected 0, got %f", promVal)
}
}

func TestGaugedCounter_PrometheusBatchAdd(t *testing.T) {
config.Set(&config.Config{JobPrometheus: "testjob"})
hostname = "testhost"
version = "v0.0.0-test"

gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "test_gauged_counter_batch", Help: "test"},
[]string{"project", "hostname", "version"},
)

gc := &GaugedCounter{promGauge: gauge}

gc.Add(10)
gc.Done()
gc.Add(5)

if gc.Value() != 14 {
t.Errorf("atomic: expected 14, got %d", gc.Value())
}
promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test"))
if promVal != 14 {
t.Errorf("prometheus: expected 14, got %f", promVal)
}
}
80 changes: 0 additions & 80 deletions internal/pkg/stats/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,86 +31,6 @@ func SeedsFinishedIncr() {
}
}

//////////////////////////
// PreprocessorRoutines //
//////////////////////////

// PreprocessorRoutinesIncr increments the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesIncr() {
globalStats.PreprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// PreprocessorRoutinesDecr decrements the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesDecr() {
globalStats.PreprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// ArchiverRoutines //
//////////////////////////

// ArchiverRoutinesIncr increments the ArchiverRoutines counter by 1.
func ArchiverRoutinesIncr() {
globalStats.ArchiverRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// ArchiverRoutinesDecr decrements the ArchiverRoutines counter by 1.
func ArchiverRoutinesDecr() {
globalStats.ArchiverRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// PostprocessorRoutines //
//////////////////////////

// PostprocessorRoutinesIncr increments the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesIncr() {
globalStats.PostprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// PostprocessorRoutinesDecr decrements the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesDecr() {
globalStats.PostprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// FinisherRoutines //
//////////////////////////

// FinisherRoutinesIncr increments the FinisherRoutines counter by 1.
func FinisherRoutinesIncr() {
globalStats.FinisherRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}

// FinisherRoutinesDecr decrements the FinisherRoutines counter by 1.
func FinisherRoutinesDecr() {
globalStats.FinisherRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

//////////////////////////
// Paused //
//////////////////////////
Expand Down
Loading
Loading