Skip to content

Commit

Permalink
Prevent 'changing module hash' with a non-zero 'first-streamable-bloc…
Browse files Browse the repository at this point in the history
…k' (#512)

- Reverts commit bf079f8.
- Removed the 'first-streamable-block' flag from 'info' command
- Modules retain their initialBlock parameter in the code,
  executionGraph exposes the adjusted initialBlocks
- Added integration tests on tier1 and tier2
  • Loading branch information
sduchesneau authored Jul 31, 2024
1 parent b1a199c commit 6104050
Show file tree
Hide file tree
Showing 29 changed files with 386 additions and 124 deletions.
4 changes: 1 addition & 3 deletions cmd/substreams/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
func init() {
infoCmd.Flags().String("output-sinkconfig-files-path", "", "if non-empty, any sinkconfig field of type 'bytes' that was packed from a file will be written to that path")
infoCmd.Flags().Bool("skip-package-validation", false, "Do not perform any validation when reading substreams package")
infoCmd.Flags().Uint64("first-streamable-block", 0, "Apply a chain's 'first-streamable-block' to modules, possibly affecting their initialBlock and hashes")
infoCmd.Flags().Bool("used-modules-only", false, "When set, only modules that are used by the output module will be displayed (requires the output_module arg to be set)")
}

Expand Down Expand Up @@ -52,15 +51,14 @@ func runInfo(cmd *cobra.Command, args []string) error {
}

outputSinkconfigFilesPath := sflags.MustGetString(cmd, "output-sinkconfig-files-path")
firstStreamableBlock := sflags.MustGetUint64(cmd, "first-streamable-block")
skipPackageValidation := sflags.MustGetBool(cmd, "skip-package-validation")
onlyShowUsedModules := sflags.MustGetBool(cmd, "used-modules-only")

if onlyShowUsedModules && outputModule == "" {
return fmt.Errorf("used-modules-only flag requires the output_module arg to be set")
}

pkgInfo, err := info.Extended(manifestPath, outputModule, skipPackageValidation, firstStreamableBlock)
pkgInfo, err := info.Extended(manifestPath, outputModule, skipPackageValidation)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Fixed error handling issue in 'backprocessing' causing high CPU usage in tier1 servers
* Fixed handling of packages referenced by `ipfs://` URL (now simply using /api/v0/cat?arg=...)
* Added `--used-modules-only` flag to `substreams info` to only show modules that are in execution tree for the given output_module
* Revert 'initialBlocks' changes from v1.9.1 because a 'changing module hash' causes more trouble.

## v1.9.2

Expand Down
6 changes: 2 additions & 4 deletions info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func Basic(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph) (*BasicInfo,
return manifestInfo, nil
}

