Skip to content

Commit

Permalink
fix quickload from cursor: prevent actually reprocessing the blocks a…
Browse files Browse the repository at this point in the history
…nd breaking the stores
  • Loading branch information
sduchesneau committed Feb 26, 2025
1 parent a94c138 commit e67a189
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
13 changes: 7 additions & 6 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,23 @@ func (p *Pipeline) initStoresFromQuickload(ctx context.Context, reqPlan *plan.Re
return true
}

func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (err error) {
if p.initStoresFromQuickload(ctx, reqPlan) {
return nil
func (p *Pipeline) InitTier1StoresAndBackprocess(ctx context.Context, reqPlan *plan.RequestPlan, noopMode bool) (bool, error) {
success := p.initStoresFromQuickload(ctx, reqPlan)
if success {
return true, nil
}

if reqPlan.RequiresParallelProcessing() {
storeMap, err := p.runParallelProcess(ctx, reqPlan, noopMode)
if err != nil {
return fmt.Errorf("run_parallel_process failed: %w", err)
return false, fmt.Errorf("run_parallel_process failed: %w", err)
}
p.stores.SetStoreMap(storeMap) // this is valid even if we don't have stores in the parallelProcessing but only a mapper
return nil
return false, nil
}

p.stores.SetStoreMap(p.setupEmptyStores(ctx))
return nil
return false, nil
}

func (p *Pipeline) GetStoreMap() store.Map {
Expand Down
20 changes: 11 additions & 9 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,8 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
if err := pipe.Init(ctx); err != nil {
return fmt.Errorf("error during pipeline init: %w", err)
}
if err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan, request.NoopMode); err != nil {
loadedFromQuicksave, err := pipe.InitTier1StoresAndBackprocess(ctx, reqPlan, request.NoopMode)
if err != nil {
return fmt.Errorf("error during init_stores_and_backprocess: %w", err)
}
if reqPlan.LinearPipeline == nil {
Expand All @@ -707,13 +708,14 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ

var streamErr error
cursor := requestDetails.ResolvedCursor
var cursorIsTarget bool
if requestDetails.ResolvedStartBlockNum != requestDetails.LinearHandoffBlockNum {
// FIXME(abourget): how is that different from reqPlan.LinearPipeline being set?
// and what does the cursor have to do here?
// This will also be true when we've done backprocessing.. is the cursor affected
// in that case?!
cursorIsTarget = true
var processBlocksBeforeCursor bool
if !loadedFromQuicksave &&
request.StartCursor != "" &&
requestDetails.ResolvedStartBlockNum != requestDetails.LinearHandoffBlockNum {
// if we have a cursor and our linearHandoff is NOT specifically set to our resolved startBlock,
// we ask the pipeline to process blocks before the cursor (between the linearHandoffBlockNum and the cursor)
// so that the stores are correctly populated from the last boundary to our cursor
processBlocksBeforeCursor = true
}
logger.Info("creating firehose stream",
zap.Uint64("handoff_block", requestDetails.LinearHandoffBlockNum),
Expand Down Expand Up @@ -744,7 +746,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
request.StopBlockNum,
cursor,
request.FinalBlocksOnly,
cursorIsTarget,
processBlocksBeforeCursor,
logger.Named("stream"),
bsstream.WithLiveSourceHandlerMiddleware(metering.LiveSourceMiddlewareHandlerFactory(ctx)),
bsstream.WithFileSourceHandlerMiddleware(metering.FileSourceMiddlewareHandlerFactory(ctx)),
Expand Down

0 comments on commit e67a189

Please sign in to comment.