diff --git a/Changelog.md b/Changelog.md index b86eb30..6e44ce5 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,3 +1,8 @@ +## 05.12.2024 +1. Remove PublishedAlerts metrics +2. Added NotifyChannels: `forwarder_notification_channel_error_total` +3. Inc `feeder_blocks_published_total` for each unsuccessful network request + ## 28.11.2024 1. Field "uniqueKey" is required for getting quorum. diff --git a/internal/app/feeder/feeder.go b/internal/app/feeder/feeder.go index 38eef25..75de313 100644 --- a/internal/app/feeder/feeder.go +++ b/internal/app/feeder/feeder.go @@ -57,6 +57,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) { case <-ticker.C: block, err := w.chainSrv.GetLatestBlock(ctx) if err != nil { + w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc() w.log.Error(fmt.Sprintf("GetLatestBlock error: %v", err)) continue } @@ -67,6 +68,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) { blockReceipts, err := w.chainSrv.GetBlockReceipts(ctx, block.Result.Hash) if err != nil { + w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc() w.log.Error(fmt.Sprintf("GetBlockReceipts error: %v", err)) continue } @@ -110,6 +112,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) { payload, marshalErr := json.Marshal(blockDto) if marshalErr != nil { + w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc() w.log.Error(fmt.Sprintf(`Could not marshal blockDto %s`, marshalErr)) continue } @@ -132,7 +135,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) { } w.log.Info(fmt.Sprintf(`%d, %s`, blockDto.Number, blockDto.Hash), payloadSize) - w.metricsStore.PublishedAlerts.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc() + w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc() } } }) diff --git a/internal/app/forwarder/forwarder.go b/internal/app/forwarder/forwarder.go index e0ed971..4506110 100644 --- a/internal/app/forwarder/forwarder.go +++ b/internal/app/forwarder/forwarder.go @@ -8,8 +8,9 @@ import ( "golang.org/x/sync/errgroup" - "github.com/lidofinance/onchain-mon/internal/pkg/consumer" "github.com/nats-io/nats.go/jetstream" + + "github.com/lidofinance/onchain-mon/internal/pkg/consumer" ) type worker struct { diff --git a/internal/connectors/metrics/prometheus.go b/internal/connectors/metrics/prometheus.go index dbdbbc7..acd86fc 100644 --- a/internal/connectors/metrics/prometheus.go +++ b/internal/connectors/metrics/prometheus.go @@ -11,11 +11,11 @@ import ( type Store struct { Prometheus *prometheus.Registry BuildInfo prometheus.Counter - PublishedAlerts *prometheus.CounterVec PublishedBlocks *prometheus.CounterVec SentAlerts *prometheus.CounterVec RedisErrors prometheus.Counter SummaryHandlers *prometheus.HistogramVec + NotifyChannels *prometheus.CounterVec } const Status = `status` @@ -40,17 +40,13 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store "version": runtime.Version(), }, }), - PublishedAlerts: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: fmt.Sprintf("%s_finding_published_total", prefix), - Help: "The total number of published findings", - }, []string{Status}), PublishedBlocks: promauto.NewCounterVec(prometheus.CounterOpts{ Name: fmt.Sprintf("%s_blocks_published_total", prefix), Help: "The total number of published blocks", }, []string{Status}), SentAlerts: promauto.NewCounterVec(prometheus.CounterOpts{ Name: fmt.Sprintf("%s_finding_sent_total", prefix), - Help: "The total number of set findings", + Help: "The total number of published findings", }, []string{ConsumerName, Status}), RedisErrors: promauto.NewCounter(prometheus.CounterOpts{ Name: fmt.Sprintf("%s_redis_error_total", prefix), @@ -61,11 +57,14 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store Help: "Time spent processing request to notification channel", Buckets: prometheus.DefBuckets, }, []string{Channel}), + NotifyChannels: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: fmt.Sprintf("%s_notification_channel_error_total", prefix), + Help: "The total number of network errors of telegram, discord, opsgenie channels", + }, []string{Channel, Status}), } store.Prometheus.MustRegister( store.BuildInfo, - store.PublishedAlerts, store.PublishedBlocks, store.SentAlerts, store.RedisErrors, diff --git a/internal/env/notify_config.go b/internal/env/notify_config.go index c9d41d8..0fd4207 100644 --- a/internal/env/notify_config.go +++ b/internal/env/notify_config.go @@ -59,7 +59,7 @@ type NotificationConfig struct { Consumers []*Consumer `mapstructure:"consumers"` } -func ReadNotificationConfig(env string, configPath string) (*NotificationConfig, error) { +func ReadNotificationConfig(env, configPath string) (*NotificationConfig, error) { v := viper.New() if env != `local` { @@ -179,7 +179,7 @@ func CollectNatsSubjects(cfg *NotificationConfig) []string { } out := make([]string, 0, len(natsSubjectsMap)) - for subject, _ := range natsSubjectsMap { + for subject := range natsSubjectsMap { out = append(out, subject) } diff --git a/internal/pkg/consumer/consumer.go b/internal/pkg/consumer/consumer.go index 89b11a0..9356be3 100644 --- a/internal/pkg/consumer/consumer.go +++ b/internal/pkg/consumer/consumer.go @@ -10,10 +10,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/lidofinance/onchain-mon/generated/databus" "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" + "github.com/lidofinance/onchain-mon/generated/databus" "github.com/lidofinance/onchain-mon/internal/connectors/metrics" "github.com/lidofinance/onchain-mon/internal/env" "github.com/lidofinance/onchain-mon/internal/pkg/notifiler" @@ -111,7 +111,15 @@ func NewConsumers(log *slog.Logger, metrics *metrics.Store, redisClient *redis.C const LruSize = 125 cache := expirable.NewLRU[string, uint](LruSize, nil, time.Minute*10) - consumer := New(log, metrics, cache, redisClient, repo, consumerName, subject, consumerCfg.SeveritySet, consumerCfg.ByQuorum, quorumSize, notificationChannel) + consumer := New( + log, metrics, cache, redisClient, repo, + consumerName, + subject, + consumerCfg.SeveritySet, + consumerCfg.ByQuorum, + quorumSize, + notificationChannel, + ) consumers = append(consumers, consumer) } @@ -221,6 +229,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg if err != nil { c.log.Error(fmt.Sprintf(`Could not increase key value: %v`, err)) c.metrics.RedisErrors.Inc() + c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc() c.nackMessage(msg) return } @@ -229,9 +238,11 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg if err := c.redisClient.Expire(ctx, countKey, TTLMins10).Err(); err != nil { c.log.Error(fmt.Sprintf(`Could not set expire time: %v`, err)) c.metrics.RedisErrors.Inc() + c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc() if _, err := c.redisClient.Decr(ctx, countKey).Result(); err != nil { c.metrics.RedisErrors.Inc() + c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc() c.log.Error(fmt.Sprintf(`Could not decrease count key %s: %v`, countKey, err)) } @@ -247,6 +258,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg if err != nil { c.log.Error(fmt.Sprintf(`Could not get key value: %v`, err)) c.metrics.RedisErrors.Inc() + c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc() c.nackMessage(msg) return } @@ -287,13 +299,12 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg if err != nil { c.log.Error(fmt.Sprintf(`Could not get notification status: %v`, err)) c.metrics.RedisErrors.Inc() + c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc() c.nackMessage(msg) return } if status == StatusSending { - c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusOk}).Inc() - c.log.Info(fmt.Sprintf("Another instance is sending finding: %s", finding.AlertId)) return } diff --git a/internal/pkg/notifiler/discord.go b/internal/pkg/notifiler/discord.go index 88c997f..9cd7b8d 100644 --- a/internal/pkg/notifiler/discord.go +++ b/internal/pkg/notifiler/discord.go @@ -25,6 +25,8 @@ type MessagePayload struct { Content string `json:"content"` } +const DiscordLabel = `discord` + func NewDiscord(webhookURL string, httpClient *http.Client, metricsStore *metrics.Store, source string) *Discord { return &Discord{ webhookURL: webhookURL, @@ -72,13 +74,15 @@ func (d *Discord) send(ctx context.Context, message string) error { defer func() { resp.Body.Close() duration := time.Since(start).Seconds() - d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `discord`}).Observe(duration) + d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: DiscordLabel}).Observe(duration) }() if resp.StatusCode != http.StatusNoContent { + d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusFail}).Inc() return fmt.Errorf("received from Discord non-204 response code: %v", resp.Status) } + d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusOk}).Inc() return nil } diff --git a/internal/pkg/notifiler/opsgenie.go b/internal/pkg/notifiler/opsgenie.go index b0274b4..f7c984f 100644 --- a/internal/pkg/notifiler/opsgenie.go +++ b/internal/pkg/notifiler/opsgenie.go @@ -37,7 +37,9 @@ func NewOpsgenie(opsGenieKey string, httpClient *http.Client, metricsStore *metr } } -func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error { +const OpsGenieLabel = `opsgenie` + +func (o *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error { opsGeniePriority := "" switch alert.Severity { case databus.SeverityCritical: @@ -51,7 +53,7 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso return nil } - message := FormatAlert(alert, u.source) + message := FormatAlert(alert, o.source) payload := AlertPayload{ Message: alert.Name, @@ -60,10 +62,10 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso Priority: opsGeniePriority, } - return u.send(ctx, payload) + return o.send(ctx, payload) } -func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error { +func (o *OpsGenie) send(ctx context.Context, payload AlertPayload) error { payloadBytes, err := json.Marshal(payload) if err != nil { return fmt.Errorf("could not marshal OpsGenie payload: %w", err) @@ -78,26 +80,28 @@ func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error { } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "GenieKey "+u.opsGenieKey) + req.Header.Set("Authorization", "GenieKey "+o.opsGenieKey) start := time.Now() - resp, err := u.httpClient.Do(req) + resp, err := o.httpClient.Do(req) if err != nil { return fmt.Errorf("could not send OpsGenie request: %w", err) } defer func() { resp.Body.Close() duration := time.Since(start).Seconds() - u.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `opsgenie`}).Observe(duration) + o.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: OpsGenieLabel}).Observe(duration) }() if resp.StatusCode != http.StatusAccepted { + o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusFail}).Inc() return fmt.Errorf("received from OpsGenie non-202 response code: %v", resp.Status) } + o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusOk}).Inc() return nil } -func (d *OpsGenie) GetType() string { +func (o *OpsGenie) GetType() string { return "OpsGenie" } diff --git a/internal/pkg/notifiler/telegram.go b/internal/pkg/notifiler/telegram.go index b366afc..b501cf8 100644 --- a/internal/pkg/notifiler/telegram.go +++ b/internal/pkg/notifiler/telegram.go @@ -34,10 +34,11 @@ func NewTelegram(botToken, chatID string, httpClient *http.Client, metricsStore const MaxTelegramMessageLength = 4096 const WarningTelegramMessage = "Warn: Msg >=4096, pls review description message" +const TelegramLabel = `telegram` -func (u *Telegram) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error { +func (t *Telegram) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error { message := TruncateMessageWithAlertID( - fmt.Sprintf("%s\n\n%s", alert.Name, FormatAlert(alert, u.source)), + fmt.Sprintf("%s\n\n%s", alert.Name, FormatAlert(alert, t.source)), MaxTelegramMessageLength, WarningTelegramMessage, ) @@ -45,19 +46,19 @@ func (u *Telegram) SendFinding(ctx context.Context, alert *databus.FindingDtoJso if alert.Severity != databus.SeverityUnknown { m := escapeMarkdownV1(message) - if sendErr := u.send(ctx, m, true); sendErr != nil { + if sendErr := t.send(ctx, m, true); sendErr != nil { message += "\n\nWarning: Could not send msg as markdown" - return u.send(ctx, message, false) + return t.send(ctx, message, false) } return nil } - return u.send(ctx, message, false) + return t.send(ctx, message, false) } -func (u *Telegram) send(ctx context.Context, message string, useMarkdown bool) error { - requestURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage?disable_web_page_preview=true&disable_notification=true&chat_id=-%s&text=%s", u.botToken, u.chatID, url.QueryEscape(message)) +func (t *Telegram) send(ctx context.Context, message string, useMarkdown bool) error { + requestURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage?disable_web_page_preview=true&disable_notification=true&chat_id=-%s&text=%s", t.botToken, t.chatID, url.QueryEscape(message)) if useMarkdown { requestURL += `&parse_mode=markdown` } @@ -69,25 +70,26 @@ func (u *Telegram) send(ctx context.Context, message string, useMarkdown bool) e start := time.Now() - resp, err := u.httpClient.Do(req) + resp, err := t.httpClient.Do(req) if err != nil { return fmt.Errorf("could not send telegram request: %w", err) } defer func() { resp.Body.Close() duration := time.Since(start).Seconds() - u.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `telegram`}).Observe(duration) + t.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: TelegramLabel}).Observe(duration) }() if resp.StatusCode != http.StatusOK { - fmt.Println(resp) + t.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: TelegramLabel, metrics.Status: metrics.StatusFail}).Inc() return fmt.Errorf("received from telegram non-200 response code: %v", resp.Status) } + t.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: TelegramLabel, metrics.Status: metrics.StatusOk}).Inc() return nil } -func (d *Telegram) GetType() string { +func (t *Telegram) GetType() string { return "Telegram" }