From ed41b8c5a36083677645d09c3e0eaf4954bf44d9 Mon Sep 17 00:00:00 2001 From: dkeysil Date: Fri, 15 Mar 2024 16:50:46 +0100 Subject: [PATCH] Metrics draft --- go.mod | 2 +- go.sum | 4 +- services/json-rpc/cache/errors.go | 43 ------- services/json-rpc/cache/json_rpc_cache.go | 119 +++++++++++++++--- .../json-rpc/cache/json_rpc_cache_test.go | 3 + services/json-rpc/cache/jsonrpc.go | 53 ++++++++ 6 files changed, 162 insertions(+), 62 deletions(-) delete mode 100644 services/json-rpc/cache/errors.go diff --git a/go.mod b/go.mod index a596cc7c..fd31eba8 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/docker/docker v1.6.2 github.com/docker/go-connections v0.4.0 - github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3 + github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.39.0 diff --git a/go.sum b/go.sum index e9ae9d76..02c846d2 100644 --- a/go.sum +++ b/go.sum @@ -331,8 +331,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3 h1:XTYIVCvUamlyjdPlYkegbT+Fdvr0wmgt3g9Llu7BrKw= -github.com/forta-network/forta-core-go v0.0.0-20240315095056-123146d68ce3/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2 h1:v+snSZVsMUPtPX6pI5oxULTWiAfJ1igeE0Iqilma7/Y= +github.com/forta-network/forta-core-go v0.0.0-20240315154515-c71e1f3bd5e2/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0= github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/services/json-rpc/cache/errors.go b/services/json-rpc/cache/errors.go deleted file mode 100644 index 558e4612..00000000 --- a/services/json-rpc/cache/errors.go +++ /dev/null @@ -1,43 +0,0 @@ -package json_rpc_cache - -import ( - "encoding/json" - "net/http" - - log "github.com/sirupsen/logrus" -) - -func writeBadRequest(w http.ResponseWriter, req *jsonRpcReq, err error) { - if req == nil { - http.Error(w, "bad request", http.StatusBadRequest) - return - } - - w.WriteHeader(http.StatusBadRequest) - - if err := json.NewEncoder(w).Encode(&errorResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: jsonRpcError{ - Code: -32600, - Message: err.Error(), - }, - }); err != nil { - log.WithError(err).Error("failed to write jsonrpc error response body") - } -} - -func writeUnauthorized(w http.ResponseWriter, req *jsonRpcReq) { - w.WriteHeader(http.StatusUnauthorized) - - if err := json.NewEncoder(w).Encode(&errorResponse{ - JSONRPC: "2.0", - ID: req.ID, - Error: jsonRpcError{ - Code: -32000, - Message: "unauthorized", - }, - }); err != nil { - log.WithError(err).Error("failed to write jsonrpc error response body") - } -} diff --git a/services/json-rpc/cache/json_rpc_cache.go b/services/json-rpc/cache/json_rpc_cache.go index fad53fdc..d1a7b9f1 100644 --- a/services/json-rpc/cache/json_rpc_cache.go +++ b/services/json-rpc/cache/json_rpc_cache.go @@ -8,9 +8,12 @@ import ( "strconv" "time" + "github.com/forta-network/forta-core-go/domain" + "github.com/forta-network/forta-core-go/protocol" "github.com/forta-network/forta-core-go/utils" "github.com/forta-network/forta-node/clients" "github.com/forta-network/forta-node/clients/blocksdata" + "github.com/forta-network/forta-node/clients/messaging" "github.com/forta-network/forta-node/config" "github.com/forta-network/forta-node/services/components/registry" log "github.com/sirupsen/logrus" @@ -36,6 +39,7 @@ type JsonRpcCache struct { cfg config.JsonRpcCacheConfig botAuthenticator clients.IPAuthenticator botRegistry registry.BotRegistry + msgClient clients.MessageClient server *http.Server @@ -55,6 +59,7 @@ func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig, botRegi cfg: cfg, botAuthenticator: botAuthenticator, botRegistry: botRegistry, + msgClient: messaging.NewClient("json-rpc-cache", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort)), }, nil } @@ -88,6 +93,8 @@ func (p *JsonRpcCache) Name() string { func (c *JsonRpcCache) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + t := time.Now() req, err := decodeBody(r) if err != nil { writeBadRequest(w, req, err) @@ -100,6 +107,26 @@ func (c *JsonRpcCache) Handler() http.Handler { return } + defer func() { + if err != nil { + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + { + AgentId: agentConfig.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: domain.MetricJSONRPCCachePollError, + Value: 1, + Details: err.Error(), + ShardId: agentConfig.ShardID(), + ChainId: int64(agentConfig.ChainID), + }, + }, + }, + ) + } + }() + chainID, err := strconv.ParseInt(r.Header.Get(ChainIDHeader), 10, 64) if err != nil { writeBadRequest(w, req, fmt.Errorf("missing or invalid chain id header")) @@ -110,19 +137,22 @@ func (c *JsonRpcCache) Handler() http.Handler { result, ok := c.cache.Get(uint64(chainID), req.Method, string(req.Params)) if !ok { - resp := &jsonRpcResp{ - ID: req.ID, - JsonRPC: "2.0", - Result: nil, - Error: &jsonRpcError{ - Code: -32603, - Message: "result not found in cache", + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + { + AgentId: agentConfig.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheMiss, req.Method), + Value: 1, + Details: cacheKey(uint64(chainID), req.Method, string(req.Params)), + ShardId: agentConfig.ShardID(), + ChainId: int64(agentConfig.ChainID), + }, + }, }, - } - - if err := json.NewEncoder(w).Encode(resp); err != nil { - log.WithError(err).Error("failed to write jsonrpc response body") - } + ) + writeNotFound(w, req) return } @@ -138,8 +168,32 @@ func (c *JsonRpcCache) Handler() http.Handler { Result: json.RawMessage(b), } - if err := json.NewEncoder(w).Encode(resp); err != nil { + if err = json.NewEncoder(w).Encode(resp); err != nil { log.WithError(err).Error("failed to write jsonrpc response body") + } else { + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + { + AgentId: agentConfig.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheHit, req.Method), + Value: 1, + ShardId: agentConfig.ShardID(), + ChainId: chainID, + }, + { + AgentId: agentConfig.ID, + Timestamp: time.Now().Format(time.RFC3339), + Name: fmt.Sprintf("%s.%s", domain.MetricJSONRPCCacheLatency, req.Method), + Value: float64(time.Since(t).Milliseconds()), + Details: cacheKey(uint64(chainID), req.Method, string(req.Params)), + ShardId: agentConfig.ShardID(), + ChainId: chainID, + }, + }, + }, + ) } }) } @@ -162,14 +216,47 @@ func (c *JsonRpcCache) pollBlocksData() { // blocksDataClient internally retries on failure and to not block on the retry, we run it in a goroutine go func() { - events, err := c.blocksDataClient.GetBlocksData(bucket) + blocksData, err := c.blocksDataClient.GetBlocksData(bucket) if err != nil { + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + { + AgentId: "system", + Timestamp: time.Now().Format(time.RFC3339), + Name: domain.MetricJSONRPCCachePollError, + Value: 1, + Details: err.Error(), + }, + }, + }, + ) log.WithError(err).Errorf("Failed to get BlocksData from dispatcher. bucket: %d", bucket) return } - log.Infof("Added BlocksData to local cache. bucket: %d events: %d", bucket, len(events.Blocks)) - c.cache.Append(events) + c.msgClient.PublishProto( + messaging.SubjectMetricAgent, &protocol.AgentMetricList{ + Metrics: []*protocol.AgentMetric{ + { + AgentId: "system", + Timestamp: time.Now().Format(time.RFC3339), + Name: domain.MetricJSONRPCCachePollSuccess, + Value: float64(len(blocksData.Blocks)), + Details: fmt.Sprintf("%d", bucket), + }, + { + AgentId: "system", + Timestamp: time.Now().Format(time.RFC3339), + Name: domain.MetricJSONRPCCacheSize, + Value: float64(c.cache.cache.ItemCount()), + }, + }, + }, + ) + + log.Infof("Added BlocksData to local cache. bucket: %d blocksData: %d", bucket, len(blocksData.Blocks)) + c.cache.Append(blocksData) }() } diff --git a/services/json-rpc/cache/json_rpc_cache_test.go b/services/json-rpc/cache/json_rpc_cache_test.go index ca4063c0..ef162cc9 100644 --- a/services/json-rpc/cache/json_rpc_cache_test.go +++ b/services/json-rpc/cache/json_rpc_cache_test.go @@ -25,8 +25,10 @@ func TestJsonRpcCache(t *testing.T) { blocksDataClient := mock_clients.NewMockBlocksDataClient(ctrl) authenticator := mock_clients.NewMockIPAuthenticator(ctrl) botRegistry := mock_registry.NewMockBotRegistry(ctrl) + msgClient := mock_clients.NewMockMessageClient(ctrl) botRegistry.EXPECT().LoadAssignedBots().Return([]config.AgentConfig{*testBotCfg}, nil).AnyTimes() + msgClient.EXPECT().PublishProto(gomock.Any(), gomock.Any()).AnyTimes() count := 0 appended := make(chan struct{}) @@ -50,6 +52,7 @@ func TestJsonRpcCache(t *testing.T) { }, cache: NewCache(300 * time.Second), botRegistry: botRegistry, + msgClient: msgClient, } go jrpCache.pollBlocksData() diff --git a/services/json-rpc/cache/jsonrpc.go b/services/json-rpc/cache/jsonrpc.go index bf03485d..c9d2aed1 100644 --- a/services/json-rpc/cache/jsonrpc.go +++ b/services/json-rpc/cache/jsonrpc.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "net/http" + + log "github.com/sirupsen/logrus" ) type jsonRpcReq struct { @@ -37,3 +39,54 @@ func decodeBody(req *http.Request) (*jsonRpcReq, error) { } return &decodedBody, nil } + +func writeBadRequest(w http.ResponseWriter, req *jsonRpcReq, err error) { + if req == nil { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusBadRequest) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32600, + Message: err.Error(), + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeUnauthorized(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusUnauthorized) + + if err := json.NewEncoder(w).Encode(&errorResponse{ + JSONRPC: "2.0", + ID: req.ID, + Error: jsonRpcError{ + Code: -32000, + Message: "unauthorized", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +} + +func writeNotFound(w http.ResponseWriter, req *jsonRpcReq) { + w.WriteHeader(http.StatusOK) + + if err := json.NewEncoder(w).Encode(&jsonRpcResp{ + ID: req.ID, + JsonRPC: "2.0", + Result: nil, + Error: &jsonRpcError{ + Code: -32603, + Message: "result not found in cache", + }, + }); err != nil { + log.WithError(err).Error("failed to write jsonrpc error response body") + } +}