Skip to content

Commit

Permalink
Metrics draft
Browse files Browse the repository at this point in the history
  • Loading branch information
dkeysil committed Mar 21, 2024
1 parent 2f3b2a9 commit ed41b8c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3
github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3 h1:XTYIVCvUamlyjdPlYkegbT+Fdvr0wmgt3g9Llu7BrKw=
github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 h1:v+snSZVsMUPtPX6pI5oxULTWiAfJ1igeE0Iqilma7/Y=
github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
43 changes: 0 additions & 43 deletions services/json-rpc/cache/errors.go

This file was deleted.

119 changes: 103 additions & 16 deletions services/json-rpc/cache/json_rpc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"strconv"
"time"

"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/blocksdata"
"github.com/forta-network/forta-node/clients/messaging"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/services/components/registry"
log "github.com/sirupsen/logrus"
Expand All @@ -36,6 +39,7 @@ type JsonRpcCache struct {
cfg config.JsonRpcCacheConfig
botAuthenticator clients.IPAuthenticator
botRegistry registry.BotRegistry
msgClient clients.MessageClient

server *http.Server

Expand All @@ -55,6 +59,7 @@ func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig, botRegi
cfg: cfg,
botAuthenticator: botAuthenticator,
botRegistry: botRegistry,
msgClient: messaging.NewClient("json-rpc-cache", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort)),
}, nil
}

Expand Down Expand Up @@ -88,6 +93,8 @@ func (p *JsonRpcCache) Name() string {

func (c *JsonRpcCache) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
t := time.Now()
req, err := decodeBody(r)
if err != nil {
writeBadRequest(w, req, err)
Expand All @@ -100,6 +107,26 @@ func (c *JsonRpcCache) Handler() http.Handler {
return
}

defer func() {
if err != nil {
c.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
AgentId: agentConfig.ID,
Timestamp: time.Now().Format(time.RFC3339),
Name: domain.MetricJSONRPCCachePollError,
Value: 1,
Details: err.Error(),
ShardId: agentConfig.ShardID(),
ChainId: int64(agentConfig.ChainID),
},
},
},
)
}
}()

chainID, err := strconv.ParseInt(r.Header.Get(ChainIDHeader), 10, 64)
if err != nil {
writeBadRequest(w, req, fmt.Errorf("missing or invalid chain id header"))
Expand All @@ -110,19 +137,22 @@ func (c *JsonRpcCache) Handler() http.Handler {

result, ok := c.cache.Get(uint64(chainID), req.Method, string(req.Params))
if !ok {
resp := &jsonRpcResp{
ID: req.ID,
JsonRPC: "2.0",
Result: nil,
Error: &jsonRpcError{
Code: -32603,
Message: "result not found in cache",
c.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
AgentId: agentConfig.ID,
Timestamp: time.Now().Format(time.RFC3339),
Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheMiss, req.Method),
Value: 1,
Details: cacheKey(uint64(chainID), req.Method, string(req.Params)),
ShardId: agentConfig.ShardID(),
ChainId: int64(agentConfig.ChainID),
},
},
},
}

if err := json.NewEncoder(w).Encode(resp); err != nil {
log.WithError(err).Error("failed to write jsonrpc response body")
}
)
writeNotFound(w, req)
return
}

Expand All @@ -138,8 +168,32 @@ func (c *JsonRpcCache) Handler() http.Handler {
Result: json.RawMessage(b),
}

