Skip to content

Commit

Permalink
Merge pull request #6994 from onflow/petera/fix-flakiness-execsync-in…
Browse files Browse the repository at this point in the history
…tegration

[Access] Fix flakiness in ExecutionStateSync integration test
  • Loading branch information
peterargue authored Feb 7, 2025
2 parents c7514cb + cd9dbf4 commit e833530
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, bl

if err != nil {
// need custom not found handler due to blob not found error
if errors.Is(err, storage.ErrNotFound) || execution_data.IsBlobNotFoundError(err) {
if errors.Is(err, storage.ErrNotFound) || execution_data.IsBlobNotFoundError(err) || errors.Is(err, subscription.ErrBlockNotReady) {
return nil, status.Errorf(codes.NotFound, "could not find execution data: %v", err)
}

Expand Down
6 changes: 2 additions & 4 deletions integration/localnet/builder/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func prepareAccessService(container testnet.ContainerConfig, i int, n int) Servi
fmt.Sprintf("--secure-rpc-addr=%s:%s", container.ContainerName, testnet.GRPCSecurePort),
fmt.Sprintf("--http-addr=%s:%s", container.ContainerName, testnet.GRPCWebPort),
fmt.Sprintf("--rest-addr=%s:%s", container.ContainerName, testnet.RESTPort),
fmt.Sprintf("--state-stream-addr=%s:%s", container.ContainerName, testnet.ExecutionStatePort),
fmt.Sprintf("--state-stream-addr=%s:%s", container.ContainerName, testnet.GRPCPort),
fmt.Sprintf("--collection-ingress-port=%s", testnet.GRPCPort),
"--supports-observer=true",
fmt.Sprintf("--public-network-address=%s:%s", container.ContainerName, testnet.PublicNetworkPort),
Expand All @@ -443,7 +443,6 @@ func prepareAccessService(container testnet.ContainerConfig, i int, n int) Servi
testnet.GRPCSecurePort,
testnet.GRPCWebPort,
testnet.RESTPort,
testnet.ExecutionStatePort,
testnet.PublicNetworkPort,
)

Expand All @@ -466,7 +465,7 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv
fmt.Sprintf("--secure-rpc-addr=%s:%s", observerName, testnet.GRPCSecurePort),
fmt.Sprintf("--http-addr=%s:%s", observerName, testnet.GRPCWebPort),
fmt.Sprintf("--rest-addr=%s:%s", observerName, testnet.RESTPort),
fmt.Sprintf("--state-stream-addr=%s:%s", observerName, testnet.ExecutionStatePort),
fmt.Sprintf("--state-stream-addr=%s:%s", observerName, testnet.GRPCPort),
"--execution-data-dir=/data/execution-data",
"--execution-data-sync-enabled=true",
"--execution-data-indexing-enabled=true",
Expand All @@ -479,7 +478,6 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv
testnet.GRPCSecurePort,
testnet.GRPCWebPort,
testnet.RESTPort,
testnet.ExecutionStatePort,
)

// observer services rely on the access gateway
Expand Down
3 changes: 0 additions & 3 deletions integration/localnet/builder/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (a *PortAllocator) Print() {
testnet.GRPCSecurePort,
testnet.GRPCWebPort,
testnet.RESTPort,
testnet.ExecutionStatePort,
testnet.PublicNetworkPort,
} {
if hostPort, ok := a.exposedPorts[node][containerPort]; ok {
Expand All @@ -165,8 +164,6 @@ func portName(containerPort string) string {
return "GRPC-Web"
case testnet.RESTPort:
return "REST"
case testnet.ExecutionStatePort:
return "Execution Data"
case testnet.AdminPort:
return "Admin"
case testnet.PublicNetworkPort:
Expand Down
11 changes: 3 additions & 8 deletions integration/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
DefaultProfilerDir = "/data/profiler"

// GRPCPort is the GRPC API port.
// Use this same port for the ExecutionDataAPI
GRPCPort = "9000"
// GRPCSecurePort is the secure GRPC API port.
GRPCSecurePort = "9001"
Expand All @@ -100,8 +101,6 @@ const (
MetricsPort = "8080"
// AdminPort is the admin server port
AdminPort = "9002"
// ExecutionStatePort is the execution state server port
ExecutionStatePort = "9003"
// PublicNetworkPort is the access node network port accessible from outside any docker container
PublicNetworkPort = "9876"
// DebuggerPort is the go debugger port
Expand Down Expand Up @@ -797,6 +796,7 @@ func (net *FlowNetwork) AddObserver(t *testing.T, conf ObserverConfig) *Containe

nodeContainer.exposePort(GRPCPort, testingdock.RandomPort(t))
nodeContainer.AddFlag("rpc-addr", nodeContainer.ContainerAddr(GRPCPort))
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(GRPCPort))

nodeContainer.exposePort(GRPCSecurePort, testingdock.RandomPort(t))
nodeContainer.AddFlag("secure-rpc-addr", nodeContainer.ContainerAddr(GRPCSecurePort))
Expand All @@ -810,9 +810,6 @@ func (net *FlowNetwork) AddObserver(t *testing.T, conf ObserverConfig) *Containe
nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t))
nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort))

