diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 6221dd64..462e6c7d 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -40,5 +40,13 @@ jobs: - name: Checkout awm-relayer repository uses: actions/checkout@v4 - - name: Run E2E Tests - run: AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh + - name: Install Forge and Run E2E Tests + # Forge installs to BASE_DIR, but updates the PATH definition in $HOME/.bashrc + run: | + BASE_DIR=${XDG_CONFIG_HOME:-$HOME} + curl -L https://foundry.paradigm.xyz | bash + source $HOME/.bashrc + $BASE_DIR/.foundry/bin/foundryup + export PATH="$PATH:$BASE_DIR/.foundry/bin" + export PATH="$PATH:$GOPATH/bin" + AVALANCHEGO_BUILD_PATH=/tmp/e2e-test/avalanchego DATA_DIR=/tmp/e2e-test/data ./scripts/e2e_test.sh diff --git a/config/config.go b/config/config.go index 783e11be..483b08d2 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,9 @@ import ( "github.com/spf13/viper" ) +// global config singleton +var globalConfig Config + const ( relayerPrivateKeyBytes = 32 accountPrivateKeyEnvVarName = "ACCOUNT_PRIVATE_KEY" @@ -77,7 +80,7 @@ func SetDefaultConfigValues(v *viper.Viper) { v.SetDefault(StorageLocationKey, "./.awm-relayer-storage") } -// BuildConfig constructs the relayer config using Viper. +// BuildConfig constructs the relayer config using Viper. Also sets the global Config singleton // The following precedence order is used. Each item takes precedence over the item below it: // 1. Flags // 2. Environment variables @@ -152,6 +155,8 @@ func BuildConfig(v *viper.Viper) (Config, bool, error) { } cfg.PChainAPIURL = pChainapiUrl + globalConfig = cfg + return cfg, optionOverwritten, nil } @@ -232,6 +237,8 @@ func (s *SourceSubnet) Validate() error { return fmt.Errorf("invalid message contract address in EVM source subnet: %s", messageContractAddress) } } + case EVM_BLOCKHASH: + // No additional validation required default: return fmt.Errorf("unsupported VM type for source subnet: %v", s.VM) } @@ -373,3 +380,8 @@ func (s *DestinationSubnet) GetRelayerAccountInfo() (*ecdsa.PrivateKey, common.A pkBytes = append(pkBytes, pk.PublicKey.Y.Bytes()...) return pk, common.BytesToAddress(crypto.Keccak256(pkBytes)), nil } + +// Global Config singleton getters +func GetNetworkID() uint32 { + return globalConfig.NetworkID +} diff --git a/config/types.go b/config/types.go index 1edd98ad..2c32babd 100644 --- a/config/types.go +++ b/config/types.go @@ -9,12 +9,15 @@ type VM int const ( UNKNOWN_VM VM = iota EVM + EVM_BLOCKHASH ) func (vm VM) String() string { switch vm { case EVM: return "evm" + case EVM_BLOCKHASH: + return "evm_blockhash" default: return "unknown" } @@ -25,6 +28,8 @@ func ParseVM(vm string) VM { switch vm { case "evm": return EVM + case "evm_blockhash": + return EVM_BLOCKHASH default: return UNKNOWN_VM } @@ -36,12 +41,15 @@ type MessageProtocol int const ( UNKNOWN_MESSAGE_PROTOCOL MessageProtocol = iota TELEPORTER + BLOCK_HASH_PUBLISHER ) func (msg MessageProtocol) String() string { switch msg { case TELEPORTER: return "teleporter" + case BLOCK_HASH_PUBLISHER: + return "block_hash_publisher" default: return "unknown" } @@ -52,6 +60,8 @@ func ParseMessageProtocol(msg string) MessageProtocol { switch msg { case "teleporter": return TELEPORTER + case "block_hash_publisher": + return BLOCK_HASH_PUBLISHER default: return UNKNOWN_MESSAGE_PROTOCOL } diff --git a/go.mod b/go.mod index 1c7200ac..8087dea7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/ava-labs/avalanche-network-runner v1.7.2 github.com/ava-labs/avalanchego v1.10.10 github.com/ava-labs/subnet-evm v0.5.6 - github.com/ava-labs/teleporter v0.0.0-20231004221622-b655bfe85981 + github.com/ava-labs/teleporter v0.0.0-20231019213726-050fbd0d6992 github.com/ethereum/go-ethereum v1.12.0 github.com/onsi/ginkgo/v2 v2.12.0 github.com/onsi/gomega v1.27.10 diff --git a/go.sum b/go.sum index 02a63318..9feeb939 100644 --- a/go.sum +++ b/go.sum @@ -69,12 +69,18 @@ github.com/ava-labs/coreth v0.12.5-rc.6 h1:OajGUyKkO5Q82XSuMa8T5UD6QywtCHUiZ4Tv3 github.com/ava-labs/coreth v0.12.5-rc.6/go.mod h1:s5wVyy+5UCCk2m0Tq3jVmy0UqOpKBDYqRE13gInCJVs= github.com/ava-labs/subnet-evm v0.5.6 h1:u+xBvEExOa362Up02hgSgeF+aqDona57whhRIeEIim8= github.com/ava-labs/subnet-evm v0.5.6/go.mod h1:desGY3ghT+Ner+oqxeovwF37eM4dmMQbYZECONPQU9w= -github.com/ava-labs/teleporter v0.0.0-20231002210825-d5f6d7f8583a h1:KOj9SYdVCK736YyklPqBgOrlMP7JfN5L88mOl2dPj88= -github.com/ava-labs/teleporter v0.0.0-20231002210825-d5f6d7f8583a/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= -github.com/ava-labs/teleporter v0.0.0-20231004214459-caa08b68d3b4 h1:1gYjG8pi1EnHBZbuXPZKwlzZ6UztlCcaif9o9+CG6K0= -github.com/ava-labs/teleporter v0.0.0-20231004214459-caa08b68d3b4/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= -github.com/ava-labs/teleporter v0.0.0-20231004221622-b655bfe85981 h1:beCzXayPAc9obFYmPyF7WOfyPCIYWThUIB9Jcxbtgq0= -github.com/ava-labs/teleporter v0.0.0-20231004221622-b655bfe85981/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231005141349-feb8fe1523b6 h1:0QOZ/xhAxbP/tTrsTTnufJgC/ZKZPGGzY0SrziM9txQ= +github.com/ava-labs/teleporter v0.0.0-20231005141349-feb8fe1523b6/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231005194159-a8e949bd37c9 h1:Rwrhf+GXl9MX6VX1WQXVGl6JQ6OD4xOI7RMdAQSlzGc= +github.com/ava-labs/teleporter v0.0.0-20231005194159-a8e949bd37c9/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231005202233-cab06cef8e05 h1:qqTnNm4RHxHiErd1LGfJO7lcTgDJYDdnQOofJMEWX/E= +github.com/ava-labs/teleporter v0.0.0-20231005202233-cab06cef8e05/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231013203557-0810228e758d h1:BoxSc7CGlddZuX/DrE6aB3RjaqtP0baRAk0noI3HGbA= +github.com/ava-labs/teleporter v0.0.0-20231013203557-0810228e758d/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231019213504-d5b8203b4952 h1:ovJAh7Mzs/VGKSkzOFeb+lVF7sp5QmEty44HiWUcDv0= +github.com/ava-labs/teleporter v0.0.0-20231019213504-d5b8203b4952/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= +github.com/ava-labs/teleporter v0.0.0-20231019213726-050fbd0d6992 h1:T5DdAjXJ5tubynurSkjLVasdFCfSbGUishqhtkK9Xsg= +github.com/ava-labs/teleporter v0.0.0-20231019213726-050fbd0d6992/go.mod h1:nyjuYCBefAGCkKDhmJIxh8iTAczapQxwxj/7FC4g9sU= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/main/main.go b/main/main.go index c9ef56d5..333c3603 100644 --- a/main/main.go +++ b/main/main.go @@ -229,7 +229,7 @@ func runRelayer(logger logging.Logger, select { case txLog := <-subscriber.Logs(): logger.Info( - "Handling Teleporter submit message log.", + "Handling message log.", zap.String("txId", hex.EncodeToString(txLog.SourceTxID)), zap.String("originChainId", sourceSubnetInfo.ChainID), zap.String("destinationChainId", txLog.DestinationChainID.String()), diff --git a/messages/block_hash_publisher/config.go b/messages/block_hash_publisher/config.go new file mode 100644 index 00000000..fb813f6e --- /dev/null +++ b/messages/block_hash_publisher/config.go @@ -0,0 +1,83 @@ +package block_hash_publisher + +import ( + "encoding/hex" + "fmt" + "strconv" + "strings" + + "github.com/ava-labs/avalanchego/ids" + "github.com/pkg/errors" +) + +type destinationInfo struct { + ChainID string `json:"chain-id"` + Address string `json:"address"` + Interval string `json:"interval"` + + useTimeInterval bool + blockInterval uint64 + timeIntervalSeconds uint64 +} + +type Config struct { + DestinationChains []destinationInfo `json:"destination-chains"` +} + +func (c *Config) Validate() error { + for i, destinationInfo := range c.DestinationChains { + // Check if the chainID is valid + if _, err := ids.FromString(destinationInfo.ChainID); err != nil { + return errors.Wrap(err, fmt.Sprintf("invalid chainID in block hash publisher configuration. Provided ID: %s", destinationInfo.ChainID)) + } + + // Check if the address is valid + addr := destinationInfo.Address + if addr == "" { + return errors.New("empty address in block hash publisher configuration") + } + addr = strings.TrimPrefix(addr, "0x") + _, err := hex.DecodeString(addr) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("invalid address in block hash publisher configuration. Provided address: %s", destinationInfo.Address)) + } + + // Intervals must be either a positive integer, or a positive integer followed by "s" + interval, isSeconds, err := parseIntervalWithSuffix(destinationInfo.Interval) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("invalid interval in block hash publisher configuration. Provided interval: %s", destinationInfo.Interval)) + } + if isSeconds { + c.DestinationChains[i].timeIntervalSeconds = interval + } else { + c.DestinationChains[i].blockInterval = interval + } + c.DestinationChains[i].useTimeInterval = isSeconds + } + return nil +} + +func parseIntervalWithSuffix(input string) (uint64, bool, error) { + // Check if the input string is empty + if input == "" { + return 0, false, fmt.Errorf("empty string") + } + + // Check if the string ends with "s" + hasSuffix := strings.HasSuffix(input, "s") + + // If it has the "s" suffix, remove it + if hasSuffix { + input = input[:len(input)-1] + } + + // Parse the string as an integer + intValue, err := strconv.Atoi(input) + + // Check if the parsed value is a positive integer + if err != nil || intValue < 0 { + return 0, false, err + } + + return uint64(intValue), hasSuffix, nil +} diff --git a/messages/block_hash_publisher/config_test.go b/messages/block_hash_publisher/config_test.go new file mode 100644 index 00000000..8a0e8eeb --- /dev/null +++ b/messages/block_hash_publisher/config_test.go @@ -0,0 +1,128 @@ +package block_hash_publisher + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +type testResult struct { + isTimeInterval bool + blockInterval uint64 + timeIntervalSeconds uint64 +} + +func TestConfigValidate(t *testing.T) { + testCases := []struct { + name string + destinationChains []destinationInfo + isError bool + testResults []testResult // indexes correspond to destinationChains + }{ + { + name: "valid", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "10", + }, + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "10s", + }, + }, + isError: false, + testResults: []testResult{ + { + isTimeInterval: false, + blockInterval: 10, + }, + { + isTimeInterval: true, + timeIntervalSeconds: 10, + }, + }, + }, + { + name: "invalid chainID", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW7", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "10", + }, + }, + isError: true, + }, + { + name: "invalid interval 1", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "4r", + }, + }, + isError: true, + }, + { + name: "invalid interval 2", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "l", + }, + }, + isError: true, + }, + { + name: "invalid interval 3", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed8635", + Interval: "", + }, + }, + isError: true, + }, + { + name: "invalid address", + destinationChains: []destinationInfo{ + { + ChainID: "9asUA3QckLh7vGnFQiiUJGPTx8KE4nFtP8c1wTWJuP8XiWW75", + Address: "0x50A46AA7b2eCBe2B1AbB7df865B9A87f5eed863", + Interval: "10", + }, + }, + isError: true, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + c := &Config{ + DestinationChains: test.destinationChains, + } + err := c.Validate() + fmt.Println(c) + if test.isError { + require.Error(t, err) + } else { + require.NoError(t, err) + for i, result := range test.testResults { + require.Equal(t, result.isTimeInterval, c.DestinationChains[i].useTimeInterval) + if result.isTimeInterval { + require.Equal(t, result.timeIntervalSeconds, c.DestinationChains[i].timeIntervalSeconds) + } else { + require.Equal(t, result.blockInterval, c.DestinationChains[i].blockInterval) + } + } + } + }) + } +} diff --git a/messages/block_hash_publisher/message_manager.go b/messages/block_hash_publisher/message_manager.go new file mode 100644 index 00000000..1bf484fb --- /dev/null +++ b/messages/block_hash_publisher/message_manager.go @@ -0,0 +1,170 @@ +package block_hash_publisher + +import ( + "encoding/json" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/vms" + "github.com/ava-labs/awm-relayer/vms/vmtypes" + teleporter_block_hash "github.com/ava-labs/teleporter/abi-bindings/Teleporter/TeleporterBlockHashReceiver" + "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" +) + +const ( + publishBlockHashGasLimit = 275000 +) + +type destinationSenderInfo struct { + client vms.DestinationClient + address common.Address + + useTimeInterval bool + timeIntervalSeconds uint64 + blockInterval uint64 + + lastTimeSent uint64 + lastBlock uint64 +} + +func (d *destinationSenderInfo) shouldSend(blockTimestamp uint64, blockNumber uint64) bool { + if d.useTimeInterval { + interval := d.timeIntervalSeconds + if time.Unix(int64(blockTimestamp), 0).Sub(time.Unix(int64(d.lastTimeSent), 0)) < (time.Duration(interval) * time.Second) { + return false + } + } else { + interval := d.blockInterval + if blockNumber-d.lastBlock < interval { + return false + } + } + return true +} + +type messageManager struct { + destinations map[ids.ID]*destinationSenderInfo + logger logging.Logger +} + +func NewMessageManager( + logger logging.Logger, + messageProtocolAddress common.Hash, + messageProtocolConfig config.MessageProtocolConfig, + destinationClients map[ids.ID]vms.DestinationClient, +) (*messageManager, error) { + // Marshal the map and unmarshal into the Block Hash Publisher config + data, err := json.Marshal(messageProtocolConfig.Settings) + if err != nil { + logger.Error("Failed to marshal Block Hash Publisher config") + return nil, err + } + var messageConfig Config + if err := json.Unmarshal(data, &messageConfig); err != nil { + logger.Error("Failed to unmarshal Block Hash Publisher config") + return nil, err + } + + if err := messageConfig.Validate(); err != nil { + logger.Error( + "Invalid Block Hash Publisher config.", + zap.Error(err), + ) + return nil, err + } + + destinations := make(map[ids.ID]*destinationSenderInfo) + for _, destination := range messageConfig.DestinationChains { + destinationID, err := ids.FromString(destination.ChainID) + if err != nil { + logger.Error( + "Failed to decode base-58 encoded destination chain ID", + zap.Error(err), + ) + return nil, err + } + destinations[destinationID] = &destinationSenderInfo{ + useTimeInterval: destination.useTimeInterval, + timeIntervalSeconds: destination.timeIntervalSeconds, + blockInterval: destination.blockInterval, + address: common.HexToAddress(destination.Address), + client: destinationClients[destinationID], + } + } + + return &messageManager{ + destinations: destinations, + logger: logger, + }, nil +} + +// ShouldSendMessage returns true if the message should be sent to ANY of the configured destination chains +// This saves us from having to aggregate signatures in that case. Decisions about which destination chains to send to are made in SendMessage +func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, _ ids.ID) (bool, error) { + // TODO: Handle the primary network case. If it's the primary network, only check the passed in destinationChainID + for _, destination := range m.destinations { + if destination.shouldSend(warpMessageInfo.BlockTimestamp, warpMessageInfo.BlockNumber) { + return true, nil + } + } + return false, nil +} + +func (m *messageManager) SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, _ ids.ID) error { + // TODO: Handle the primary network case. If it's the primary network, only send to the passed in destinationChainID + for destinationChainID, destination := range m.destinations { + if !destination.shouldSend(warpMessageInfo.BlockTimestamp, warpMessageInfo.BlockNumber) { + m.logger.Debug( + "Not sending message to destination chain", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + continue + } + + m.logger.Info( + "Sending message to destination chain", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + // Construct the transaction call data to call the receive cross chain message method of the receiver precompile. + callData, err := teleporter_block_hash.PackReceiveBlockHash(0) + if err != nil { + m.logger.Error( + "Failed packing receiveCrossChainMessage call data", + zap.Error(err), + zap.Uint32("messageIndex", 0), + zap.String("sourceChainID", signedMessage.SourceChainID.String()), + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + return err + } + + // Get the correct destination client from the global map + err = destination.client.SendTx(signedMessage, destination.address.Hex(), publishBlockHashGasLimit, callData) + if err != nil { + m.logger.Error( + "Failed to send tx.", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.Error(err), + ) + return err + } + + // Set the last sent block/time + destination.lastTimeSent = warpMessageInfo.BlockTimestamp + destination.lastBlock = warpMessageInfo.BlockNumber + m.logger.Info( + "Sent message to destination chain", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + } + return nil +} diff --git a/messages/block_hash_publisher/message_manager_test.go b/messages/block_hash_publisher/message_manager_test.go new file mode 100644 index 00000000..cc6407ca --- /dev/null +++ b/messages/block_hash_publisher/message_manager_test.go @@ -0,0 +1,120 @@ +package block_hash_publisher + +import ( + "testing" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/awm-relayer/vms/vmtypes" + "github.com/stretchr/testify/require" +) + +func TestShouldSendMessage(t *testing.T) { + testCases := []struct { + name string + chainID ids.ID + destination destinationSenderInfo + warpMessageInfo vmtypes.WarpMessageInfo + expectedError bool + expectedResult bool + }{ + { + name: "should send (time)", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: true, + timeIntervalSeconds: 10, + lastTimeSent: uint64(time.Unix(100, 0).Unix()), + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockTimestamp: uint64(time.Unix(111, 0).Unix()), + }, + expectedError: false, + expectedResult: true, + }, + { + name: "should send (block)", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: false, + blockInterval: 5, + lastBlock: 100, + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockNumber: 106, + }, + expectedError: false, + expectedResult: true, + }, + { + name: "should send (time) 2", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: true, + timeIntervalSeconds: 10, + lastTimeSent: uint64(time.Unix(100, 0).Unix()), + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockTimestamp: uint64(time.Unix(110, 0).Unix()), + }, + expectedError: false, + expectedResult: true, + }, + { + name: "should send (block) 2", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: false, + blockInterval: 5, + lastBlock: 100, + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockNumber: 105, + }, + expectedError: false, + expectedResult: true, + }, + { + name: "should not send (time)", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: true, + timeIntervalSeconds: 10, + lastTimeSent: uint64(time.Unix(100, 0).Unix()), + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockTimestamp: uint64(time.Unix(109, 0).Unix()), + }, + expectedError: false, + expectedResult: false, + }, + { + name: "should not send (block)", + chainID: ids.GenerateTestID(), + destination: destinationSenderInfo{ + useTimeInterval: false, + blockInterval: 5, + lastBlock: 100, + }, + warpMessageInfo: vmtypes.WarpMessageInfo{ + BlockNumber: 104, + }, + expectedError: false, + expectedResult: false, + }, + } + for _, testCase := range testCases { + messageManager := &messageManager{ + destinations: map[ids.ID]*destinationSenderInfo{ + testCase.chainID: &testCase.destination, + }, + } + result, err := messageManager.ShouldSendMessage(&testCase.warpMessageInfo, testCase.chainID) + if testCase.expectedError { + require.Error(t, err, testCase.name) + } else { + require.NoError(t, err, testCase.name) + require.Equal(t, testCase.expectedResult, result, testCase.name) + } + } +} diff --git a/messages/message_manager.go b/messages/message_manager.go index 00251353..93b201a8 100644 --- a/messages/message_manager.go +++ b/messages/message_manager.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/messages/block_hash_publisher" "github.com/ava-labs/awm-relayer/messages/teleporter" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/awm-relayer/vms/vmtypes" @@ -26,7 +27,7 @@ type MessageManager interface { ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) // SendMessage sends the signed message to the destination chain. The payload parsed according to // the VM rules is also passed in, since MessageManager does not assume any particular VM - SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error + SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) error } // NewMessageManager constructs a MessageManager for a particular message protocol, defined by the message protocol address and config @@ -40,7 +41,15 @@ func NewMessageManager( format := messageProtocolConfig.MessageFormat switch config.ParseMessageProtocol(format) { case config.TELEPORTER: - return teleporter.NewMessageManager(logger, + return teleporter.NewMessageManager( + logger, + messageProtocolAddress, + messageProtocolConfig, + destinationClients, + ) + case config.BLOCK_HASH_PUBLISHER: + return block_hash_publisher.NewMessageManager( + logger, messageProtocolAddress, messageProtocolConfig, destinationClients, diff --git a/messages/teleporter/message_manager.go b/messages/teleporter/message_manager.go index d6334bbf..0f8259e8 100644 --- a/messages/teleporter/message_manager.go +++ b/messages/teleporter/message_manager.go @@ -206,7 +206,7 @@ func (m *messageManager) messageDelivered( // SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract, // and dispatches transaction construction and broadcast to the destination client -func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error { +func (m *messageManager) SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) error { teleporterMessage, ok := m.teleporterMessageCache.Get(signedMessage.ID()) if !ok { m.logger.Debug( @@ -215,7 +215,7 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa zap.String("warpMessageID", signedMessage.ID().String()), ) var err error - teleporterMessage, err = UnpackTeleporterMessage(parsedVmPayload) + teleporterMessage, err = UnpackTeleporterMessage(warpMessageInfo.WarpPayload) if err != nil { m.logger.Error( "Failed unpacking teleporter message.", diff --git a/relayer/message_relayer.go b/relayer/message_relayer.go index a88ca365..ed1eb5cb 100644 --- a/relayer/message_relayer.go +++ b/relayer/message_relayer.go @@ -48,7 +48,7 @@ var ( // Each messageRelayer runs in its own goroutine. type messageRelayer struct { relayer *Relayer - warpMessage *warp.UnsignedMessage + warpMessageInfo *vmtypes.WarpMessageInfo destinationChainID ids.ID messageResponseChan chan message.InboundMessage logger logging.Logger @@ -60,14 +60,14 @@ func newMessageRelayer( logger logging.Logger, metrics *MessageRelayerMetrics, relayer *Relayer, - warpMessage *warp.UnsignedMessage, + warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID, messageResponseChan chan message.InboundMessage, messageCreator message.Creator, ) *messageRelayer { return &messageRelayer{ relayer: relayer, - warpMessage: warpMessage, + warpMessageInfo: warpMessageInfo, destinationChainID: destinationChainID, messageResponseChan: messageResponseChan, logger: logger, @@ -76,8 +76,8 @@ func newMessageRelayer( } } -func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, requestID uint32, messageManager messages.MessageManager) error { - shouldSend, err := messageManager.ShouldSendMessage(warpMessageInfo, r.destinationChainID) +func (r *messageRelayer) relayMessage(requestID uint32, messageManager messages.MessageManager) error { + shouldSend, err := messageManager.ShouldSendMessage(r.warpMessageInfo, r.destinationChainID) if err != nil { r.logger.Error( "Failed to check if message should be sent", @@ -108,7 +108,7 @@ func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, // create signed message latency (ms) r.setCreateSignedMessageLatencyMS(float64(time.Since(startCreateSignedMessageTime).Milliseconds())) - err = messageManager.SendMessage(signedMessage, warpMessageInfo.WarpPayload, r.destinationChainID) + err = messageManager.SendMessage(signedMessage, r.warpMessageInfo, r.destinationChainID) if err != nil { r.logger.Error( "Failed to send warp message", @@ -119,7 +119,6 @@ func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, } r.logger.Info( "Finished relaying message to destination chain", - zap.String("destinationChainID", r.destinationChainID.String()), ) r.incSuccessfulRelayMessageCount() return nil @@ -129,9 +128,10 @@ func (r *messageRelayer) relayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, error) { r.logger.Info( "Starting relayer routine", - zap.String("destinationChainID", r.destinationChainID.String()), ) + sourceChainID := r.warpMessageInfo.WarpUnsignedMessage.SourceChainID + // Get the current canonical validator set of the source subnet. validatorSet, totalValidatorWeight, err := r.getCurrentCanonicalValidatorSet() if err != nil { @@ -174,19 +174,18 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e // Construct the request req := msg.SignatureRequest{ - MessageID: r.warpMessage.ID(), + MessageID: r.warpMessageInfo.WarpUnsignedMessage.ID(), } reqBytes, err := msg.RequestToBytes(codec, req) if err != nil { r.logger.Error( "Failed to marshal request bytes", - zap.String("destinationChainID", r.destinationChainID.String()), zap.Error(err), ) return nil, err } - outMsg, err := r.messageCreator.AppRequest(r.warpMessage.SourceChainID, requestID, peers.DefaultAppRequestTimeout, reqBytes) + outMsg, err := r.messageCreator.AppRequest(sourceChainID, requestID, peers.DefaultAppRequestTimeout, reqBytes) if err != nil { r.logger.Error( "Failed to create app request message", @@ -205,7 +204,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e r.logger.Debug( "Relayer collecting signatures from peers.", zap.Int("attempt", attempt), - zap.String("destinationChainID", r.destinationChainID.String()), zap.Int("validatorSetSize", len(validatorSet)), zap.Int("signatureMapSize", len(signatureMap)), zap.Int("responsesExpected", responsesExpected), @@ -229,8 +227,8 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e // Register a timeout response for each queried node reqID := ids.RequestID{ NodeID: nodeID, - SourceChainID: r.warpMessage.SourceChainID, - DestinationChainID: r.warpMessage.SourceChainID, + SourceChainID: sourceChainID, + DestinationChainID: sourceChainID, RequestID: requestID, Op: byte(message.AppResponseOp), } @@ -316,13 +314,12 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e if err != nil { r.logger.Error( "Failed to aggregate signature.", - zap.String("destinationChainID", r.destinationChainID.String()), zap.Error(err), ) return nil, err } - signedMsg, err := warp.NewMessage(r.warpMessage, &warp.BitSetSignature{ + signedMsg, err := warp.NewMessage(r.warpMessageInfo.WarpUnsignedMessage, &warp.BitSetSignature{ Signers: vdrBitSet.Bytes(), Signature: *(*[bls.SignatureLen]byte)(bls.SignatureToBytes(aggSig)), }) @@ -345,7 +342,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e if signedMsg != nil { r.logger.Info( "Created signed message.", - zap.String("destinationChainID", r.destinationChainID.String()), ) return signedMsg, nil } @@ -365,7 +361,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e r.logger.Warn( "Failed to collect a threshold of signatures", zap.Int("attempts", maxRelayerQueryAttempts), - zap.String("destinationChainID", r.destinationChainID.String()), ) return nil, errNotEnoughSignatures } @@ -381,7 +376,6 @@ func (r *messageRelayer) getCurrentCanonicalValidatorSet() ([]*warp.Validator, u if err != nil { r.logger.Error( "failed to get validating subnet for destination chain", - zap.String("destinationChainID", r.destinationChainID.String()), zap.Error(err), ) return nil, 0, err @@ -457,7 +451,6 @@ func (r *messageRelayer) isValidSignatureResponse( r.logger.Debug( "Response contained an empty signature", zap.String("nodeID", response.NodeID().String()), - zap.String("destinationChainID", r.destinationChainID.String()), ) return blsSignatureBuf{}, false } @@ -466,15 +459,13 @@ func (r *messageRelayer) isValidSignatureResponse( if err != nil { r.logger.Debug( "Failed to create signature from response", - zap.String("destinationChainID", r.destinationChainID.String()), ) return blsSignatureBuf{}, false } - if !bls.Verify(pubKey, sig, r.warpMessage.Bytes()) { + if !bls.Verify(pubKey, sig, r.warpMessageInfo.WarpUnsignedMessage.Bytes()) { r.logger.Debug( "Failed verification for signature", - zap.String("destinationChainID", r.destinationChainID.String()), ) return blsSignatureBuf{}, false } @@ -516,7 +507,6 @@ func (r *messageRelayer) aggregateSignatures(signatureMap map[int]blsSignatureBu func (r *messageRelayer) incSuccessfulRelayMessageCount() { r.metrics.successfulRelayMessageCount. WithLabelValues( - r.destinationChainID.String(), r.relayer.sourceChainID.String(), r.relayer.sourceSubnetID.String()).Inc() } @@ -524,7 +514,6 @@ func (r *messageRelayer) incSuccessfulRelayMessageCount() { func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) { r.metrics.failedRelayMessageCount. WithLabelValues( - r.destinationChainID.String(), r.relayer.sourceChainID.String(), r.relayer.sourceSubnetID.String(), failureReason).Inc() @@ -533,7 +522,6 @@ func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) { func (r *messageRelayer) setCreateSignedMessageLatencyMS(latency float64) { r.metrics.createSignedMessageLatencyMS. WithLabelValues( - r.destinationChainID.String(), r.relayer.sourceChainID.String(), r.relayer.sourceSubnetID.String()).Set(latency) } diff --git a/relayer/message_relayer_metrics.go b/relayer/message_relayer_metrics.go index 93a6fb89..bfe614a3 100644 --- a/relayer/message_relayer_metrics.go +++ b/relayer/message_relayer_metrics.go @@ -17,7 +17,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM Name: "successful_relay_message_count", Help: "Number of messages that relayed successfully", }, - []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + []string{"source_chain_id", "source_subnet_id"}, ) registerer.MustRegister(successfulRelayMessageCount) @@ -26,7 +26,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM Name: "create_signed_message_latency_ms", Help: "Latency of creating a signed message in milliseconds", }, - []string{"destination_chain_id", "source_chain_id", "source_subnet_id"}, + []string{"source_chain_id", "source_subnet_id"}, ) registerer.MustRegister(createSignedMessageLatencyMS) @@ -35,7 +35,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM Name: "failed_relay_message_count", Help: "Number of messages that failed to relay", }, - []string{"destination_chain_id", "source_chain_id", "source_subnet_id", "failure_reason"}, + []string{"source_chain_id", "source_subnet_id", "failure_reason"}, ) registerer.MustRegister(failedRelayMessageCount) diff --git a/relayer/relayer.go b/relayer/relayer.go index 6513f3c9..87f9cc2f 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -72,6 +72,7 @@ func NewRelayer( messageManagers := make(map[common.Hash]messages.MessageManager) for address, config := range sourceSubnetInfo.MessageContracts { addressHash := common.HexToHash(address) + // TODO: To handle the primary network case, the messageManager needs to know if its source subnet is the primary network messageManager, err := messages.NewMessageManager(logger, addressHash, config, destinationClients) if err != nil { logger.Error( @@ -172,14 +173,14 @@ func NewRelayer( } // RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially -func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *MessageRelayerMetrics, messageCreator message.Creator) error { +func (r *Relayer) RelayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, metrics *MessageRelayerMetrics, messageCreator message.Creator) error { r.logger.Info( "Relaying message", zap.String("chainID", r.sourceChainID.String()), ) // Unpack the VM message bytes into a Warp message - warpMessageInfo, err := r.contractMessage.UnpackWarpMessage(warpLogInfo.UnsignedMsgBytes) + err := r.contractMessage.UnpackWarpMessage(warpMessageInfo) if err != nil { r.logger.Error( "Failed to unpack sender message", @@ -194,20 +195,26 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()), ) - // Check that the warp message is from a support message protocol contract address. - messageManager, supportedMessageProtocol := r.messageManagers[warpLogInfo.SourceAddress] + // Check that the warp message is from a supported message protocol contract address. + messageManager, supportedMessageProtocol := r.messageManagers[warpMessageInfo.SourceAddress] if !supportedMessageProtocol { // Do not return an error here because it is expected for there to be messages from other contracts // than just the ones supported by a single relayer instance. r.logger.Debug( "Warp message from unsupported message protocol address. Not relaying.", - zap.String("protocolAddress", warpLogInfo.SourceAddress.Hex()), + zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()), ) return nil } + // TODO: To handle anycasting for the primary network, we need to call newMessageRelayer in a loop for each of the + // configured destinations. We can get the configured destinations from messageManage. + // This is necessary because in the primary network case, each destination will have a different aggregate signature. + // For anycasting from any other network, we only need to create a single aggregate signature, so we fan out in + // messageManager instead. + // Create and run the message relayer to attempt to deliver the message to the destination chain - messageRelayer := newMessageRelayer(r.logger, metrics, r, warpMessageInfo.WarpUnsignedMessage, warpLogInfo.DestinationChainID, r.responseChan, messageCreator) + messageRelayer := newMessageRelayer(r.logger, metrics, r, warpMessageInfo, warpMessageInfo.DestinationChainID, r.responseChan, messageCreator) if err != nil { r.logger.Error( "Failed to create message relayer", @@ -218,7 +225,7 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag // Relay the message to the destination. Messages from a given source chain must be processed in serial in order to // guarantee that the previous block (n-1) is fully processed by the relayer when processing a given log from block n. - err = messageRelayer.relayMessage(warpMessageInfo, r.currentRequestID, messageManager) + err = messageRelayer.relayMessage(r.currentRequestID, messageManager) if err != nil { r.logger.Error( "Failed to run message relayer", @@ -233,7 +240,7 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag r.currentRequestID++ // Update the database with the latest processed block height - err = r.db.Put(r.sourceChainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(warpLogInfo.BlockNumber, 10))) + err = r.db.Put(r.sourceChainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(warpMessageInfo.BlockNumber, 10))) if err != nil { r.logger.Error( fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey), diff --git a/tests/basic_relay.go b/tests/basic_relay.go index 58b76f59..441b364b 100644 --- a/tests/basic_relay.go +++ b/tests/basic_relay.go @@ -22,7 +22,7 @@ import ( predicateutils "github.com/ava-labs/subnet-evm/utils/predicate" warpPayload "github.com/ava-labs/subnet-evm/warp/payload" "github.com/ava-labs/subnet-evm/x/warp" - teleportermessenger "github.com/ava-labs/teleporter/abis/TeleporterMessenger" + teleportermessenger "github.com/ava-labs/teleporter/abi-bindings/Teleporter/TeleporterMessenger" teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -180,7 +180,7 @@ func BasicRelay() { log.Info("Sending teleporter transaction", "destinationChainID", subnetBInfo.BlockchainID, "txHash", signedTx.Hash()) receipt := teleporterTestUtils.SendTransactionAndWaitForAcceptance(ctx, subnetAInfo.ChainWSClient, signedTx) - bind, err := teleportermessenger.NewTeleportermessenger(teleporterContractAddress, subnetAInfo.ChainWSClient) + bind, err := teleportermessenger.NewTeleporterMessenger(teleporterContractAddress, subnetAInfo.ChainWSClient) Expect(err).Should(BeNil()) sendEvent, err := teleporterTestUtils.GetSendEventFromLogs(receipt.Logs, bind) Expect(err).Should(BeNil()) @@ -222,7 +222,7 @@ func BasicRelay() { Expect(receipt.Status).Should(Equal(types.ReceiptStatusSuccessful)) // Check that the transaction emits ReceiveCrossChainMessage - bind, err = teleportermessenger.NewTeleportermessenger(teleporterContractAddress, subnetBInfo.ChainWSClient) + bind, err = teleportermessenger.NewTeleporterMessenger(teleporterContractAddress, subnetBInfo.ChainWSClient) Expect(err).Should(BeNil()) receiveEvent, err := teleporterTestUtils.GetReceiveEventFromLogs(receipt.Logs, bind) diff --git a/tests/e2e_test.go b/tests/e2e_test.go index c596d35a..04b7c710 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -36,6 +36,7 @@ var _ = ginkgo.AfterSuite(teleporterTestUtils.TearDownNetwork) var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Basic Relay", BasicRelay) + ginkgo.It("Publish Block Hashes", PublishBlockHashes) }) // Sets up the warp-enabled network and deploys the teleporter contract to each of the subnets diff --git a/tests/publish_block_hashes.go b/tests/publish_block_hashes.go new file mode 100644 index 00000000..982bd218 --- /dev/null +++ b/tests/publish_block_hashes.go @@ -0,0 +1,336 @@ +package tests + +import ( + "bufio" + "context" + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "os" + "os/exec" + "sync" + "time" + + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/peers" + testUtils "github.com/ava-labs/awm-relayer/tests/utils" + relayerEvm "github.com/ava-labs/awm-relayer/vms/evm" + "github.com/ava-labs/subnet-evm/accounts/abi" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/interfaces" + teleporter_block_hash "github.com/ava-labs/teleporter/abi-bindings/Teleporter/TeleporterBlockHashReceiver" + deploymentUtils "github.com/ava-labs/teleporter/contract-deployment/utils" + teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" + + . "github.com/onsi/gomega" +) + +func deployBlockHashReceiver( + ctx context.Context, + subnetInfo teleporterTestUtils.SubnetTestInfo, + fundedAddress common.Address, + fundedKey *ecdsa.PrivateKey, + blockHashABI *abi.ABI, + blockHashReceiverByteCode string, +) common.Address { + nonce, err := subnetInfo.ChainWSClient.NonceAt(ctx, fundedAddress, nil) + Expect(err).Should(BeNil()) + + blockHashReceiverAddress, err := deploymentUtils.DeriveEVMContractAddress(fundedAddress, nonce) + Expect(err).Should(BeNil()) + + cmdOutput := make(chan string) + cmd := exec.Command( + "cast", + "send", + "--rpc-url", teleporterTestUtils.HttpToRPCURI(subnetInfo.ChainNodeURIs[0], subnetInfo.BlockchainID.String()), + "--private-key", hexutil.Encode(fundedKey.D.Bytes()), + "--create", blockHashReceiverByteCode, + ) + + // Set up a pipe to capture the command's output + cmdReader, err := cmd.StdoutPipe() + Expect(err).Should(BeNil()) + cmdStdErrReader, err := cmd.StderrPipe() + Expect(err).Should(BeNil()) + + // Start a goroutine to read and output the command's stdout + go func() { + scanner := bufio.NewScanner(cmdReader) + for scanner.Scan() { + log.Info(scanner.Text()) + } + cmdOutput <- "Command execution finished" + }() + go func() { + scanner := bufio.NewScanner(cmdStdErrReader) + for scanner.Scan() { + log.Error(scanner.Text()) + } + cmdOutput <- "Command execution finished" + }() + + err = cmd.Run() + Expect(err).Should(BeNil()) + + // Confirm successful deployment + deployedCode, err := subnetInfo.ChainWSClient.CodeAt(ctx, blockHashReceiverAddress, nil) + Expect(err).Should(BeNil()) + Expect(len(deployedCode)).Should(BeNumerically(">", 2)) // 0x is an EOA, contract returns the bytecode + + log.Info("Deployed block hash receiver contract", "address", blockHashReceiverAddress.Hex()) + + return blockHashReceiverAddress +} + +func receiveBlockHash( + ctx context.Context, + blockHashReceiverAddress common.Address, + newHeads chan *types.Header, + subnetInfo teleporterTestUtils.SubnetTestInfo, + blockHashABI *abi.ABI, expectedHashes []common.Hash) { + newHead := <-newHeads + log.Info("Fetching log from the newly produced block") + + blockHashB := newHead.Hash() + + logs, err := subnetInfo.ChainWSClient.FilterLogs(ctx, interfaces.FilterQuery{ + BlockHash: &blockHashB, + Addresses: []common.Address{blockHashReceiverAddress}, + Topics: [][]common.Hash{ + { + blockHashABI.Events["ReceiveBlockHash"].ID, + }, + }, + }) + Expect(err).Should(BeNil()) + + bind, err := teleporter_block_hash.NewTeleporterBlockHashReceiver(blockHashReceiverAddress, subnetInfo.ChainWSClient) + Expect(err).Should(BeNil()) + event, err := bind.ParseReceiveBlockHash(logs[0]) + Expect(err).Should(BeNil()) + + // The published block hash should match one of the ones sent on Subnet A + foundHash := false + for _, blockHash := range expectedHashes { + if hex.EncodeToString(blockHash[:]) == hex.EncodeToString(event.BlockHash[:]) { + foundHash = true + break + } + } + if !foundHash { + Expect(false).Should(BeTrue(), "published block hash does not match any of the sent block hashes") + } + log.Info( + "Received published block hash on destination", + "blockHash", hex.EncodeToString(event.BlockHash[:]), + "destinationChainID", subnetInfo.BlockchainID.String(), + ) + + // We shouldn't receive any more blocks, since the relayer is configured to publish once every 5 blocks on the source + log.Info("Waiting for 10s to ensure no new block confirmations on destination chain") + Consistently(newHeads, 10*time.Second, 500*time.Millisecond).ShouldNot(Receive()) +} + +func PublishBlockHashes() { + subnetAInfo := teleporterTestUtils.GetSubnetATestInfo() + subnetBInfo := teleporterTestUtils.GetSubnetBTestInfo() + subnetCInfo := teleporterTestUtils.GetSubnetCTestInfo() + fundedAddress, fundedKey := teleporterTestUtils.GetFundedAccountInfo() + + // + // Deploy block hash receiver on Subnet B + // + ctx := context.Background() + + blockHashABI, err := teleporter_block_hash.TeleporterBlockHashReceiverMetaData.GetAbi() + Expect(err).Should(BeNil()) + blockHashReceiverByteCode := testUtils.ReadHexTextFile("./tests/utils/BlockHashReceiverByteCode.txt") + + blockHashReceiverAddressB := deployBlockHashReceiver(ctx, subnetBInfo, fundedAddress, fundedKey, blockHashABI, blockHashReceiverByteCode) + blockHashReceiverAddressC := deployBlockHashReceiver(ctx, subnetCInfo, fundedAddress, fundedKey, blockHashABI, blockHashReceiverByteCode) + + // + // Setup relayer config + // + hostA, portA, err := teleporterTestUtils.GetURIHostAndPort(subnetAInfo.ChainNodeURIs[0]) + Expect(err).Should(BeNil()) + + hostB, portB, err := teleporterTestUtils.GetURIHostAndPort(subnetBInfo.ChainNodeURIs[0]) + Expect(err).Should(BeNil()) + + hostC, portC, err := teleporterTestUtils.GetURIHostAndPort(subnetCInfo.ChainNodeURIs[0]) + Expect(err).Should(BeNil()) + + log.Info( + "Setting up relayer config", + "hostA", hostA, + "portA", portA, + "blockChainA", subnetAInfo.BlockchainID.String(), + "subnetA", subnetAInfo.SubnetID.String(), + "hostB", hostB, + "portB", portB, + "blockChainB", subnetBInfo.BlockchainID.String(), + "subnetB", subnetBInfo.SubnetID.String(), + "hostC", hostC, + "portC", portC, + "blockChainC", subnetCInfo.BlockchainID.String(), + "subnetC", subnetCInfo.SubnetID.String(), + ) + + relayerConfig := config.Config{ + LogLevel: logging.Info.LowerString(), + NetworkID: peers.LocalNetworkID, + PChainAPIURL: subnetAInfo.ChainNodeURIs[0], + EncryptConnection: false, + StorageLocation: storageLocation, + SourceSubnets: []config.SourceSubnet{ + { + SubnetID: subnetAInfo.SubnetID.String(), + ChainID: subnetAInfo.BlockchainID.String(), + VM: config.EVM_BLOCKHASH.String(), + EncryptConnection: false, + APINodeHost: hostA, + APINodePort: portA, + MessageContracts: map[string]config.MessageProtocolConfig{ + "0x0000000000000000000000000000000000000000": { + MessageFormat: config.BLOCK_HASH_PUBLISHER.String(), + Settings: map[string]interface{}{ + "destination-chains": []struct { + ChainID string `json:"chain-id"` + Address string `json:"address"` + Interval string `json:"interval"` + }{ + { + ChainID: subnetBInfo.BlockchainID.String(), + Address: blockHashReceiverAddressB.String(), + Interval: "5", + }, + { + ChainID: subnetCInfo.BlockchainID.String(), + Address: blockHashReceiverAddressC.String(), + Interval: "5", + }, + }, + }, + }, + }, + }, + }, + DestinationSubnets: []config.DestinationSubnet{ + { + SubnetID: subnetBInfo.SubnetID.String(), + ChainID: subnetBInfo.BlockchainID.String(), + VM: config.EVM.String(), + EncryptConnection: false, + APINodeHost: hostB, + APINodePort: portB, + AccountPrivateKey: hex.EncodeToString(fundedKey.D.Bytes()), + }, + { + SubnetID: subnetCInfo.SubnetID.String(), + ChainID: subnetCInfo.BlockchainID.String(), + VM: config.EVM.String(), + EncryptConnection: false, + APINodeHost: hostC, + APINodePort: portC, + AccountPrivateKey: hex.EncodeToString(fundedKey.D.Bytes()), + }, + }, + } + + data, err := json.MarshalIndent(relayerConfig, "", "\t") + Expect(err).Should(BeNil()) + + f, err := os.CreateTemp(os.TempDir(), "relayer-config.json") + Expect(err).Should(BeNil()) + + _, err = f.Write(data) + Expect(err).Should(BeNil()) + relayerConfigPath := f.Name() + + log.Info("Created awm-relayer config", "configPath", relayerConfigPath, "config", string(data)) + + // + // Build Relayer + // + cmd := exec.Command("./scripts/build.sh") + out, err := cmd.CombinedOutput() + fmt.Println(string(out)) + Expect(err).Should(BeNil()) + + // + // Publish block hashes + // + relayerCmd, relayerCancel := testUtils.RunRelayerExecutable(ctx, relayerConfigPath) + + destinationAddress := common.HexToAddress("0x0000000000000000000000000000000000000000") + gasTipCapA, err := subnetAInfo.ChainWSClient.SuggestGasTipCap(context.Background()) + Expect(err).Should(BeNil()) + + baseFeeA, err := subnetAInfo.ChainWSClient.EstimateBaseFee(context.Background()) + Expect(err).Should(BeNil()) + gasFeeCapA := baseFeeA.Mul(baseFeeA, big.NewInt(relayerEvm.BaseFeeFactor)) + gasFeeCapA.Add(gasFeeCapA, big.NewInt(relayerEvm.MaxPriorityFeePerGas)) + + // Subscribe to the destination chain block published + newHeadsB := make(chan *types.Header, 10) + subB, err := subnetBInfo.ChainWSClient.SubscribeNewHead(ctx, newHeadsB) + Expect(err).Should(BeNil()) + defer subB.Unsubscribe() + + newHeadsC := make(chan *types.Header, 10) + subC, err := subnetCInfo.ChainWSClient.SubscribeNewHead(ctx, newHeadsC) + Expect(err).Should(BeNil()) + defer subC.Unsubscribe() + + // Send 5 transactions to produce 5 blocks on subnet A + // We expect exactly one of the block hashes to be published by the relayer + subnetAHashes := []common.Hash{} + for i := 0; i < 5; i++ { + nonceA, err := subnetAInfo.ChainWSClient.NonceAt(ctx, fundedAddress, nil) + Expect(err).Should(BeNil()) + value := big.NewInt(0).Mul(big.NewInt(1e18), big.NewInt(1)) // 1eth + txA := types.NewTx(&types.DynamicFeeTx{ + ChainID: subnetAInfo.ChainIDInt, + Nonce: nonceA, + To: &destinationAddress, + Gas: teleporterTestUtils.DefaultTeleporterTransactionGas, + GasFeeCap: gasFeeCapA, + GasTipCap: gasTipCapA, + Value: value, + }) + txSignerA := types.LatestSignerForChainID(subnetAInfo.ChainIDInt) + + triggerTxA, err := types.SignTx(txA, txSignerA, fundedKey) + Expect(err).Should(BeNil()) + + receipt := teleporterTestUtils.SendTransactionAndWaitForAcceptance(ctx, subnetAInfo.ChainWSClient, triggerTxA) + + log.Info("Sent block on destination", "blockHash", receipt.BlockHash) + subnetAHashes = append(subnetAHashes, receipt.BlockHash) + } + + // Listen on the destination chains for the published block hash + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + receiveBlockHash(ctx, blockHashReceiverAddressB, newHeadsB, subnetBInfo, blockHashABI, subnetAHashes) + }() + go func() { + defer wg.Done() + receiveBlockHash(ctx, blockHashReceiverAddressC, newHeadsC, subnetCInfo, blockHashABI, subnetAHashes) + }() + wg.Wait() + + // Cancel the command and stop the relayer + relayerCancel() + _ = relayerCmd.Wait() +} diff --git a/tests/utils/BlockHashReceiverByteCode.txt b/tests/utils/BlockHashReceiverByteCode.txt new file mode 100644 index 00000000..6b222848 --- /dev/null +++ b/tests/utils/BlockHashReceiverByteCode.txt @@ -0,0 +1 @@ +0x608060405234801561001057600080fd5b506001600081905580556102a3806100296000396000f3fe608060405234801561001057600080fd5b50600436106100365760003560e01c8063303bb4251461003b578063b771b3bc14610050575b600080fd5b61004e6100493660046101ba565b61007a565b005b61005e6005600160991b0181565b6040516001600160a01b03909116815260200160405180910390f35b600180541461009c5760405163a815ca6b60e01b815260040160405180910390fd5b600260015560405163ce7f592960e01b815263ffffffff8216600482015260009081906005600160991b019063ce7f592990602401606060405180830381865afa1580156100ee573d6000803e3d6000fd5b505050506040513d601f19601f8201168201806040525081019061011291906101e7565b91509150806101815760405162461bcd60e51b815260206004820152603160248201527f54656c65706f72746572426c6f636b4861736852656365697665723a20696e76604482015270616c69642077617270206d65737361676560781b606482015260840160405180910390fd5b602082015182516040517f7770e5f72465e9b05c8076c3f2eac70898abe6a84f0259307d127c13e2a1a4e490600090a350506001805550565b6000602082840312156101cc57600080fd5b813563ffffffff811681146101e057600080fd5b9392505050565b60008082840360608112156101fb57600080fd5b604081121561020957600080fd5b506040516040810181811067ffffffffffffffff8211171561023b57634e487b7160e01b600052604160045260246000fd5b60409081528451825260208086015190830152840151909250801515811461026257600080fd5b80915050925092905056fea2646970667358221220bcb05beb3e88ca671f681162bf1f075f28fed86a56b8ae3feae71b034d9d6eb564736f6c63430008120033 \ No newline at end of file diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 598ba545..0928ba25 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -27,11 +27,6 @@ func RunRelayerExecutable(ctx context.Context, relayerConfigPath string) (*exec. cmdStdErrReader, err := relayerCmd.StderrPipe() Expect(err).Should(BeNil()) - // Start the command - log.Info("Starting the relayer executable") - err = relayerCmd.Start() - Expect(err).Should(BeNil()) - // Start goroutines to read and output the command's stdout and stderr go func() { scanner := bufio.NewScanner(cmdStdOutReader) @@ -47,6 +42,12 @@ func RunRelayerExecutable(ctx context.Context, relayerConfigPath string) (*exec. } cmdOutput <- "Command execution finished" }() + + // Start the command + log.Info("Starting the relayer executable") + err = relayerCmd.Start() + Expect(err).Should(BeNil()) + return relayerCmd, relayerCancel } diff --git a/vms/contract_message.go b/vms/contract_message.go index 3cb9fdb7..160bc72a 100644 --- a/vms/contract_message.go +++ b/vms/contract_message.go @@ -7,18 +7,21 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/vms/evm" + "github.com/ava-labs/awm-relayer/vms/evm_block_hash" "github.com/ava-labs/awm-relayer/vms/vmtypes" ) type ContractMessage interface { // UnpackWarpMessage unpacks the warp message from the VM - UnpackWarpMessage(unsignedMsgBytes []byte) (*vmtypes.WarpMessageInfo, error) + UnpackWarpMessage(warpMessageInfo *vmtypes.WarpMessageInfo) error } func NewContractMessage(logger logging.Logger, subnetInfo config.SourceSubnet) ContractMessage { switch config.ParseVM(subnetInfo.VM) { case config.EVM: return evm.NewContractMessage(logger, subnetInfo) + case config.EVM_BLOCKHASH: + return evm_block_hash.NewContractMessage(logger, subnetInfo) default: return nil } diff --git a/vms/evm/contract_message.go b/vms/evm/contract_message.go index d4441352..c9ed95c3 100644 --- a/vms/evm/contract_message.go +++ b/vms/evm/contract_message.go @@ -22,14 +22,14 @@ func NewContractMessage(logger logging.Logger, subnetInfo config.SourceSubnet) * } } -func (m *contractMessage) UnpackWarpMessage(unsignedMsgBytes []byte) (*vmtypes.WarpMessageInfo, error) { - unsignedMsg, err := avalancheWarp.ParseUnsignedMessage(unsignedMsgBytes) +func (m *contractMessage) UnpackWarpMessage(warpMessageInfo *vmtypes.WarpMessageInfo) error { + unsignedMsg, err := avalancheWarp.ParseUnsignedMessage(warpMessageInfo.UnsignedMsgBytes) if err != nil { m.logger.Error( "Failed parsing unsigned message", zap.Error(err), ) - return nil, err + return err } err = unsignedMsg.Initialize() if err != nil { @@ -37,7 +37,7 @@ func (m *contractMessage) UnpackWarpMessage(unsignedMsgBytes []byte) (*vmtypes.W "Failed initializing unsigned message", zap.Error(err), ) - return nil, err + return err } warpPayload, err := warpPayload.ParseAddressedPayload(unsignedMsg.Payload) @@ -46,12 +46,11 @@ func (m *contractMessage) UnpackWarpMessage(unsignedMsgBytes []byte) (*vmtypes.W "Failed parsing addressed payload", zap.Error(err), ) - return nil, err + return err } - messageInfo := vmtypes.WarpMessageInfo{ - WarpUnsignedMessage: unsignedMsg, - WarpPayload: warpPayload.Payload, - } - return &messageInfo, nil + warpMessageInfo.WarpUnsignedMessage = unsignedMsg + warpMessageInfo.WarpPayload = warpPayload.Payload + + return nil } diff --git a/vms/evm/contract_message_test.go b/vms/evm/contract_message_test.go index fbc50c91..f5e78f84 100644 --- a/vms/evm/contract_message_test.go +++ b/vms/evm/contract_message_test.go @@ -11,6 +11,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/vms/vmtypes" warpPayload "github.com/ava-labs/subnet-evm/warp/payload" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" @@ -76,14 +77,17 @@ func TestUnpack(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { input, err := hex.DecodeString(testCase.input) require.NoError(t, err) + msgInfo := vmtypes.WarpMessageInfo{ + UnsignedMsgBytes: input, + } mockLogger.EXPECT().Error(gomock.Any(), gomock.Any()).Times(testCase.errorLogTimes) - msg, err := m.UnpackWarpMessage(input) + err = m.UnpackWarpMessage(&msgInfo) if testCase.expectError { require.Error(t, err) } else { require.NoError(t, err) - require.Equal(t, testCase.networkID, msg.WarpUnsignedMessage.NetworkID) + require.Equal(t, testCase.networkID, msgInfo.WarpUnsignedMessage.NetworkID) } }) } diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index c42aac03..851da74d 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -55,7 +55,7 @@ type subscriber struct { nodeWSURL string nodeRPCURL string chainID ids.ID - logsChan chan vmtypes.WarpLogInfo + logsChan chan vmtypes.WarpMessageInfo evmLog <-chan types.Log sub interfaces.Subscription @@ -74,7 +74,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat return nil } - logs := make(chan vmtypes.WarpLogInfo, maxClientSubscriptionBuffer) + logs := make(chan vmtypes.WarpMessageInfo, maxClientSubscriptionBuffer) return &subscriber{ nodeWSURL: subnetInfo.GetNodeWSEndpoint(), @@ -86,7 +86,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat } } -func (s *subscriber) NewWarpLogInfo(log types.Log) (*vmtypes.WarpLogInfo, error) { +func (s *subscriber) NewWarpMessageInfo(log types.Log) (*vmtypes.WarpMessageInfo, error) { if len(log.Topics) != 4 { s.logger.Error( "Log did not have the correct number of topics", @@ -111,7 +111,7 @@ func (s *subscriber) NewWarpLogInfo(log types.Log) (*vmtypes.WarpLogInfo, error) return nil, ErrInvalidLog } - return &vmtypes.WarpLogInfo{ + return &vmtypes.WarpMessageInfo{ DestinationChainID: destinationChainID, DestinationAddress: log.Topics[2], SourceAddress: log.Topics[3], @@ -124,7 +124,7 @@ func (s *subscriber) NewWarpLogInfo(log types.Log) (*vmtypes.WarpLogInfo, error) // forward logs from the concrete log channel to the interface channel func (s *subscriber) forwardLogs() { for msgLog := range s.evmLog { - messageInfo, err := s.NewWarpLogInfo(msgLog) + messageInfo, err := s.NewWarpMessageInfo(msgLog) if err != nil { s.logger.Error( "Invalid log. Continuing.", @@ -211,7 +211,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int) error { zap.String("chainID", s.chainID.String()), ) for _, log := range logs { - messageInfo, err := s.NewWarpLogInfo(log) + messageInfo, err := s.NewWarpMessageInfo(log) if err != nil { s.logger.Error( "Invalid log when processing from height. Continuing.", @@ -321,7 +321,7 @@ func (s *subscriber) dialAndSubscribe() error { return nil } -func (s *subscriber) Logs() <-chan vmtypes.WarpLogInfo { +func (s *subscriber) Logs() <-chan vmtypes.WarpMessageInfo { return s.logsChan } diff --git a/vms/evm_block_hash/contract_message.go b/vms/evm_block_hash/contract_message.go new file mode 100644 index 00000000..eb86bc44 --- /dev/null +++ b/vms/evm_block_hash/contract_message.go @@ -0,0 +1,53 @@ +package evm_block_hash + +import ( + "github.com/ava-labs/avalanchego/utils/logging" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/vms/vmtypes" + warpPayload "github.com/ava-labs/subnet-evm/warp/payload" + "go.uber.org/zap" +) + +type contractMessage struct { + logger logging.Logger +} + +func NewContractMessage(logger logging.Logger, subnetInfo config.SourceSubnet) *contractMessage { + return &contractMessage{ + logger: logger, + } +} + +func (m *contractMessage) UnpackWarpMessage(warpMessageInfo *vmtypes.WarpMessageInfo) error { + unsignedMsg, err := avalancheWarp.ParseUnsignedMessage(warpMessageInfo.UnsignedMsgBytes) + if err != nil { + m.logger.Error( + "Failed parsing unsigned message", + zap.Error(err), + ) + return err + } + err = unsignedMsg.Initialize() + if err != nil { + m.logger.Error( + "Failed initializing unsigned message", + zap.Error(err), + ) + return err + } + + warpPayload, err := warpPayload.ParseBlockHashPayload(unsignedMsg.Payload) + if err != nil { + m.logger.Error( + "Failed parsing addressed payload", + zap.Error(err), + ) + return err + } + + warpMessageInfo.WarpUnsignedMessage = unsignedMsg + warpMessageInfo.WarpPayload = warpPayload.BlockHash.Bytes() + + return nil +} diff --git a/vms/evm_block_hash/subscriber.go b/vms/evm_block_hash/subscriber.go new file mode 100644 index 00000000..ed28f6d7 --- /dev/null +++ b/vms/evm_block_hash/subscriber.go @@ -0,0 +1,193 @@ +package evm_block_hash + +import ( + "context" + "errors" + "fmt" + "math/big" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/vms/vmtypes" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ava-labs/subnet-evm/warp/payload" + "go.uber.org/zap" +) + +const ( + // Max buffer size for ethereum subscription channels + maxClientSubscriptionBuffer = 20000 + subscribeRetryTimeout = 1 * time.Second + maxResubscribeAttempts = 10 +) + +var ( + // Errors + ErrInvalidLog = errors.New("invalid warp block hash log") +) + +// subscriber implements Subscriber +type subscriber struct { + nodeWSURL string + nodeRPCURL string + chainID ids.ID + logsChan chan vmtypes.WarpMessageInfo + blocks <-chan *types.Header + sub interfaces.Subscription + networkID uint32 + + logger logging.Logger + db database.RelayerDatabase +} + +// NewSubscriber returns a subscriber +func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db database.RelayerDatabase) *subscriber { + chainID, err := ids.FromString(subnetInfo.ChainID) + if err != nil { + logger.Error( + "Invalid chainID provided to subscriber", + zap.Error(err), + ) + return nil + } + + logs := make(chan vmtypes.WarpMessageInfo, maxClientSubscriptionBuffer) + + return &subscriber{ + nodeWSURL: subnetInfo.GetNodeWSEndpoint(), + nodeRPCURL: subnetInfo.GetNodeRPCEndpoint(), + chainID: chainID, + logger: logger, + db: db, + logsChan: logs, + networkID: config.GetNetworkID(), + } +} + +func (s *subscriber) Subscribe() error { + // Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times + for attempt := 0; attempt < maxResubscribeAttempts; attempt++ { + // Unsubscribe before resubscribing + // s.sub should only be nil on the first call to Subscribe + if s.sub != nil { + s.sub.Unsubscribe() + } + err := s.dialAndSubscribe() + if err == nil { + s.logger.Info( + "Successfully subscribed", + zap.String("chainID", s.chainID.String()), + ) + return nil + } + + s.logger.Warn( + "Failed to subscribe to node", + zap.Int("attempt", attempt), + zap.String("chainID", s.chainID.String()), + zap.Error(err), + ) + + if attempt != maxResubscribeAttempts-1 { + time.Sleep(subscribeRetryTimeout) + } + } + + return fmt.Errorf("failed to subscribe to node with all %d attempts", maxResubscribeAttempts) +} + +func (s *subscriber) dialAndSubscribe() error { + // Dial the configured source chain endpoint + // This needs to be a websocket + ethClient, err := ethclient.Dial(s.nodeWSURL) + if err != nil { + return err + } + + blocks := make(chan *types.Header, maxClientSubscriptionBuffer) + sub, err := ethClient.SubscribeNewHead(context.Background(), blocks) + if err != nil { + s.logger.Error( + "Failed to subscribe to logs", + zap.String("chainID", s.chainID.String()), + zap.Error(err), + ) + return err + } + s.blocks = blocks + s.sub = sub + + // Forward logs to the interface channel. Closed when the subscription is cancelled + go s.forwardLogs() + return nil +} + +func (s *subscriber) NewWarpMessageInfo(block *types.Header) (*vmtypes.WarpMessageInfo, error) { + blockHashPayload, err := payload.NewBlockHashPayload(block.Hash()) + if err != nil { + return nil, err + } + unsignedMessage, err := avalancheWarp.NewUnsignedMessage(s.networkID, s.chainID, blockHashPayload.Bytes()) + if err != nil { + return nil, err + } + err = unsignedMessage.Initialize() + if err != nil { + return nil, err + } + + return &vmtypes.WarpMessageInfo{ + UnsignedMsgBytes: unsignedMessage.Bytes(), + BlockNumber: block.Number.Uint64(), + BlockTimestamp: block.Time, + }, nil +} + +// forward logs from the concrete log channel to the interface channel +func (s *subscriber) forwardLogs() { + // TODO: When publishing anycast messages from the C-Chain, we need to create + // a separate aggregate signature for each destination chain. + // The easiest thing to do would be to fan out here and produce new logs for each of + // the destinations, but that may introduce a cyclic dependency between this package + // and the anycast message protocol package. + for block := range s.blocks { + messageInfo, err := s.NewWarpMessageInfo(block) + if err != nil { + s.logger.Error( + "Invalid log. Continuing.", + zap.Error(err), + ) + continue + } + s.logsChan <- *messageInfo + } +} + +func (s *subscriber) ProcessFromHeight(height *big.Int) error { + // TODO: Implement historical block processing + return nil +} + +func (s *subscriber) SetProcessedBlockHeightToLatest() error { + // TODO: Implement historical block processing + // We should distinguish the key from the value for the evm relayer: chainID_blockhash + return nil +} + +func (s *subscriber) Logs() <-chan vmtypes.WarpMessageInfo { + return s.logsChan +} + +func (s *subscriber) Err() <-chan error { + return s.sub.Err() +} + +func (s *subscriber) Cancel() { + // Nothing to do here, the ethclient manages both the log and err channels +} diff --git a/vms/subscriber.go b/vms/subscriber.go index 82682cca..5bb34eb3 100644 --- a/vms/subscriber.go +++ b/vms/subscriber.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/vms/evm" + "github.com/ava-labs/awm-relayer/vms/evm_block_hash" "github.com/ava-labs/awm-relayer/vms/vmtypes" ) @@ -29,7 +30,7 @@ type Subscriber interface { Subscribe() error // Logs returns the channel that the subscription writes events to - Logs() <-chan vmtypes.WarpLogInfo + Logs() <-chan vmtypes.WarpMessageInfo // Err returns the channel that the subscription writes errors to // If an error is sent to this channel, the subscription should be closed @@ -44,6 +45,8 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat switch config.ParseVM(subnetInfo.VM) { case config.EVM: return evm.NewSubscriber(logger, subnetInfo, db) + case config.EVM_BLOCKHASH: + return evm_block_hash.NewSubscriber(logger, subnetInfo, db) default: return nil } diff --git a/vms/vmtypes/message_info.go b/vms/vmtypes/message_info.go index b54da2b5..fd346f82 100644 --- a/vms/vmtypes/message_info.go +++ b/vms/vmtypes/message_info.go @@ -5,17 +5,21 @@ package vmtypes import ( "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ethereum/go-ethereum/common" ) -// WarpLogInfo is provided by the subscription and +// WarpMessageInfo is provided by the subscription and // describes the transaction information emitted by the source chain, // including the Warp Message payload bytes -type WarpLogInfo struct { - SourceAddress common.Hash - DestinationChainID ids.ID - DestinationAddress common.Hash - SourceTxID []byte - UnsignedMsgBytes []byte - BlockNumber uint64 +type WarpMessageInfo struct { + SourceAddress common.Hash + DestinationChainID ids.ID + DestinationAddress common.Hash + SourceTxID []byte + UnsignedMsgBytes []byte + BlockNumber uint64 + BlockTimestamp uint64 + WarpUnsignedMessage *warp.UnsignedMessage + WarpPayload []byte } diff --git a/vms/vmtypes/types.go b/vms/vmtypes/types.go deleted file mode 100644 index c10ddf05..00000000 --- a/vms/vmtypes/types.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package vmtypes - -import ( - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -// -// Types used in the vms interfaces -// - -// WarpMessageInfo is used internally to provide access to warp message info emitted by the sender -type WarpMessageInfo struct { - WarpUnsignedMessage *warp.UnsignedMessage - WarpPayload []byte -}