Skip to content

Conversation

@renaynay
Copy link
Collaborator

@renaynay renaynay commented Nov 2, 2025

Potential alternative to #2798

  • on start, will block on catch-up jobs where DA --> sync.Store.Height() and p2p stores --> sync.Store.Height() in the case that either one somehow fell behind the sync.Store
  • after catch-up, 2 tip-tracking routines are launched:
    • daRetrievalLoop --> fires job to fetch from DA every Celestia blocktime (6s)
    • waitForNewP2PHeights --> fires job every ev-node blocktime (100ms) to process headers between headerStore.Height() --> headerSyncHead.Height(), and if daStore.Height() somehow behind daSyncHead.Height(), processes data between that range too
  • inside of ProcessHeaderRange and ProcessDataRange, pass syncer's context in there instead of artificially lowering timeout on ctx as go-header header store implicitly subscribes to the desired height and returns it when it's ready. This means, when the ev-node sync process expects a new height to be available, it will sit there + wait til it is returned (as once it lands in header store, it has gone through all forms of verification, etc).

@renaynay
Copy link
Collaborator Author

renaynay commented Nov 2, 2025

tests are failing

@github-actions
Copy link
Contributor

github-actions bot commented Nov 2, 2025

The latest Buf updates on your PR. Results from workflow CI and Release / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedNov 2, 2025, 3:07 PM

@claude
Copy link
Contributor

claude bot commented Nov 2, 2025

Claude finished @renaynay's task —— View job


Code Review Complete

I've completed a comprehensive review of this synchronization refactor. The tests are still running, but I've identified several issues that need attention.


Critical Issues

1. Interface Mismatch - SyncHead Method Signature

Location: block/internal/common/expected_interfaces.go:14 vs pkg/sync/sync_service.go:119

The Broadcaster interface defines:

SyncHead() (H, error)

But the actual implementation has:

func (syncService *SyncService[H]) SyncHead(ctx context.Context) (H, error)

Impact: This will cause compilation failures. The syncer calls s.headerStore.SyncHead() (syncer.go:424) and s.dataStore.SyncHead() (syncer.go:434) without a context parameter, but the implementation requires one.

Fix Required: Either:

  • Add context.Context parameter to the Broadcaster.SyncHead() interface, OR
  • Update the implementation to match the interface (remove ctx parameter and use internal context)

High Priority Issues

2. Missing Ticker in waitForNewP2PHeights

Location: syncer.go:414-446

The ticker is created at line 414 but never used in the select statement:

ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()

for {
    select {
    case <-s.ctx.Done():
        return
    default:  // ← Problem: no ticker case
    }
    // ... processing logic
}

Impact: This creates a tight busy loop that will consume 100% CPU. The code will continuously process without any throttling.

Fix Required:

for {
    select {
    case <-s.ctx.Done():
        return
    case <-ticker.C:
        // Move processing logic here
    }
}

3. Potential Goroutine Leak in syncLoop

Location: syncer.go:272-291

The code creates a sync.WaitGroup and launches goroutines with wg.Go(), but the type is wrong:

