Skip to content

Commit

Permalink
Merge pull request #50 from lidofinance/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
sergeyWh1te authored Dec 5, 2024
2 parents 0ab62d1 + 11e95cb commit 3138a52
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 73 deletions.
8 changes: 8 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 05.12.2024
1. Remove PublishedAlerts metrics
2. Added NotifyChannels: `forwarder_notification_channel_error_total`
3. Inc `feeder_blocks_published_total` for each unsuccessful network request

## 28.11.2024
1. Field "uniqueKey" is required for getting quorum.

## 16.10.2024
1. Added dynamic yaml notification config

Expand Down
2 changes: 1 addition & 1 deletion brief/databus/finding.dto.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"type": "string"
}
},
"required": ["severity", "alertId", "name", "description", "botName", "team"],
"required": ["severity", "alertId", "name", "description", "botName", "team", "uniqueKey"],
"definitions": {
"Severity": {
"type": "string",
Expand Down
1 change: 1 addition & 0 deletions cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func main() {
MaxAge: 10 * time.Minute,
Subjects: env.CollectNatsSubjects(notificationConfig),
MaxMsgSize: maxMsgSize,
Retention: jetstream.InterestPolicy,
})
if err != nil && !errors.Is(err, nats.ErrStreamNameAlreadyInUse) {
fmt.Printf("could not create %s stream error: %v\n", natsStreamName, err)
Expand Down
6 changes: 2 additions & 4 deletions generated/databus/block.dto.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions generated/databus/finding.dto.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/app/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {
case <-ticker.C:
block, err := w.chainSrv.GetLatestBlock(ctx)
if err != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf("GetLatestBlock error: %v", err))
continue
}
Expand All @@ -67,6 +68,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {

blockReceipts, err := w.chainSrv.GetBlockReceipts(ctx, block.Result.Hash)
if err != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf("GetBlockReceipts error: %v", err))
continue
}
Expand Down Expand Up @@ -110,6 +112,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {

payload, marshalErr := json.Marshal(blockDto)
if marshalErr != nil {
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusFail}).Inc()
w.log.Error(fmt.Sprintf(`Could not marshal blockDto %s`, marshalErr))
continue
}
Expand All @@ -132,7 +135,7 @@ func (w *Feeder) Run(ctx context.Context, g *errgroup.Group) {
}

w.log.Info(fmt.Sprintf(`%d, %s`, blockDto.Number, blockDto.Hash), payloadSize)
w.metricsStore.PublishedAlerts.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc()
w.metricsStore.PublishedBlocks.With(prometheus.Labels{metrics.Status: metrics.StatusOk}).Inc()
}
}
})
Expand Down
3 changes: 2 additions & 1 deletion internal/app/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

"golang.org/x/sync/errgroup"

"github.com/lidofinance/onchain-mon/internal/pkg/consumer"
"github.com/nats-io/nats.go/jetstream"

"github.com/lidofinance/onchain-mon/internal/pkg/consumer"
)

type worker struct {
Expand Down
13 changes: 6 additions & 7 deletions internal/connectors/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
type Store struct {
Prometheus *prometheus.Registry
BuildInfo prometheus.Counter
PublishedAlerts *prometheus.CounterVec
PublishedBlocks *prometheus.CounterVec
SentAlerts *prometheus.CounterVec
RedisErrors prometheus.Counter
SummaryHandlers *prometheus.HistogramVec
NotifyChannels *prometheus.CounterVec
}

const Status = `status`
Expand All @@ -40,17 +40,13 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store
"version": runtime.Version(),
},
}),
PublishedAlerts: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_finding_published_total", prefix),
Help: "The total number of published findings",
}, []string{Status}),
PublishedBlocks: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_blocks_published_total", prefix),
Help: "The total number of published blocks",
}, []string{Status}),
SentAlerts: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_finding_sent_total", prefix),
Help: "The total number of set findings",
Help: "The total number of published findings",
}, []string{ConsumerName, Status}),
RedisErrors: promauto.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_redis_error_total", prefix),
Expand All @@ -61,11 +57,14 @@ func New(promRegistry *prometheus.Registry, prefix, appName, env string) *Store
Help: "Time spent processing request to notification channel",
Buckets: prometheus.DefBuckets,
}, []string{Channel}),
NotifyChannels: promauto.NewCounterVec(prometheus.CounterOpts{
Name: fmt.Sprintf("%s_notification_channel_error_total", prefix),
Help: "The total number of network errors of telegram, discord, opsgenie channels",
}, []string{Channel, Status}),
}