if err := json.NewEncoder(w).Encode(resp); err != nil {
if err = json.NewEncoder(w).Encode(resp); err != nil {
log.WithError(err).Error("failed to write jsonrpc response body")
} else {
c.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
AgentId: agentConfig.ID,
Timestamp: time.Now().Format(time.RFC3339),
Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheHit, req.Method),
Value: 1,
ShardId: agentConfig.ShardID(),
ChainId: chainID,
},
{
AgentId: agentConfig.ID,
Timestamp: time.Now().Format(time.RFC3339),
Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheLatency, req.Method),
Value: float64(time.Since(t).Milliseconds()),
Details: cacheKey(uint64(chainID), req.Method, string(req.Params)),
ShardId: agentConfig.ShardID(),
ChainId: chainID,
},
},
},
)
}
})
}
Expand All @@ -162,14 +216,47 @@ func (c *JsonRpcCache) pollBlocksData() {

// blocksDataClient internally retries on failure and to not block on the retry, we run it in a goroutine
go func() {
events, err := c.blocksDataClient.GetBlocksData(bucket)
blocksData, err := c.blocksDataClient.GetBlocksData(bucket)
if err != nil {
c.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
AgentId: "system",
Timestamp: time.Now().Format(time.RFC3339),
Name: domain.MetricJSONRPCCachePollError,
Value: 1,
Details: err.Error(),
},
},
},
)
log.WithError(err).Errorf("Failed to get BlocksData from dispatcher. bucket: %d", bucket)
return
}

log.Infof("Added BlocksData to local cache. bucket: %d events: %d", bucket, len(events.Blocks))
c.cache.Append(events)
c.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: []*protocol.AgentMetric{
{
AgentId: "system",
Timestamp: time.Now().Format(time.RFC3339),
Name: domain.MetricJSONRPCCachePollSuccess,
Value: float64(len(blocksData.Blocks)),
Details: fmt.Sprintf("%d", bucket),
},
{
AgentId: "system",
Timestamp: time.Now().Format(time.RFC3339),
Name: domain.MetricJSONRPCCacheSize,
Value: float64(c.cache.cache.ItemCount()),
},
},
},
)

log.Infof("Added BlocksData to local cache. bucket: %d blocksData: %d", bucket, len(blocksData.Blocks))
c.cache.Append(blocksData)
}()

}
Expand Down
3 changes: 3 additions & 0 deletions services/json-rpc/cache/json_rpc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func TestJsonRpcCache(t *testing.T) {
blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl)
authenticator := mock_clients.NewMockIPAuthenticator(ctrl)
botRegistry := mock_registry.NewMockBotRegistry(ctrl)
msgClient := mock_clients.NewMockMessageClient(ctrl)

botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{*testBotCfg}, nil).AnyTimes()
msgClient.EXPECT().PublishProto(gomock.Any(), gomock.Any()).AnyTimes()

count := 0
appended := make(chan struct{})
Expand All @@ -50,6 +52,7 @@ func TestJsonRpcCache(t *testing.T) {
},
cache: NewCache(300 * time.Second),
botRegistry: botRegistry,
msgClient: msgClient,
}

go jrpCache.pollBlocksData()
Expand Down
53 changes: 53 additions & 0 deletions services/json-rpc/cache/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"
)

type jsonRpcReq struct {
Expand Down Expand Up @@ -37,3 +39,54 @@ func decodeBody(req *http.Request) (*jsonRpcReq, error) {
}
return &decodedBody, nil
}

func writeBadRequest(w http.ResponseWriter, req *jsonRpcReq, err error) {
if req == nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusBadRequest)

if err := json.NewEncoder(w).Encode(&errorResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: jsonRpcError{
Code: -32600,
Message: err.Error(),
},
}); err != nil {
log.WithError(err).Error("failed to write jsonrpc error response body")
}
}

func writeUnauthorized(w http.ResponseWriter, req *jsonRpcReq) {
w.WriteHeader(http.StatusUnauthorized)

if err := json.NewEncoder(w).Encode(&errorResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: jsonRpcError{
Code: -32000,
Message: "unauthorized",
},
}); err != nil {
log.WithError(err).Error("failed to write jsonrpc error response body")
}
}

func writeNotFound(w http.ResponseWriter, req *jsonRpcReq) {
w.WriteHeader(http.StatusOK)

if err := json.NewEncoder(w).Encode(&jsonRpcResp{
ID: req.ID,
JsonRPC: "2.0",
Result: nil,
Error: &jsonRpcError{
Code: -32603,
Message: "result not found in cache",
},
}); err != nil {
log.WithError(err).Error("failed to write jsonrpc error response body")
}
}

0 comments on commit ed41b8c

Please sign in to comment.