wg := sync.WaitGroup{}  // ← This is sync.WaitGroup, not errgroup.Group
// ...
wg.Go(func() {  // ← sync.WaitGroup doesn't have Go() method

Impact: Compilation error. You need errgroup.Group instead.

Fix Required:

wg := errgroup.Group{}

4. Missing Error Handling in syncDARange

Location: syncer.go:302-333

The syncDARange function logs errors but doesn't propagate them. If the DA sync fails during catch-up, the tip-tracking routines will start anyway, leading to inconsistent state.

Fix Required: Return errors from syncDARange and handle them in syncLoop before launching tip-tracking routines.


Concurrency & Race Conditions

5. Race Condition in daRetrievalLoop

Location: syncer.go:335-356

The loop has both ticker.C and default cases:

for {
    select {
    case <-s.ctx.Done():
        return
    case <-ticker.C:
        s.tryFetchFromDA(nextDARequestAt)
        s.processPendingEvents()
    default:  // ← This creates a busy loop
    }
}

Impact: The default case makes this a busy loop that will consume excessive CPU. Remove the default case.

6. Atomic Operations Without Synchronization

Location: Throughout syncer.go

The code uses atomic operations for daHeight and lastState, but there's no synchronization between reading/writing the store height and these atomic values. For example:

// syncer.go:277-282
if s.headerStore.Store().Height() < startHeight {
    wg.Go(func() {
        s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height(), startHeight+1, s.heightInCh)
    })
}

Concern: The Height() is called twice without ensuring consistency. Consider caching the value.


Architecture & Design

7. Blocking Catch-up May Timeout

Location: syncer.go:263-270

ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic
startHeight, err := s.store.Height(ctx)
cancel()

Concern: The 1-second timeout during catch-up initialization seems too short. If the store operation is slow (e.g., cold start, large DB), this will fail unnecessarily.

Recommendation: Either increase the timeout or use the parent context without timeout during initialization.

8. Context Propagation Change

Location: p2p_handler.go:60, 145

The PR description mentions "pass syncer's context in there instead of artificially lowering timeout on ctx". This is good, but there are still artificial timeouts in ProcessHeaderRange and ProcessDataRange:

timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2)
retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height)

Concern: This is inconsistent with the stated goal. If the header store implicitly waits, why does the data store need a timeout?

Recommendation: Document why data needs a timeout but headers don't, or make them consistent.


Error Handling

9. Silent Failure in waitForNewP2PHeights

Location: syncer.go:424-442

When SyncHead() fails, the code logs and continues:

syncHeadHeader, err := s.headerStore.SyncHead()
if err != nil {
    s.logger.Error().Err(err).Msg("failed to get header p2p sync head")
    continue
}

Concern: With the busy loop issue (#2), this will spam logs continuously if SyncHead() fails persistently.

Recommendation: Add backoff or error counting to avoid log spam.

10. Context Cancellation Not Checked

Location: syncer.go:431, 441

ProcessHeaderRange and ProcessDataRange are called without checking if the context is still valid:

s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height()+1, syncHeadHeader.Height(), s.heightInCh)

If s.ctx is cancelled, these methods will return immediately, but the loop will just call them again.


Testing & Reliability

11. Test Coverage for New Flow

The new two-phase approach (catch-up → tip-tracking) is a significant architectural change. Ensure tests cover:

  • ✓ Catch-up completes before tip-tracking starts
  • ✓ DA catch-up when DA is behind store
  • ✓ P2P catch-up when header store is behind
  • ✓ Simultaneous DA and P2P catch-up
  • ✓ Graceful shutdown during catch-up phase
  • ✓ Graceful shutdown during tip-tracking phase

12. Missing Integration Test

The interaction between daRetrievalLoop and waitForNewP2PHeights needs integration testing to ensure:

  • They don't fight over resources
  • Events are processed in order
  • No deadlocks occur

Performance

13. Channel Buffer Size

Location: syncer.go:107

heightInCh:  make(chan common.DAHeightEvent, 10_000),

Observation: 10,000 buffer size is quite large. Consider if this is necessary or if backpressure should be applied earlier.

14. Pending Events Processing

Location: syncer.go:699-734

processPendingEvents() is called in multiple places:

  • daRetrievalLoop (line 352)
  • waitForNewP2PHeights (line 445)
  • syncLoop (line 260)

Concern: If both tip-tracking routines call this simultaneously, there could be contention on the cache. Consider if this needs synchronization or if the cache is already thread-safe.


Code Quality

15. TODOs in Production Code

Locations: Multiple

// syncer.go:263
ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic

// syncer.go:342
// TODO @tac0turtle, changed it to fire on Celestia blocktime

// syncer.go:413
// TODO @tac0turtle: ev-node expected blocktime here

// p2p_handler.go:58, 81, 144
// TODO @tac0turtle: it is desirable to block here...

Recommendation: Either resolve these TODOs before merging or create tracking issues.

