From 73a5b7285742ff2d2bedd4e832b2e71a2662dabc Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 20 Dec 2021 22:23:45 +0100 Subject: [PATCH] node: implement observation requests via gossip Limitations: - Only supported for Solana and for confirmation level Finalized, which the token/NFT bridges use. Need to take a close look before enabling it for both (since we're bypassing the tx fetcher and would fetch and process accounts of the "wrong" confirmation levels). - Rate limiting not implemented yet, will be done in a future release when things are not currently on fire. Test: https://gist.github.com/leoluk/bab3a18e922057109facea1cf1f26b2f commit-id:6a0d4c32 --- DEVELOP.md | 3 + node/cmd/guardiand/adminclient.go | 41 +++++++++++ node/cmd/guardiand/adminserver.go | 26 ++++--- node/cmd/guardiand/node.go | 14 ++-- node/cmd/spy/spy.go | 3 +- node/pkg/p2p/p2p.go | 111 +++++++++++++++++++++++++++++- node/pkg/solana/client.go | 14 ++++ proto/gossip/v1/gossip.proto | 21 ++++++ proto/node/v1/node.proto | 13 ++++ 9 files changed, 230 insertions(+), 16 deletions(-) diff --git a/DEVELOP.md b/DEVELOP.md index 42517941e8..f46eaca3ec 100644 --- a/DEVELOP.md +++ b/DEVELOP.md @@ -147,6 +147,9 @@ To Solana as CPI instruction: kubectl exec solana-devnet-0 -c setup -- client post-message --proxy CP1co2QMMoDPbsmV7PGcUTLFwyhgCgTXt25gLQ5LewE1 Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o 1 confirmed ffff +### Observation Requests + + kubectl exec -it guardian-0 -- /guardiand admin send-observation-request --socket /tmp/admin.sock 1 4636d8f7593c78a5092bed13dec765cc705752653db5eb1498168c92345cd389 ### IntelliJ Protobuf Autocompletion diff --git a/node/cmd/guardiand/adminclient.go b/node/cmd/guardiand/adminclient.go index 2dc4b3d675..10f007c199 100644 --- a/node/cmd/guardiand/adminclient.go +++ b/node/cmd/guardiand/adminclient.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/node/pkg/vaa" "github.com/davecgh/go-spew/spew" @@ -43,12 +44,14 @@ func init() { AdminClientFindMissingMessagesCmd.Flags().AddFlagSet(pf) AdminClientListNodes.Flags().AddFlagSet(pf) DumpVAAByMessageID.Flags().AddFlagSet(pf) + SendObservationRequest.Flags().AddFlagSet(pf) AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd) AdminCmd.AddCommand(AdminClientFindMissingMessagesCmd) AdminCmd.AddCommand(AdminClientGovernanceVAAVerifyCmd) AdminCmd.AddCommand(AdminClientListNodes) AdminCmd.AddCommand(DumpVAAByMessageID) + AdminCmd.AddCommand(SendObservationRequest) } var AdminCmd = &cobra.Command{ @@ -77,6 +80,13 @@ var DumpVAAByMessageID = &cobra.Command{ Args: cobra.ExactArgs(1), } +var SendObservationRequest = &cobra.Command{ + Use: "send-observation-request [CHAIN_ID] [TX_HASH_HEX]", + Short: "Broadcast an observation request for the given chain ID and chain-specific tx_hash", + Run: runSendObservationRequest, + Args: cobra.ExactArgs(2), +} + func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, error, nodev1.NodePrivilegedServiceClient) { conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithInsecure()) @@ -213,3 +223,34 @@ func runDumpVAAByMessageID(cmd *cobra.Command, args []string) { log.Printf("VAA with digest %s: %+v\n", v.HexDigest(), spew.Sdump(v)) fmt.Printf("Bytes:\n%s\n", hex.EncodeToString(resp.VaaBytes)) } + +func runSendObservationRequest(cmd *cobra.Command, args []string) { + chainID, err := strconv.Atoi(args[0]) + if err != nil { + log.Fatalf("invalid chain ID: %v", err) + } + + txHash, err := hex.DecodeString(args[1]) + if err != nil { + log.Fatalf("invalid transaction hash: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err, c := getAdminClient(ctx, *clientSocketPath) + defer conn.Close() + if err != nil { + log.Fatalf("failed to get admin client: %v", err) + } + + _, err = c.SendObservationRequest(ctx, &nodev1.SendObservationRequestRequest{ + ObservationRequest: &gossipv1.ObservationRequest{ + ChainId: uint32(chainID), + TxHash: txHash, + }, + }) + if err != nil { + log.Fatalf("failed to send observation request: %v", err) + } +} diff --git a/node/cmd/guardiand/adminserver.go b/node/cmd/guardiand/adminserver.go index 1e2b382529..475a68cc3a 100644 --- a/node/cmd/guardiand/adminserver.go +++ b/node/cmd/guardiand/adminserver.go @@ -30,10 +30,11 @@ import ( type nodePrivilegedService struct { nodev1.UnimplementedNodePrivilegedServiceServer - db *db.Database - injectC chan<- *vaa.VAA - logger *zap.Logger - signedInC chan *gossipv1.SignedVAAWithQuorum + db *db.Database + injectC chan<- *vaa.VAA + obsvReqSendC chan *gossipv1.ObservationRequest + logger *zap.Logger + signedInC chan *gossipv1.SignedVAAWithQuorum } // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation. @@ -342,7 +343,7 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no }, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { +func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, obsvReqSendC chan *gossipv1.ObservationRequest, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { @@ -375,10 +376,11 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- logger.Info("admin server listening on", zap.String("path", socketPath)) nodeService := &nodePrivilegedService{ - injectC: injectC, - db: db, - logger: logger.Named("adminservice"), - signedInC: signedInC, + injectC: injectC, + obsvReqSendC: obsvReqSendC, + db: db, + logger: logger.Named("adminservice"), + signedInC: signedInC, } publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst) @@ -388,3 +390,9 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService) return supervisor.GRPCServer(grpcServer, l, false), nil } + +func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) { + s.obsvReqSendC <- req.ObservationRequest + s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest)) + return &nodev1.SendObservationRequestResponse{}, nil +} diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index fba7885266..c8943ae450 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -521,6 +521,12 @@ func runNode(cmd *cobra.Command, args []string) { // Inbound signed VAAs signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) + // Inbound observation requests + obsvReqC := make(chan *gossipv1.ObservationRequest, 50) + + // Outbound observation requests + obsvReqSendC := make(chan *gossipv1.ObservationRequest) + // Injected VAAs (manually generated rather than created via observation) injectC := make(chan *vaa.VAA) @@ -599,7 +605,7 @@ func runNode(cmd *cobra.Command, args []string) { } // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, db, gst) + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, obsvReqSendC, db, gst) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) } @@ -613,7 +619,7 @@ func runNode(cmd *cobra.Command, args []string) { // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { + obsvC, obsvReqC, obsvReqSendC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { return err } @@ -669,12 +675,12 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "solwatch-confirmed", - solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, rpc.CommitmentConfirmed).Run); err != nil { + solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed).Run); err != nil { return err } if err := supervisor.Run(ctx, "solwatch-finalized", - solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, rpc.CommitmentFinalized).Run); err != nil { + solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solAddress, lockC, obsvReqC, rpc.CommitmentFinalized).Run); err != nil { return err } diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index 3b315c2649..4579e82c38 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -298,8 +298,7 @@ func runSpy(cmd *cobra.Command, args []string) { // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { - if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil { + if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, nil, nil, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel)); err != nil { return err } diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index f8c7ec6c9b..2a5fe450dd 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -55,11 +55,17 @@ var ( var heartbeatMessagePrefix = []byte("heartbeat|") +var signedObservationRequestPrefix = []byte("signed_observation_request|") + func heartbeatDigest(b []byte) common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } -func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { +func signedObservationRequestDigest(b []byte) common.Hash { + return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...)) +} + +func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.ObservationRequest, obsvReqSendC chan *gossipv1.ObservationRequest, sendC chan []byte, signedInC chan *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) @@ -279,6 +285,44 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC ch if err != nil { logger.Error("failed to publish message from queue", zap.Error(err)) } + case msg := <-obsvReqSendC: + b, err := proto.Marshal(msg) + if err != nil { + panic(err) + } + + // Sign the observation request using our node's guardian key. + digest := signedObservationRequestDigest(b) + sig, err := ethcrypto.Sign(digest.Bytes(), gk) + if err != nil { + panic(err) + } + + sReq := &gossipv1.SignedObservationRequest{ + ObservationRequest: b, + Signature: sig, + GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(), + } + + envelope := &gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedObservationRequest{ + SignedObservationRequest: sReq}} + + b, err = proto.Marshal(envelope) + if err != nil { + panic(err) + } + + // Send to local observation request queue (the loopback message is ignored) + obsvReqC <- msg + + err = th.Publish(ctx, b) + p2pMessagesSent.Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } } } }() @@ -342,6 +386,32 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, signedInC ch case *gossipv1.GossipMessage_SignedVaaWithQuorum: signedInC <- m.SignedVaaWithQuorum p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() + case *gossipv1.GossipMessage_SignedObservationRequest: + s := m.SignedObservationRequest + gs := gst.Get() + if gs == nil { + logger.Debug("dropping SignedObservationRequest - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + logger.Info("valid signed observation request received", + zap.Any("value", r), + zap.String("from", envelope.GetFrom().String())) + + obsvReqC <- r + } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type (running outdated software?)", @@ -392,3 +462,42 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_ return &h, nil } + +func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) { + envelopeAddr := common.BytesToAddress(s.GuardianAddr) + idx, ok := gs.KeyIndex(envelopeAddr) + var pk common.Address + if !ok { + return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr) + } else { + pk = gs.Keys[idx] + } + + digest := signedObservationRequestDigest(s.ObservationRequest) + + pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature) + if err != nil { + return nil, errors.New("failed to recover public key") + } + + signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) + if pk != signerAddr { + return nil, fmt.Errorf("invalid signer: %v", signerAddr) + } + + var h gossipv1.ObservationRequest + err = proto.Unmarshal(s.ObservationRequest, &h) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal observation request: %w", err) + } + + // For now, this supports Solana only. Once we add more chains, we'll have to add a + // multiplexer/router in node.go. + if h.ChainId != uint32(vaa.ChainIDSolana) { + return nil, fmt.Errorf("unsupported chain id: %d", h.ChainId) + } + + // TODO: implement per-guardian rate limiting + + return &h, nil +} diff --git a/node/pkg/solana/client.go b/node/pkg/solana/client.go index ade4df0a7f..8381f8fc8f 100644 --- a/node/pkg/solana/client.go +++ b/node/pkg/solana/client.go @@ -28,6 +28,7 @@ type SolanaWatcher struct { rpcUrl string commitment rpc.CommitmentType messageEvent chan *common.MessagePublication + obsvReqC chan *gossipv1.ObservationRequest rpcClient *rpc.Client } @@ -101,11 +102,13 @@ func NewSolanaWatcher( wsUrl, rpcUrl string, contractAddress solana.PublicKey, messageEvents chan *common.MessagePublication, + obsvReqC chan *gossipv1.ObservationRequest, commitment rpc.CommitmentType) *SolanaWatcher { return &SolanaWatcher{ contract: contractAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents, + obsvReqC: obsvReqC, commitment: commitment, rpcClient: rpc.New(rpcUrl), } @@ -155,6 +158,17 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { for _, acc := range accs { s.fetchMessageAccount(rCtx, logger, solana.MustPublicKeyFromBase58(acc), 0) } + case m := <-s.obsvReqC: + if m.ChainId != uint32(vaa.ChainIDSolana) { + panic("unexpected chain id") + } + + acc := solana.PublicKeyFromBytes(m.TxHash) + logger.Info("received observation request", zap.String("account", acc.String())) + + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + s.fetchMessageAccount(rCtx, logger, acc, 0) + cancel() case <-timer.C: // Get current slot height rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 32a715209c..4a040d4833 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -9,6 +9,7 @@ message GossipMessage { SignedObservation signed_observation = 2; SignedHeartbeat signed_heartbeat = 3; SignedVAAWithQuorum signed_vaa_with_quorum = 4; + SignedObservationRequest signed_observation_request = 5; } } @@ -90,3 +91,23 @@ message SignedObservation { message SignedVAAWithQuorum { bytes vaa = 1; } + +// Any guardian can send a SignedObservationRequest to the network to request +// all guardians to re-observe a given transaction. This is rate-limited to one +// request per second per guardian to prevent abuse. +// +// In the current implementation, this is only implemented for Solana. +// For Solana, the tx_hash is the account address of the transaction's message account. +message SignedObservationRequest { + // Serialized observation request. + bytes observation_request = 1; + + // Signature + bytes signature = 2; + bytes guardian_addr = 3; +} + +message ObservationRequest { + uint32 chain_id = 1; + bytes tx_hash = 2; +} diff --git a/proto/node/v1/node.proto b/proto/node/v1/node.proto index f207752aee..753e1add6e 100644 --- a/proto/node/v1/node.proto +++ b/proto/node/v1/node.proto @@ -4,6 +4,8 @@ package node.v1; option go_package = "github.com/certusone/wormhole/node/pkg/proto/node/v1;nodev1"; +import "gossip/v1/gossip.proto"; + // NodePrivilegedService exposes an administrative API. It runs on a UNIX socket and is authenticated // using Linux filesystem permissions. service NodePrivilegedService { @@ -21,6 +23,11 @@ service NodePrivilegedService { // // An error is returned if more than 1000 gaps are found. rpc FindMissingMessages (FindMissingMessagesRequest) returns (FindMissingMessagesResponse); + + // SendObservationRequest broadcasts a signed observation request to the gossip network + // using the node's guardian key. The network rate limits these requests to one per second. + // Requests at higher rates will fail silently. + rpc SendObservationRequest (SendObservationRequestRequest) returns (SendObservationRequestResponse); } message InjectGovernanceVAARequest { @@ -133,3 +140,9 @@ message FindMissingMessagesResponse { uint64 first_sequence = 2; uint64 last_sequence = 3; } + +message SendObservationRequestRequest { + gossip.v1.ObservationRequest observation_request = 1; +} + +message SendObservationRequestResponse {}