Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion block/internal/common/expected_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"github.com/celestiaorg/go-header"
)

// broadcaster interface for P2P broadcasting
// Broadcaster interface for P2P broadcasting
type Broadcaster[H header.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
Store() header.Store[H]
SyncHead() (H, error)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will tell you what the latest view of network head is BTW (it's the same as listening from gossipsub).

}
24 changes: 10 additions & 14 deletions block/internal/syncing/p2p_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei
default:
}

// Create a timeout context for each GetByHeight call to prevent blocking
timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
header, err := h.headerStore.GetByHeight(timeoutCtx, height)
cancel()

// TODO @tac0turtle: it is desirable to block here as go-header store automatically subscribes to the new height
// if it is not available, and returns it when it becomes available (appended)
header, err := h.headerStore.GetByHeight(ctx, height)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later")
Expand All @@ -79,10 +77,12 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei

// Get corresponding data (empty data are still broadcasted by peers)
var data *types.Data
timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond)

// TODO @tac0turtle: we can provide a sane timeout here bc we expect header + data not to
// drift too much
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2)
retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height)
cancel()

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later")
Expand Down Expand Up @@ -141,11 +141,8 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh
default:
}

// Create a timeout context for each GetByHeight call to prevent blocking
timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
data, err := h.dataStore.GetByHeight(timeoutCtx, height)
cancel()

// TODO @tac0turtle: same here
data, err := h.dataStore.GetByHeight(ctx, height)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later")
Expand All @@ -158,10 +155,9 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh
}

// Get corresponding header with timeout
timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2)
header, err := h.headerStore.GetByHeight(timeoutCtx, height)
cancel()

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later")
Expand Down
179 changes: 132 additions & 47 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,14 @@ func (s *Syncer) Start(ctx context.Context) error {
s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger)

// Start main processing loop
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
s.processLoop()
}()
})

// Start sync loop (DA and P2P retrieval)
s.wg.Add(1)
go func() {
defer s.wg.Done()
// Start P2p + DA syncing loop
s.wg.Go(func() {
s.syncLoop()
}()
})

s.logger.Info().Msg("syncer started")
return nil
Expand Down Expand Up @@ -244,11 +240,10 @@ func (s *Syncer) processLoop() {
}
}

// syncLoop handles synchronization from DA and P2P sources.
// syncLoop handles the node's initial sync process from both
// DA and P2P sources and then spawns tip-tracking routines
// upon successful initial sync.
func (s *Syncer) syncLoop() {
s.logger.Info().Msg("starting sync loop")
defer s.logger.Info().Msg("sync loop stopped")

if delay := time.Until(s.genesis.StartTime); delay > 0 {
s.logger.Info().Dur("delay", delay).Msg("waiting until genesis to start syncing")
select {
Expand All @@ -258,25 +253,104 @@ func (s *Syncer) syncLoop() {
}
}

// Backoff control when DA replies with errors
nextDARequestAt := &time.Time{}
s.logger.Info().Msg("starting sync loop")
defer s.logger.Info().Msg("sync loop stopped")

for {
select {
case <-s.ctx.Done():
// first flush any existing pending events
s.processPendingEvents()

// get starting height of node
ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic
startHeight, err := s.store.Height(ctx)
cancel()
if err != nil {
// TODO @tac0turtle: this would be fatal
s.logger.Error().Err(err).Msg("failed to get start height for sync loop")
return
}

wg := sync.WaitGroup{}
// check if p2p stores are behind the node's store. Note that processing
// header range requires the header to be available in the p2p store in
// order to proceed (regardless of height of data p2p store, so we only
// trigger processing for p2p header store
if s.headerStore.Store().Height() < startHeight {
// trigger sync job async
wg.Go(func() {
s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height(), startHeight+1, s.heightInCh)
})
}

// check if DA is behind the node's store, and if so, sync.
if s.GetDAHeight() < startHeight {
// trigger sync job from DA async
wg.Go(func() {
s.syncDARange(startHeight)
})
}
wg.Wait()

// begin tip-tracking routines
s.wg.Go(func() {
s.waitForNewP2PHeights()
})
s.wg.Go(func() {
s.daRetrievalLoop()
})
}

func (s *Syncer) syncDARange(toHeight uint64) {
currentDAHeight := s.GetDAHeight()

for currentDAHeight <= toHeight {
events, err := s.daRetriever.RetrieveFromDA(s.ctx, currentDAHeight)
if err != nil {
if errors.Is(err, coreda.ErrBlobNotFound) {
// no data at this height, increase DA height
currentDAHeight++
s.SetDAHeight(currentDAHeight)
continue
}
s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve from DA during sync")
return
default:
}

s.processPendingEvents()
s.tryFetchFromP2P()
s.tryFetchFromDA(nextDARequestAt)
// Process DA events
for _, event := range events {
select {
case <-s.ctx.Done():
return
case s.heightInCh <- event:
default:
s.cache.SetPendingEvent(event.Header.Height(), &event)
}
}

// Prevent busy-waiting when no events are processed
// increase DA height
currentDAHeight++
s.SetDAHeight(currentDAHeight)
}
}

func (s *Syncer) daRetrievalLoop() {
s.logger.Info().Msg("starting DA retrieval loop")
defer s.logger.Info().Msg("DA retrieval loop stopped")

// Backoff control when DA replies with errors
nextDARequestAt := &time.Time{}

// TODO @tac0turtle, changed it to fire on Celestia block time
ticker := time.NewTicker(time.Second * 6)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-time.After(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)):
case <-ticker.C:
s.tryFetchFromDA(nextDARequestAt)
s.processPendingEvents()
default:
}
}
}
Expand Down Expand Up @@ -333,28 +407,42 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) {
s.SetDAHeight(daHeight + 1)
}

