Skip to content

Commit

Permalink
Merge pull request #844 from forta-network/kisel/forta-1404-receive-m…
Browse files Browse the repository at this point in the history
…etrics

Metrics from Health handlers for Bots V2
  • Loading branch information
canercidam authored Jan 29, 2024
2 parents acb524f + 07de81d commit b067fd8
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 206 deletions.
48 changes: 37 additions & 11 deletions clients/bothttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,42 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"

"github.com/forta-network/forta-core-go/utils/httpclient"
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"
)

var (
// responseSizeLimit is the maximum number of bytes to read from the response body.
responseSizeLimit = int64(2 << 20) // 2MB
)

type HealthResponse struct {
Errors []string `json:"errors"`
Errors []string `json:"errors"`
Metrics []Metrics `json:"metrics"`
}

type Metrics struct {
// ChainID is the id of the chain the metrics are for
ChainID int64 `json:"chainId"`
DataPoints map[string][]float64 `json:"dataPoints"`
}

// Client is the bot HTTP client interface.
type Client interface {
Health(ctx context.Context) error
Health(ctx context.Context) ([]Metrics, error)
}

type botClient struct {
baseUrl string
httpClient *http.Client
}

// NewClient creates anew client.
// NewClient creates a new client.
func NewClient(host string, port int) Client {
return &botClient{
baseUrl: fmt.Sprintf("http://%s:%d", host, port),
Expand All @@ -34,31 +49,42 @@ func NewClient(host string, port int) Client {
}

// Health does a health check on the bot.
func (bc *botClient) Health(ctx context.Context) error {
func (bc *botClient) Health(ctx context.Context) ([]Metrics, error) {
healthUrl := fmt.Sprintf("%s/health", bc.baseUrl)
req, err := http.NewRequestWithContext(ctx, "GET", healthUrl, nil)
if err != nil {
return err
return nil, err
}

resp, err := bc.httpClient.Do(req)
if err != nil {
return err
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode > 200 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

var healthResp HealthResponse
if err := json.NewDecoder(resp.Body).Decode(&healthResp); err != nil {
return nil // ignore decoding errors

// Limit the response size to a certain number of bytes
limitedReader := io.LimitReader(resp.Body, responseSizeLimit)
if err := json.NewDecoder(limitedReader).Decode(&healthResp); err != nil {
if strings.Contains(err.Error(), "EOF") {
log.WithError(err).Warn("response size limit for health check is reached")
}

return nil, nil // ignore decoding errors
}

if len(healthResp.Errors) == 0 {
return nil
return healthResp.Metrics, nil
}

for _, errMsg := range healthResp.Errors {
err = multierror.Append(err, errors.New(errMsg))
}
return err

return healthResp.Metrics, err
}
35 changes: 33 additions & 2 deletions clients/bothttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"testing"

"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,12 +29,42 @@ func TestHealth(t *testing.T) {
go server.ListenAndServe()

client := NewClient("localhost", 8183)
err := client.Health(context.Background())
_, err := client.Health(context.Background())
r.NoError(err)

respData.Errors = append(respData.Errors, "some error msg")
err = client.Health(context.Background())
_, err = client.Health(context.Background())
r.Error(err)

respData = HealthResponse{
Metrics: []Metrics{
{
ChainID: 1,
DataPoints: map[string][]float64{
"tx.success": {1, 2, 3},
},
},
{
ChainID: 2,
DataPoints: map[string][]float64{
"tx.success": {3},
},
},
},
}

hook := test.NewGlobal()

metrics, err := client.Health(context.Background())
r.NoError(err)
r.EqualValues(respData.Metrics, metrics)

responseSizeLimit = 1

_, err = client.Health(context.Background())
r.NoError(err)
r.Equal(1, len(hook.Entries))
r.Equal("response size limit for health check is reached", hook.LastEntry().Message)

server.Close()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible
require (
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283
github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwU
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283 h1:MmvZ3so59eNLtsJgEnRS1cwy/uqI/PazAS0x9Xkl3+E=
github.com/forta-network/forta-core-go v0.0.0-20240129095537-dad5459b7283/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3 h1:tfuCghhFdyolM3CiapTxtdLVHcy7ssRUjo5JxwwJnGc=
github.com/forta-network/forta-core-go v0.0.0-20240129180226-af53540338f3/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
27 changes: 22 additions & 5 deletions services/components/botio/bot_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (bot *botClient) processTransaction(ctx context.Context, lg *log.Entry, req
// truncate findings
if len(resp.Findings) > MaxFindings {
dropped := len(resp.Findings) - MaxFindings
droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped))
droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped))
bot.msgClient.PublishProto(
messaging.SubjectMetricAgent,
&protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}},
Expand Down Expand Up @@ -613,8 +613,8 @@ func (bot *botClient) processBlock(ctx context.Context, lg *log.Entry, request *
// truncate findings
if len(resp.Findings) > MaxFindings {
dropped := len(resp.Findings) - MaxFindings
droppedMetric := metrics.CreateAgentMetric(
botConfig, metrics.MetricFindingsDropped, float64(dropped),
droppedMetric := metrics.CreateAgentMetricV1(
botConfig, domain.MetricFindingsDropped, float64(dropped),
)
bot.msgClient.PublishProto(
messaging.SubjectMetricAgent,
Expand Down Expand Up @@ -706,7 +706,7 @@ func (bot *botClient) processCombinationAlert(ctx context.Context, lg *log.Entry
// truncate findings
if len(resp.Findings) > MaxFindings {
dropped := len(resp.Findings) - MaxFindings
droppedMetric := metrics.CreateAgentMetric(botConfig, metrics.MetricFindingsDropped, float64(dropped))
droppedMetric := metrics.CreateAgentMetricV1(botConfig, domain.MetricFindingsDropped, float64(dropped))
bot.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{Metrics: []*protocol.AgentMetric{droppedMetric}},
)
Expand Down Expand Up @@ -754,7 +754,24 @@ func (bot *botClient) doHealthCheck(ctx context.Context, lg *log.Entry) bool {

var err error
if botConfig.ProtocolVersion >= 2 {
err = bot.clientV2.Health(ctx)
var botMetrics []bothttp.Metrics
botMetrics, err = bot.clientV2.Health(ctx)

if len(botMetrics) != 0 {
agentMetrics := make([]*protocol.AgentMetric, 0, len(botMetrics))
for _, botMetric := range botMetrics {
for metricName, metricValues := range botMetric.DataPoints {
for _, metricValue := range metricValues {
agentMetrics = append(agentMetrics, metrics.CreateAgentMetricV2(botConfig, metricName, metricValue, botMetric.ChainID))
}
}
}

bot.msgClient.PublishProto(
messaging.SubjectMetricAgent,
&protocol.AgentMetricList{Metrics: agentMetrics},
)
}
} else {
err = botClient.DoHealthCheck(ctx)
}
Expand Down
11 changes: 7 additions & 4 deletions services/components/botio/bot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,20 @@ func (s *BotClientSuite) TestStartProcessStop() {
s.lifecycleMetrics.EXPECT().ActionSubscribe(combinerSubscriptions)

// test health checks
healthCheckChan := make(chan interface{})
HealthCheckInterval = time.Second
s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).MinTimes(1)
s.botGrpc.EXPECT().DoHealthCheck(gomock.Any()).Return(nil).MinTimes(1)
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe).MinTimes(1)
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).MinTimes(1)
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe).Do(func(_ interface{}) {
close(healthCheckChan)
}).MinTimes(1)

s.msgClient.EXPECT().Publish(messaging.SubjectAgentsAlertSubscribe, combinerSubscriptions)
s.botClient.StartProcessing()
s.botClient.Initialize()

<-healthCheckChan

<-s.botClient.Initialized()

txReq := &protocol.EvaluateTxRequest{
Expand Down Expand Up @@ -193,8 +198,6 @@ func (s *BotClientSuite) TestStartProcessStop() {
s.lifecycleMetrics.EXPECT().ActionUnsubscribe(combinerSubscriptions)

s.r.NoError(s.botClient.Close())
// Using small sleep to allow goroutines to be executed (e.g. health check)
time.Sleep(30 * time.Millisecond)
}

func (s *BotClientSuite) TestCombinerBotSubscriptions() {
Expand Down
7 changes: 4 additions & 3 deletions services/components/botio/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/forta-network/forta-core-go/clients/health"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/messaging"
Expand Down Expand Up @@ -115,7 +116,7 @@ func (rs *requestSender) SendEvaluateTxRequest(req *protocol.EvaluateTxRequest)
}:
default: // do not try to send if the buffer is full
lg.WithField("bot", botConfig.ID).Debug("agent tx request buffer is full - skipping")
metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricTxDrop, 1))
metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricTxDrop, 1))
}
lg.WithFields(log.Fields{
"bot": botConfig.ID,
Expand Down Expand Up @@ -164,7 +165,7 @@ func (rs *requestSender) SendEvaluateBlockRequest(req *protocol.EvaluateBlockReq
}:
default: // do not try to send if the buffer is full
lg.WithField("bot", botConfig.ID).Warn("agent block request buffer is full - skipping")
metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricBlockDrop, 1))
metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricBlockDrop, 1))
}
lg.WithFields(
log.Fields{
Expand Down Expand Up @@ -249,7 +250,7 @@ func (rs *requestSender) SendEvaluateAlertRequest(req *protocol.EvaluateAlertReq
}:
default: // do not try to send if the buffer is full
lg.WithField("bot", botConfig.ID).Warn("agent alert request buffer is full - skipping")
metricsList = append(metricsList, metrics.CreateAgentMetric(botConfig, metrics.MetricCombinerDrop, 1))
metricsList = append(metricsList, metrics.CreateAgentMetricV1(botConfig, domain.MetricCombinerDrop, 1))
}

lg.WithFields(
Expand Down
4 changes: 1 addition & 3 deletions services/components/lifecycle/bot_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ func (blm *botLifecycleManager) ExitInactiveBots(ctx context.Context) error {
botConfig, found := blm.findBotConfigByID(inactiveBotID)
logger := log.WithField("bot", inactiveBotID)
if !found {
logger.Warn("could not find the config for inactive bot - skipping stop")
// send this metric by ID, because it doesn't have a shard ID (since it's not found)
blm.lifecycleMetrics.StatusInactive(config.AgentConfig{ID: inactiveBotID})
logger.Warn("could not find the config for inactive bot - skipping stop", inactiveBotID)
continue
}

Expand Down
3 changes: 2 additions & 1 deletion services/components/lifecycle/bot_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lifecycle
import (
"sync"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/services/components/metrics"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func (bm *botMonitor) UpdateWithMetrics(botMetrics *protocol.AgentMetricList) er
}

for _, botMetric := range botMetrics.Metrics {
if botMetric.Name == metrics.MetricStatusActive {
if botMetric.Name == domain.MetricStatusActive {
bm.saveBotActivity(botMetric.AgentId)
}
}
Expand Down
4 changes: 2 additions & 2 deletions services/components/lifecycle/bot_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"testing"
"time"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/services/components/metrics"
mock_metrics "github.com/forta-network/forta-node/services/components/metrics/mocks"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestBotMonitor(t *testing.T) {
r.NoError(botMonitor.UpdateWithMetrics(&protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
Name: metrics.MetricStatusActive,
Name: domain.MetricStatusActive,
AgentId: testTrackerBotID2,
},
},
Expand Down
5 changes: 3 additions & 2 deletions services/components/metrics/activity.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/config"
)
Expand All @@ -10,9 +11,9 @@ func FindActiveBotsFromMetrics(allBotMetrics []*protocol.AgentMetrics) (found []
for _, botMetrics := range allBotMetrics {
botID := botMetrics.AgentId
for _, botMetric := range botMetrics.Metrics {
if botMetric.Name == MetricHealthCheckSuccess {
if botMetric.Name == domain.MetricHealthCheckSuccess {
// copy over shardID value so metric will indicate shard
cfg := &config.AgentConfig{ID: botID}
cfg := &config.AgentConfig{ID: botID, ChainID: int(botMetric.ChainId)}
if botMetric.ShardId >= 0 {
cfg.ShardConfig = &config.ShardConfig{ShardID: uint(botMetric.ShardId)}
}
Expand Down
Loading

0 comments on commit b067fd8

Please sign in to comment.