nodeContainer.exposePort(ExecutionStatePort, testingdock.RandomPort(t))
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(ExecutionStatePort))

nodeContainer.opts.HealthCheck = testingdock.HealthCheckCustom(nodeContainer.HealthcheckCallback())

suiteContainer := net.suite.Container(containerOpts)
Expand Down Expand Up @@ -910,6 +907,7 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
case flow.RoleAccess:
nodeContainer.exposePort(GRPCPort, testingdock.RandomPort(t))
nodeContainer.AddFlag("rpc-addr", nodeContainer.ContainerAddr(GRPCPort))
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(GRPCPort))

nodeContainer.exposePort(GRPCSecurePort, testingdock.RandomPort(t))
nodeContainer.AddFlag("secure-rpc-addr", nodeContainer.ContainerAddr(GRPCSecurePort))
Expand All @@ -920,9 +918,6 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t))
nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort))

nodeContainer.exposePort(ExecutionStatePort, testingdock.RandomPort(t))
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(ExecutionStatePort))

// uncomment line below to point the access node exclusively to a single collection node
// nodeContainer.AddFlag("static-collection-ingress-addr", "collection_1:9000")
nodeContainer.AddFlag("collection-ingress-port", GRPCPort)
Expand Down
109 changes: 64 additions & 45 deletions integration/tests/access/cohort3/execution_state_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@ import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/ipfs/go-datastore"
badgerds "github.com/ipfs/go-ds-badger2"
pebbleds "github.com/ipfs/go-ds-pebble"
sdk "github.com/onflow/flow-go-sdk"
sdkclient "github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow/protobuf/go/flow/entities"
"github.com/onflow/flow/protobuf/go/flow/executiondata"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/engine/ghost/client"
"github.com/onflow/flow-go/integration/testnet"
"github.com/onflow/flow-go/integration/tests/lib"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/metrics"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -158,62 +164,75 @@ func (s *ExecutionStateSyncSuite) executionStateSyncTest() {
blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized)
s.T().Logf("got block height %v ID %v", blockA.Header.Height, blockA.Header.ID())

// wait for the requested number of sealed blocks, then pause the network so we can inspect the dbs
s.BlockState.WaitForSealedHeight(s.T(), blockA.Header.Height+runBlocks)
s.net.StopContainers()

metrics := metrics.NewNoopCollector()

// start an execution data service using the Access Node's execution data db
an := s.net.ContainerByID(s.bridgeID)
anEds := s.nodeExecutionDataStore(an)

// setup storage objects needed to get the execution data id
anDB, err := an.DB()
require.NoError(s.T(), err, "could not open db")

anHeaders := storage.NewHeaders(metrics, anDB)
anResults := storage.NewExecutionResults(metrics, anDB)
// Loop through checkBlocks and verify the execution data was downloaded correctly
an := s.net.ContainerByName(testnet.PrimaryAN)
anClient, err := an.SDKClient()
require.NoError(s.T(), err, "could not get access node testnet client")

// start an execution data service using the Observer Node's execution data db
on := s.net.ContainerByName(s.observerName)
onEds := s.nodeExecutionDataStore(on)

// setup storage objects needed to get the execution data id
onDB, err := on.DB()
require.NoError(s.T(), err, "could not open db")
onClient, err := on.SDKClient()
require.NoError(s.T(), err, "could not get observer testnet client")

