Skip to content

Commit

Permalink
Fixed a crash when eth_call batch is of length 0 and a retry is att…
Browse files Browse the repository at this point in the history
…empted.
  • Loading branch information
maoueh committed May 3, 2024
1 parent ca1c843 commit e88c0f6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 13 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See [MAINTAINERS.md](./MAINTAINERS.md)
for instructions to keep up to date.

## v2.4.9 (unreleased)
## v2.4.9

### Substreams

* Fixed a crash when `eth_call` batch is of length 0 and a retry is attempted.

* Allow stores to write to stores with out-of-order ordinals (they will be reordered at the end of the module execution for each block)

* Fix issue in substreams-tier2 causing some files to be written to the wrong place sometimes under load, resulting in some hanging requests

## v2.4.8
Expand Down
31 changes: 21 additions & 10 deletions substreams/rpccalls.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,27 @@ func (e *RPCEngine) WASMExtensions() map[string]map[string]wasm.WASMExtension {
}

func (e *RPCEngine) ETHCall(ctx context.Context, traceID string, clock *pbsubstreams.Clock, in []byte) (out []byte, err error) {
// We set `alwaysRetry` parameter to `true` here so it means `deterministic` return value will always be `true` and we can safely ignore it
out, _, err = e.ethCall(ctx, true, traceID, clock, in)
// We set `retryCount` parameter to `-1` (infinite retry) here so it means `deterministic` return value will always be `true` and we can safely ignore it
out, _, err = e.ethCall(ctx, -1, traceID, clock, in)
return out, err
}

