Skip to content

chore: fix incorrect logging in updateHead #4191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Update state.go
santamasa authored Apr 2, 2025
commit ec0b4980f5ca212454e87d00bc48dd8d5deb12e5
64 changes: 32 additions & 32 deletions das/state.go
Original file line number Diff line number Diff line change
@@ -8,34 +8,34 @@ import (
"github.com/celestiaorg/celestia-node/header"
)

// coordinatorState represents the current state of sampling process.
// coordinatorState represents the current state of sampling process
type coordinatorState struct {
// sampleFrom is the height from which the DASer will start sampling.
// sampleFrom is the height from which the DASer will start sampling
sampleFrom uint64
// samplingRange is the maximum amount of headers processed in one job.
samplingRange uint64

// keeps track of running workers.
// keeps track of running workers
inProgress map[int]func() workerState

// retryStrategy implements retry backoff.
// retryStrategy implements retry backoff
retryStrategy retryStrategy
// stores heights of failed headers with amount of retry attempt as value.
// stores heights of failed headers with amount of retry attempt as value
failed map[uint64]retryAttempt
// inRetry stores (height -> attempt count) of failed headers that are currently being retried by
// workers.
// workers
inRetry map[uint64]retryAttempt

// nextJobID is a unique identifier that will be used for creation of next job.
// nextJobID is a unique identifier that will be used for creation of next job
nextJobID int
// all headers before next were sent to workers.
// all headers before next were sent to workers
next uint64
// networkHead is the height of the latest known network head.
// networkHead is the height of the latest known network head
networkHead uint64

// catchUpDone indicates if all headers are sampled.
// catchUpDone indicates if all headers are sampled
catchUpDone atomic.Bool
// catchUpDoneCh blocks until all headers are sampled.
// catchUpDoneCh blocks until all headers are sampled
catchUpDoneCh chan struct{}
}

@@ -47,7 +47,7 @@ type retryAttempt struct {
after time.Time
}

// newCoordinatorState initiates state for samplingCoordinator.
// newCoordinatorState initiates state for samplingCoordinator
func newCoordinatorState(params Parameters) coordinatorState {
return coordinatorState{
sampleFrom: params.SampleFrom,
@@ -72,7 +72,7 @@ func (s *coordinatorState) resumeFromCheckpoint(c checkpoint) {
s.networkHead = c.NetworkHead

for h, count := range c.Failed {
// resumed retries should start without backoff delay.
// resumed retries should start without backoff delay
s.failed[h] = retryAttempt{
count: count,
after: time.Now(),
@@ -94,7 +94,7 @@ func (s *coordinatorState) handleResult(res result) {
}

func (s *coordinatorState) handleRecentOrCatchupResult(res result) {
// check if the worker retried any of the previously failed heights.
// check if the worker retried any of the previously failed heights
for h := range s.failed {
if h < res.from || h > res.to {
continue
@@ -105,7 +105,7 @@ func (s *coordinatorState) handleRecentOrCatchupResult(res result) {
}
}

// update failed heights.
// update failed heights
for h := range res.failed {
nextRetry, _ := s.retryStrategy.nextRetry(retryAttempt{}, time.Now())
s.failed[h] = nextRetry
@@ -114,10 +114,10 @@ func (s *coordinatorState) handleRecentOrCatchupResult(res result) {

func (s *coordinatorState) handleRetryResult(res result) {
// move heights that has failed again to failed with keeping retry count, they will be picked up by
// retry workers later.
// retry workers later
for h := range res.failed {
lastRetry := s.inRetry[h]
// height will be retried after backoff.
// height will be retried after backoff
nextRetry, retryExceeded := s.retryStrategy.nextRetry(lastRetry, time.Now())
if retryExceeded {
log.Warnw(
@@ -129,14 +129,14 @@ func (s *coordinatorState) handleRetryResult(res result) {
s.failed[h] = nextRetry
}

// processed heights are either already moved to failed map or succeeded, cleanup inRetry.
// processed heights are either already moved to failed map or succeeded, cleanup inRetry
for h := res.from; h <= res.to; h++ {
delete(s.inRetry, h)
}
}

func (s *coordinatorState) isNewHead(newHead uint64) bool {
// seen this header before.
// seen this header before
if newHead <= s.networkHead {
log.Warnf(
"received head height: %v, which is lower or the same as previously known: %v",
@@ -162,7 +162,7 @@ func (s *coordinatorState) updateHead(newHead uint64) {

// recentJob creates a job to process a recent header.
func (s *coordinatorState) recentJob(header *header.ExtendedHeader) job {
// move next, to prevent catchup job from processing same height.
// move next, to prevent catchup job from processing same height
if s.next == header.Height() {
s.next++
}
@@ -176,18 +176,18 @@ func (s *coordinatorState) recentJob(header *header.ExtendedHeader) job {
}
}

// nextJob will return next catchup or retry job according to priority (retry -> catchup).
// nextJob will return next catchup or retry job according to priority (retry -> catchup)
func (s *coordinatorState) nextJob() (next job, found bool) {
// check for if any retry jobs are available.
// check for if any retry jobs are available
if job, found := s.retryJob(); found {
return job, found
}

// if no retry jobs, make a catchup job.
// if no retry jobs, make a catchup job
return s.catchupJob()
}

// catchupJob creates a catchup job if catchup is not finished.
// catchupJob creates a catchup job if catchup is not finished
func (s *coordinatorState) catchupJob() (next job, found bool) {
if s.next > s.networkHead {
return job{}, false
@@ -199,15 +199,15 @@ func (s *coordinatorState) catchupJob() (next job, found bool) {
return j, true
}

// retryJob creates a job to retry previously failed header.
// retryJob creates a job to retry previously failed header
func (s *coordinatorState) retryJob() (next job, found bool) {
for h, attempt := range s.failed {
if !attempt.canRetry() {
// height will be retried later.
// height will be retried later
continue
}

// move header from failed into retry.
// move header from failed into retry
delete(s.failed, h)
s.inRetry[h] = attempt
j := s.newJob(retryJob, h, h)
@@ -231,13 +231,13 @@ func (s *coordinatorState) newJob(jobType jobType, from, to uint64) job {
}
}

// unsafeStats collects coordinator stats without thread-safety.
// unsafeStats collects coordinator stats without thread-safety
func (s *coordinatorState) unsafeStats() SamplingStats {
workers := make([]WorkerStats, 0, len(s.inProgress))
lowestFailedOrInProgress := s.next
failed := make(map[uint64]int)

// gather worker stats.
// gather worker stats
for _, getStats := range s.inProgress {
wstats := getStats()
var errMsg string
@@ -264,7 +264,7 @@ func (s *coordinatorState) unsafeStats() SamplingStats {
}
}

// set lowestFailedOrInProgress to minimum failed - 1.
// set lowestFailedOrInProgress to minimum failed - 1
for h, retry := range s.failed {
failed[h] += retry.count
if h < lowestFailedOrInProgress {
@@ -297,13 +297,13 @@ func (s *coordinatorState) checkDone() {
}

if s.catchUpDone.Load() {
// overwrite channel before storing done flag.
// overwrite channel before storing done flag
s.catchUpDoneCh = make(chan struct{})
s.catchUpDone.Store(false)
}
}

// waitCatchUp waits for sampling process to indicate catchup is done.
// waitCatchUp waits for sampling process to indicate catchup is done
func (s *coordinatorState) waitCatchUp(ctx context.Context) error {
if s.catchUpDone.Load() {
return nil