16. Magic Numbers

  • time.Second * 6 (line 343) - Celestia block time should be a constant
  • time.Millisecond * 100 (line 414) - ev-node block time should be a constant
  • time.Second * 2 (p2p_handler.go:83, 158) - timeout durations should be configurable

Security

17. No Rate Limiting on DA Requests

While there's backoff on errors, there's no protection against rapid DA queries during normal operation. If Celestia block time is 6s but events arrive faster, the node might overload the DA layer.

Recommendation: Consider adding rate limiting or a minimum time between requests.


Documentation

18. Missing Function Documentation

Key functions lack documentation:

  • syncLoop() - Should explain the two-phase approach
  • syncDARange() - Should document behavior when DA is behind
  • waitForNewP2PHeights() - Should explain the polling approach
  • daRetrievalLoop() - Should document the backoff strategy

Positive Observations

Good separation of concerns - The P2P handler is cleanly separated
Atomic state management - Using atomic pointers for concurrent access
Local-only P2P broadcasting - Prevents spamming the network with old blocks (syncer.go:513, 518)
Pending event cache - Smart reprocessing of out-of-order events
Data hash validation - Critical security check (p2p_handler.go:100-108, 179-187)


Summary

Blockers (must fix):

  1. Interface mismatch - SyncHead signature
  2. Missing ticker usage - CPU busy loop
  3. Wrong WaitGroup type - compilation error

High Priority:
4. Missing error handling in syncDARange
5. Race condition in daRetrievalLoop default case
6. Timeout consistency issues

Recommendations:

  • Add comprehensive integration tests for the new two-phase flow
  • Resolve or track all TODOs
  • Add rate limiting for DA requests
  • Improve documentation

The architectural direction is sound, but the implementation needs fixes before merging. Once the compilation issues are resolved, I recommend running the full test suite and benchmarks to validate performance characteristics.


type Broadcaster[H header.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
Store() header.Store[H]
SyncHead() (H, error)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will tell you what the latest view of network head is BTW (it's the same as listening from gossipsub).

@tac0turtle
Copy link
Contributor

the main takeaway from this pr is that we should have sync vs following modes. This will be easier to handle when we have celestia-node subscriptions to a namespace. Currently the code difference between sync and follow isnt that far off. Id hold off on the seperation until we have celestia-node API integrated.

I do tend to prefer the event drven design in 2798, while i understand this is effectively what goheader is also doing under the hood when we use getbyheight the event driven approach uses less resources as we dont need to have long lasting requests. This is a microoptimization at this point i understand that as well.

Thanks for the PR, We will come back to it in the future, but for now will close it

@tac0turtle tac0turtle closed this Nov 3, 2025
@github-project-automation github-project-automation bot moved this to Done in Evolve Nov 3, 2025
@Wondertan
Copy link
Member

while i understand this is effectively what goheader is also doing under the hood when we use getbyheight the event driven approach uses less resources as we dont need to have long lasting requests.

@tac0turtle, can you clarify which resources the Get call would use that the new event system would not? For now, the justification for building another event system layer that duplicates existing functionality seems stretched out. The simple get call is both simpler and easier with less code and complexity. While we are not code owners in this repo, it would still be great to have a benchmark that justifies another event system layer to reject Rene's proposal.

Also, from go-header design perspective, it would be great to know why this event system is useful, so that we could potentially incorporate it. The goal of this library is to minimize the amount of code users need to write and solve problems for them instead of creating more.

@tac0turtle
Copy link
Contributor

The reason I didn't do firing getHeights is because it was more complex to have a system that does an indefinite getheight call, then needs watcher to see if the height came in from somewhere else to cancel the call. I did that in one of the commits, the implementation was more complex because we were spinning up many getheight calls and watchers. Doing one is less complex but still requires a watcher to cancel the contexts when the height comes in from a different source.

The design you guys have makes sense when p2p is the single source of syncing, but indefinite calls in our case need to be tracked and cancelled if the block comes in from elsewhere.

I still need to do further testing to verify the flow so it may be that i need to go back to a polling and watcher design.

@tac0turtle tac0turtle deleted the rene/try-refactor-sync branch December 1, 2025 10:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants