diff --git a/cmd/substreams/info.go b/cmd/substreams/info.go index ee09c973f..659c13c5b 100644 --- a/cmd/substreams/info.go +++ b/cmd/substreams/info.go @@ -17,7 +17,6 @@ import ( func init() { infoCmd.Flags().String("output-sinkconfig-files-path", "", "if non-empty, any sinkconfig field of type 'bytes' that was packed from a file will be written to that path") infoCmd.Flags().Bool("skip-package-validation", false, "Do not perform any validation when reading substreams package") - infoCmd.Flags().Uint64("first-streamable-block", 0, "Apply a chain's 'first-streamable-block' to modules, possibly affecting their initialBlock and hashes") infoCmd.Flags().Bool("used-modules-only", false, "When set, only modules that are used by the output module will be displayed (requires the output_module arg to be set)") } @@ -52,7 +51,6 @@ func runInfo(cmd *cobra.Command, args []string) error { } outputSinkconfigFilesPath := sflags.MustGetString(cmd, "output-sinkconfig-files-path") - firstStreamableBlock := sflags.MustGetUint64(cmd, "first-streamable-block") skipPackageValidation := sflags.MustGetBool(cmd, "skip-package-validation") onlyShowUsedModules := sflags.MustGetBool(cmd, "used-modules-only") @@ -60,7 +58,7 @@ func runInfo(cmd *cobra.Command, args []string) error { return fmt.Errorf("used-modules-only flag requires the output_module arg to be set") } - pkgInfo, err := info.Extended(manifestPath, outputModule, skipPackageValidation, firstStreamableBlock) + pkgInfo, err := info.Extended(manifestPath, outputModule, skipPackageValidation) if err != nil { return err } diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 78c5e34bc..4337d2b6c 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Fixed error handling issue in 'backprocessing' causing high CPU usage in tier1 servers * Fixed handling of packages referenced by `ipfs://` URL (now simply using /api/v0/cat?arg=...) * Added `--used-modules-only` flag to `substreams info` to only show modules that are in execution tree for the given output_module +* Revert 'initialBlocks' changes from v1.9.1 because a 'changing module hash' causes more trouble. ## v1.9.2 diff --git a/info/info.go b/info/info.go index 4d243aa70..016c3929e 100644 --- a/info/info.go +++ b/info/info.go @@ -219,7 +219,7 @@ func Basic(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph) (*BasicInfo, return manifestInfo, nil } -func Extended(manifestPath string, outputModule string, skipValidation bool, firstStreamableBlock uint64) (*ExtendedInfo, error) { +func Extended(manifestPath string, outputModule string, skipValidation bool) (*ExtendedInfo, error) { var opts []manifest.Option if skipValidation { opts = append(opts, manifest.SkipPackageValidationReader()) @@ -234,8 +234,6 @@ func Extended(manifestPath string, outputModule string, skipValidation bool, fir return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err) } - pbsubstreams.ApplyFirstStreamableBlockToModules(firstStreamableBlock, pkg.Modules.Modules) - return ExtendedWithPackage(pkg, graph, outputModule) } @@ -247,7 +245,7 @@ func ExtendedWithPackage(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph, var stages [][][]string if outputModule != "" { - execGraph, err := exec.NewOutputModuleGraph(outputModule, true, pkg.Modules) + execGraph, err := exec.NewOutputModuleGraph(outputModule, true, pkg.Modules, 0) if err != nil { return nil, fmt.Errorf("creating output module graph: %w", err) } diff --git a/info/info_test.go b/info/info_test.go index d14fea776..6c8cf9496 100644 --- a/info/info_test.go +++ b/info/info_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/streamingfast/substreams/manifest" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -27,23 +26,11 @@ func TestBasicInfo(t *testing.T) { } func TestExtendedInfo(t *testing.T) { - info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false, 0) + info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false) require.NoError(t, err) r, err := json.MarshalIndent(info, "", " ") require.NoError(t, err) - assert.Equal(t, uint64(12369621), info.Modules[0].InitialBlock) - fmt.Println(string(r)) } - -func TestExtendedInfoFirstStreamable(t *testing.T) { - info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false, 999999999) - require.NoError(t, err) - - assert.Equal(t, uint64(999999999), info.Modules[0].InitialBlock) - assert.Equal(t, uint64(999999999), info.Modules[1].InitialBlock) - assert.Equal(t, uint64(999999999), info.Modules[2].InitialBlock) - // ... -} diff --git a/orchestrator/parallelprocessor.go b/orchestrator/parallelprocessor.go index e3bb5a2d3..f1920ba69 100644 --- a/orchestrator/parallelprocessor.go +++ b/orchestrator/parallelprocessor.go @@ -73,7 +73,8 @@ func BuildParallelProcessor( // no ReadExecOut if output type is an index if requestedModule.GetKindMap() != nil { - execOutSegmenter := reqPlan.ReadOutSegmenter(requestedModule.InitialBlock) + initialBlock := execGraph.ModulesInitBlocks()[requestedModule.Name] + execOutSegmenter := reqPlan.ReadOutSegmenter(initialBlock) walker := execoutStorage.NewFileWalker(requestedModule.Name, execOutSegmenter) sched.ExecOutWalker = orchestratorExecout.NewWalker( diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index 32ea3a112..a86936edc 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -124,7 +124,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { modules := s.Stages.StageModules(workUnit.Stage) return loop.Batch( - worker.Work(s.ctx, workUnit, workRange, modules, s.stream), + worker.Work(s.ctx, workUnit, workRange.StartBlock, modules, s.stream), work.CmdScheduleNextJob(), ) diff --git a/orchestrator/stage/stages.go b/orchestrator/stage/stages.go index e28ef6714..288eddc87 100644 --- a/orchestrator/stage/stages.go +++ b/orchestrator/stage/stages.go @@ -70,6 +70,7 @@ func NewStages( logger := reqctx.Logger(ctx) stagedModules := execGraph.StagedUsedModules() + modulesInitBlocks := execGraph.ModulesInitBlocks() out = &Stages{ ctx: ctx, logger: reqctx.Logger(ctx), @@ -108,13 +109,13 @@ func NewStages( } var moduleStates []*StoreModuleState - stageLowestInitBlock := layer[0].InitialBlock + stageLowestInitBlock := modulesInitBlocks[layer[0].Name] for _, mod := range layer { - modSegmenter := segmenter.WithInitialBlock(mod.InitialBlock) + modSegmenter := segmenter.WithInitialBlock(modulesInitBlocks[mod.Name]) modState := NewModuleState(logger, mod.Name, modSegmenter, storeConfigs[mod.Name]) moduleStates = append(moduleStates, modState) - stageLowestInitBlock = min(stageLowestInitBlock, mod.InitialBlock) + stageLowestInitBlock = min(stageLowestInitBlock, modulesInitBlocks[mod.Name]) } stageSegmenter := segmenter.WithInitialBlock(stageLowestInitBlock) diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index 3b0d0baf3..cc3027f41 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" - "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/client" "github.com/streamingfast/substreams/metrics" "github.com/streamingfast/substreams/orchestrator/loop" @@ -41,10 +40,10 @@ type Result struct { type Worker interface { ID() string - Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd // *Result + Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd // *Result } -func NewWorkerFactoryFromFunc(f func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd) *SimpleWorkerFactory { +func NewWorkerFactoryFromFunc(f func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd) *SimpleWorkerFactory { return &SimpleWorkerFactory{ f: f, id: atomic.AddUint64(&lastWorkerID, 1), @@ -52,12 +51,12 @@ func NewWorkerFactoryFromFunc(f func(ctx context.Context, unit stage.Unit, workR } type SimpleWorkerFactory struct { - f func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd + f func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd id uint64 } -func (f SimpleWorkerFactory) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd { - return f.f(ctx, unit, workRange, moduleNames, upstream) +func (f SimpleWorkerFactory) Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd { + return f.f(ctx, unit, startBlock, moduleNames, upstream) } func (f SimpleWorkerFactory) ID() string { @@ -87,13 +86,13 @@ func (w *RemoteWorker) ID() string { return fmt.Sprintf("%d", w.id) } -func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, workRange *block.Range) *pbssinternal.ProcessRangeRequest { +func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, startBlock uint64) *pbssinternal.ProcessRangeRequest { tier2ReqParams, ok := reqctx.GetTier2RequestParameters(ctx) if !ok { panic("unable to get tier2 request parameters") } - segment := uint64(workRange.StartBlock) / tier2ReqParams.StateBundleSize + segment := startBlock / tier2ReqParams.StateBundleSize return &pbssinternal.ProcessRangeRequest{ Modules: req.Modules, @@ -111,8 +110,8 @@ func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, } } -func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd { - request := NewRequest(ctx, reqctx.Details(ctx), unit.Stage, workRange) +func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd { + request := NewRequest(ctx, reqctx.Details(ctx), unit.Stage, startBlock) logger := reqctx.Logger(ctx) return func() loop.Msg { @@ -192,7 +191,7 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo } } -func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRangeRequest, moduleNames []string, upstream *response.Stream) *Result { +func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRangeRequest, _ []string, upstream *response.Stream) *Result { metrics.Tier1ActiveWorkerRequest.Inc() metrics.Tier1WorkerRequestCounter.Inc() defer metrics.Tier1ActiveWorkerRequest.Dec() diff --git a/orchestrator/work/workerpool_test.go b/orchestrator/work/workerpool_test.go index 8c051f10b..dd2745ead 100644 --- a/orchestrator/work/workerpool_test.go +++ b/orchestrator/work/workerpool_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" - "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/orchestrator/loop" "github.com/streamingfast/substreams/orchestrator/response" "github.com/streamingfast/substreams/orchestrator/stage" @@ -17,7 +16,7 @@ import ( func Test_workerPoolPool_Borrow_Return(t *testing.T) { ctx := context.Background() pi := NewWorkerPool(ctx, 2, func(logger *zap.Logger) Worker { - return NewWorkerFactoryFromFunc(func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd { + return NewWorkerFactoryFromFunc(func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd { return func() loop.Msg { return &Result{} } diff --git a/pb/sf/substreams/intern/v2/validate.go b/pb/sf/substreams/intern/v2/validate.go index 967c9378a..3680f8867 100644 --- a/pb/sf/substreams/intern/v2/validate.go +++ b/pb/sf/substreams/intern/v2/validate.go @@ -24,9 +24,8 @@ func (r *ProcessRangeRequest) Validate() error { return fmt.Errorf("merged blocks store is required in request") case r.SegmentSize == 0: return fmt.Errorf("a non-zero state bundle size is required in request") - case (r.SegmentNumber+1)*r.SegmentSize < r.FirstStreamableBlock: - return fmt.Errorf("requested segment is way below the first streamable block") - + case ((r.SegmentNumber+1)*r.SegmentSize - 1) < r.FirstStreamableBlock: + return fmt.Errorf("segment is completely below the first streamable block") } seenStores := map[string]bool{} diff --git a/pb/sf/substreams/v1/modules.go b/pb/sf/substreams/v1/modules.go index e95896906..4ef27357e 100644 --- a/pb/sf/substreams/v1/modules.go +++ b/pb/sf/substreams/v1/modules.go @@ -13,14 +13,6 @@ const ( ModuleKindBlockIndex ) -func ApplyFirstStreamableBlockToModules(firstStreamableBlock uint64, modules []*Module) { - for _, mod := range modules { - if mod.InitialBlock < firstStreamableBlock { - mod.InitialBlock = firstStreamableBlock - } - } -} - func (m *Module) BlockFilterQueryString() (string, error) { if m.BlockFilter == nil { return "", nil diff --git a/pipeline/exec/graph.go b/pipeline/exec/graph.go index 342713a62..1ab0db049 100644 --- a/pipeline/exec/graph.go +++ b/pipeline/exec/graph.go @@ -67,18 +67,18 @@ func (g *Graph) LowestStoresInitBlock() *uint64 { return g.lowestStoresIni func (g *Graph) ModulesInitBlocks() map[string]uint64 { return g.modulesInitBlocks } func (g *Graph) OutputModuleStageIndex() int { return len(g.stagedUsedModules) - 1 } -func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) (out *Graph, err error) { +func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules, firstStreamableBlock uint64) (out *Graph, err error) { out = &Graph{ requestModules: modules, } - if err := out.computeGraph(outputModule, productionMode, modules); err != nil { + if err := out.computeGraph(outputModule, productionMode, modules, firstStreamableBlock); err != nil { return nil, fmt.Errorf("module graph: %w", err) } return out, nil } -func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) error { +func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules, firstStreamableBlock uint64) error { graph, err := manifest.NewModuleGraph(modules.Modules) if err != nil { return fmt.Errorf("compute graph: %w", err) @@ -92,7 +92,11 @@ func (g *Graph) computeGraph(outputModule string, productionMode bool, modules * g.usedModules = processModules g.modulesInitBlocks = map[string]uint64{} for _, mod := range g.usedModules { - g.modulesInitBlocks[mod.Name] = mod.InitialBlock + initialBlock := mod.InitialBlock + if initialBlock < firstStreamableBlock { + initialBlock = firstStreamableBlock + } + g.modulesInitBlocks[mod.Name] = initialBlock } g.stagedUsedModules, err = computeStages(g.usedModules, g.modulesInitBlocks) @@ -100,8 +104,8 @@ func (g *Graph) computeGraph(outputModule string, productionMode bool, modules * return err } - g.lowestInitBlock = computeLowestInitBlock(processModules) - g.lowestStoresInitBlock = computeLowestStoresInitBlock(processModules) + g.lowestInitBlock = computeLowestInitBlock(processModules, firstStreamableBlock) + g.lowestStoresInitBlock = computeLowestStoresInitBlock(processModules, firstStreamableBlock) if err := g.hashModules(graph); err != nil { return fmt.Errorf("cannot hash module: %w", err) } @@ -126,7 +130,7 @@ func (g *Graph) computeGraph(outputModule string, productionMode bool, modules * } // computeLowestStoresInitBlock finds the lowest initial block of all store modules. -func computeLowestStoresInitBlock(modules []*pbsubstreams.Module) (out *uint64) { +func computeLowestStoresInitBlock(modules []*pbsubstreams.Module, firstStreamableBlock uint64) (out *uint64) { lowest := uint64(math.MaxUint64) countStores := 0 for _, mod := range modules { @@ -143,12 +147,15 @@ func computeLowestStoresInitBlock(modules []*pbsubstreams.Module) (out *uint64) return nil } + if lowest < firstStreamableBlock { + return &firstStreamableBlock + } return &lowest } // computeLowestInitBlock finds the lowest initial block of all modules that are not block indexes. // if there are only blockIndex types of modules, it returns 0, because blockIndex modules are always at 0. -func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) { +func computeLowestInitBlock(modules []*pbsubstreams.Module, firstStreamableBlock uint64) (out uint64) { var atLeastOneModuleThatIsNotAnIndex bool lowest := uint64(math.MaxUint64) for _, mod := range modules { @@ -161,7 +168,10 @@ func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) { } } if !atLeastOneModuleThatIsNotAnIndex { - return 0 + return firstStreamableBlock + } + if lowest < firstStreamableBlock { + return firstStreamableBlock } return lowest } @@ -243,12 +253,12 @@ func computeStages(mods []*pbsubstreams.Module, initBlocks map[string]uint64) (s continue case *pbsubstreams.Module_Input_Map_: depModName = input.Map.ModuleName - if mod.InitialBlock >= initBlocks[depModName] { + if initBlocks[mod.Name] >= initBlocks[depModName] { validInputsAtInitialBlock = true } case *pbsubstreams.Module_Input_Store_: depModName = input.Store.ModuleName - if mod.InitialBlock >= initBlocks[depModName] { + if initBlocks[mod.Name] >= initBlocks[depModName] { validInputsAtInitialBlock = true } default: @@ -264,7 +274,7 @@ func computeStages(mods []*pbsubstreams.Module, initBlocks map[string]uint64) (s } if !validInputsAtInitialBlock { - return nil, fmt.Errorf("module %q has no input available at its initial block %d", mod.Name, mod.InitialBlock) + return nil, fmt.Errorf("module %q has no input available at its initial block %d", mod.Name, initBlocks[mod.Name]) } //Check block index dependence diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 8d43fea7e..6be13e0b3 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -479,6 +479,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error { } p.loadedModules = loadedModules + modulesInitBlocks := p.execGraph.ModulesInitBlocks() var stagedModuleExecutors [][]exec.ModuleExecutor for _, stage := range p.executionStages { @@ -516,7 +517,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error { baseExecutor := exec.NewBaseExecutor( ctx, module.Name, - module.InitialBlock, + modulesInitBlocks[module.Name], mod, p.wasmRuntime.InstanceCacheEnabled(), inputs, @@ -540,7 +541,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error { baseExecutor := exec.NewBaseExecutor( ctx, module.Name, - module.InitialBlock, + modulesInitBlocks[module.Name], mod, p.wasmRuntime.InstanceCacheEnabled(), inputs, @@ -558,7 +559,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error { baseExecutor := exec.NewBaseExecutor( ctx, module.Name, - module.InitialBlock, + modulesInitBlocks[module.Name], mod, p.wasmRuntime.InstanceCacheEnabled(), inputs, diff --git a/service/live_back_filler.go b/service/live_back_filler.go index 79a4e7a17..1a5bcb410 100644 --- a/service/live_back_filler.go +++ b/service/live_back_filler.go @@ -6,7 +6,6 @@ import ( "io" "github.com/streamingfast/derr" - "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/orchestrator/work" "github.com/streamingfast/substreams/reqctx" @@ -20,7 +19,7 @@ import ( const finalBlockDelay = 120 const backfillRetries = 999 // no point in failing "early". It may be failing because merged blocks are lagging behind a little bit. -type RequestBackProcessingFunc = func(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobCompleted chan error) +type RequestBackProcessingFunc = func(ctx context.Context, logger *zap.Logger, startBlock uint64, stageToProcess int, clientFactory client.InternalClientFactory, jobCompleted chan error) type LiveBackFiller struct { RequestBackProcessing RequestBackProcessingFunc @@ -57,8 +56,8 @@ func (l *LiveBackFiller) ProcessBlock(blk *pbbstream.Block, obj interface{}) (er return l.NextHandler.ProcessBlock(blk, obj) } -func RequestBackProcessing(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) { - liveBackFillerRequest := work.NewRequest(ctx, reqctx.Details(ctx), stageToProcess, blockRange) +func RequestBackProcessing(ctx context.Context, logger *zap.Logger, startBlock uint64, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) { + liveBackFillerRequest := work.NewRequest(ctx, reqctx.Details(ctx), stageToProcess, startBlock) err := derr.RetryContext(ctx, backfillRetries, func(ctx context.Context) error { err := requestBackProcessing(ctx, logger, liveBackFillerRequest, clientFactory) @@ -149,10 +148,8 @@ func (l *LiveBackFiller) Start(ctx context.Context) { if (targetSegment > l.currentSegment) && mergedBlockIsWritten { - liveBackFillerRange := block.NewRange(segmentStart, segmentEnd) - jobProcessing = true - go l.RequestBackProcessing(ctx, l.logger, liveBackFillerRange, l.stageToProcess, l.clientFactory, jobResult) + go l.RequestBackProcessing(ctx, l.logger, segmentStart, l.stageToProcess, l.clientFactory, jobResult) } } } diff --git a/service/live_back_filler_test.go b/service/live_back_filler_test.go index 04d0b4476..b83bd0f8f 100644 --- a/service/live_back_filler_test.go +++ b/service/live_back_filler_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/streamingfast/substreams/block" - "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -90,13 +88,13 @@ func TestBackFiller(t *testing.T) { testLogger := zap.NewNop() segmentProcessed := make(chan uint64) - RequestBackProcessingTest := func(ctx context.Context, logger *zap.Logger, blockRange *block.Range, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) { + RequestBackProcessingTest := func(ctx context.Context, logger *zap.Logger, startBlock uint64, stageToProcess int, clientFactory client.InternalClientFactory, jobResult chan error) { var err error if c.errorBackProcessing { err = fmt.Errorf("fail") } - segmentNumber := blockRange.ExclusiveEndBlock / c.segmentSize + segmentNumber := startBlock/c.segmentSize + 1 segmentProcessed <- segmentNumber jobResult <- err diff --git a/service/testing.go b/service/testing.go index b107e2307..fe59192df 100644 --- a/service/testing.go +++ b/service/testing.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/streamingfast/bstream" "github.com/streamingfast/substreams/wasm" "github.com/streamingfast/substreams/reqctx" @@ -34,7 +35,7 @@ func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum ui } func (s *Tier1Service) TestBlocks(ctx context.Context, isSubRequest bool, request *pbsubstreamsrpc.Request, respFunc substreams.ResponseFunc) error { - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules) + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock) if err != nil { return stream.NewErrInvalidArg(err.Error()) } diff --git a/service/tier1.go b/service/tier1.go index 42e949fed..815296aba 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -228,10 +228,7 @@ func (s *Tier1Service) Blocks( return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err)) } - // this tweaks the actual request so all initialBlocks are correct with the given chain firstStreamableBlock - pbsubstreams.ApplyFirstStreamableBlockToModules(bstream.GetProtocolFirstStreamableBlock, request.Modules.Modules) - - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules) + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock) if err != nil { return bsstream.NewErrInvalidArg(err.Error()) } @@ -450,12 +447,12 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ segmentSize := s.runtimeConfig.SegmentSize - execOutputConfigs, err := execout.NewConfigs(cacheStore, execGraph.UsedModules(), execGraph.ModuleHashes(), segmentSize, logger) + execOutputConfigs, err := execout.NewConfigs(cacheStore, execGraph.UsedModules(), execGraph.ModuleHashes(), segmentSize, chainFirstStreamableBlock, logger) if err != nil { return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes()) + storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), chainFirstStreamableBlock) if err != nil { return fmt.Errorf("configuring stores: %w", err) } diff --git a/service/tier2.go b/service/tier2.go index a540f19f7..e505ab457 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -128,9 +128,6 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s metrics.Tier2RequestCounter.Inc() defer metrics.Tier2ActiveRequests.Dec() - // this tweaks the actual request so all initialBlocks are correct with the given chain firstStreamableBlock - pbsubstreams.ApplyFirstStreamableBlockToModules(request.FirstStreamableBlock, request.Modules.Modules) - // We keep `err` here as the unaltered error from `blocks` call, this is used in the EndSpan to record the full error // and not only the `grpcError` one which is a subset view of the full `err`. var err error @@ -174,6 +171,7 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s zap.Uint32("stage", request.Stage), zap.Strings("modules", moduleNames), zap.String("output_module", request.OutputModule), + zap.Uint64("first_streamable_block", request.FirstStreamableBlock), } if auth := dauth.FromContext(ctx); auth != nil { @@ -237,7 +235,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P return err } - execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules) + execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules, request.FirstStreamableBlock) if err != nil { return stream.NewErrInvalidArg(err.Error()) } @@ -265,18 +263,19 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P execGraph.UsedModulesUpToStage(int(request.Stage)), execGraph.ModuleHashes(), request.SegmentSize, + request.FirstStreamableBlock, logger) if err != nil { return fmt.Errorf("new config map: %w", err) } - storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes()) + storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes(), request.FirstStreamableBlock) if err != nil { return fmt.Errorf("configuring stores: %w", err) } // indexes are not metered: we want users to use them as much as possible - indexConfigs, err := index.NewConfigs(unmeteredCacheStore, execGraph.UsedIndexesModulesUpToStage(int(request.Stage)), execGraph.ModuleHashes(), logger) + indexConfigs, err := index.NewConfigs(unmeteredCacheStore, execGraph.UsedIndexesModulesUpToStage(int(request.Stage)), execGraph.ModuleHashes(), request.FirstStreamableBlock, logger) if err != nil { return fmt.Errorf("configuring indexes: %w", err) } diff --git a/service/validate_test.go b/service/validate_test.go index 4ce4084e9..fc161dcf9 100644 --- a/service/validate_test.go +++ b/service/validate_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" + pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" ) @@ -42,6 +43,72 @@ func Test_ValidateRequest(t *testing.T) { } } +func Test_ValidateTier2Request(t *testing.T) { + + //testStoreModule := &pbsubstreams.Module{ + // Name: "test", + // Kind: &pbsubstreams.Module_KindStore_{}, + //} + testMapModule := &pbsubstreams.Module{ + Name: "test", + Kind: &pbsubstreams.Module_KindMap_{}, + } + + // testOutputMap := withOutputModule("output_mod", "map") + // testOutputStore := withOutputModule("output_mod", "store") + + getPerfectRequest := func() *pbssinternal.ProcessRangeRequest { + return &pbssinternal.ProcessRangeRequest{ + Modules: &pbsubstreams.Modules{Modules: []*pbsubstreams.Module{testMapModule}}, + OutputModule: "test", + Stage: 0, + SegmentNumber: 10, + SegmentSize: 10, + FirstStreamableBlock: 0, + MeteringConfig: "metering", + BlockType: "block", + StateStore: "/tmp/state", + MergedBlocksStore: "/tmp/merged", + } + } + + tests := []struct { + name string + tweakReq func(*pbssinternal.ProcessRangeRequest) + expect error + }{ + {"negative start block num", func(req *pbssinternal.ProcessRangeRequest) { req.Modules = nil }, + fmt.Errorf("validate tier2 request: no modules found in request")}, + {"completely below first streamable block", func(req *pbssinternal.ProcessRangeRequest) { req.FirstStreamableBlock = 110 }, + fmt.Errorf("validate tier2 request: segment is completely below the first streamable block")}, + {"half below first streamable block", func(req *pbssinternal.ProcessRangeRequest) { req.FirstStreamableBlock = 109 }, + nil}, + {"old 'stopBlockNum' is set", func(req *pbssinternal.ProcessRangeRequest) { req.StopBlockNum = 123 }, + fmt.Errorf("validate tier2 request: invalid protocol: update your tier1")}, + {"no output module", func(req *pbssinternal.ProcessRangeRequest) { req.OutputModule = "" }, + fmt.Errorf("validate tier2 request: no output module defined in request")}, + {"no metering config", func(req *pbssinternal.ProcessRangeRequest) { req.MeteringConfig = "" }, + fmt.Errorf("validate tier2 request: metering config is required in request")}, + {"no blocktype", func(req *pbssinternal.ProcessRangeRequest) { req.BlockType = "" }, + fmt.Errorf("validate tier2 request: block type is required in request")}, + {"working", func(req *pbssinternal.ProcessRangeRequest) {}, + nil}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + req := getPerfectRequest() + test.tweakReq(req) + err := ValidateTier2Request(req) + if test.expect != nil { + require.EqualError(t, err, test.expect.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + type reqOption func(*pbsubstreamsrpc.Request) *pbsubstreamsrpc.Request func withOutputModule(outputModule, kind string) reqOption { diff --git a/storage/execout/configs.go b/storage/execout/configs.go index 0bf53e627..f5d8a219e 100644 --- a/storage/execout/configs.go +++ b/storage/execout/configs.go @@ -16,12 +16,17 @@ type Configs struct { logger *zap.Logger } -func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, execOutputSaveInterval uint64, logger *zap.Logger) (*Configs, error) { +func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, execOutputSaveInterval uint64, firstStreamableBlock uint64, logger *zap.Logger) (*Configs, error) { out := make(map[string]*Config) for _, mod := range allRequestedModules { + + initialBlock := mod.InitialBlock + if initialBlock < firstStreamableBlock { + initialBlock = firstStreamableBlock + } conf, err := NewConfig( mod.Name, - mod.InitialBlock, + initialBlock, mod.ModuleKind(), moduleHashes.Get(mod.Name), baseObjectStore, diff --git a/storage/index/configs.go b/storage/index/configs.go index 9cc0d7563..c8f71a43e 100644 --- a/storage/index/configs.go +++ b/storage/index/configs.go @@ -14,12 +14,16 @@ type Configs struct { logger *zap.Logger } -func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, logger *zap.Logger) (*Configs, error) { +func NewConfigs(baseObjectStore dstore.Store, allRequestedModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, firstStreamableBlock uint64, logger *zap.Logger) (*Configs, error) { out := make(map[string]*Config) for _, mod := range allRequestedModules { + initialBlock := mod.InitialBlock + if initialBlock < firstStreamableBlock { + initialBlock = firstStreamableBlock + } conf, err := NewConfig( mod.Name, - mod.InitialBlock, + initialBlock, moduleHashes.Get(mod.Name), baseObjectStore, logger, diff --git a/storage/store/configmap.go b/storage/store/configmap.go index e0a9d1801..cbb090dca 100644 --- a/storage/store/configmap.go +++ b/storage/store/configmap.go @@ -10,12 +10,16 @@ import ( type ConfigMap map[string]*Config -func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes) (out ConfigMap, err error) { +func NewConfigMap(baseObjectStore dstore.Store, storeModules []*pbsubstreams.Module, moduleHashes *manifest.ModuleHashes, firstStreamableBlock uint64) (out ConfigMap, err error) { out = make(ConfigMap) for _, storeModule := range storeModules { + initialBlock := storeModule.InitialBlock + if initialBlock < firstStreamableBlock { + initialBlock = firstStreamableBlock + } c, err := NewConfig( storeModule.Name, - storeModule.InitialBlock, + initialBlock, moduleHashes.Get(storeModule.Name), storeModule.GetKindStore().UpdatePolicy, storeModule.GetKindStore().ValueType, diff --git a/test/complex_integration_test.go b/test/complex_integration_test.go index efa5983b0..a306e1287 100644 --- a/test/complex_integration_test.go +++ b/test/complex_integration_test.go @@ -62,7 +62,7 @@ func TestAllAssertionsInComplex(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - run := newTestRun(t, int64(c.startBlock), c.linearHandoffBlock, c.exclusiveEndBlock, c.moduleName, "./testdata/complex_substreams/complex-substreams-v0.1.0.spkg") + run := newTestRun(t, int64(c.startBlock), c.linearHandoffBlock, c.exclusiveEndBlock, 0, c.moduleName, "./testdata/complex_substreams/complex-substreams-v0.1.0.spkg") err := run.Run(t, c.moduleName) if c.expectError { require.Error(t, err) diff --git a/test/integration_test.go b/test/integration_test.go index b85ea175b..d5d5286b4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -128,7 +128,7 @@ func TestForkHandling(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - run := newTestRun(t, 1, 1, 7, test.module, "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 1, 1, 7, 0, test.module, "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.NewBlockGenerator = func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { return &ForkBlockGenerator{ initialLIB: bstream.NewBlockRef("0a", 0), @@ -187,6 +187,7 @@ func TestOneStoreOneMap(t *testing.T) { startBlock int64 linearBlock uint64 stopBlock uint64 + firstStreamableBlock uint64 production bool preWork testPreWork expectedResponseCount int @@ -252,6 +253,23 @@ func TestOneStoreOneMap(t *testing.T) { assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map }, }, + { + name: "prod_mode_back_forward_to_stop_nonzero_first_streamable", + firstStreamableBlock: 16, + startBlock: 0, + linearBlock: 30, + stopBlock: 30, + production: true, + expectedResponseCount: 14, + expectFiles: []string{ + assertTestStoreAddI64Hash + "/outputs/0000000016-0000000020.output", // map + assertTestStoreAddI64Hash + "/outputs/0000000020-0000000030.output", // map + testStoreAddI64Hash + "/outputs/0000000016-0000000020.output", + testStoreAddI64Hash + "/outputs/0000000020-0000000030.output", + testStoreAddI64Hash + "/states/0000000020-0000000016.kv", + testStoreAddI64Hash + "/states/0000000030-0000000016.kv", + }, + }, { name: "prod_mode_back_forward_to_stop_passed_boundary", startBlock: 25, @@ -279,7 +297,7 @@ func TestOneStoreOneMap(t *testing.T) { stopBlock: 30, production: true, preWork: func(t *testing.T, run *testRun, workerFactory work.WorkerFactory) { - partialPreWork(t, 1, 10, 0, run, workerFactory) + partialPreWork(t, 1, 0, run, workerFactory) }, expectedResponseCount: 29, expectFiles: []string{ @@ -300,7 +318,9 @@ func TestOneStoreOneMap(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - run := newTestRun(t, test.startBlock, test.linearBlock, test.stopBlock, "assert_test_store_add_i64", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + bstream.GetProtocolFirstStreamableBlock = test.firstStreamableBlock // set for tier1 request to grab + run := newTestRun(t, test.startBlock, test.linearBlock, test.stopBlock, test.firstStreamableBlock, "assert_test_store_add_i64", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run.ProductionMode = test.production run.ParallelSubrequests = 1 run.PreWork = test.preWork @@ -326,7 +346,7 @@ func TestOneStoreOneMap(t *testing.T) { } func TestStoreDeletePrefix(t *testing.T) { - run := newTestRun(t, 30, 40, 42, "assert_test_store_delete_prefix", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 30, 40, 42, 0, "assert_test_store_delete_prefix", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.BlockProcessedCallback = func(ctx *execContext) { if ctx.block.Number == 40 { s, storeFound := ctx.stores.Get("test_store_delete_prefix") @@ -340,7 +360,7 @@ func TestStoreDeletePrefix(t *testing.T) { func TestAllAssertions(t *testing.T) { // Relies on `assert_all_test` having modInit == 1, so - run := newTestRun(t, 1, 31, 31, "assert_all_test", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 1, 31, 31, 0, "assert_all_test", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") require.NoError(t, run.Run(t, "assert_all_test")) @@ -350,7 +370,7 @@ func TestAllAssertions(t *testing.T) { } func Test_SimpleMapModule(t *testing.T) { - run := newTestRun(t, 10000, 10001, 10001, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 10000, 10001, 10001, 0, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.Params = map[string]string{"test_map": "my test params"} run.NewBlockGenerator = func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { return &LinearBlockGenerator{ @@ -365,7 +385,7 @@ func Test_SimpleMapModule(t *testing.T) { } func Test_WASMBindgenShims(t *testing.T) { - run := newTestRun(t, 12, 14, 14, "map_block", "./testdata/wasmbindgen_substreams/wasmbindgen-substreams-v0.1.0.spkg") + run := newTestRun(t, 12, 14, 14, 0, "map_block", "./testdata/wasmbindgen_substreams/wasmbindgen-substreams-v0.1.0.spkg") run.NewBlockGenerator = func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { return &LinearBlockGenerator{ startBlock: startBlock, @@ -382,7 +402,7 @@ func Test_WASMBindgenShims(t *testing.T) { } func Test_Early(t *testing.T) { - run := newTestRun(t, 12, 14, 14, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 12, 14, 14, 0, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.Params = map[string]string{"test_map": "my test params"} run.ProductionMode = true run.NewBlockGenerator = func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { @@ -397,7 +417,7 @@ func Test_Early(t *testing.T) { } func TestEarlyWithEmptyStore(t *testing.T) { - run := newTestRun(t, 2, 4, 4, "assert_test_store_delete_prefix", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 2, 4, 4, 0, "assert_test_store_delete_prefix", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.ProductionMode = true var foundBlock3 bool @@ -413,7 +433,7 @@ func TestEarlyWithEmptyStore(t *testing.T) { } func Test_SingleMapModule_FileWalker(t *testing.T) { - run := newTestRun(t, 200, 250, 300, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") + run := newTestRun(t, 200, 250, 300, 0, "test_map", "./testdata/simple_substreams/substreams-test-v0.1.0.spkg") run.Params = map[string]string{"test_map": "my test params"} run.ProductionMode = true run.NewBlockGenerator = func(startBlock uint64, inclusiveStopBlock uint64) TestBlockGenerator { @@ -476,7 +496,7 @@ func assertFiles(t *testing.T, tempDir string, expectPartialSpkg bool, wantedFil assert.ElementsMatch(t, wantedFiles, actualFiles) } -func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory) { +func partialPreWork(t *testing.T, start uint64, stageIdx int, run *testRun, workerFactory work.WorkerFactory) { worker := workerFactory(zlog) // FIXME: use the new `Work` interface here, and validate that the @@ -484,7 +504,7 @@ func partialPreWork(t *testing.T, start, end uint64, stageIdx int, run *testRun, segmenter := block.NewSegmenter(10, 0, 0) unit := stage.Unit{Segment: segmenter.IndexForStartBlock(start), Stage: stageIdx} ctx := reqctx.WithRequest(run.Context, &reqctx.RequestDetails{Modules: run.Package.Modules, OutputModule: run.ModuleName}) - cmd := worker.Work(ctx, unit, block.NewRange(start, end), []string{run.ModuleName}, nil) + cmd := worker.Work(ctx, unit, start, []string{run.ModuleName}, nil) result := cmd() msg, ok := result.(work.MsgJobSucceeded) require.True(t, ok) diff --git a/test/runnable_test.go b/test/runnable_test.go index e54aa145d..c0c009728 100644 --- a/test/runnable_test.go +++ b/test/runnable_test.go @@ -46,6 +46,7 @@ type testRun struct { NewBlockGenerator BlockGeneratorFactory BlockProcessedCallback blockProcessedCallBack LinearHandoffBlockNum uint64 // defaults to the request's StopBlock, so no linear handoff, only backprocessing + FirstStreamableBlock uint64 ProductionMode bool // PreWork can be done to perform tier2 work in advance, to simulate when // pre-existing data is available in different conditions @@ -58,10 +59,10 @@ type testRun struct { TempDir string } -func newTestRun(t *testing.T, startBlock int64, linearHandoffBlock, exclusiveEndBlock uint64, moduleName string, manifestPath string) *testRun { +func newTestRun(t *testing.T, startBlock int64, linearHandoffBlock, exclusiveEndBlock, firstStreamableBlock uint64, moduleName string, manifestPath string) *testRun { pkg := manifest.TestReadManifest(t, manifestPath) - return &testRun{Package: pkg, StartBlock: startBlock, ExclusiveEndBlock: exclusiveEndBlock, ModuleName: moduleName, LinearHandoffBlockNum: linearHandoffBlock} + return &testRun{Package: pkg, StartBlock: startBlock, ExclusiveEndBlock: exclusiveEndBlock, FirstStreamableBlock: firstStreamableBlock, ModuleName: moduleName, LinearHandoffBlockNum: linearHandoffBlock} } func (f *testRun) Run(t *testing.T, testName string) error { @@ -135,6 +136,7 @@ func (f *testRun) Run(t *testing.T, testName string) error { blockProcessedCallBack: f.BlockProcessedCallback, testTempDir: testTempDir, id: workerID.Inc(), + firstStreamableBlock: f.FirstStreamableBlock, } } @@ -317,7 +319,7 @@ type TestRunner struct { generator TestBlockGenerator } -func (r *TestRunner) StreamFactory(_ context.Context, h bstream.Handler, startBlockNum int64, stopBlockNum uint64, _ string, _ bool, _ bool, _ *zap.Logger) (service.Streamable, error) { +func (r *TestRunner) StreamFactory(ctx context.Context, h bstream.Handler, startBlockNum int64, stopBlockNum uint64, _ string, _ bool, _ bool, _ *zap.Logger) (service.Streamable, error) { var liveBackFiller *service.LiveBackFiller if liveBackFillerHandler, ok := h.(*service.LiveBackFiller); ok { @@ -327,8 +329,17 @@ func (r *TestRunner) StreamFactory(_ context.Context, h bstream.Handler, startBl r.pipe = pipelineHandler } + firstStreamableBlock := bstream.GetProtocolFirstStreamableBlock + if tier2ReqParams, ok := reqctx.GetTier2RequestParameters(ctx); ok { + firstStreamableBlock = tier2ReqParams.FirstStreamableBlock + } + r.Shutter = shutter.New() - r.generator = r.blockGeneratorFactory(uint64(startBlockNum), stopBlockNum) + factoryFirstBlock := uint64(startBlockNum) + if factoryFirstBlock < firstStreamableBlock { + factoryFirstBlock = firstStreamableBlock + } + r.generator = r.blockGeneratorFactory(factoryFirstBlock, stopBlockNum) return r, nil } diff --git a/test/testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg b/test/testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg new file mode 100644 index 000000000..484d7e605 Binary files /dev/null and b/test/testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg differ diff --git a/test/testdata/simple_substreams_init0/substreams.yaml b/test/testdata/simple_substreams_init0/substreams.yaml new file mode 100644 index 000000000..c45b40880 --- /dev/null +++ b/test/testdata/simple_substreams_init0/substreams.yaml @@ -0,0 +1,62 @@ +specVersion: v0.1.0 +package: + name: "substreams_test_init0" + version: v0.1.0 + +protobuf: + importPaths: + - ../simple_substreams/proto + +binaries: + default: + type: wasm/rust-v1 + file: ../simple_substreams/target/wasm32-unknown-unknown/release/substreams_test.wasm + protoPackageMapping: + sf.ethereum.type.v2: "substreams_ethereum::pb::eth::v2" + sf.substreams.v1: "substreams::pb::substreams" + sf.substreams.v1.test: "pb::test" + my.types.v1: "pb::my_types_v1" + +modules: + - name: test_index + kind: blockIndex + inputs: + - source: sf.substreams.v1.test.Block + output: + type: proto:sf.substreams.index.v1.Keys + + - name: test_map + kind: map + inputs: + - params: string + - source: sf.substreams.v1.test.Block + output: + type: proto:sf.substreams.v1.test.MapResult + + - name: setup_test_store_add_i64 + kind: store + updatePolicy: add + valueType: int64 + inputs: + - source: sf.substreams.v1.test.Block + + - name: assert_test_store_add_i64 + kind: map + inputs: + - source: sf.substreams.v1.test.Block + - store: setup_test_store_add_i64 + output: + type: proto:sf.substreams.v1.test.Boolean + + - name: assert_test_store_add_i64_deltas + kind: map + inputs: + - source: sf.substreams.v1.test.Block + - store: setup_test_store_add_i64 + - store: setup_test_store_add_i64 + mode: deltas + output: + type: proto:sf.substreams.v1.test.Boolean + +params: + test_map: my test params diff --git a/test/tier2_integration_test.go b/test/tier2_integration_test.go index 6541558da..8b99d6474 100644 --- a/test/tier2_integration_test.go +++ b/test/tier2_integration_test.go @@ -23,7 +23,6 @@ import ( "github.com/streamingfast/substreams/manifest" - "github.com/streamingfast/substreams/block" "github.com/stretchr/testify/require" "github.com/streamingfast/substreams/orchestrator/work" @@ -59,7 +58,7 @@ func TestTier2Call(t *testing.T) { cases := []struct { name string startBlock uint64 - endBlock uint64 + firstStreamableBlock uint64 stage int moduleName string stateBundleSize uint64 @@ -79,7 +78,6 @@ func TestTier2Call(t *testing.T) { { name: "check full kv production in previous stages", startBlock: 50, - endBlock: 60, stage: 3, moduleName: "map_output_init_50", stateBundleSize: 10, @@ -105,6 +103,119 @@ func TestTier2Call(t *testing.T) { }, }, + // Simple substreams package with initialBlock==0 : "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg" + // Output module : test_map + //Stage 0: [["test_map"]] + { + name: "first streamble block", + startBlock: 10, + firstStreamableBlock: 18, + stage: 0, + moduleName: "test_map", + stateBundleSize: 10, + manifestPath: "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg", + preCreatedFiles: nil, + + expectRemainingFiles: []string{ + "746573745f6d6170/outputs/0000000018-0000000020.output", + }, + }, + + // Simple substreams package with initialBlock==0 : "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg" + // Output module : test_map + //Stage 0: [["test_store_add_i64"]] + //Stage 1: [["assert_test_store_add_i64"]] (not run) + { + name: "first streamble block with store", + startBlock: 10, + firstStreamableBlock: 18, + stage: 0, + moduleName: "assert_test_store_add_i64", + stateBundleSize: 10, + manifestPath: "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg", + preCreatedFiles: nil, + + expectRemainingFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.partial", + }, + }, + + // Simple substreams package with initialBlock==0 : "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg" + // Output module : test_map + //Stage 0: [["test_store_add_i64"]] + //Stage 1: [["assert_test_store_add_i64"]] + { + name: "first streamble block with store all stages together", + startBlock: 10, + firstStreamableBlock: 18, + stage: 1, + moduleName: "assert_test_store_add_i64", + stateBundleSize: 10, + manifestPath: "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg", + preCreatedFiles: nil, + + expectRemainingFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.kv", // kv store done directly + "6173736572745f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output", + }, + }, + + // Simple substreams package with initialBlock==0 : "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg" + // Output module : test_map + //Stage 0: [["test_store_add_i64"]] + //Stage 1: [["assert_test_store_add_i64"]] (not run) + { + name: "first streamble block with store second segment", + startBlock: 20, + firstStreamableBlock: 18, + stage: 0, + moduleName: "assert_test_store_add_i64", + stateBundleSize: 10, + manifestPath: "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg", + preCreatedFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output.zst", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.partial.zst", + }, + + expectRemainingFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.partial", + + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000020-0000000030.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000030-0000000020.partial", + }, + }, + + // Simple substreams package with initialBlock==0 : "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg" + // Output module : test_map + //Stage 0: [["test_store_add_i64"]] + //Stage 1: [["assert_test_store_add_i64"]] + { + name: "first streamble block with store second stage", + startBlock: 20, + firstStreamableBlock: 18, + stage: 1, + moduleName: "assert_test_store_add_i64", + stateBundleSize: 10, + manifestPath: "./testdata/simple_substreams_init0/substreams-test-init0-v0.1.0.spkg", + preCreatedFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output.zst", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.kv.zst", + }, + + expectRemainingFiles: []string{ + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000018-0000000020.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000020-0000000018.kv", + + "73657475705f746573745f73746f72655f6164645f693634/outputs/0000000020-0000000030.output", + "73657475705f746573745f73746f72655f6164645f693634/states/0000000030-0000000018.kv", + + "6173736572745f746573745f73746f72655f6164645f693634/outputs/0000000020-0000000030.output", + }, + }, + // Complex substreams package : "./testdata/complex_substreams/complex-substreams-v0.1.0.spkg" // Output module : second_map_output_init_50 //Stage 0: [["first_store_init_20"]] @@ -114,7 +225,6 @@ func TestTier2Call(t *testing.T) { { name: "stores with different initial blocks on the same stage", startBlock: 50, - endBlock: 60, stage: 3, moduleName: "second_map_output_init_50", stateBundleSize: 10, @@ -149,7 +259,6 @@ func TestTier2Call(t *testing.T) { { name: "test index_init_60 with map_using_index_init_70 filtering through key 'even' with pre-existing random indices", startBlock: 70, - endBlock: 80, stage: 0, moduleName: "map_using_index_init_70", stateBundleSize: 10, @@ -175,7 +284,6 @@ func TestTier2Call(t *testing.T) { { name: "test index_init_60 with map_using_index_init_70 filtering through key 'even'", startBlock: 70, - endBlock: 80, stage: 0, moduleName: "map_using_index_init_70", stateBundleSize: 10, @@ -210,8 +318,11 @@ func TestTier2Call(t *testing.T) { ctx = reqctx.WithTier2RequestParameters(ctx, reqctx.Tier2RequestParameters{ BlockType: "sf.substreams.v1.test.Block", + FirstStreamableBlock: test.firstStreamableBlock, StateBundleSize: test.stateBundleSize, StateStoreURL: filepath.Join(testTempDir, "test.store"), + MeteringConfig: "some_metering_config", + MergedBlockStoreURL: "some_merged_block_store_url", StateStoreDefaultTag: "tag", }) @@ -224,9 +335,8 @@ func TestTier2Call(t *testing.T) { } } - workRange := block.NewRange(test.startBlock, test.endBlock) - - request := work.NewRequest(ctx, reqctx.Details(ctx), test.stage, workRange) + request := work.NewRequest(ctx, reqctx.Details(ctx), test.stage, test.startBlock) + require.NoError(t, request.Validate()) err = processInternalRequest(t, ctx, request, nil, newBlockGenerator, responseCollector, nil, testTempDir) require.NoError(t, err) diff --git a/test/worker_test.go b/test/worker_test.go index 2342ff7f4..26667e53d 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -9,7 +9,6 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" - "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/orchestrator/loop" "github.com/streamingfast/substreams/orchestrator/response" "github.com/streamingfast/substreams/orchestrator/stage" @@ -24,6 +23,7 @@ type TestWorker struct { blockProcessedCallBack blockProcessedCallBack testTempDir string id uint64 + firstStreamableBlock uint64 } var workerID atomic.Uint64 @@ -32,7 +32,7 @@ func (w *TestWorker) ID() string { return fmt.Sprintf("%d", w.id) } -func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd { +func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd { w.t.Helper() ctx = reqctx.WithTier2RequestParameters(ctx, reqctx.Tier2RequestParameters{ @@ -40,8 +40,9 @@ func (w *TestWorker) Work(ctx context.Context, unit stage.Unit, workRange *block StateBundleSize: 10, StateStoreURL: filepath.Join(w.testTempDir, "test.store"), StateStoreDefaultTag: "tag", + FirstStreamableBlock: w.firstStreamableBlock, }) - request := work.NewRequest(ctx, reqctx.Details(ctx), unit.Stage, workRange) + request := work.NewRequest(ctx, reqctx.Details(ctx), unit.Stage, startBlock) logger := reqctx.Logger(ctx) logger = logger.With(zap.Uint64("workerId", w.id))