Skip to content

Track in-transit items in pipeline#584

Open
AltayAkkus wants to merge 2 commits intointernetarchive:mainfrom
AltayAkkus:feat-itemflow
Open

Track in-transit items in pipeline#584
AltayAkkus wants to merge 2 commits intointernetarchive:mainfrom
AltayAkkus:feat-itemflow

Conversation

@AltayAkkus
Copy link
Copy Markdown
Contributor

@AltayAkkus AltayAkkus commented Mar 23, 2026

Refers to #471

@vbanos suggested tracking in-transit items per component to identify bottlenecks. We already expose similar metrics *Routines via Prometheus, but the current implementation requires separate increment/decrement methods per component and is somewhat clunky.

//////////////////////////
// PreprocessorRoutines //
//////////////////////////
// PreprocessorRoutinesIncr increments the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesIncr() {
globalStats.PreprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}
// PreprocessorRoutinesDecr decrements the PreprocessorRoutines counter by 1.
func PreprocessorRoutinesDecr() {
globalStats.PreprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}
//////////////////////////
// ArchiverRoutines //
//////////////////////////
// ArchiverRoutinesIncr increments the ArchiverRoutines counter by 1.
func ArchiverRoutinesIncr() {
globalStats.ArchiverRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}
// ArchiverRoutinesDecr decrements the ArchiverRoutines counter by 1.
func ArchiverRoutinesDecr() {
globalStats.ArchiverRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}
//////////////////////////
// PostprocessorRoutines //
//////////////////////////
// PostprocessorRoutinesIncr increments the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesIncr() {
globalStats.PostprocessorRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}
// PostprocessorRoutinesDecr decrements the PostprocessorRoutines counter by 1.
func PostprocessorRoutinesDecr() {
globalStats.PostprocessorRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}
//////////////////////////
// FinisherRoutines //
//////////////////////////
// FinisherRoutinesIncr increments the FinisherRoutines counter by 1.
func FinisherRoutinesIncr() {
globalStats.FinisherRoutines.incr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
}
}
// FinisherRoutinesDecr decrements the FinisherRoutines counter by 1.
func FinisherRoutinesDecr() {
globalStats.FinisherRoutines.decr(1)
if globalPromStats != nil {
globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec()
}
}

GaugedCounter

In the first commit I refactored the existing *Routines to use GaugedCounter, a thin wrapper around counter and prometheus.GaugeVec with Add/Done semantics (similar to a WaitGroup).
Adding counters with GaugedCounter has way less friction compared to the previous approach.

In the second commit I added the stats

	PreprocessorInTransit  *GaugedCounter
	ArchiverInTransit      *GaugedCounter
	PostprocessorInTransit *GaugedCounter
	FinisherInTransit      *GaugedCounter

Each counter increments when an item enters a component and decrements when it leaves.

We can now monitor the queue pressure of each component in the pipeline 🥳

PostprocessorRoutines.promGauge = globalPromStats.postprocessorRoutines
FinisherRoutines.promGauge = globalPromStats.finisherRoutines

PreprocessorInTransit.promGauge = globalPromStats.preprocessorInTransit
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to wire in the prometheus gauges at runtime. Hopefully this will not cause a race condition.

Comment thread go.mod
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added due to prometheus/testutil in counter_test.go

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 40.00000% with 48 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.32%. Comparing base (6e932c2) to head (6978f43).

Files with missing lines Patch % Lines
internal/pkg/stats/stats.go 25.00% 24 Missing ⚠️
internal/pkg/stats/prometheus.go 0.00% 20 Missing ⚠️
internal/pkg/postprocessor/postprocessor.go 66.66% 2 Missing ⚠️
internal/pkg/archiver/worker.go 80.00% 1 Missing ⚠️
internal/pkg/preprocessor/preprocessor.go 80.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #584      +/-   ##
==========================================
- Coverage   56.42%   56.32%   -0.11%     
==========================================
  Files         133      133              
  Lines        6747     6777      +30     
==========================================
+ Hits         3807     3817      +10     
- Misses       2561     2587      +26     
+ Partials      379      373       -6     
Flag Coverage Δ
e2etests 41.68% <32.50%> (-0.13%) ⬇️
unittests 29.20% <22.50%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@AltayAkkus
Copy link
Copy Markdown
Contributor Author

Additional refactoring of stats.go

The same pattern I refactored for *Routines exists for many other stats
The methods.go is full of Methods which mutate a atomic int, and sync it with it's corresponding Prometheus gauge.
(WarcWritingQueueSizeSet, MeanHTTPRespTimeAdd, MeanProcessBodyTimeAdd, MeanWaitOnFeedbackTimeAdd, CFMitigatedIncr, and many more)

I only refactored the Routine counters because their interfaces allow Incr and Decr, other counters

  1. only allow increment
    // SeencheckFailuresIncr increments the SeencheckFailures counter by 1.
    func SeencheckFailuresIncr() {
    globalStats.SeencheckFailures.Add(1)
    if globalPromStats != nil {
    globalPromStats.seencheckFailures.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
    }
    }
    // CFMitigatedIncr increments the CFMitigated counter by 1.
    func CFMitigatedIncr() {
    globalStats.cfMitigated.Add(1)
    if globalPromStats != nil {
    globalPromStats.cfMitigated.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
    }
    }
    // AkamaiMitigatedIncr increments the AkamaiMitigated counter by 1.
    func AkamaiMitigatedIncr() {
    globalStats.akamaiMitigated.Add(1)
    if globalPromStats != nil {
    globalPromStats.akamaiMitigated.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc()
    }
    }
  2. only allow set
