diff --git a/pkg/app/flags.go b/pkg/app/flags.go index 1c4d3f9..3fb3f4d 100644 --- a/pkg/app/flags.go +++ b/pkg/app/flags.go @@ -58,6 +58,6 @@ var Flags = []cli.Flag{ &cli.StringFlag{ Name: "x-gov", Usage: "version of the gov module to use (v1|v1beta1)", - Value: "v1beta1", + Value: "v1", }, } diff --git a/pkg/app/run.go b/pkg/app/run.go index 31147df..68414ae 100644 --- a/pkg/app/run.go +++ b/pkg/app/run.go @@ -102,21 +102,17 @@ func RunFunc(cCtx *cli.Context) error { return validatorsWatcher.Start(ctx) }) } + if xGov != "v1beta1" && xGov != "v1" { + log.Warn().Msgf("unknown gov module version: %s (fallback to v1)", xGov) + xGov = "v1" + } if !noGov { - switch xGov { - case "v1beta1": - votesWatcher := watcher.NewVotesV1Beta1Watcher(trackedValidators, metrics, pool) - errg.Go(func() error { - return votesWatcher.Start(ctx) - }) - case "v1": - votesWatcher := watcher.NewVotesV1Watcher(trackedValidators, metrics, pool) - errg.Go(func() error { - return votesWatcher.Start(ctx) - }) - default: - log.Warn().Msgf("unknown gov module version: %s", xGov) - } + votesWatcher := watcher.NewVotesWatcher(trackedValidators, metrics, pool, watcher.VotesWatcherOptions{ + GovModuleVersion: xGov, + }) + errg.Go(func() error { + return votesWatcher.Start(ctx) + }) } var wh *webhook.Webhook if webhookURL != "" { @@ -128,6 +124,7 @@ func RunFunc(cCtx *cli.Context) error { } upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, wh, watcher.UpgradeWatcherOptions{ CheckPendingProposals: !noGov, + GovModuleVersion: xGov, }) errg.Go(func() error { return upgradeWatcher.Start(ctx) diff --git a/pkg/watcher/upgrade.go b/pkg/watcher/upgrade.go index 6e055de..82847fe 100644 --- a/pkg/watcher/upgrade.go +++ b/pkg/watcher/upgrade.go @@ -8,7 +8,9 @@ import ( ctypes "github.com/cometbft/cometbft/rpc/core/types" comettypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/client" - gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1" + govbeta "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" "github.com/cosmos/cosmos-sdk/x/upgrade/types" upgrade "github.com/cosmos/cosmos-sdk/x/upgrade/types" "github.com/gogo/protobuf/codec" @@ -31,6 +33,7 @@ type UpgradeWatcher struct { type UpgradeWatcherOptions struct { CheckPendingProposals bool + GovModuleVersion string } func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, webhook *webhook.Webhook, options UpgradeWatcherOptions) *UpgradeWatcher { @@ -135,7 +138,12 @@ func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error plan := resp.Plan if plan == nil && w.options.CheckPendingProposals { - plan, err = w.checkUpgradeProposals(ctx, node) + switch w.options.GovModuleVersion { + case "v1beta1": + plan, err = w.checkUpgradeProposalsV1Beta1(ctx, node) + default: // v1 + plan, err = w.checkUpgradeProposalsV1(ctx, node) + } if err != nil { log.Error().Err(err).Msg("failed to check upgrade proposals") } @@ -146,7 +154,7 @@ func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error return nil } -func (w *UpgradeWatcher) checkUpgradeProposals(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) { +func (w *UpgradeWatcher) checkUpgradeProposalsV1(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) { clientCtx := (client.Context{}).WithClient(node.Client) queryClient := gov.NewQueryClient(clientCtx) @@ -159,29 +167,68 @@ func (w *UpgradeWatcher) checkUpgradeProposals(ctx context.Context, node *rpc.No } for _, proposal := range proposalsResp.GetProposals() { - if proposal.Content == nil { - continue + for _, message := range proposal.Messages { + plan, err := extractUpgradePlan(message) + if err != nil { + return nil, fmt.Errorf("failed to extract upgrade plan: %w", err) + } + if plan != nil { + return plan, nil + } } + } - cdc := codec.New(1) + return nil, nil +} - switch proposal.Content.TypeUrl { - case "/cosmos.upgrade.v1beta1.SoftwareUpgradeProposal": - var upgrade types.SoftwareUpgradeProposal - err := cdc.Unmarshal(proposal.Content.Value, &upgrade) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err) - } - return &upgrade.Plan, nil +func (w *UpgradeWatcher) checkUpgradeProposalsV1Beta1(ctx context.Context, node *rpc.Node) (*upgrade.Plan, error) { + clientCtx := (client.Context{}).WithClient(node.Client) + queryClient := govbeta.NewQueryClient(clientCtx) - case "/cosmos.upgrade.v1beta1.MsgSoftwareUpgrade": - var upgrade types.MsgSoftwareUpgrade - err := cdc.Unmarshal(proposal.Content.Value, &upgrade) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err) - } - return &upgrade.Plan, nil + // Fetch all proposals in voting period + proposalsResp, err := queryClient.Proposals(ctx, &govbeta.QueryProposalsRequest{ + ProposalStatus: govbeta.StatusVotingPeriod, + }) + if err != nil { + return nil, fmt.Errorf("failed to get proposals: %w", err) + } + + for _, proposal := range proposalsResp.GetProposals() { + plan, err := extractUpgradePlan(proposal.Content) + if err != nil { + return nil, fmt.Errorf("failed to extract upgrade plan: %w", err) + } + if plan != nil { + return plan, nil + } + } + + return nil, nil +} + +func extractUpgradePlan(content *codectypes.Any) (*upgrade.Plan, error) { + if content == nil { + return nil, nil + } + + cdc := codec.New(1) + + switch content.TypeUrl { + case "/cosmos.upgrade.v1beta1.SoftwareUpgradeProposal": + var upgrade types.SoftwareUpgradeProposal + err := cdc.Unmarshal(content.Value, &upgrade) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err) + } + return &upgrade.Plan, nil + + case "/cosmos.upgrade.v1beta1.MsgSoftwareUpgrade": + var upgrade types.MsgSoftwareUpgrade + err := cdc.Unmarshal(content.Value, &upgrade) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal software upgrade proposal: %w", err) } + return &upgrade.Plan, nil } return nil, nil diff --git a/pkg/watcher/votes.go b/pkg/watcher/votes.go new file mode 100644 index 0000000..7a33e2b --- /dev/null +++ b/pkg/watcher/votes.go @@ -0,0 +1,187 @@ +package watcher + +import ( + "context" + "fmt" + "time" + + "github.com/cosmos/cosmos-sdk/client" + gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1" + govbeta "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" + "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" + "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" + "github.com/rs/zerolog/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type VotesWatcher struct { + metrics *metrics.Metrics + validators []TrackedValidator + pool *rpc.Pool + options VotesWatcherOptions +} + +type VotesWatcherOptions struct { + GovModuleVersion string +} + +func NewVotesWatcher(validators []TrackedValidator, metrics *metrics.Metrics, pool *rpc.Pool, options VotesWatcherOptions) *VotesWatcher { + return &VotesWatcher{ + metrics: metrics, + validators: validators, + pool: pool, + options: options, + } +} + +func (w *VotesWatcher) Start(ctx context.Context) error { + ticker := time.NewTicker(1 * time.Minute) + + for { + node := w.pool.GetSyncedNode() + if node == nil { + log.Warn().Msg("no node available to fetch proposals") + } else if err := w.fetchProposalsV1(ctx, node); err != nil { + log.Error().Err(err).Msg("failed to fetch pending proposals") + } + + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + } +} + +func (w *VotesWatcher) fetchProposals(ctx context.Context, node *rpc.Node) error { + switch w.options.GovModuleVersion { + case "v1beta1": + return w.fetchProposalsV1Beta1(ctx, node) + default: // v1 + return w.fetchProposalsV1(ctx, node) + } +} +func (w *VotesWatcher) fetchProposalsV1(ctx context.Context, node *rpc.Node) error { + clientCtx := (client.Context{}).WithClient(node.Client) + queryClient := gov.NewQueryClient(clientCtx) + + // Fetch all proposals in voting period + proposalsResp, err := queryClient.Proposals(ctx, &gov.QueryProposalsRequest{ + ProposalStatus: gov.StatusVotingPeriod, + }) + if err != nil { + return fmt.Errorf("failed to get proposals: %w", err) + } + + chainID := node.ChainID() + + // For each proposal, fetch validators vote + for _, proposal := range proposalsResp.GetProposals() { + w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.Id)).Set(float64(proposal.VotingEndTime.Unix())) + + for _, validator := range w.validators { + voter := validator.AccountAddress() + if voter == "" { + log.Warn().Str("validator", validator.Name).Msg("no account address for validator") + continue + } + voteResp, err := queryClient.Vote(ctx, &gov.QueryVoteRequest{ + ProposalId: proposal.Id, + Voter: voter, + }) + + w.metrics.Vote.Reset() + if isInvalidArgumentError(err) { + w.handleVoteV1(chainID, validator, proposal.Id, nil) + } else if err != nil { + return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err) + } else { + vote := voteResp.GetVote() + w.handleVoteV1(chainID, validator, proposal.Id, vote.Options) + } + } + } + + return nil +} + +func (w *VotesWatcher) handleVoteV1(chainID string, validator TrackedValidator, proposalId uint64, votes []*gov.WeightedVoteOption) { + voted := false + for _, option := range votes { + if option.Option != gov.OptionEmpty { + voted = true + break + } + } + + w.metrics.Vote. + WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)). + Set(metrics.BoolToFloat64(voted)) +} + +func (w *VotesWatcher) fetchProposalsV1Beta1(ctx context.Context, node *rpc.Node) error { + clientCtx := (client.Context{}).WithClient(node.Client) + queryClient := govbeta.NewQueryClient(clientCtx) + + // Fetch all proposals in voting period + proposalsResp, err := queryClient.Proposals(ctx, &govbeta.QueryProposalsRequest{ + ProposalStatus: govbeta.StatusVotingPeriod, + }) + if err != nil { + return fmt.Errorf("failed to get proposals: %w", err) + } + + chainID := node.ChainID() + + // For each proposal, fetch validators vote + for _, proposal := range proposalsResp.GetProposals() { + w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.ProposalId)).Set(float64(proposal.VotingEndTime.Unix())) + + for _, validator := range w.validators { + voter := validator.AccountAddress() + if voter == "" { + log.Warn().Str("validator", validator.Name).Msg("no account address for validator") + continue + } + voteResp, err := queryClient.Vote(ctx, &govbeta.QueryVoteRequest{ + ProposalId: proposal.ProposalId, + Voter: voter, + }) + + w.metrics.Vote.Reset() + if isInvalidArgumentError(err) { + w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, nil) + } else if err != nil { + return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err) + } else { + vote := voteResp.GetVote() + w.handleVoteV1Beta1(chainID, validator, proposal.ProposalId, vote.Options) + } + } + } + + return nil +} + +func (w *VotesWatcher) handleVoteV1Beta1(chainID string, validator TrackedValidator, proposalId uint64, votes []govbeta.WeightedVoteOption) { + voted := false + for _, option := range votes { + if option.Option != govbeta.OptionEmpty { + voted = true + break + } + } + + w.metrics.Vote. + WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)). + Set(metrics.BoolToFloat64(voted)) +} + +func isInvalidArgumentError(err error) bool { + st, ok := status.FromError(err) + if !ok { + return false + } + return st.Code() == codes.InvalidArgument +} diff --git a/pkg/watcher/votes_v1beta1_test.go b/pkg/watcher/votes_test.go similarity index 72% rename from pkg/watcher/votes_v1beta1_test.go rename to pkg/watcher/votes_test.go index 93dd198..3140f2b 100644 --- a/pkg/watcher/votes_v1beta1_test.go +++ b/pkg/watcher/votes_test.go @@ -22,16 +22,19 @@ func TestVotesWatcher(t *testing.T) { } ) - votesWatcher := NewVotesV1Beta1Watcher( + votesWatcher := NewVotesWatcher( validators, metrics.New("cosmos_validator_watcher"), nil, + VotesWatcherOptions{ + GovModuleVersion: "v1beta1", + }, ) t.Run("Handle Votes", func(t *testing.T) { - votesWatcher.handleVote(chainID, validators[0], 40, nil) - votesWatcher.handleVote(chainID, validators[0], 41, []gov.WeightedVoteOption{{Option: gov.OptionEmpty}}) - votesWatcher.handleVote(chainID, validators[0], 42, []gov.WeightedVoteOption{{Option: gov.OptionYes}}) + votesWatcher.handleVoteV1Beta1(chainID, validators[0], 40, nil) + votesWatcher.handleVoteV1Beta1(chainID, validators[0], 41, []gov.WeightedVoteOption{{Option: gov.OptionEmpty}}) + votesWatcher.handleVoteV1Beta1(chainID, validators[0], 42, []gov.WeightedVoteOption{{Option: gov.OptionYes}}) assert.Equal(t, float64(0), testutil.ToFloat64(votesWatcher.metrics.Vote.WithLabelValues(chainID, kilnAddress, kilnName, "40"))) assert.Equal(t, float64(0), testutil.ToFloat64(votesWatcher.metrics.Vote.WithLabelValues(chainID, kilnAddress, kilnName, "41"))) diff --git a/pkg/watcher/votes_v1.go b/pkg/watcher/votes_v1.go deleted file mode 100644 index ef7a73e..0000000 --- a/pkg/watcher/votes_v1.go +++ /dev/null @@ -1,104 +0,0 @@ -package watcher - -import ( - "context" - "fmt" - "time" - - "github.com/cosmos/cosmos-sdk/client" - gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1" - "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" - "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" - "github.com/rs/zerolog/log" -) - -type VotesV1Watcher struct { - metrics *metrics.Metrics - validators []TrackedValidator - pool *rpc.Pool -} - -func NewVotesV1Watcher(validators []TrackedValidator, metrics *metrics.Metrics, pool *rpc.Pool) *VotesV1Watcher { - return &VotesV1Watcher{ - metrics: metrics, - validators: validators, - pool: pool, - } -} - -func (w *VotesV1Watcher) Start(ctx context.Context) error { - ticker := time.NewTicker(1 * time.Minute) - - for { - node := w.pool.GetSyncedNode() - if node == nil { - log.Warn().Msg("no node available to fetch proposals") - } else if err := w.fetchProposals(ctx, node); err != nil { - log.Error().Err(err).Msg("failed to fetch pending proposals") - } - - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - } - } -} - -func (w *VotesV1Watcher) fetchProposals(ctx context.Context, node *rpc.Node) error { - clientCtx := (client.Context{}).WithClient(node.Client) - queryClient := gov.NewQueryClient(clientCtx) - - // Fetch all proposals in voting period - proposalsResp, err := queryClient.Proposals(ctx, &gov.QueryProposalsRequest{ - ProposalStatus: gov.StatusVotingPeriod, - }) - if err != nil { - return fmt.Errorf("failed to get proposals: %w", err) - } - - chainID := node.ChainID() - - // For each proposal, fetch validators vote - for _, proposal := range proposalsResp.GetProposals() { - w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.Id)).Set(float64(proposal.VotingEndTime.Unix())) - - for _, validator := range w.validators { - voter := validator.AccountAddress() - if voter == "" { - log.Warn().Str("validator", validator.Name).Msg("no account address for validator") - continue - } - voteResp, err := queryClient.Vote(ctx, &gov.QueryVoteRequest{ - ProposalId: proposal.Id, - Voter: voter, - }) - - w.metrics.Vote.Reset() - if isInvalidArgumentError(err) { - w.handleVote(chainID, validator, proposal.Id, nil) - } else if err != nil { - return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.Id, err) - } else { - vote := voteResp.GetVote() - w.handleVote(chainID, validator, proposal.Id, vote.Options) - } - } - } - - return nil -} - -func (w *VotesV1Watcher) handleVote(chainID string, validator TrackedValidator, proposalId uint64, votes []*gov.WeightedVoteOption) { - voted := false - for _, option := range votes { - if option.Option != gov.OptionEmpty { - voted = true - break - } - } - - w.metrics.Vote. - WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)). - Set(metrics.BoolToFloat64(voted)) -} diff --git a/pkg/watcher/votes_v1beta1.go b/pkg/watcher/votes_v1beta1.go deleted file mode 100644 index 441be35..0000000 --- a/pkg/watcher/votes_v1beta1.go +++ /dev/null @@ -1,114 +0,0 @@ -package watcher - -import ( - "context" - "fmt" - "time" - - "github.com/cosmos/cosmos-sdk/client" - gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1" - "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" - "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" - "github.com/rs/zerolog/log" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type VotesV1Beta1Watcher struct { - metrics *metrics.Metrics - validators []TrackedValidator - pool *rpc.Pool -} - -func NewVotesV1Beta1Watcher(validators []TrackedValidator, metrics *metrics.Metrics, pool *rpc.Pool) *VotesV1Beta1Watcher { - return &VotesV1Beta1Watcher{ - metrics: metrics, - validators: validators, - pool: pool, - } -} - -func (w *VotesV1Beta1Watcher) Start(ctx context.Context) error { - ticker := time.NewTicker(1 * time.Minute) - - for { - node := w.pool.GetSyncedNode() - if node == nil { - log.Warn().Msg("no node available to fetch proposals") - } else if err := w.fetchProposals(ctx, node); err != nil { - log.Error().Err(err).Msg("failed to fetch pending proposals") - } - - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - } - } -} - -func (w *VotesV1Beta1Watcher) fetchProposals(ctx context.Context, node *rpc.Node) error { - clientCtx := (client.Context{}).WithClient(node.Client) - queryClient := gov.NewQueryClient(clientCtx) - - // Fetch all proposals in voting period - proposalsResp, err := queryClient.Proposals(ctx, &gov.QueryProposalsRequest{ - ProposalStatus: gov.StatusVotingPeriod, - }) - if err != nil { - return fmt.Errorf("failed to get proposals: %w", err) - } - - chainID := node.ChainID() - - // For each proposal, fetch validators vote - for _, proposal := range proposalsResp.GetProposals() { - w.metrics.ProposalEndTime.WithLabelValues(chainID, fmt.Sprintf("%d", proposal.ProposalId)).Set(float64(proposal.VotingEndTime.Unix())) - - for _, validator := range w.validators { - voter := validator.AccountAddress() - if voter == "" { - log.Warn().Str("validator", validator.Name).Msg("no account address for validator") - continue - } - voteResp, err := queryClient.Vote(ctx, &gov.QueryVoteRequest{ - ProposalId: proposal.ProposalId, - Voter: voter, - }) - - w.metrics.Vote.Reset() - if isInvalidArgumentError(err) { - w.handleVote(chainID, validator, proposal.ProposalId, nil) - } else if err != nil { - return fmt.Errorf("failed to get validator vote for proposal %d: %w", proposal.ProposalId, err) - } else { - vote := voteResp.GetVote() - w.handleVote(chainID, validator, proposal.ProposalId, vote.Options) - } - } - } - - return nil -} - -func (w *VotesV1Beta1Watcher) handleVote(chainID string, validator TrackedValidator, proposalId uint64, votes []gov.WeightedVoteOption) { - voted := false - for _, option := range votes { - if option.Option != gov.OptionEmpty { - voted = true - break - } - } - - w.metrics.Vote. - WithLabelValues(chainID, validator.Address, validator.Name, fmt.Sprintf("%d", proposalId)). - Set(metrics.BoolToFloat64(voted)) -} - -func isInvalidArgumentError(err error) bool { - st, ok := status.FromError(err) - if !ok { - return false - } - return st.Code() == codes.InvalidArgument -}