Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send webhook on planned upgrade #43

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Features:
- Track the **staked amount** as well as the min seat price
- Track **pending proposals** and check if your validator has voted (including proposal end time)
- Expose **upgrade plan** to know when the next upgrade will happen (including pending proposals)
- Trigger webhook when an upgrade happens (soon)
- Trigger webhook when an upgrade happens

![Cosmos Validator Watcher Screenshot](assets/cosmos-validator-watcher-screenshot.jpg)

Expand Down Expand Up @@ -75,6 +75,7 @@ GLOBAL OPTIONS:
--denom value denom used in metrics label (eg. atom or uatom)
--denom-exponent value denom exponent (eg. 6 for atom, 1 for uatom) (default: 0)
--validator value [ --validator value ] validator address(es) to track (use :my-label to add a custom label in metrics & ouput)
--webhook-url value endpoint where to send upgrade webhooks
--x-gov value version of the gov module to use (v1|v1beta1) (default: "v1beta1")
--help, -h show help
--version, -v print the version
Expand Down
4 changes: 4 additions & 0 deletions pkg/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ var Flags = []cli.Flag{
Name: "validator",
Usage: "validator address(es) to track (use :my-label to add a custom label in metrics & ouput)",
},
&cli.StringFlag{
Name: "webhook-url",
Usage: "endpoint where to send upgrade webhooks (experimental)",
},
&cli.StringFlag{
Name: "x-gov",
Usage: "version of the gov module to use (v1|v1beta1)",
Expand Down
29 changes: 22 additions & 7 deletions pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/kilnfi/cosmos-validator-watcher/pkg/watcher"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
Expand All @@ -40,6 +42,7 @@ func RunFunc(cCtx *cli.Context) error {
denom = cCtx.String("denom")
denomExpon = cCtx.Uint("denom-exponent")
validators = cCtx.StringSlice("validator")
webhookURL = cCtx.String("webhook-url")
xGov = cCtx.String("x-gov")
)

Expand Down Expand Up @@ -86,12 +89,6 @@ func RunFunc(cCtx *cli.Context) error {
errg.Go(func() error {
return statusWatcher.Start(ctx)
})
// Register watchers on nodes events
for _, node := range pool.Nodes {
node.OnStart(blockWatcher.OnNodeStart)
node.OnStatus(statusWatcher.OnNodeStatus)
node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock)
}

//
// Pool watchers
Expand Down Expand Up @@ -121,13 +118,31 @@ func RunFunc(cCtx *cli.Context) error {
log.Warn().Msgf("unknown gov module version: %s", xGov)
}
}
upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, watcher.UpgradeWatcherOptions{
var wh *webhook.Webhook
if webhookURL != "" {
whURL, err := url.Parse(webhookURL)
if err != nil {
return fmt.Errorf("failed to parse webhook endpoint: %w", err)
}
wh = webhook.New(*whURL)
}
upgradeWatcher := watcher.NewUpgradeWatcher(metrics, pool, wh, watcher.UpgradeWatcherOptions{
CheckPendingProposals: !noGov,
})
errg.Go(func() error {
return upgradeWatcher.Start(ctx)
})

//
// Register watchers on nodes events
//
for _, node := range pool.Nodes {
node.OnStart(blockWatcher.OnNodeStart)
node.OnStatus(statusWatcher.OnNodeStatus)
node.OnEvent(rpc.EventNewBlock, blockWatcher.OnNewBlock)
node.OnEvent(rpc.EventNewBlock, upgradeWatcher.OnNewBlock)
}

//
// Start Pool
//
Expand Down
74 changes: 73 additions & 1 deletion pkg/watcher/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,39 @@ import (
"fmt"
"time"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
comettypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/cosmos-sdk/client"
gov "github.com/cosmos/cosmos-sdk/x/gov/types/v1beta1"
"github.com/cosmos/cosmos-sdk/x/upgrade/types"
upgrade "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/gogo/protobuf/codec"
"github.com/kilnfi/cosmos-validator-watcher/pkg/metrics"
"github.com/kilnfi/cosmos-validator-watcher/pkg/rpc"
"github.com/kilnfi/cosmos-validator-watcher/pkg/webhook"
"github.com/rs/zerolog/log"
)

type UpgradeWatcher struct {
metrics *metrics.Metrics
pool *rpc.Pool
webhook *webhook.Webhook
options UpgradeWatcherOptions

nextUpgradePlan *upgrade.Plan // known upgrade plan
latestBlockHeight int64 // latest block received
latestWebhookSent int64 // latest block for which webhook has been sent
}

type UpgradeWatcherOptions struct {
CheckPendingProposals bool
}

func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, options UpgradeWatcherOptions) *UpgradeWatcher {
func NewUpgradeWatcher(metrics *metrics.Metrics, pool *rpc.Pool, webhook *webhook.Webhook, options UpgradeWatcherOptions) *UpgradeWatcher {
return &UpgradeWatcher{
metrics: metrics,
pool: pool,
webhook: webhook,
options: options,
}
}
Expand All @@ -52,6 +61,68 @@ func (w *UpgradeWatcher) Start(ctx context.Context) error {
}
}

