Skip to content

Commit

Permalink
fix handling block execution timeout and returning error
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Aug 20, 2024
1 parent 9ba80f6 commit 850a4f8
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pipeline/exec/baseexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (e *BaseExecutor) wasmCall(outputGetter execout.ExecutionOutputGetter) (cal
return nil, fmt.Errorf("block %d: module %q: general wasm execution panicked: %w: %s", clock.Number, e.moduleName, ErrWasmDeterministicExec, errExecutor.Error())
}
if err != nil {
if err := e.ctx.Err(); err != nil {
return nil, fmt.Errorf("block %d: module %q: general wasm execution failed: %w", clock.Number, e.moduleName, err)
if ctxErr := e.ctx.Err(); ctxErr != nil {
return nil, fmt.Errorf("block %d: module %q: general wasm execution failed: %w, %w", clock.Number, e.moduleName, err, ctxErr)
}
return nil, fmt.Errorf("block %d: module %q: general wasm execution failed: %w: %s", clock.Number, e.moduleName, ErrWasmDeterministicExec, err)
}
Expand Down
1 change: 1 addition & 0 deletions pipeline/exec/indexexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (i *IndexModuleExecutor) applyCachedOutput([]byte) error {
func (i *IndexModuleExecutor) run(ctx context.Context, reader execout.ExecutionOutputGetter) (out []byte, outForFiles []byte, moduleOutputData *pbssinternal.ModuleOutput, err error) {
_, span := reqctx.WithModuleExecutionSpan(ctx, "exec_index")
defer span.EndWithErr(&err)
i.ctx = ctx

var call *wasm.Call
if call, err = i.wasmCall(reader); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pipeline/exec/mapexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (e *MapperModuleExecutor) run(ctx context.Context, reader execout.Execution
_, span := reqctx.WithModuleExecutionSpan(ctx, "exec_map")
defer span.EndWithErr(&err)

e.ctx = ctx
var call *wasm.Call
if call, err = e.wasmCall(reader); err != nil {
return nil, nil, nil, fmt.Errorf("maps wasm call: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pipeline/exec/storeexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (e *StoreModuleExecutor) applyCachedOutput(value []byte) error {
func (e *StoreModuleExecutor) run(ctx context.Context, reader execout.ExecutionOutputGetter) (out []byte, outForFiles []byte, moduleOutputData *pbssinternal.ModuleOutput, err error) {
_, span := reqctx.WithModuleExecutionSpan(ctx, "exec_store")
defer span.EndWithErr(&err)
e.ctx = ctx

if _, err := e.wasmCall(reader); err != nil {
return nil, nil, nil, fmt.Errorf("store wasm call: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *Pipeline) processBlock(
dmetering.GetBytesMeter(ctx).AddBytesRead(execOutput.Len())
err = p.handleStepNew(ctx, clock, cursor, execOutput)
if err != nil && err != io.EOF {
return fmt.Errorf("step new: handler step new: %w", err)
return err
}
if err == io.EOF {
eof = true
Expand All @@ -137,20 +137,20 @@ func (p *Pipeline) processBlock(
p.blockStepMap[bstream.StepNewIrreversible]++
err = p.handleStepNew(ctx, clock, cursor, execOutput)
if err != nil && err != io.EOF {
return fmt.Errorf("step new irr: handler step new: %w", err)
return err
}
if err == io.EOF {
eof = true
}
err = p.handleStepFinal(clock)
if err != nil {
return fmt.Errorf("handling step irreversible: %w", err)
return err
}
case bstream.StepIrreversible:
p.blockStepMap[bstream.StepIrreversible]++
err = p.handleStepFinal(clock)
if err != nil {
return fmt.Errorf("handling step irreversible: %w", err)
return err
}
}

Expand Down
2 changes: 1 addition & 1 deletion pipeline/terminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (p *Pipeline) OnStreamTerminated(ctx context.Context, err error) error {
}

if err := p.stores.flushStores(ctx, p.executionStages, reqDetails.StopBlockNum); err != nil {
return fmt.Errorf("step new irr: stores end of stream: %w", err)
return fmt.Errorf("flushing stores on termination: %w", err)
}

if reqctx.Details(ctx).IsTier2Request {
Expand Down
2 changes: 1 addition & 1 deletion service/failure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Tier1Service) errorFromRecordedFailure(id string, isProductionMode bool
return nil
}

// Error: rpc error: code = InvalidArgument desc = step new irr: handler step new: execute modules: applying executor results ... store wasm call: block 300: module "store_eth_stats": wasm execution failed ...
// Error: rpc error: code = InvalidArgument desc = execute modules: applying executor results ... store wasm call: block 300: module "store_eth_stats": wasm execution failed ...
var blockFailureRE = regexp.MustCompile(`store wasm call: block ([0-9]*): module "([^"]*)"`)

func (s *Tier1Service) recordFailure(requestID string, err error) {
Expand Down

0 comments on commit 850a4f8

Please sign in to comment.