Skip to content

Commit

Permalink
Merge pull request #49 from lidofinance/add-metrics
Browse files Browse the repository at this point in the history
Add metrics
  • Loading branch information
sergeyWh1te authored Dec 5, 2024
2 parents 8999d6b + 018baa1 commit 11e95cb
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 35 deletions.
5 changes: 5 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 05.12.2024
1. Remove PublishedAlerts metrics
2. Added NotifyChannels: `forwarder_notification_channel_error_total`
3. Inc `feeder_blocks_published_total` for each unsuccessful network request

## 28.11.2024
1. Field "uniqueKey" is required for getting quorum.

Expand Down
5 changes: 4 additions & 1 deletion internal/app/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {
case <-ticker.C:
block, err := w.chainSrv.GetLatestBlock(ctx)
if err != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf("GetLatestBlock error: %v", err))
continue
}
Expand All @@ -67,6 +68,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {

blockReceipts, err := w.chainSrv.GetBlockReceipts(ctx, block.Result.Hash)
if err != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf("GetBlockReceipts error: %v", err))
continue
}
Expand Down Expand Up @@ -110,6 +112,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {

payload, marshalErr := json.Marshal(blockDto)
if marshalErr != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf(`Could not marshal blockDto %s`, marshalErr))
continue
}
Expand All @@ -132,7 +135,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {
}

w.log.Info(fmt.Sprintf(`%d, %s`, blockDto.Number, blockDto.Hash), payloadSize)
w.metricsStore.PublishedAlerts.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc()
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc()
}
}
})
Expand Down
3 changes: 2 additions & 1 deletion internal/app/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

"golang.org/x/sync/errgroup"

"github.com/lidofinance/onchain-mon/internal/pkg/consumer"
"github.com/nats-io/nats.go/jetstream"

"github.com/lidofinance/onchain-mon/internal/pkg/consumer"
)

type worker struct {
Expand Down
13 changes: 6 additions & 7 deletions internal/connectors/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
type Store struct {
Prometheus *prometheus.Registry
BuildInfo prometheus.Counter
PublishedAlerts *prometheus.CounterVec
PublishedBlocks *prometheus.CounterVec
SentAlerts *prometheus.CounterVec
RedisErrors prometheus.Counter
SummaryHandlers *prometheus.HistogramVec
NotifyChannels *prometheus.CounterVec
}

const Status = `status`
Expand All @@ -40,17 +40,13 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store
"version": runtime.Version(),
},
}),
PublishedAlerts: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_finding_published_total", prefix),
Help: "The total number of published findings",
}, []string{Status}),
PublishedBlocks: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_blocks_published_total", prefix),
Help: "The total number of published blocks",
}, []string{Status}),
SentAlerts: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_finding_sent_total", prefix),
Help: "The total number of set findings",
Help: "The total number of published findings",
}, []string{ConsumerName, Status}),
RedisErrors: promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_redis_error_total", prefix),
Expand All @@ -61,11 +57,14 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store
Help: "Time spent processing request to notification channel",
Buckets: prometheus.DefBuckets,
}, []string{Channel}),
NotifyChannels: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_notification_channel_error_total", prefix),
Help: "The total number of network errors of telegram, discord, opsgenie channels",
}, []string{Channel, Status}),
}