func (w *UpgradeWatcher) OnNewBlock(ctx context.Context, node *rpc.Node, evt *ctypes.ResultEvent) error {
// Ignore is webhook is not configured
if w.webhook == nil {
return nil
}

// Ignore if no upgrade plan
if w.nextUpgradePlan == nil {
return nil
}

// Ignore blocks if node is catching up
if !node.IsSynced() {
return nil
}

blockEvent := evt.Data.(comettypes.EventDataNewBlock)
block := blockEvent.Block

// Skip already processed blocks
if w.latestBlockHeight >= block.Height {
return nil
}

w.latestBlockHeight = block.Height

// Ignore if upgrade plan is for a future block
if block.Height < w.nextUpgradePlan.Height-1 {
return nil
}

// Ignore if webhook has already been sent
if w.latestWebhookSent >= w.nextUpgradePlan.Height {
return nil
}

// Upgrade plan is for this block
go w.triggerWebhook(ctx, node.ChainID(), *w.nextUpgradePlan)
w.nextUpgradePlan = nil
w.latestWebhookSent = w.nextUpgradePlan.Height

return nil
}

func (w *UpgradeWatcher) triggerWebhook(ctx context.Context, chainID string, plan upgrade.Plan) {
msg := struct {
Type string `json:"type"`
Block int64 `json:"block"`
ChainID string `json:"chain_id"`
Version string `json:"version"`
}{
Type: "upgrade",
Block: plan.Height,
ChainID: chainID,
Version: plan.Name,
}

if err := w.webhook.Send(ctx, msg); err != nil {
log.Error().Err(err).Msg("failed to send upgrade webhook")
}
}

func (w *UpgradeWatcher) fetchUpgrade(ctx context.Context, node *rpc.Node) error {
clientCtx := (client.Context{}).WithClient(node.Client)
queryClient := upgrade.NewQueryClient(clientCtx)
Expand Down Expand Up @@ -122,5 +193,6 @@ func (w *UpgradeWatcher) handleUpgradePlan(chainID string, plan *upgrade.Plan) {
return
}

w.nextUpgradePlan = plan
w.metrics.UpgradePlan.WithLabelValues(chainID, plan.Name).Set(float64(plan.Height))
}
1 change: 1 addition & 0 deletions pkg/watcher/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestUpgradeWatcher(t *testing.T) {
watcher := NewUpgradeWatcher(
metrics.New("cosmos_validator_watcher"),
nil,
nil,
UpgradeWatcherOptions{},
)

Expand Down
70 changes: 70 additions & 0 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package webhook

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/avast/retry-go/v4"
"github.com/rs/zerolog/log"
)

type Webhook struct {
endpoint url.URL
client *http.Client
}

func New(endpoint url.URL) *Webhook {
return &Webhook{
endpoint: endpoint,
client: &http.Client{},
}
}

func (w *Webhook) Send(ctx context.Context, message interface{}) error {
body, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}

log.Info().Msgf("sending webhook: %s", body)

req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint.String(), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")

retryOpts := []retry.Option{
retry.Context(ctx),
retry.Delay(1 * time.Second),
retry.Attempts(3),
retry.OnRetry(func(_ uint, err error) {
log.Warn().Err(err).Msgf("retrying webhook on %s", w.endpoint.String())
}),
}

return retry.Do(func() error {
return w.postRequest(ctx, req)
}, retryOpts...)
}

func (w *Webhook) postRequest(ctx context.Context, req *http.Request) error {
resp, err := w.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()

// Check if response is not 4xx or 5xx
if resp.StatusCode >= 400 {
return fmt.Errorf("unexpected response status: %s", resp.Status)
}

return nil
}
Loading