WarcWritingQueueSizeSet
WARCDataTotalBytesSet
WARCCDXDedupeTotalBytesSet
WARCDoppelgangerDedupeTotalBytesSet
WARCLocalDedupeTotalBytesSet
WARCCDXDedupeTotalSet
WARCDoppelgangerDedupeTotalSet
WARCLocalDedupeTotalSet
  1. only allow add
    // MeanHTTPRespTimeAdd adds the given value to the MeanHTTPRespTime.
    func MeanHTTPRespTimeAdd(value time.Duration) {
    globalStats.MeanHTTPResponseTime.add(uint64(value.Milliseconds()))
    if globalPromStats != nil {
    globalPromStats.meanHTTPRespTime.WithLabelValues(config.Get().JobPrometheus, hostname, version).Observe(float64(value))
    }
    }
    //////////////////////////
    // MeanProcessBodyTime //
    //////////////////////////
    // MeanProcessBodyTimeAdd adds the given value to the MeanProcessBodyTime.
    func MeanProcessBodyTimeAdd(value time.Duration) {
    globalStats.MeanProcessBodyTime.add(uint64(value.Milliseconds()))
    if globalPromStats != nil {
    globalPromStats.meanProcessBodyTime.WithLabelValues(config.Get().JobPrometheus, hostname, version).Observe(float64(value))
    }
    }
    ////////////////////////////
    // MeanWaitOnFeedbackTime //
    ////////////////////////////
    // MeanWaitOnFeedbackTimeAdd adds the given value to the MeanWaitOnFeedbackTime.
    func MeanWaitOnFeedbackTimeAdd(value time.Duration) {
    globalStats.MeanWaitOnFeedbackTime.add(uint64(value.Milliseconds()))
    if globalPromStats != nil {
    globalPromStats.meanWaitOnFeedbackTime.WithLabelValues(config.Get().JobPrometheus, hostname, version).Observe(float64(value))
    }
    }

Additionally some of these methods use prometheus.HistogramVec or prometheus.CounterVec.

I think that we could add additional interfaces to the counter.go so that we can remove most of these java-esque (getter/setter) methods, and improve metric collection in Zeno. @yzqzss @NGTmeaty thoughts?

@AltayAkkus
Copy link
Copy Markdown
Contributor Author

AltayAkkus commented Mar 23, 2026

Identify components slowing down Zeno

Although we implemented the same endpoint as internetarchive/warcprox we cannot really answer the question of "which components are slowing down Zeno?"

Every 0.5s: curl -s http://localhost:9090/metrics | grep in_transit              me.local: Mon Mar 23 23:21:29 2026
                                                                                                                      in 0.013s (0)
# HELP zeno_archiver_in_transit Number of items currently being processed by the archiver
# TYPE zeno_archiver_in_transit gauge
zeno_archiver_in_transit{hostname="me.local",project="testjob3",version="unknown_version"} 16
# HELP zeno_finisher_in_transit Number of items currently being processed by the finisher
# TYPE zeno_finisher_in_transit gauge
zeno_finisher_in_transit{hostname="me.local",project="testjob3",version="unknown_version"} 0
# HELP zeno_postprocessor_in_transit Number of items currently being processed by the postprocessor
# TYPE zeno_postprocessor_in_transit gauge
zeno_postprocessor_in_transit{hostname="me.local",project="testjob3",version="unknown_version"} 0
# HELP zeno_preprocessor_in_transit Number of items currently being processed by the preprocessor
# TYPE zeno_preprocessor_in_transit gauge
zeno_preprocessor_in_transit{hostname="me.local",project="testjob3",version="unknown_version"} 0

Atleast on my machine and with my workloads (crawling https://france.fr) you have to be very lucky or sample very aggressively to have some in-transit items outside of the archiver, just because everything else is so much faster.

Proposal: Add processing timestamps to model

type Item struct {
	id         string
	url        *URL
	seedVia    string
	status     ItemState
	source     ItemSource

	childrenMu sync.RWMutex
	children   []*Item
	parent     *Item
	err        error

    // new
	traceTime      bool                          // marks item for timestamping
	stageTimes     map[ItemState]time.Time       // timestamps per stage
	stageTimesMu   sync.RWMutex                 
}

// old SetStatus
// func (i *Item) SetStatus(status ItemState) { i.status = status }
func (i *Item) SetStatus(status ItemState) { 
	i.stageTimesMu.Lock()
	defer i.stageTimesMu.Unlock()

	// Always update state
	i.status = status

	// Only track timestamps if enabled
	if !i.traceTime {
		return
	}

	// Lazy init 
	if i.stageTimes == nil {
		i.stageTimes = make(map[ItemState]time.Time)
	}

	// Only set if not already set
	if _, exists := i.stageTimes[state]; !exists {
		i.stageTimes[state] = time.Now()
	}
}

To reduce load, we could control how many/which items shall be sampled (fractional, stochastic, etc.) by setting the traceTime bool when creating the item, and collect the timestamps in the finisher.

We could add a prometheus.HistogramVec for each stage, let the finisher calculate the time delta's between the stages, and .Observe these processing times. Like we already do for

func MeanHTTPRespTimeAdd(value time.Duration) {
globalStats.MeanHTTPResponseTime.add(uint64(value.Milliseconds()))
if globalPromStats != nil {
globalPromStats.meanHTTPRespTime.WithLabelValues(config.Get().JobPrometheus, hostname, version).Observe(float64(value))
}
}

That would be quite the elegant solution for profiling Zeno's entire pipeline on actual workloads.
Just give me a thumbs up and I'll come back to it :)

@AltayAkkus AltayAkkus marked this pull request as ready for review March 24, 2026 17:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants