Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

@claude
Copy link
Contributor

claude bot commented Nov 4, 2025

Claude finished @tac0turtle's task —— View job


Comprehensive Code Review: P2P Refactoring

I've completed a thorough review of this PR which removes the notifier pattern and simplifies the P2P synchronization architecture. This is a significant refactoring with +246/-1351 lines across 14 files.


Executive Summary

Overall Assessment:Strong refactoring that significantly simplifies the codebase while maintaining functionality. The removal of 663 lines from the notifier package and consolidation of logic is a clear improvement. However, there are several areas requiring attention around error handling, concurrency safety, and resource management.

Key Changes:

  • Removed pkg/sync/notifier package (663 lines)
  • Simplified P2P handler to use direct polling instead of event-driven notifications
  • Consolidated DA fetching logic with improved backoff handling
  • Reduced mock complexity significantly

1. Code Quality & Architecture ⚠️

Strengths

Significant simplification: Removal of the notifier pattern reduces cognitive overhead
Clear separation of concerns: daRetriever and p2pHandler interfaces cleanly separate responsibilities (syncer.go:27-34)
Idiomatic Go: Good use of atomic operations, context cancellation, and goroutine management
Improved testability: Simplified mocking with focused interfaces

Issues

Critical: Resource Leak in P2P Worker Loop
Location: syncer.go:311-368