func Extended(manifestPath string, outputModule string, skipValidation bool, firstStreamableBlock uint64) (*ExtendedInfo, error) {
func Extended(manifestPath string, outputModule string, skipValidation bool) (*ExtendedInfo, error) {
var opts []manifest.Option
if skipValidation {
opts = append(opts, manifest.SkipPackageValidationReader())
Expand All @@ -234,8 +234,6 @@ func Extended(manifestPath string, outputModule string, skipValidation bool, fir
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

pbsubstreams.ApplyFirstStreamableBlockToModules(firstStreamableBlock, pkg.Modules.Modules)

return ExtendedWithPackage(pkg, graph, outputModule)
}

Expand All @@ -247,7 +245,7 @@ func ExtendedWithPackage(pkg *pbsubstreams.Package, graph *manifest.ModuleGraph,

var stages [][][]string
if outputModule != "" {
execGraph, err := exec.NewOutputModuleGraph(outputModule, true, pkg.Modules)
execGraph, err := exec.NewOutputModuleGraph(outputModule, true, pkg.Modules, 0)
if err != nil {
return nil, fmt.Errorf("creating output module graph: %w", err)
}
Expand Down
15 changes: 1 addition & 14 deletions info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/streamingfast/substreams/manifest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -27,23 +26,11 @@ func TestBasicInfo(t *testing.T) {
}

func TestExtendedInfo(t *testing.T) {
info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false, 0)
info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false)
require.NoError(t, err)

r, err := json.MarshalIndent(info, "", " ")
require.NoError(t, err)

assert.Equal(t, uint64(12369621), info.Modules[0].InitialBlock)

fmt.Println(string(r))
}

func TestExtendedInfoFirstStreamable(t *testing.T) {
info, err := Extended("https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg", "graph_out", false, 999999999)
require.NoError(t, err)

assert.Equal(t, uint64(999999999), info.Modules[0].InitialBlock)
assert.Equal(t, uint64(999999999), info.Modules[1].InitialBlock)
assert.Equal(t, uint64(999999999), info.Modules[2].InitialBlock)
// ...
}
3 changes: 2 additions & 1 deletion orchestrator/parallelprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func BuildParallelProcessor(

// no ReadExecOut if output type is an index
if requestedModule.GetKindMap() != nil {
execOutSegmenter := reqPlan.ReadOutSegmenter(requestedModule.InitialBlock)
initialBlock := execGraph.ModulesInitBlocks()[requestedModule.Name]
execOutSegmenter := reqPlan.ReadOutSegmenter(initialBlock)
walker := execoutStorage.NewFileWalker(requestedModule.Name, execOutSegmenter)

sched.ExecOutWalker = orchestratorExecout.NewWalker(
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {
modules := s.Stages.StageModules(workUnit.Stage)

return loop.Batch(
worker.Work(s.ctx, workUnit, workRange, modules, s.stream),
worker.Work(s.ctx, workUnit, workRange.StartBlock, modules, s.stream),
work.CmdScheduleNextJob(),
)

Expand Down
7 changes: 4 additions & 3 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewStages(
logger := reqctx.Logger(ctx)

stagedModules := execGraph.StagedUsedModules()
modulesInitBlocks := execGraph.ModulesInitBlocks()
out = &Stages{
ctx: ctx,
logger: reqctx.Logger(ctx),
Expand Down Expand Up @@ -108,13 +109,13 @@ func NewStages(
}

var moduleStates []*StoreModuleState
stageLowestInitBlock := layer[0].InitialBlock
stageLowestInitBlock := modulesInitBlocks[layer[0].Name]
for _, mod := range layer {
modSegmenter := segmenter.WithInitialBlock(mod.InitialBlock)
modSegmenter := segmenter.WithInitialBlock(modulesInitBlocks[mod.Name])
modState := NewModuleState(logger, mod.Name, modSegmenter, storeConfigs[mod.Name])
moduleStates = append(moduleStates, modState)

stageLowestInitBlock = min(stageLowestInitBlock, mod.InitialBlock)
stageLowestInitBlock = min(stageLowestInitBlock, modulesInitBlocks[mod.Name])
}

stageSegmenter := segmenter.WithInitialBlock(stageLowestInitBlock)
Expand Down
21 changes: 10 additions & 11 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/metrics"
"github.com/streamingfast/substreams/orchestrator/loop"
Expand All @@ -41,23 +40,23 @@ type Result struct {

type Worker interface {
ID() string
Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd // *Result
Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd // *Result
}

func NewWorkerFactoryFromFunc(f func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd) *SimpleWorkerFactory {
func NewWorkerFactoryFromFunc(f func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd) *SimpleWorkerFactory {
return &SimpleWorkerFactory{
f: f,
id: atomic.AddUint64(&lastWorkerID, 1),
}
}

type SimpleWorkerFactory struct {
f func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd
f func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd
id uint64
}

func (f SimpleWorkerFactory) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd {
return f.f(ctx, unit, workRange, moduleNames, upstream)
func (f SimpleWorkerFactory) Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd {
return f.f(ctx, unit, startBlock, moduleNames, upstream)
}

func (f SimpleWorkerFactory) ID() string {
Expand Down Expand Up @@ -87,13 +86,13 @@ func (w *RemoteWorker) ID() string {
return fmt.Sprintf("%d", w.id)
}

func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, workRange *block.Range) *pbssinternal.ProcessRangeRequest {
func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int, startBlock uint64) *pbssinternal.ProcessRangeRequest {
tier2ReqParams, ok := reqctx.GetTier2RequestParameters(ctx)
if !ok {
panic("unable to get tier2 request parameters")
}

segment := uint64(workRange.StartBlock) / tier2ReqParams.StateBundleSize
segment := startBlock / tier2ReqParams.StateBundleSize

return &pbssinternal.ProcessRangeRequest{
Modules: req.Modules,
Expand All @@ -111,8 +110,8 @@ func NewRequest(ctx context.Context, req *reqctx.RequestDetails, stageIndex int,
}
}

func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd {
request := NewRequest(ctx, reqctx.Details(ctx), unit.Stage, workRange)
func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd {
request := NewRequest(ctx, reqctx.Details(ctx), unit.Stage, startBlock)
logger := reqctx.Logger(ctx)

return func() loop.Msg {
Expand Down Expand Up @@ -192,7 +191,7 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo
}
}

func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRangeRequest, moduleNames []string, upstream *response.Stream) *Result {
func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRangeRequest, _ []string, upstream *response.Stream) *Result {
metrics.Tier1ActiveWorkerRequest.Inc()
metrics.Tier1WorkerRequestCounter.Inc()
defer metrics.Tier1ActiveWorkerRequest.Dec()
Expand Down
3 changes: 1 addition & 2 deletions orchestrator/work/workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/orchestrator/loop"
"github.com/streamingfast/substreams/orchestrator/response"
"github.com/streamingfast/substreams/orchestrator/stage"
Expand All @@ -17,7 +16,7 @@ import (
func Test_workerPoolPool_Borrow_Return(t *testing.T) {
ctx := context.Background()
pi := NewWorkerPool(ctx, 2, func(logger *zap.Logger) Worker {
return NewWorkerFactoryFromFunc(func(ctx context.Context, unit stage.Unit, workRange *block.Range, moduleNames []string, upstream *response.Stream) loop.Cmd {
return NewWorkerFactoryFromFunc(func(ctx context.Context, unit stage.Unit, startBlock uint64, moduleNames []string, upstream *response.Stream) loop.Cmd {
return func() loop.Msg {
return &Result{}
}
Expand Down
5 changes: 2 additions & 3 deletions pb/sf/substreams/intern/v2/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ func (r *ProcessRangeRequest) Validate() error {
return fmt.Errorf("merged blocks store is required in request")
case r.SegmentSize == 0:
return fmt.Errorf("a non-zero state bundle size is required in request")
case (r.SegmentNumber+1)*r.SegmentSize < r.FirstStreamableBlock:
return fmt.Errorf("requested segment is way below the first streamable block")

case ((r.SegmentNumber+1)*r.SegmentSize - 1) < r.FirstStreamableBlock:
return fmt.Errorf("segment is completely below the first streamable block")
}

seenStores := map[string]bool{}
Expand Down
8 changes: 0 additions & 8 deletions pb/sf/substreams/v1/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ const (
ModuleKindBlockIndex
)

func ApplyFirstStreamableBlockToModules(firstStreamableBlock uint64, modules []*Module) {
for _, mod := range modules {
if mod.InitialBlock < firstStreamableBlock {
mod.InitialBlock = firstStreamableBlock
}
}
}

func (m *Module) BlockFilterQueryString() (string, error) {
if m.BlockFilter == nil {
return "", nil
Expand Down
34 changes: 22 additions & 12 deletions pipeline/exec/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ func (g *Graph) LowestStoresInitBlock() *uint64 { return g.lowestStoresIni
func (g *Graph) ModulesInitBlocks() map[string]uint64 { return g.modulesInitBlocks }
func (g *Graph) OutputModuleStageIndex() int { return len(g.stagedUsedModules) - 1 }

func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) (out *Graph, err error) {
func NewOutputModuleGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules, firstStreamableBlock uint64) (out *Graph, err error) {
out = &Graph{
requestModules: modules,
}
if err := out.computeGraph(outputModule, productionMode, modules); err != nil {
if err := out.computeGraph(outputModule, productionMode, modules, firstStreamableBlock); err != nil {
return nil, fmt.Errorf("module graph: %w", err)
}

return out, nil
}

func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules) error {
func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *pbsubstreams.Modules, firstStreamableBlock uint64) error {
graph, err := manifest.NewModuleGraph(modules.Modules)
if err != nil {
return fmt.Errorf("compute graph: %w", err)
Expand All @@ -92,16 +92,20 @@ func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *
g.usedModules = processModules
g.modulesInitBlocks = map[string]uint64{}
for _, mod := range g.usedModules {
g.modulesInitBlocks[mod.Name] = mod.InitialBlock
initialBlock := mod.InitialBlock
if initialBlock < firstStreamableBlock {
initialBlock = firstStreamableBlock
}
g.modulesInitBlocks[mod.Name] = initialBlock
}

g.stagedUsedModules, err = computeStages(g.usedModules, g.modulesInitBlocks)
if err != nil {
return err
}

g.lowestInitBlock = computeLowestInitBlock(processModules)
g.lowestStoresInitBlock = computeLowestStoresInitBlock(processModules)
g.lowestInitBlock = computeLowestInitBlock(processModules, firstStreamableBlock)
g.lowestStoresInitBlock = computeLowestStoresInitBlock(processModules, firstStreamableBlock)
if err := g.hashModules(graph); err != nil {
return fmt.Errorf("cannot hash module: %w", err)
}
Expand All @@ -126,7 +130,7 @@ func (g *Graph) computeGraph(outputModule string, productionMode bool, modules *
}

// computeLowestStoresInitBlock finds the lowest initial block of all store modules.
func computeLowestStoresInitBlock(modules []*pbsubstreams.Module) (out *uint64) {
func computeLowestStoresInitBlock(modules []*pbsubstreams.Module, firstStreamableBlock uint64) (out *uint64) {
lowest := uint64(math.MaxUint64)
countStores := 0
for _, mod := range modules {
Expand All @@ -143,12 +147,15 @@ func computeLowestStoresInitBlock(modules []*pbsubstreams.Module) (out *uint64)
return nil
}

if lowest < firstStreamableBlock {
return &firstStreamableBlock
}
return &lowest
}

// computeLowestInitBlock finds the lowest initial block of all modules that are not block indexes.
// if there are only blockIndex types of modules, it returns 0, because blockIndex modules are always at 0.
func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) {
func computeLowestInitBlock(modules []*pbsubstreams.Module, firstStreamableBlock uint64) (out uint64) {
var atLeastOneModuleThatIsNotAnIndex bool
lowest := uint64(math.MaxUint64)
for _, mod := range modules {
Expand All @@ -161,7 +168,10 @@ func computeLowestInitBlock(modules []*pbsubstreams.Module) (out uint64) {
}
}
if !atLeastOneModuleThatIsNotAnIndex {
return 0
return firstStreamableBlock
}
if lowest < firstStreamableBlock {
return firstStreamableBlock
}
return lowest
}
Expand Down Expand Up @@ -243,12 +253,12 @@ func computeStages(mods []*pbsubstreams.Module, initBlocks map[string]uint64) (s
continue
case *pbsubstreams.Module_Input_Map_:
depModName = input.Map.ModuleName
if mod.InitialBlock >= initBlocks[depModName] {
if initBlocks[mod.Name] >= initBlocks[depModName] {
validInputsAtInitialBlock = true
}
case *pbsubstreams.Module_Input_Store_:
depModName = input.Store.ModuleName
if mod.InitialBlock >= initBlocks[depModName] {
if initBlocks[mod.Name] >= initBlocks[depModName] {
validInputsAtInitialBlock = true
}
default:
Expand All @@ -264,7 +274,7 @@ func computeStages(mods []*pbsubstreams.Module, initBlocks map[string]uint64) (s
}

if !validInputsAtInitialBlock {
return nil, fmt.Errorf("module %q has no input available at its initial block %d", mod.Name, mod.InitialBlock)
return nil, fmt.Errorf("module %q has no input available at its initial block %d", mod.Name, initBlocks[mod.Name])
}

//Check block index dependence
Expand Down
7 changes: 4 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
}

p.loadedModules = loadedModules
modulesInitBlocks := p.execGraph.ModulesInitBlocks()

var stagedModuleExecutors [][]exec.ModuleExecutor
for _, stage := range p.executionStages {
Expand Down Expand Up @@ -516,7 +517,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
baseExecutor := exec.NewBaseExecutor(
ctx,
module.Name,
module.InitialBlock,
modulesInitBlocks[module.Name],
mod,
p.wasmRuntime.InstanceCacheEnabled(),
inputs,
Expand All @@ -540,7 +541,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
baseExecutor := exec.NewBaseExecutor(
ctx,
module.Name,
module.InitialBlock,
modulesInitBlocks[module.Name],
mod,
p.wasmRuntime.InstanceCacheEnabled(),
inputs,
Expand All @@ -558,7 +559,7 @@ func (p *Pipeline) BuildModuleExecutors(ctx context.Context) error {
baseExecutor := exec.NewBaseExecutor(
ctx,
module.Name,
module.InitialBlock,
modulesInitBlocks[module.Name],
mod,
p.wasmRuntime.InstanceCacheEnabled(),
inputs,
Expand Down
Loading

0 comments on commit 6104050

Please sign in to comment.