// tryFetchFromP2P attempts to fetch events from P2P stores.
// It processes both header and data ranges when the block ticker fires.
// Returns true if any events were successfully processed.
func (s *Syncer) tryFetchFromP2P() {
currentHeight, err := s.store.Height(s.ctx)
if err != nil {
s.logger.Error().Err(err).Msg("failed to get current height")
return
}
// waitForNewP2PHeights waits for new headers or data to appear in the p2p
// header/data stores and processes them.
func (s *Syncer) waitForNewP2PHeights() {
// TODO @tac0turtle: ev-node expected blocktime here
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()

// Process headers
newHeaderHeight := s.headerStore.Store().Height()
if newHeaderHeight > currentHeight {
s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh)
}
for {
select {
case <-s.ctx.Done():
return
default:
}

// Process data (if not already processed by headers)
newDataHeight := s.dataStore.Store().Height()
// TODO @MARKO: why only if newDataHeight != newHeaderHeight? why not process
// just if newDataHeight > currentHeight ?
if newDataHeight != newHeaderHeight && newDataHeight > currentHeight {
s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh)
syncHeadHeader, err := s.headerStore.SyncHead()
if err != nil {
s.logger.Error().Err(err).Msg("failed to get header p2p sync head")
continue
}

// try to process all headers between store's height and the syncer's head
s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height()+1, syncHeadHeader.Height(), s.heightInCh)

// process data (if not already processed by headers)
syncHeadData, err := s.dataStore.SyncHead()
if err != nil {
s.logger.Error().Err(err).Msg("failed to get data p2p sync head")
continue
}

if s.dataStore.Store().Height() < syncHeadData.Height() {
s.p2pHandler.ProcessDataRange(s.ctx, s.dataStore.Store().Height()+1, syncHeadData.Height(), s.heightInCh)
}

// and flush all pending
s.processPendingEvents()
}
}

Expand Down Expand Up @@ -636,9 +724,6 @@ func (s *Syncer) processPendingEvents() {
case s.heightInCh <- heightEvent:
// Event was successfully sent and already removed by GetNextPendingEvent
s.logger.Debug().Uint64("height", nextHeight).Msg("sent pending event to processing")
case <-s.ctx.Done():
s.cache.SetPendingEvent(nextHeight, event)
return
default:
s.cache.SetPendingEvent(nextHeight, event)
return
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/evstack/ev-node

go 1.24.6
go 1.25.1

retract v0.12.0 // Published by accident

Expand Down
4 changes: 4 additions & 0 deletions pkg/sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func newSyncService[H header.Header[H]](
}, nil
}

func (syncService *SyncService[H]) SyncHead(ctx context.Context) (H, error) {
return syncService.syncer.Head(ctx)
}

// Store returns the store of the SyncService
func (syncService *SyncService[H]) Store() header.Store[H] {
return syncService.store
Expand Down
Loading