onHeaders := storage.NewHeaders(metrics, onDB)
onResults := storage.NewExecutionResults(metrics, onDB)
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Minute)
defer cancel()

// Loop through checkBlocks and verify the execution data was downloaded correctly
for i := blockA.Header.Height; i <= blockA.Header.Height+checkBlocks; i++ {
// access node
header, err := anHeaders.ByHeight(i)
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryAN)
anBED, err := s.executionDataForHeight(ctx, anClient, i)
require.NoError(s.T(), err, "could not get execution data from AN for height %v", i)

onBED, err := s.executionDataForHeight(ctx, onClient, i)
require.NoError(s.T(), err, "could not get execution data from ON for height %v", i)

result, err := anResults.ByBlockID(header.ID())
require.NoError(s.T(), err, "%s: could not get sealed result", testnet.PrimaryAN)
assert.Equal(s.T(), anBED.BlockID, onBED.BlockID)
}
}

ed, err := anEds.Get(s.ctx, result.ExecutionDataID)
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryAN, i) {
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryAN, i)
assert.Equal(s.T(), header.ID(), ed.BlockID)
// executionDataForHeight returns the execution data for the given height from the given node
// It retries the request until the data is available or the context is canceled
func (s *ExecutionStateSyncSuite) executionDataForHeight(ctx context.Context, nodeClient *sdkclient.Client, height uint64) (*execution_data.BlockExecutionData, error) {
execDataClient := nodeClient.ExecutionDataRPCClient()

var header *sdk.BlockHeader
s.Require().NoError(retryNotFound(ctx, 200*time.Millisecond, func() error {
var err error
header, err = nodeClient.GetBlockHeaderByHeight(s.ctx, height)
return err
}), "could not get block header for block %d", height)

var blockED *execution_data.BlockExecutionData
s.Require().NoError(retryNotFound(ctx, 200*time.Millisecond, func() error {
ed, err := execDataClient.GetExecutionDataByBlockID(s.ctx, &executiondata.GetExecutionDataByBlockIDRequest{
BlockId: header.ID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
})
if err != nil {
return err
}

// observer node
header, err = onHeaders.ByHeight(i)
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryON)
blockED, err = convert.MessageToBlockExecutionData(ed.GetBlockExecutionData(), flow.Localnet.Chain())
s.Require().NoError(err, "could not convert execution data")

return err
}), "could not get execution data for block %d", height)

result, err = onResults.ByID(result.ID())
require.NoError(s.T(), err, "%s: could not get sealed result from ON`s storage", testnet.PrimaryON)
return blockED, nil
}

ed, err = onEds.Get(s.ctx, result.ExecutionDataID)
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryON, i) {
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryON, i)
assert.Equal(s.T(), header.ID(), ed.BlockID)
// retryNotFound retries the given function until it returns an error that is not NotFound or the context is canceled
func retryNotFound(ctx context.Context, delay time.Duration, f func() error) error {
for ctx.Err() == nil {
err := f()
if status.Code(err) == codes.NotFound {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
continue
}
return err
}
return ctx.Err()
}

