From 2c04acfe8ab966eb39f22a7f4b6e3e5458cf0aca Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Wed, 20 Nov 2024 17:30:10 +0700 Subject: [PATCH] Improve cache observability --- app/proxy/proxy.go | 2 +- app/query/cache.go | 32 ++++++++++++---------- app/query/metrics.go | 50 ++++++++++++++++++++++++++++++++++ internal/metrics/metrics.go | 35 ++++++------------------ internal/status/status.go | 6 ++-- pkg/sturdycache/sturdycache.go | 4 +-- 6 files changed, 82 insertions(+), 47 deletions(-) create mode 100644 app/query/metrics.go diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index b583513f..83d40d1f 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -149,7 +149,7 @@ func Handle(w http.ResponseWriter, r *http.Request) { } monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)}) observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet) - logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq) + logger.Log().Errorf("error calling sdk method %s: %s", rpcReq.Method, err) return } diff --git a/app/query/cache.go b/app/query/cache.go index 099d6932..ff569595 100644 --- a/app/query/cache.go +++ b/app/query/cache.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - "github.com/OdyseeTeam/odysee-api/internal/metrics" "github.com/OdyseeTeam/odysee-api/internal/monitor" "github.com/OdyseeTeam/odysee-api/pkg/rpcerrors" @@ -35,9 +34,9 @@ type QueryCache struct { } func NewQueryCache(store cache.CacheInterface[any]) *QueryCache { - m := marshaler.New(store) + marshal := marshaler.New(store) return &QueryCache{ - cache: m, + cache: marshal, singleflight: &singleflight.Group{}, } } @@ -49,31 +48,34 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached Params: query.Params(), } - ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) defer cancel() + start := time.Now() + hit, err := c.cache.Get(ctx, cacheReq, &CachedResponse{}) if err != nil { if !errors.Is(err, &store.NotFound{}) { - metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc() + ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start) return nil, fmt.Errorf("error during cache.get: %w", err) } - metrics.SturdyQueryCacheMissCount.WithLabelValues(cacheReq.Method).Inc() + ObserveQueryCacheOperation(CacheOperationGet, CacheResultMiss, cacheReq.Method, start) if getter == nil { log.Warnf("nil getter provided for %s", cacheReq.Method) - metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc() return nil, nil } + log.Infof("cache miss for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) // Cold object retrieval after cache miss - log.Infof("cold object retrieval for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey()) + start := time.Now() obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter) if err != nil { - metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc() + ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start) return nil, fmt.Errorf("error calling getter: %w", err) } + ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start) res, ok := obj.(*jsonrpc.RPCResponse) if !ok { @@ -84,27 +86,29 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached if res.Error != nil { log.Debugf("rpc error received (%s), not caching", cacheReq.Method) } else { + start := time.Now() err = c.cache.Set( ctx, cacheReq, cacheResp, store.WithExpiration(cacheReq.Expiration()), store.WithTags(cacheReq.Tags()), ) if err != nil { - metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc() + ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start) monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method}) - return cacheResp, fmt.Errorf("error during cache.set: %w", err) + log.Warnf("error during cache.set: %s", err) + return cacheResp, nil } + ObserveQueryCacheOperation(CacheOperationSet, CacheResultSuccess, cacheReq.Method, start) } return cacheResp, nil } - log.Debugf("cache hit for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey()) + log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds()) + ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start) cacheResp, ok := hit.(*CachedResponse) if !ok { - metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc() return nil, errors.New("unknown cache object retrieved") } - metrics.SturdyQueryCacheHitCount.WithLabelValues(cacheReq.Method).Inc() return cacheResp, nil } diff --git a/app/query/metrics.go b/app/query/metrics.go new file mode 100644 index 00000000..659c8292 --- /dev/null +++ b/app/query/metrics.go @@ -0,0 +1,50 @@ +package query + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + CacheOperationGet = "get" + CacheOperationSet = "set" + + CacheResultHit = "hit" + CacheResultMiss = "miss" + CacheResultSuccess = "success" + CacheResultError = "error" +) + +var ( + queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300} + cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0} + + QueryCacheOperationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "query_cache", + Name: "operation_duration_seconds", + Help: "Cache operation latency", + Buckets: cacheDurationBuckets, + }, + []string{"operation", "result", "method"}, + ) + QueryRetrievalDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "query_cache", + Name: "retrieval_duration_seconds", + Help: "Latency for cold cache retrieval", + Buckets: queryRetrievalDurationBuckets, + }, + []string{"result", "method"}, + ) +) + +func ObserveQueryCacheOperation(operation, result, method string, start time.Time) { + QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds())) +} + +func ObserveQueryRetrievalDuration(result, method string, start time.Time) { + QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds())) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index bd49e7e3..6c7596ae 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -47,7 +47,7 @@ const ( ) var ( - callsSecondsBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300} + callDurationBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300} IAPIAuthSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: nsIAPI, @@ -85,7 +85,7 @@ var ( Subsystem: "e2e_calls", Name: "total_seconds", Help: "End-to-end method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method"}, ) @@ -95,7 +95,7 @@ var ( Subsystem: "e2e_calls", Name: "failed_seconds", Help: "Failed end-to-end method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "kind"}, ) @@ -124,7 +124,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "Method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "origin"}, ) @@ -134,7 +134,7 @@ var ( Subsystem: "calls", Name: "failed_seconds", Help: "Failed method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "origin", "kind"}, ) @@ -176,25 +176,6 @@ var ( Help: "Total number of errors retrieving queries from the local cache", }, []string{"method"}) - SturdyQueryCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: nsSturdyCache, - Subsystem: "query", - Name: "hit_count", - Help: "Total number of queries found in sturdycache", - }, []string{"method"}) - SturdyQueryCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: nsSturdyCache, - Subsystem: "query", - Name: "miss_count", - Help: "Total number of queries that were not in sturdycache", - }, []string{"method"}) - SturdyQueryCacheErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: nsSturdyCache, - Subsystem: "query", - Name: "error_count", - Help: "Total number of errors retrieving queries from sturdycache", - }, []string{"method"}) - LbrynetWalletsLoaded = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: nsLbrynet, Subsystem: "wallets", @@ -222,7 +203,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "How long do calls to lbrytv take (end-to-end)", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"path"}, ) @@ -278,7 +259,7 @@ var ( Subsystem: "calls", Name: "total_seconds", Help: "Method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "group"}, ) @@ -288,7 +269,7 @@ var ( Subsystem: "calls", Name: "failed_seconds", Help: "Failed method call latency distributions", - Buckets: callsSecondsBuckets, + Buckets: callDurationBuckets, }, []string{"method", "endpoint", "group", "kind"}, ) diff --git a/internal/status/status.go b/internal/status/status.go index 950bdb78..514ef254 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -110,18 +110,18 @@ func StatusV2(w http.ResponseWriter, r *http.Request) { c.Cache = query.CacheFromRequest(r) } - rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest("resolve", map[string]interface{}{"urls": resolveURL})) + rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest(query.MethodResolve, map[string]interface{}{"urls": resolveURL})) if err != nil { srv.Error = err.Error() srv.Status = statusOffline failureDetected = true - logger.Log().Error("we're failing: ", err) + logger.Log().Errorf("status call resolve is failing: %s", err) } else if rpcRes.Error != nil { srv.Error = rpcRes.Error.Message srv.Status = statusNotReady failureDetected = true - logger.Log().Error("we're failing: ", err) + logger.Log().Errorf("status call resolve is failing: %s", err) } else { if user != nil { response.User = &userData{ diff --git a/pkg/sturdycache/sturdycache.go b/pkg/sturdycache/sturdycache.go index 0f73e104..8c755ecb 100644 --- a/pkg/sturdycache/sturdycache.go +++ b/pkg/sturdycache/sturdycache.go @@ -25,8 +25,8 @@ func NewReplicatedCache( Addr: masterAddr, Password: password, DB: 0, - PoolSize: 10, - MinIdleConns: 5, + PoolSize: 200, + MinIdleConns: 10, }) masterStore := redis_store.NewRedis(masterClient)