store.Prometheus.MustRegister(
store.BuildInfo,
store.PublishedAlerts,
store.PublishedBlocks,
store.SentAlerts,
store.RedisErrors,
Expand Down
4 changes: 2 additions & 2 deletions internal/env/notify_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type NotificationConfig struct {
Consumers []*Consumer `mapstructure:"consumers"`
}

func ReadNotificationConfig(env string, configPath string) (*NotificationConfig, error) {
func ReadNotificationConfig(env, configPath string) (*NotificationConfig, error) {
v := viper.New()

if env != `local` {
Expand Down Expand Up @@ -179,7 +179,7 @@ func CollectNatsSubjects(cfg *NotificationConfig) []string {
}

out := make([]string, 0, len(natsSubjectsMap))
for subject, _ := range natsSubjectsMap {
for subject := range natsSubjectsMap {
out = append(out, subject)
}

Expand Down
19 changes: 15 additions & 4 deletions internal/pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

"github.com/go-redis/redis/v8"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/lidofinance/onchain-mon/generated/databus"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"

"github.com/lidofinance/onchain-mon/generated/databus"
"github.com/lidofinance/onchain-mon/internal/connectors/metrics"
"github.com/lidofinance/onchain-mon/internal/env"
"github.com/lidofinance/onchain-mon/internal/pkg/notifiler"
Expand Down Expand Up @@ -111,7 +111,15 @@ func NewConsumers(log *slog.Logger, metrics *metrics.Store, redisClient *redis.C
const LruSize = 125
cache := expirable.NewLRU[string, uint](LruSize, nil, time.Minute*10)

consumer := New(log, metrics, cache, redisClient, repo, consumerName, subject, consumerCfg.SeveritySet, consumerCfg.ByQuorum, quorumSize, notificationChannel)
consumer := New(
log, metrics, cache, redisClient, repo,
consumerName,
subject,
consumerCfg.SeveritySet,
consumerCfg.ByQuorum,
quorumSize,
notificationChannel,
)

consumers = append(consumers, consumer)
}
Expand Down Expand Up @@ -221,6 +229,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not increase key value: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}
Expand All @@ -229,9 +238,11 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err := c.redisClient.Expire(ctx, countKey, TTLMins10).Err(); err != nil {
c.log.Error(fmt.Sprintf(`Could not set expire time: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()

if _, err := c.redisClient.Decr(ctx, countKey).Result(); err != nil {
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.log.Error(fmt.Sprintf(`Could not decrease count key %s: %v`, countKey, err))
}

Expand All @@ -247,6 +258,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not get key value: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}
Expand Down Expand Up @@ -287,13 +299,12 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not get notification status: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}

if status == StatusSending {
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusOk}).Inc()

c.log.Info(fmt.Sprintf("Another instance is sending finding: %s", finding.AlertId))
return
}
Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/notifiler/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type MessagePayload struct {
Content string `json:"content"`
}

const DiscordLabel = `discord`

func NewDiscord(webhookURL string, httpClient *http.Client, metricsStore *metrics.Store, source string) *Discord {
return &Discord{
webhookURL: webhookURL,
Expand Down Expand Up @@ -72,13 +74,15 @@ func (d *Discord) send(ctx context.Context, message string) error {
defer func() {
resp.Body.Close()
duration := time.Since(start).Seconds()
d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `discord`}).Observe(duration)
d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: DiscordLabel}).Observe(duration)
}()

if resp.StatusCode != http.StatusNoContent {
d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusFail}).Inc()
return fmt.Errorf("received from Discord non-204 response code: %v", resp.Status)
}

d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusOk}).Inc()
return nil
}

Expand Down
20 changes: 12 additions & 8 deletions internal/pkg/notifiler/opsgenie.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func NewOpsgenie(opsGenieKey string, httpClient *http.Client, metricsStore *metr
}
}

func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
const OpsGenieLabel = `opsgenie`

func (o *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
opsGeniePriority := ""
switch alert.Severity {
case databus.SeverityCritical:
Expand All @@ -51,7 +53,7 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso
return nil
}

message := FormatAlert(alert, u.source)
message := FormatAlert(alert, o.source)

payload := AlertPayload{
Message: alert.Name,
Expand All @@ -60,10 +62,10 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso
Priority: opsGeniePriority,
}

return u.send(ctx, payload)
return o.send(ctx, payload)
}

func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
func (o *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("could not marshal OpsGenie payload: %w", err)
Expand All @@ -78,26 +80,28 @@ func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "GenieKey "+u.opsGenieKey)
req.Header.Set("Authorization", "GenieKey "+o.opsGenieKey)

start := time.Now()
resp, err := u.httpClient.Do(req)
resp, err := o.httpClient.Do(req)
if err != nil {
return fmt.Errorf("could not send OpsGenie request: %w", err)
}
defer func() {
resp.Body.Close()
duration := time.Since(start).Seconds()
u.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `opsgenie`}).Observe(duration)
o.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: OpsGenieLabel}).Observe(duration)
}()

if resp.StatusCode != http.StatusAccepted {
o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusFail}).Inc()
return fmt.Errorf("received from OpsGenie non-202 response code: %v", resp.Status)
}

o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusOk}).Inc()
return nil
}

func (d *OpsGenie) GetType() string {
func (o *OpsGenie) GetType() string {
return "OpsGenie"
}
24 changes: 13 additions & 11 deletions internal/pkg/notifiler/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,31 @@ func NewTelegram(botToken, chatID string, httpClient *http.Client, metricsStore

const MaxTelegramMessageLength = 4096
const WarningTelegramMessage = "Warn: Msg >=4096, pls review description message"
const TelegramLabel = `telegram`

func (u *Telegram) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
func (t *Telegram) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
message := TruncateMessageWithAlertID(
fmt.Sprintf("%s\n\n%s", alert.Name, FormatAlert(alert, u.source)),
fmt.Sprintf("%s\n\n%s", alert.Name, FormatAlert(alert, t.source)),
MaxTelegramMessageLength,
WarningTelegramMessage,
)

if alert.Severity != databus.SeverityUnknown {
m := escapeMarkdownV1(message)

if sendErr := u.send(ctx, m, true); sendErr != nil {
if sendErr := t.send(ctx, m, true); sendErr != nil {
message += "\n\nWarning: Could not send msg as markdown"
return u.send(ctx, message, false)
return t.send(ctx, message, false)
}

return nil
}

return u.send(ctx, message, false)
return t.send(ctx, message, false)
}

func (u *Telegram) send(ctx context.Context, message string, useMarkdown bool) error {
requestURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage?disable_web_page_preview=true&disable_notification=true&chat_id=-%s&text=%s", u.botToken, u.chatID, url.QueryEscape(message))
func (t *Telegram) send(ctx context.Context, message string, useMarkdown bool) error {
requestURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage?disable_web_page_preview=true&disable_notification=true&chat_id=-%s&text=%s", t.botToken, t.chatID, url.QueryEscape(message))
if useMarkdown {
requestURL += `&parse_mode=markdown`
}
Expand All @@ -69,25 +70,26 @@ func (u *Telegram) send(ctx context.Context, message string, useMarkdown bool) e

start := time.Now()

resp, err := u.httpClient.Do(req)
resp, err := t.httpClient.Do(req)
if err != nil {
return fmt.Errorf("could not send telegram request: %w", err)
}
defer func() {
resp.Body.Close()
duration := time.Since(start).Seconds()
u.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `telegram`}).Observe(duration)
t.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: TelegramLabel}).Observe(duration)
}()

if resp.StatusCode != http.StatusOK {
fmt.Println(resp)
t.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: TelegramLabel, metrics.Status: metrics.StatusFail}).Inc()
return fmt.Errorf("received from telegram non-200 response code: %v", resp.Status)
}

t.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: TelegramLabel, metrics.Status: metrics.StatusOk}).Inc()
return nil
}

func (d *Telegram) GetType() string {
func (t *Telegram) GetType() string {
return "Telegram"
}

Expand Down

0 comments on commit 11e95cb

Please sign in to comment.