store.Prometheus.MustRegister(
store.BuildInfo,
store.PublishedAlerts,
store.PublishedBlocks,
store.SentAlerts,
store.RedisErrors,
Expand Down
4 changes: 2 additions & 2 deletions internal/env/notify_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type NotificationConfig struct {
Consumers []*Consumer `mapstructure:"consumers"`
}

func ReadNotificationConfig(env string, configPath string) (*NotificationConfig, error) {
func ReadNotificationConfig(env, configPath string) (*NotificationConfig, error) {
v := viper.New()

if env != `local` {
Expand Down Expand Up @@ -179,7 +179,7 @@ func CollectNatsSubjects(cfg *NotificationConfig) []string {
}

out := make([]string, 0, len(natsSubjectsMap))
for subject, _ := range natsSubjectsMap {
for subject := range natsSubjectsMap {
out = append(out, subject)
}

Expand Down
47 changes: 16 additions & 31 deletions internal/pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package consumer

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
Expand All @@ -13,10 +10,10 @@ import (

"github.com/go-redis/redis/v8"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/lidofinance/onchain-mon/generated/databus"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"

"github.com/lidofinance/onchain-mon/generated/databus"
"github.com/lidofinance/onchain-mon/internal/connectors/metrics"
"github.com/lidofinance/onchain-mon/internal/env"
"github.com/lidofinance/onchain-mon/internal/pkg/notifiler"
Expand Down Expand Up @@ -114,7 +111,15 @@ func NewConsumers(log *slog.Logger, metrics *metrics.Store, redisClient *redis.C
const LruSize = 125
cache := expirable.NewLRU[string, uint](LruSize, nil, time.Minute*10)

consumer := New(log, metrics, cache, redisClient, repo, consumerName, subject, consumerCfg.SeveritySet, consumerCfg.ByQuorum, quorumSize, notificationChannel)
consumer := New(
log, metrics, cache, redisClient, repo,
consumerName,
subject,
consumerCfg.SeveritySet,
consumerCfg.ByQuorum,
quorumSize,
notificationChannel,
)

consumers = append(consumers, consumer)
}
Expand Down Expand Up @@ -208,7 +213,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
return
}

key := findingToUniqueHash(finding)
key := finding.UniqueKey
countKey := fmt.Sprintf(countTemplate, c.name, key)
statusKey := fmt.Sprintf(statusTemplate, c.name, key)

Expand All @@ -224,6 +229,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not increase key value: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}
Expand All @@ -232,9 +238,11 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err := c.redisClient.Expire(ctx, countKey, TTLMins10).Err(); err != nil {
c.log.Error(fmt.Sprintf(`Could not set expire time: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()

if _, err := c.redisClient.Decr(ctx, countKey).Result(); err != nil {
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.log.Error(fmt.Sprintf(`Could not decrease count key %s: %v`, countKey, err))
}

Expand All @@ -250,6 +258,7 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not get key value: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}
Expand Down Expand Up @@ -290,13 +299,12 @@ func (c *Consumer) GetConsumeHandler(ctx context.Context) func(msg jetstream.Msg
if err != nil {
c.log.Error(fmt.Sprintf(`Could not get notification status: %v`, err))
c.metrics.RedisErrors.Inc()
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusFail}).Inc()
c.nackMessage(msg)
return
}

if status == StatusSending {
c.metrics.SentAlerts.With(prometheus.Labels{metrics.ConsumerName: c.name, metrics.Status: metrics.StatusOk}).Inc()

c.log.Info(fmt.Sprintf("Another instance is sending finding: %s", finding.AlertId))
return
}
Expand Down Expand Up @@ -405,26 +413,3 @@ func (c *Consumer) ackMessage(msg jetstream.Msg) {
c.log.Error(fmt.Sprintf(`Could not ack msg: %v`, ackErr))
}
}

func computeSHA256Hash(data []byte) string {
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
}

func findingToUniqueHash(f *databus.FindingDtoJson) string {
var buffer bytes.Buffer

if f.UniqueKey != nil {
buffer.WriteString(f.Team)
buffer.WriteString(f.BotName)
buffer.WriteString(*f.UniqueKey)
} else {
buffer.WriteString(f.Team)
buffer.WriteString(f.BotName)
buffer.WriteString(f.AlertId)
buffer.WriteString(f.Name)
buffer.WriteString(string(f.Severity))
}

return computeSHA256Hash(buffer.Bytes())
}
6 changes: 5 additions & 1 deletion internal/pkg/notifiler/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type MessagePayload struct {
Content string `json:"content"`
}

const DiscordLabel = `discord`

func NewDiscord(webhookURL string, httpClient *http.Client, metricsStore *metrics.Store, source string) *Discord {
return &Discord{
webhookURL: webhookURL,
Expand Down Expand Up @@ -72,13 +74,15 @@ func (d *Discord) send(ctx context.Context, message string) error {
defer func() {
resp.Body.Close()
duration := time.Since(start).Seconds()
d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `discord`}).Observe(duration)
d.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: DiscordLabel}).Observe(duration)
}()

if resp.StatusCode != http.StatusNoContent {
d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusFail}).Inc()
return fmt.Errorf("received from Discord non-204 response code: %v", resp.Status)
}

d.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: DiscordLabel, metrics.Status: metrics.StatusOk}).Inc()
return nil
}

Expand Down
20 changes: 12 additions & 8 deletions internal/pkg/notifiler/opsgenie.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func NewOpsgenie(opsGenieKey string, httpClient *http.Client, metricsStore *metr
}
}

func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
const OpsGenieLabel = `opsgenie`

func (o *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJson) error {
opsGeniePriority := ""
switch alert.Severity {
case databus.SeverityCritical:
Expand All @@ -51,7 +53,7 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso
return nil
}

message := FormatAlert(alert, u.source)
message := FormatAlert(alert, o.source)

payload := AlertPayload{
Message: alert.Name,
Expand All @@ -60,10 +62,10 @@ func (u *OpsGenie) SendFinding(ctx context.Context, alert *databus.FindingDtoJso
Priority: opsGeniePriority,
}

return u.send(ctx, payload)
return o.send(ctx, payload)
}

func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
func (o *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("could not marshal OpsGenie payload: %w", err)
Expand All @@ -78,26 +80,28 @@ func (u *OpsGenie) send(ctx context.Context, payload AlertPayload) error {
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "GenieKey "+u.opsGenieKey)
req.Header.Set("Authorization", "GenieKey "+o.opsGenieKey)

start := time.Now()
resp, err := u.httpClient.Do(req)
resp, err := o.httpClient.Do(req)
if err != nil {
return fmt.Errorf("could not send OpsGenie request: %w", err)
}
defer func() {
resp.Body.Close()
duration := time.Since(start).Seconds()
u.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: `opsgenie`}).Observe(duration)
o.metrics.SummaryHandlers.With(prometheus.Labels{metrics.Channel: OpsGenieLabel}).Observe(duration)
}()

if resp.StatusCode != http.StatusAccepted {
o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusFail}).Inc()
return fmt.Errorf("received from OpsGenie non-202 response code: %v", resp.Status)
}

o.metrics.NotifyChannels.With(prometheus.Labels{metrics.Channel: OpsGenieLabel, metrics.Status: metrics.StatusOk}).Inc()
return nil
}

func (d *OpsGenie) GetType() string {
func (o *OpsGenie) GetType() string {
return "OpsGenie"
}
Loading

0 comments on commit 3138a52

Please sign in to comment.