func (s *Syncer) p2pWorkerLoop() {
    // ...
    for {
        // ...
        waitCtx, cancel := context.WithCancel(s.ctx)
        s.setP2PWaitState(targetHeight, cancel)
        
        err = s.p2pHandler.ProcessHeight(waitCtx, targetHeight, s.heightInCh)
        s.clearP2PWaitState(targetHeight)
        cancel()  // ⚠️ Good: cancel is called
        
        // BUT: if ProcessHeight returns early error, cancel may not be set properly

Issue: While cancel() is called after ProcessHeight, the context creation and cancellation pattern could leak if panic occurs between line 339-344. Consider using defer cancel() immediately after context.WithCancel().

Recommendation:

waitCtx, cancel := context.WithCancel(s.ctx)
defer cancel()  // Ensure cleanup even on panic
s.setP2PWaitState(targetHeight, cancel)

Medium: Potential Race in processedHeight
Location: p2p_handler.go:49-63

func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
    h.mu.Lock()
    shouldProcess := height > h.processedHeight
    h.mu.Unlock()  // Lock released here
    
    if !shouldProcess {
        return nil
    }
    
    // ... fetch header and data (can take time) ...
    
    h.mu.Lock()
    if height > h.processedHeight {
        h.processedHeight = height  // ⚠️ Check-then-act race
    }
    h.mu.Unlock()

Issue: Between the first check and the final update, another goroutine could process the same height. While not catastrophic (idempotent operation), it wastes resources.

Recommendation: Consider using atomic operations or maintaining the lock through the operation, or accept this as acceptable given the polling nature.


Medium: Magic Number for Channel Buffer Size
Location: syncer.go:111

heightInCh:  make(chan common.DAHeightEvent, 10_000),

Issue: 10_000 is a large buffer with no explanation. This could hide backpressure issues.

Recommendation: Either document why this size was chosen, or make it configurable. Consider: is unbounded queueing the right design, or should backpressure signal slower DA polling?


Low: Unused sendNonBlockingSignal Method
Location: syncer.go:689-696

func (s *Syncer) sendNonBlockingSignal(ch chan struct{}, name string) {
    // ... implementation
}

Issue: This method appears to be leftover from the old notifier pattern and may not be used anymore. Tests exist but no production usage found.

Recommendation: Remove if truly unused, or document its purpose if it's for future use.


2. Error Handling & Resilience ⚠️

Strengths

Improved DA backoff logic: Better handling of ErrHeightFromFuture and ErrBlobNotFound (syncer.go:386-439)
Retry logic for executor: Solid implementation with configurable retries (syncer.go:631-658)
Critical error propagation: Good use of errorCh for halting conditions

Issues

Medium: Silent Error Handling in initializeState
Location: syncer.go:129-133

if currentHeight, err := s.store.Height(s.ctx); err != nil {
    s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler")
} else {
    s.p2pHandler.SetProcessedHeight(currentHeight)
}

Issue: Failure to get initial height is logged but not returned. This could cause P2P handler to have incorrect initial state (defaulting to 0).

Recommendation: Return this error from Start() or set a safe default explicitly.


Medium: Swallowed Context Errors in P2P Loop
Location: syncer.go:346-359

if err != nil {
    if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
        continue  // ⚠️ Silently continues on context cancellation
    }
    
    if waitCtx.Err() == nil {
        logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height")
    }

Issue: Context cancellation is expected during shutdown, but it's also possible during normal operation if DA completes first. This logic seems correct but the comment would help clarify intent.

Recommendation: Add comment explaining when context cancellation is expected.


Low: Inconsistent Error Wrapping
Location: Throughout syncer.go

Some errors use fmt.Errorf("... : %w", err) (good), others use errors.Join() (also good), but mixing both patterns without clear rationale reduces consistency.

Recommendation: Document when to use Join vs %w wrapping. Generally: %w for single error chains, Join for multiple concurrent errors.


3. Security 🔒

Strengths

Signature verification: Maintained via SetCustomVerifierForSyncNode (syncer.go:666)
Proposer validation: P2P handler validates proposer address (p2p_handler.go:120-125)
No credential leaks: Reviewed logs and error messages

Issues

Low: Potential for Invalid Block Acceptance via Race
Location: syncer.go:458-462

if height <= currentHeight || s.cache.IsHeaderSeen(headerHash) {
    s.logger.Debug().Uint64("height", height).Msg("height already processed")
    return
}

Issue: If two goroutines process the same height concurrently, both could pass this check before either commits. However, the batch commit would likely fail for one, so this is protected by the store layer.

Recommendation: Document that duplicate prevention relies on store-level uniqueness constraints.


4. Performance & Resource Efficiency ⚙️

Strengths

Reduced goroutine overhead: Simpler worker model with just 3 background goroutines
Efficient DA polling: Uses backoff to avoid hammering DA layer
Atomic operations: Good use of atomic.Pointer and atomic.Uint64 for lock-free reads

Issues

High: Busy-Wait in DA Worker
Location: syncer.go:278-285

for {
    s.tryFetchFromDA(nextDARequestAt)
    select {
    case <-s.ctx.Done():
        return
    case <-time.After(pollInterval):  // ⚠️ Always 10ms
    }
}

Issue: pollInterval is hardcoded to 10ms (fastDAPollInterval). This creates a busy-wait loop that will execute 100 times per second even when backoff is active. The backoff is only checked inside tryFetchFromDA, but the loop still wakes up every 10ms.

Impact: Unnecessary CPU usage and potential battery drain on mobile nodes.

Recommendation: Make the sleep duration dynamic based on nextDARequestAt:

sleepDuration := fastDAPollInterval
if !nextDARequestAt.IsZero() {
    if delay := time.Until(*nextDARequestAt); delay > 0 {
        sleepDuration = delay
    }
}
time.After(sleepDuration)

Medium: P2P Worker Inefficient Retry
Location: syncer.go:332-336

if err != nil {
    logger.Error().Err(err).Msg("failed to get current height for P2P worker")
    if !s.sleepOrDone(50 * time.Millisecond) {
        return
    }
    continue
}

Issue: Fixed 50ms retry on store errors. If the store is consistently unavailable, this creates a tight retry loop.

Recommendation: Add exponential backoff for persistent store failures.


Low: Allocation in GetLastState
Location: syncer.go:161-172

func (s *Syncer) GetLastState() types.State {
    state := s.lastState.Load()
    if state == nil {
        return types.State{}
    }
    
    stateCopy := *state
    stateCopy.AppHash = bytes.Clone(state.AppHash)          // Allocation
    stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash)  // Allocation
    
    return stateCopy
}

Issue: Every call to GetLastState() allocates new byte slices. If called frequently in hot paths, this creates GC pressure.

Recommendation: Profile usage. If called frequently, consider returning a pointer with a contract that callers must not mutate, or add aGetLastStateRef() for performance-critical code.


5. Concurrency & Thread Safety 🔀

Strengths

Proper use of atomics: Good patterns for lastState and daHeight
Context cancellation: Well-structured shutdown with cancelP2PWait()
Wait group management: Clean goroutine lifecycle management

Issues

Medium: p2pWaitState Mutex Granularity
Location: syncer.go:774-799

func (s *Syncer) setP2PWaitState(height uint64, cancel context.CancelFunc) {
    s.p2pWaitMu.Lock()
    s.p2pWaitState = p2pWaitState{
        height: height,
        cancel: cancel,
    }
    s.p2pWaitMu.Unlock()
}

func (s *Syncer) cancelP2PWait(height uint64) {
    s.p2pWaitMu.Lock()
    defer s.p2pWaitMu.Unlock()
    
    if s.p2pWaitState.cancel != nil && (height == 0 || s.p2pWaitState.height == height) {
        s.p2pWaitState.cancel()  // ⚠️ Calling cancel while holding lock
        s.p2pWaitState = p2pWaitState{}
    }
}

Issue: Calling cancel() while holding the mutex could cause issues if cancel triggers callbacks that try to acquire the same lock. While unlikely here, it's a pattern to avoid.

Recommendation: Extract cancel function first, release lock, then call it:

func (s *Syncer) cancelP2PWait(height uint64) {
    s.p2pWaitMu.Lock()
    var toCancel context.CancelFunc
    if s.p2pWaitState.cancel != nil && (height == 0 || s.p2pWaitState.height == height) {
        toCancel = s.p2pWaitState.cancel
        s.p2pWaitState = p2pWaitState{}
    }
    s.p2pWaitMu.Unlock()
    
    if toCancel != nil {
        toCancel()
    }
}

Low: processPendingEvents Could Loop Forever
Location: syncer.go:709-735

for {
    event := s.cache.GetNextPendingEvent(nextHeight)
    if event == nil {
        return
    }
    // ... process event ...
    nextHeight++
}

Issue: If cache has a very long chain of pending events, this could block for extended time without checking context cancellation.

Recommendation: Add periodic context checks:

for {
    select {
    case <-s.ctx.Done():
        return
    default:
    }
    event := s.cache.GetNextPendingEvent(nextHeight)
    // ...
}

6. Testing & Reliability 🧪

Strengths

Good test coverage: Tests for validation, sequential sync, pending events
Mock simplification: Cleaner mocks with focused interfaces
P2P handler tests: Comprehensive coverage of edge cases (p2p_handler_test.go)

Issues

Medium: Missing Test for DA Backoff Logic
The new DA backoff implementation (syncer.go:386-439) has complex logic for different error types but lacks a dedicated test verifying all branches.

Recommendation: Add TestSyncer_tryFetchFromDA_BackoffScenarios covering:

  • ErrBlobNotFound (should increment height)
  • ErrHeightFromFuture (should backoff 6s)
  • Other errors (should backoff by DA block time)
  • Successful fetch (should reset backoff)

Low: Reduced Coverage
Codecov reports 71.84% patch coverage with 29 lines missing coverage, particularly in syncer.go. While removing code improves overall ratio, ensure critical paths are tested.

Recommendation: Review uncovered lines in syncer.go and add targeted tests.


7. Documentation & Code Clarity 📝

Strengths

Clear function documentation: Most exported functions have good comments
Informative logging: Good use of structured logging with context

Issues

Medium: Missing Architecture Documentation
The removal of the notifier pattern is a significant architectural change, but there's no documentation explaining:

  • Why the notifier was removed
  • How the new polling approach works
  • Performance trade-offs

Recommendation: Add or update architecture documentation (ADR?) explaining this design decision.


Low: Unexplained Constants
Location: syncer.go:260-263

const (
    fastDAPollInterval  = 10 * time.Millisecond
    futureHeightBackoff = 6 * time.Second // current celestia block time
)

Issue: fastDAPollInterval lacks explanation. Why 10ms? How was this chosen?

Recommendation: Add comment explaining the rationale.


8. Specific File Reviews

syncer.go

Lines of Concern:

  • Line 111: Large channel buffer (10,000) - needs justification
  • Line 278-285: Busy-wait loop - performance issue
  • Line 339-344: Context leak potential - use defer
  • Line 689-696: Dead code? (sendNonBlockingSignal)

p2p_handler.go

Lines of Concern:

  • Line 60-63: Check-then-act race (minor)
  • Line 109-113: Updates processedHeight after successful processing (good)

sync_service.go

Looks Good: No significant issues found. Clean separation of header/data sync services.


Recommendations Summary

Must Fix (Before Merge)

  1. Add defer cancel() in p2pWorkerLoop to prevent context leaks (syncer.go:339)
  2. Fix DA worker busy-wait - make sleep duration dynamic based on backoff (syncer.go:278-285)

Should Fix (High Priority)

  1. Handle store.Height() error in initializeState properly (syncer.go:129-133)
  2. Refactor cancelP2PWait to call cancel outside mutex (syncer.go:791-799)
  3. Add context check in processPendingEvents loop (syncer.go:709-735)

Consider Fixing (Medium Priority)

  1. Document channel buffer size rationale (syncer.go:111)
  2. Add exponential backoff for P2P worker store errors (syncer.go:332-336)
  3. Add test coverage for DA backoff scenarios
  4. Remove dead code if sendNonBlockingSignal is unused

Nice to Have (Low Priority)

  1. Add architecture documentation explaining notifier removal
  2. Add comments for timing constants
  3. Profile and optimize GetLastState() if it's a hot path

Positive Highlights 🌟

  1. Excellent simplification: Removing 1351 lines while maintaining functionality is impressive
  2. Clean interface design: The daRetriever and p2pHandler interfaces are well-designed
  3. Improved testability: The new architecture is easier to test and mock
  4. Better error classification: DA errors are now properly categorized (syncer.go:399-422)
  5. Solid concurrency patterns: Good use of atomics, context cancellation, and wait groups

Conclusion

This is a strong refactoring that significantly improves code maintainability. The removal of the notifier pattern is well-executed. However, there are several concurrency and performance issues that should be addressed before merging, particularly:

  • The context leak potential in p2pWorkerLoop
  • The busy-wait in DA worker consuming unnecessary CPU
  • Several mutex usage patterns that could be improved

With the recommended fixes, this PR will be production-ready and a clear improvement over the previous architecture.

Recommendation: Approve with requested changes⚠️


@github-actions
Copy link
Contributor

github-actions bot commented Nov 4, 2025

The latest Buf updates on your PR. Results from workflow CI and Release / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedNov 4, 2025, 10:51 AM

@codecov
Copy link

codecov bot commented Nov 4, 2025

Codecov Report

❌ Patch coverage is 82.25806% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.47%. Comparing base (b3f0a90) to head (2c965ea).

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 82.29% 14 Missing and 3 partials ⚠️
block/internal/syncing/p2p_handler.go 85.71% 3 Missing ⚠️
pkg/sync/sync_service.go 50.00% 2 Missing ⚠️
Additional details and impacted files
@@                  Coverage Diff                   @@
##           marko/p2p_refactor    #2812      +/-   ##
======================================================
+ Coverage               64.33%   64.47%   +0.14%     
======================================================
  Files                      81       80       -1     
  Lines                    7250     7153      -97     
======================================================
- Hits                     4664     4612      -52     
+ Misses                   2044     2005      -39     
+ Partials                  542      536       -6     
Flag Coverage Δ
combined 64.47% <82.25%> (+0.14%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@julienrbrt
Copy link
Member

julienrbrt commented Nov 4, 2025

I generally prefer this over the notifer and current main. The simplification makes sense to me.

Copy link
Contributor

@alpe alpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work. Much cleaner code and easier to follow.
With the changes, I believe that we can reduce heightInCh size to save some memory.
The P2P loop can only move 1 message ahead of the consumer now.

}

if waitCtx.Err() == nil {
logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that we run into this quite often when the height is not available, yet. IMHO it is worth to ignore the not found error

s.p2pWaitMu.Lock()
defer s.p2pWaitMu.Unlock()

if s.p2pWaitState.cancel != nil && (height == 0 || s.p2pWaitState.height == height) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it would be safe to cancel p2pWaitState.height <= height, just in case p2p is falling behind


func (s *Syncer) clearP2PWaitState(height uint64) {
s.p2pWaitMu.Lock()
if s.p2pWaitState.height == height {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, is the height important? There should not be concurrent calls


err = s.p2pHandler.ProcessHeight(waitCtx, targetHeight, s.heightInCh)
s.clearP2PWaitState(targetHeight)
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: cancelP2PWait could be used to clear the state

}

// Cancel any P2P wait that might still be blocked on this height
s.cancelP2PWait(height)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it also make sense to cancel in the error case? We have the block in the cache now. 🤔 But the p2p handler height was not updated so it would return the block again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

4 participants