From 8e61f294eb0c6be1e03a41c0d8b355f5185bf2d7 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Sun, 2 Nov 2025 13:55:08 +0100 Subject: [PATCH 1/2] trying to clean up the syncer.go --- block/internal/common/expected_interfaces.go | 3 +- block/internal/syncing/p2p_handler.go | 24 ++- block/internal/syncing/syncer.go | 176 ++++++++++++++----- go.mod | 2 +- pkg/sync/sync_service.go | 4 + 5 files changed, 146 insertions(+), 63 deletions(-) diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go index e44845a62f..6f49c83120 100644 --- a/block/internal/common/expected_interfaces.go +++ b/block/internal/common/expected_interfaces.go @@ -7,8 +7,9 @@ import ( "github.com/celestiaorg/go-header" ) -// broadcaster interface for P2P broadcasting +// Broadcaster interface for P2P broadcasting type Broadcaster[H header.Header[H]] interface { WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error Store() header.Store[H] + SyncHead() (H, error) } diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index 551254ff4b..01632d387d 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -55,11 +55,9 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei default: } - // Create a timeout context for each GetByHeight call to prevent blocking - timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) - header, err := h.headerStore.GetByHeight(timeoutCtx, height) - cancel() - + // TODO @tac0turtle: it is desirable to block here as go-header store automatically subscribes to the new height + // if it is not available, and returns it when it becomes available (appended) + header, err := h.headerStore.GetByHeight(ctx, height) if err != nil { if errors.Is(err, context.DeadlineExceeded) { h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") @@ -79,10 +77,12 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei // Get corresponding data (empty data are still broadcasted by peers) var data *types.Data - timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + + // TODO @tac0turtle: we can provide a sane timeout here bc we expect header + data not to + // drift too much + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2) retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height) cancel() - if err != nil { if errors.Is(err, context.DeadlineExceeded) { h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") @@ -141,11 +141,8 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh default: } - // Create a timeout context for each GetByHeight call to prevent blocking - timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) - data, err := h.dataStore.GetByHeight(timeoutCtx, height) - cancel() - + // TODO @tac0turtle: same here + data, err := h.dataStore.GetByHeight(ctx, height) if err != nil { if errors.Is(err, context.DeadlineExceeded) { h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") @@ -158,10 +155,9 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh } // Get corresponding header with timeout - timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2) header, err := h.headerStore.GetByHeight(timeoutCtx, height) cancel() - if err != nil { if errors.Is(err, context.DeadlineExceeded) { h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index b72001d8f7..a71795dde6 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -124,18 +124,14 @@ func (s *Syncer) Start(ctx context.Context) error { s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) // Start main processing loop - s.wg.Add(1) - go func() { - defer s.wg.Done() + s.wg.Go(func() { s.processLoop() - }() + }) - // Start sync loop (DA and P2P retrieval) - s.wg.Add(1) - go func() { - defer s.wg.Done() + // Start P2p + DA syncing loop + s.wg.Go(func() { s.syncLoop() - }() + }) s.logger.Info().Msg("syncer started") return nil @@ -244,11 +240,10 @@ func (s *Syncer) processLoop() { } } -// syncLoop handles synchronization from DA and P2P sources. +// syncLoop handles the node's initial sync process from both +// DA and P2P sources and then spawns tip-tracking routines +// upon successful initial sync. func (s *Syncer) syncLoop() { - s.logger.Info().Msg("starting sync loop") - defer s.logger.Info().Msg("sync loop stopped") - if delay := time.Until(s.genesis.StartTime); delay > 0 { s.logger.Info().Dur("delay", delay).Msg("waiting until genesis to start syncing") select { @@ -258,25 +253,104 @@ func (s *Syncer) syncLoop() { } } - // Backoff control when DA replies with errors - nextDARequestAt := &time.Time{} + s.logger.Info().Msg("starting sync loop") + defer s.logger.Info().Msg("sync loop stopped") - for { - select { - case <-s.ctx.Done(): + // first flush any existing pending events + s.processPendingEvents() + + // get starting height of node + ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic + startHeight, err := s.store.Height(ctx) + cancel() + if err != nil { + // TODO @tac0turtle: this would be fatal + s.logger.Error().Err(err).Msg("failed to get start height for sync loop") + return + } + + wg := sync.WaitGroup{} + // check if p2p stores are behind the node's store. Note that processing + // header range requires the header to be available in the p2p store in + // order to proceed (regardless of height of data p2p store, so we only + // trigger processing for p2p header store + if s.headerStore.Store().Height() < startHeight { + // trigger sync job async + wg.Go(func() { + s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height(), startHeight+1, s.heightInCh) + }) + } + + // check if DA is behind the node's store, and if so, sync. + if s.GetDAHeight() < startHeight { + // trigger sync job from DA async + wg.Go(func() { + s.syncDARange(startHeight) + }) + } + wg.Wait() + + // begin tip-tracking routines + s.wg.Go(func() { + s.waitForNewP2PHeights() + }) + s.wg.Go(func() { + s.daRetrievalLoop() + }) +} + +func (s *Syncer) syncDARange(toHeight uint64) { + currentDAHeight := s.GetDAHeight() + + for currentDAHeight <= toHeight { + events, err := s.daRetriever.RetrieveFromDA(s.ctx, currentDAHeight) + if err != nil { + if errors.Is(err, coreda.ErrBlobNotFound) { + // no data at this height, increase DA height + currentDAHeight++ + s.SetDAHeight(currentDAHeight) + continue + } + s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve from DA during sync") return - default: } - s.processPendingEvents() - s.tryFetchFromP2P() - s.tryFetchFromDA(nextDARequestAt) + // Process DA events + for _, event := range events { + select { + case <-s.ctx.Done(): + return + case s.heightInCh <- event: + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) + } + } + + // increase DA height + currentDAHeight++ + s.SetDAHeight(currentDAHeight) + } +} + +func (s *Syncer) daRetrievalLoop() { + s.logger.Info().Msg("starting DA retrieval loop") + defer s.logger.Info().Msg("DA retrieval loop stopped") - // Prevent busy-waiting when no events are processed + // Backoff control when DA replies with errors + nextDARequestAt := &time.Time{} + + // TODO @tac0turtle, changed it to fire on Celestia block time + ticker := time.NewTicker(time.Second * 6) + defer ticker.Stop() + + for { select { case <-s.ctx.Done(): return - case <-time.After(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)): + case <-ticker.C: + s.tryFetchFromDA(nextDARequestAt) + s.processPendingEvents() + default: } } } @@ -333,28 +407,39 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { s.SetDAHeight(daHeight + 1) } -// tryFetchFromP2P attempts to fetch events from P2P stores. -// It processes both header and data ranges when the block ticker fires. -// Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P() { - currentHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get current height") - return - } +// waitForNewP2PHeights waits for new headers or data to appear in the p2p +// header/data stores and processes them. +func (s *Syncer) waitForNewP2PHeights() { + // TODO @tac0turtle: ev-node expected blocktime here + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() - // Process headers - newHeaderHeight := s.headerStore.Store().Height() - if newHeaderHeight > currentHeight { - s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) - } + for { + select { + case <-s.ctx.Done(): + return + default: + } + + syncHeadHeader, err := s.headerStore.SyncHead() + if err != nil { + s.logger.Error().Err(err).Msg("failed to get header p2p sync head") + continue + } - // Process data (if not already processed by headers) - newDataHeight := s.dataStore.Store().Height() - // TODO @MARKO: why only if newDataHeight != newHeaderHeight? why not process - // just if newDataHeight > currentHeight ? - if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { - s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) + // try to process all headers between store's height and the syncer's head + s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height()+1, syncHeadHeader.Height(), s.heightInCh) + + // process data (if not already processed by headers) + syncHeadData, err := s.dataStore.SyncHead() + if err != nil { + s.logger.Error().Err(err).Msg("failed to get data p2p sync head") + continue + } + + if s.dataStore.Store().Height() < syncHeadData.Height() { + s.p2pHandler.ProcessDataRange(s.ctx, s.dataStore.Store().Height()+1, syncHeadData.Height(), s.heightInCh) + } } } @@ -636,9 +721,6 @@ func (s *Syncer) processPendingEvents() { case s.heightInCh <- heightEvent: // Event was successfully sent and already removed by GetNextPendingEvent s.logger.Debug().Uint64("height", nextHeight).Msg("sent pending event to processing") - case <-s.ctx.Done(): - s.cache.SetPendingEvent(nextHeight, event) - return default: s.cache.SetPendingEvent(nextHeight, event) return diff --git a/go.mod b/go.mod index 73a0a33966..a44523e679 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/evstack/ev-node -go 1.24.6 +go 1.25.1 retract v0.12.0 // Published by accident diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index b8dc086a6a..9718a94902 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -116,6 +116,10 @@ func newSyncService[H header.Header[H]]( }, nil } +func (syncService *SyncService[H]) SyncHead(ctx context.Context) (H, error) { + return syncService.syncer.Head(ctx) +} + // Store returns the store of the SyncService func (syncService *SyncService[H]) Store() header.Store[H] { return syncService.store From afb9469b422935c36e94a120b539f1a705189107 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Sun, 2 Nov 2025 16:07:04 +0100 Subject: [PATCH 2/2] flush pending --- block/internal/syncing/syncer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a71795dde6..129799adaf 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -440,6 +440,9 @@ func (s *Syncer) waitForNewP2PHeights() { if s.dataStore.Store().Height() < syncHeadData.Height() { s.p2pHandler.ProcessDataRange(s.ctx, s.dataStore.Store().Height()+1, syncHeadData.Height(), s.heightInCh) } + + // and flush all pending + s.processPendingEvents() } }