diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c2b3be8c..cdf053cd 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -215,22 +215,23 @@ func (p *Pipeline) initStoresFromQuickload(ctx context.Context, reqPlan *plan.Re return true } -func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (err error) { - if p.initStoresFromQuickload(ctx, reqPlan) { - return nil +func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (bool, error) { + success := p.initStoresFromQuickload(ctx, reqPlan) + if success { + return true, nil } if reqPlan.RequiresParallelProcessing() { storeMap, err := p.runParallelProcess(ctx, reqPlan, noopMode) if err != nil { - return fmt.Errorf("run_parallel_process failed: %w", err) + return false, fmt.Errorf("run_parallel_process failed: %w", err) } p.stores.SetStoreMap(storeMap) // this is valid even if we don't have stores in the parallelProcessing but only a mapper - return nil + return false, nil } p.stores.SetStoreMap(p.setupEmptyStores(ctx)) - return nil + return false, nil } func (p *Pipeline) GetStoreMap() store.Map { diff --git a/service/tier1.go b/service/tier1.go index ceec5353..d1716838 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -698,7 +698,8 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ if err := pipe.Init(ctx); err != nil { return fmt.Errorf("error during pipeline init: %w", err) } - if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan, request.NoopMode); err != nil { + loadedFromQuicksave, err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan, request.NoopMode) + if err != nil { return fmt.Errorf("error during init_stores_and_backprocess: %w", err) } if reqPlan.LinearPipeline == nil { @@ -707,13 +708,14 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ var streamErr error cursor := requestDetails.ResolvedCursor - var cursorIsTarget bool - if requestDetails.ResolvedStartBlockNum != requestDetails.LinearHandoffBlockNum { - // FIXME(abourget): how is that different from reqPlan.LinearPipeline being set? - // and what does the cursor have to do here? - // This will also be true when we've done backprocessing.. is the cursor affected - // in that case?! - cursorIsTarget = true + var processBlocksBeforeCursor bool + if !loadedFromQuicksave && + request.StartCursor != "" && + requestDetails.ResolvedStartBlockNum != requestDetails.LinearHandoffBlockNum { + // if we have a cursor and our linearHandoff is NOT specifically set to our resolved startBlock, + // we ask the pipeline to process blocks before the cursor (between the linearHandoffBlockNum and the cursor) + // so that the stores are correctly populated from the last boundary to our cursor + processBlocksBeforeCursor = true } logger.Info("creating firehose stream", zap.Uint64("handoff_block", requestDetails.LinearHandoffBlockNum), @@ -744,7 +746,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ request.StopBlockNum, cursor, request.FinalBlocksOnly, - cursorIsTarget, + processBlocksBeforeCursor, logger.Named("stream"), bsstream.WithLiveSourceHandlerMiddleware(metering.LiveSourceMiddlewareHandlerFactory(ctx)), bsstream.WithFileSourceHandlerMiddleware(metering.FileSourceMiddlewareHandlerFactory(ctx)),