func (s *ExecutionStateSyncSuite) nodeExecutionDataStore(node *testnet.Container) execution_data.ExecutionDataStore {
Expand Down
26 changes: 6 additions & 20 deletions integration/tests/access/cohort4/execution_data_pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import (
"time"

badgerds "github.com/ipfs/go-ds-badger2"
sdk "github.com/onflow/flow-go-sdk"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

sdk "github.com/onflow/flow-go-sdk"

"github.com/onflow/flow-go/integration/testnet"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -137,17 +134,6 @@ func (s *ExecutionDataPruningSuite) SetupTest() {
s.net.Start(s.ctx)
}

// getGRPCClient is the helper func to create an access api client
func (s *ExecutionDataPruningSuite) getGRPCClient(address string) (accessproto.AccessAPIClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}

client := accessproto.NewAccessAPIClient(conn)
return client, nil
}

// TestHappyPath tests the execution data pruning process in a happy path scenario.
// The test follows these steps:
//
Expand Down Expand Up @@ -197,17 +183,17 @@ func (s *ExecutionDataPruningSuite) TestHappyPath() {
func (s *ExecutionDataPruningSuite) waitUntilExecutionDataForBlockIndexed(waitingBlockHeight uint64) {
observerNode := s.net.ContainerByName(s.observerNodeName)

grpcClient, err := s.getGRPCClient(observerNode.Addr(testnet.GRPCPort))
sdkClient, err := observerNode.SDKClient()
s.Require().NoError(err)

// creating execution data api client
client, err := getClient(fmt.Sprintf("localhost:%s", observerNode.Port(testnet.ExecutionStatePort)))
s.Require().NoError(err)
accessClient := sdkClient.RPCClient()
execClient := sdkClient.ExecutionDataRPCClient()

// pause until the observer node start indexing blocks,
// getting events from 1-nd block to make sure that 1-st block already indexed, and we can start subscribing
s.Require().Eventually(func() bool {
_, err := grpcClient.GetEventsForHeightRange(s.ctx, &accessproto.GetEventsForHeightRangeRequest{
_, err := accessClient.GetEventsForHeightRange(s.ctx, &accessproto.GetEventsForHeightRangeRequest{
Type: sdk.EventAccountCreated,
StartHeight: 1,
EndHeight: 1,
Expand All @@ -220,7 +206,7 @@ func (s *ExecutionDataPruningSuite) waitUntilExecutionDataForBlockIndexed(waitin
// subscribe on events till waitingBlockHeight to make sure that execution data for block indexed till waitingBlockHeight and pruner
// pruned execution data at least once
// SubscribeEventsFromStartHeight used as subscription here because we need to make sure that execution data are already indexed
stream, err := client.SubscribeEventsFromStartHeight(s.ctx, &executiondata.SubscribeEventsFromStartHeightRequest{
stream, err := execClient.SubscribeEventsFromStartHeight(s.ctx, &executiondata.SubscribeEventsFromStartHeightRequest{
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
Filter: &executiondata.EventFilter{},
HeartbeatInterval: 1,
Expand Down
33 changes: 11 additions & 22 deletions integration/tests/access/cohort4/grpc_state_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import (
"sync"
"testing"

jsoncdc "github.com/onflow/cadence/encoding/json"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

jsoncdc "github.com/onflow/cadence/encoding/json"

"github.com/onflow/flow-go-sdk/test"

Expand Down Expand Up @@ -168,17 +165,14 @@ func (s *GrpcStateStreamSuite) Ghost() *client.GhostClient {
// TestRestEventStreaming tests gRPC event streaming
func (s *GrpcStateStreamSuite) TestHappyPath() {
unittest.SkipUnless(s.T(), unittest.TEST_FLAKY, "flaky tests: https://github.com/onflow/flow-go/issues/5825")
testANURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryAN).Port(testnet.ExecutionStatePort))
sdkClientTestAN, err := getClient(testANURL)
s.Require().NoError(err)
testAN := s.net.ContainerByName(testnet.PrimaryAN)
sdkClientTestAN := getClient(s.T(), testAN)

controlANURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName("access_2").Port(testnet.ExecutionStatePort))
sdkClientControlAN, err := getClient(controlANURL)
s.Require().NoError(err)
controlAN := s.net.ContainerByName("access_2")
sdkClientControlAN := getClient(s.T(), controlAN)

testONURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryON).Port(testnet.ExecutionStatePort))
sdkClientTestON, err := getClient(testONURL)
s.Require().NoError(err)
testON := s.net.ContainerByName(testnet.PrimaryON)
sdkClientTestON := getClient(s.T(), testON)

// get the first block height
currentFinalized := s.BlockState.HighestFinalizedHeight()
Expand Down Expand Up @@ -471,15 +465,10 @@ func compareEvents(t *testing.T, controlData, testData *SubscribeEventsResponse)
}
}

// TODO: switch to SDK versions once crypto library is fixed to support the latest SDK version

func getClient(address string) (executiondata.ExecutionDataAPIClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}

return executiondata.NewExecutionDataAPIClient(conn), nil
func getClient(t *testing.T, node *testnet.Container) executiondata.ExecutionDataAPIClient {
accessClient, err := node.SDKClient()
require.NoError(t, err, "could not get access client")
return accessClient.ExecutionDataRPCClient()
}

func SubscribeHandler[T any, V any](
Expand Down
Loading

0 comments on commit e833530

Please sign in to comment.