func (e *RPCEngine) ethCall(ctx context.Context, alwaysRetry bool, traceID string, clock *pbsubstreams.Clock, in []byte) (out []byte, deterministic bool, err error) {
func (e *RPCEngine) ethCall(ctx context.Context, retryCount int, traceID string, clock *pbsubstreams.Clock, in []byte) (out []byte, deterministic bool, err error) {
calls := &pbethss.RpcCalls{}
if err := proto.Unmarshal(in, calls); err != nil {
return nil, false, fmt.Errorf("unmarshal rpc calls proto: %w", err)
}

if len(calls.Calls) == 0 {
// A empty byte slice is a valid output that will lead to 0 responses
return make([]byte, 0), false, nil
}

if err := e.validateCalls(calls); err != nil {
return nil, true, err
}

res, deterministic, err := e.rpcCalls(ctx, traceID, alwaysRetry, clock.Id, calls)
res, deterministic, err := e.rpcCalls(ctx, traceID, retryCount, clock.Id, calls)
if err != nil {
return nil, deterministic, err
}
Expand Down Expand Up @@ -196,10 +201,15 @@ func (e *RPCEngine) validateCalls(calls *pbethss.RpcCalls) (err error) {

var evmExecutionExecutionTimeoutRegex = regexp.MustCompile(`execution aborted \(timeout\s*=\s*[^\)]+\)`)

// rpcsCalls performs the RPC calls with full retry unless `alwaysRetry` is `false` in which case output is
// returned right away. If `alwaysRetry` is sets to `true` than `deterministic` will always return `true`
// and `err` will always be nil.
func (e *RPCEngine) rpcCalls(ctx context.Context, traceID string, alwaysRetry bool, blockHash string, calls *pbethss.RpcCalls) (out *pbethss.RpcResponses, deterministic bool, err error) {
// rpcsCalls performs the RPC calls retrying forever on error if `retryCount` is set to -1. If `retryCount`
// is sets to 0, no retry is attempted. If `retryCount` is > 0, it will retry `retryCount` times.
//
// If there is no retry or if partial retry, deterministic will be always `false`. Otherwise, it can only
// be `true` (since we retry either forever or until we hit a deterministic error).
//
// Note that the `retryCount` value should be set to something else than -1 only for testing purposes, production
// code paths should always set it to -1 (infinite retry).
func (e *RPCEngine) rpcCalls(ctx context.Context, traceID string, retryCount int, blockHash string, calls *pbethss.RpcCalls) (out *pbethss.RpcResponses, deterministic bool, err error) {
reqs := make([]*rpc.RPCRequest, len(calls.Calls))
for i, call := range calls.Calls {
reqs[i] = rpc.NewRawETHCall(rpc.CallParams{
Expand All @@ -222,7 +232,8 @@ func (e *RPCEngine) rpcCalls(ctx context.Context, traceID string, alwaysRetry bo

out, err := client.DoRequests(ctx, reqs)
if err != nil {
if !alwaysRetry {
// Never retry on retry attempted max count
if retryCount == 0 || (retryCount > 0 && attemptNumber > retryCount) {
return nil, false, err
}

Expand Down Expand Up @@ -254,7 +265,7 @@ func (e *RPCEngine) rpcCalls(ctx context.Context, traceID string, alwaysRetry bo
}
}

if !alwaysRetry {
if retryCount == 0 {
return toProtoResponses(out), deterministicResp, nil
}

Expand Down
76 changes: 74 additions & 2 deletions substreams/rpccalls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,79 @@ func TestRPCEngine_rpcCalls(t *testing.T) {
protoCalls, err := proto.Marshal(&pbethss.RpcCalls{Calls: []*pbethss.RpcCall{{ToAddr: address, Data: data}}})
require.NoError(t, err)

out, deterministic, err := engine.ethCall(context.Background(), false, traceID, clockBlock1, protoCalls)
out, deterministic, err := engine.ethCall(context.Background(), 0, traceID, clockBlock1, protoCalls)
require.NoError(t, err)
require.True(t, deterministic)

responses := &pbethss.RpcResponses{}
err = proto.Unmarshal(out, responses)
require.NoError(t, err)

assertProtoEqual(t, &pbethss.RpcResponses{
Responses: []*pbethss.RpcResponse{
{Raw: eth.MustNewBytes("0x0000000000000000000000000000000000000000000000000000000000000012"), Failed: false},
},
}, responses)
}

func TestRPCEngine_rpcCalls_noCallsInInput(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Fail(t, "The server should never been called")
}))

engine, err := NewRPCEngine([]string{server.URL}, 50_000_000)
require.NoError(t, err)

traceID := "someTraceID"

protoCalls, err := proto.Marshal(&pbethss.RpcCalls{Calls: []*pbethss.RpcCall{}})
require.NoError(t, err)

out, deterministic, err := engine.ethCall(context.Background(), 1, traceID, clockBlock1, protoCalls)
require.NoError(t, err)
require.False(t, deterministic)

responses := &pbethss.RpcResponses{}
err = proto.Unmarshal(out, responses)
require.NoError(t, err)

assertProtoEqual(t, &pbethss.RpcResponses{
Responses: []*pbethss.RpcResponse{},
}, responses)
}

func TestRPCEngine_rpcCalls_retry(t *testing.T) {
invokedCount := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buffer := bytes.NewBuffer(nil)
_, err := buffer.ReadFrom(r.Body)
require.NoError(t, err)

invokedCount++
if invokedCount == 1 {
// Error than retry
w.WriteHeader(http.StatusInternalServerError)
} else {
assert.Equal(t,
`[{"params":[{"to":"0xea674fdde714fd979de3edf0f56aa9716b898ec8","gas":"0x2faf080","data":"0x313ce567"},{"blockHash":"0x10155bcb0fab82ccdc5edc8577f0f608ae059f93720172d11ca0fc01438b08a5"}],"method":"eth_call","jsonrpc":"2.0","id":"0x1"}]`,
buffer.String(),
)

w.Write([]byte(`{"jsonrpc":"2.0","id":"0x1","result":"0x0000000000000000000000000000000000000000000000000000000000000012"}`))
}
}))

engine, err := NewRPCEngine([]string{server.URL}, 50_000_000)
require.NoError(t, err)

traceID := "someTraceID"
address := eth.MustNewAddress("0xea674fdde714fd979de3edf0f56aa9716b898ec8")
data := eth.MustNewMethodDef("decimals()").MethodID()

protoCalls, err := proto.Marshal(&pbethss.RpcCalls{Calls: []*pbethss.RpcCall{{ToAddr: address, Data: data}}})
require.NoError(t, err)

out, deterministic, err := engine.ethCall(context.Background(), 1, traceID, clockBlock1, protoCalls)
require.NoError(t, err)
require.True(t, deterministic)

Expand Down Expand Up @@ -159,7 +231,7 @@ func TestRPCEngine_rpcCalls_determisticErrorMessages(t *testing.T) {
protoCalls, err := proto.Marshal(&pbethss.RpcCalls{Calls: []*pbethss.RpcCall{tt.rpcCall}})
require.NoError(t, err)

out, deterministic, err := engine.ethCall(context.Background(), false, traceID, clockBlock1, protoCalls)
out, deterministic, err := engine.ethCall(context.Background(), 0, traceID, clockBlock1, protoCalls)
tt.expectedErr(t, err)
require.Equal(t, tt.wantOut.deterministic, deterministic)

Expand Down

0 comments on commit e88c0f6

Please sign in to comment.