Skip to content

Commit

Permalink
Improve cache observability
Browse files Browse the repository at this point in the history
  • Loading branch information
anbsky committed Nov 20, 2024
1 parent 83d72ef commit 2c04acf
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 47 deletions.
2 changes: 1 addition & 1 deletion app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func Handle(w http.ResponseWriter, r *http.Request) {
}
monitor.ErrorToSentry(err, map[string]string{"request": fmt.Sprintf("%+v", rpcReq), "response": fmt.Sprintf("%+v", rpcRes)})
observeFailure(metrics.GetDuration(r), rpcReq.Method, metrics.FailureKindNet)
logger.Log().Errorf("error calling lbrynet: %v, request: %+v", err, rpcReq)
logger.Log().Errorf("error calling sdk method %s: %s", rpcReq.Method, err)
return
}

Expand Down
32 changes: 18 additions & 14 deletions app/query/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"time"

"github.com/OdyseeTeam/odysee-api/internal/metrics"
"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/pkg/rpcerrors"

Expand All @@ -35,9 +34,9 @@ type QueryCache struct {
}

func NewQueryCache(store cache.CacheInterface[any]) *QueryCache {
m := marshaler.New(store)
marshal := marshaler.New(store)
return &QueryCache{
cache: m,
cache: marshal,
singleflight: &singleflight.Group{},
}
}
Expand All @@ -49,31 +48,34 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
Params: query.Params(),
}

ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()

start := time.Now()

hit, err := c.cache.Get(ctx, cacheReq, &CachedResponse{})
if err != nil {
if !errors.Is(err, &store.NotFound{}) {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
ObserveQueryCacheOperation(CacheOperationGet, CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error during cache.get: %w", err)
}

metrics.SturdyQueryCacheMissCount.WithLabelValues(cacheReq.Method).Inc()
ObserveQueryCacheOperation(CacheOperationGet, CacheResultMiss, cacheReq.Method, start)

if getter == nil {
log.Warnf("nil getter provided for %s", cacheReq.Method)
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, nil
}

log.Infof("cache miss for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
// Cold object retrieval after cache miss
log.Infof("cold object retrieval for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey())
start := time.Now()
obj, err, _ := c.singleflight.Do(cacheReq.GetCacheKey(), getter)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
ObserveQueryRetrievalDuration(CacheResultError, cacheReq.Method, start)
return nil, fmt.Errorf("error calling getter: %w", err)
}
ObserveQueryRetrievalDuration(CacheResultSuccess, cacheReq.Method, start)

res, ok := obj.(*jsonrpc.RPCResponse)
if !ok {
Expand All @@ -84,27 +86,29 @@ func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*Cached
if res.Error != nil {
log.Debugf("rpc error received (%s), not caching", cacheReq.Method)
} else {
start := time.Now()
err = c.cache.Set(
ctx, cacheReq, cacheResp,
store.WithExpiration(cacheReq.Expiration()),
store.WithTags(cacheReq.Tags()),
)
if err != nil {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
ObserveQueryCacheOperation(CacheOperationSet, CacheResultError, cacheReq.Method, start)
monitor.ErrorToSentry(fmt.Errorf("error during cache.set: %w", err), map[string]string{"method": cacheReq.Method})
return cacheResp, fmt.Errorf("error during cache.set: %w", err)
log.Warnf("error during cache.set: %s", err)
return cacheResp, nil
}
ObserveQueryCacheOperation(CacheOperationSet, CacheResultSuccess, cacheReq.Method, start)
}

return cacheResp, nil
}
log.Debugf("cache hit for %s [%s]", cacheReq.Method, cacheReq.GetCacheKey())
log.Infof("cache hit for %s, key=%s, duration=%.2fs", cacheReq.Method, cacheReq.GetCacheKey(), time.Since(start).Seconds())
ObserveQueryCacheOperation(CacheOperationGet, CacheResultHit, cacheReq.Method, start)
cacheResp, ok := hit.(*CachedResponse)
if !ok {
metrics.SturdyQueryCacheErrorCount.WithLabelValues(cacheReq.Method).Inc()
return nil, errors.New("unknown cache object retrieved")
}
metrics.SturdyQueryCacheHitCount.WithLabelValues(cacheReq.Method).Inc()
return cacheResp, nil
}

Expand Down
50 changes: 50 additions & 0 deletions app/query/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package query

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
CacheOperationGet = "get"
CacheOperationSet = "set"

CacheResultHit = "hit"
CacheResultMiss = "miss"
CacheResultSuccess = "success"
CacheResultError = "error"
)

var (
queryRetrievalDurationBuckets = []float64{0.025, 0.05, 0.1, 0.25, 0.4, 1, 2.5, 5, 10, 25, 50, 100, 300}
cacheDurationBuckets = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0}

QueryCacheOperationDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "query_cache",
Name: "operation_duration_seconds",
Help: "Cache operation latency",
Buckets: cacheDurationBuckets,
},
[]string{"operation", "result", "method"},
)
QueryRetrievalDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "query_cache",
Name: "retrieval_duration_seconds",
Help: "Latency for cold cache retrieval",
Buckets: queryRetrievalDurationBuckets,
},
[]string{"result", "method"},
)
)

func ObserveQueryCacheOperation(operation, result, method string, start time.Time) {
QueryCacheOperationDuration.WithLabelValues(operation, result, method).Observe(float64(time.Since(start).Seconds()))
}

func ObserveQueryRetrievalDuration(result, method string, start time.Time) {
QueryRetrievalDuration.WithLabelValues(result, method).Observe(float64(time.Since(start).Seconds()))
}
35 changes: 8 additions & 27 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

var (
callsSecondsBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300}
callDurationBuckets = []float64{0.005, 0.025, 0.05, 0.1, 0.25, 0.4, 1, 2, 5, 10, 20, 60, 120, 300}

IAPIAuthSuccessDurations = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: nsIAPI,
Expand Down Expand Up @@ -85,7 +85,7 @@ var (
Subsystem: "e2e_calls",
Name: "total_seconds",
Help: "End-to-end method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method"},
)
Expand All @@ -95,7 +95,7 @@ var (
Subsystem: "e2e_calls",
Name: "failed_seconds",
Help: "Failed end-to-end method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method", "kind"},
)
Expand Down Expand Up @@ -124,7 +124,7 @@ var (
Subsystem: "calls",
Name: "total_seconds",
Help: "Method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method", "endpoint", "origin"},
)
Expand All @@ -134,7 +134,7 @@ var (
Subsystem: "calls",
Name: "failed_seconds",
Help: "Failed method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method", "endpoint", "origin", "kind"},
)
Expand Down Expand Up @@ -176,25 +176,6 @@ var (
Help: "Total number of errors retrieving queries from the local cache",
}, []string{"method"})

SturdyQueryCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: nsSturdyCache,
Subsystem: "query",
Name: "hit_count",
Help: "Total number of queries found in sturdycache",
}, []string{"method"})
SturdyQueryCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: nsSturdyCache,
Subsystem: "query",
Name: "miss_count",
Help: "Total number of queries that were not in sturdycache",
}, []string{"method"})
SturdyQueryCacheErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: nsSturdyCache,
Subsystem: "query",
Name: "error_count",
Help: "Total number of errors retrieving queries from sturdycache",
}, []string{"method"})

LbrynetWalletsLoaded = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: nsLbrynet,
Subsystem: "wallets",
Expand Down Expand Up @@ -222,7 +203,7 @@ var (
Subsystem: "calls",
Name: "total_seconds",
Help: "How long do calls to lbrytv take (end-to-end)",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"path"},
)
Expand Down Expand Up @@ -278,7 +259,7 @@ var (
Subsystem: "calls",
Name: "total_seconds",
Help: "Method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method", "endpoint", "group"},
)
Expand All @@ -288,7 +269,7 @@ var (
Subsystem: "calls",
Name: "failed_seconds",
Help: "Failed method call latency distributions",
Buckets: callsSecondsBuckets,
Buckets: callDurationBuckets,
},
[]string{"method", "endpoint", "group", "kind"},
)
Expand Down
6 changes: 3 additions & 3 deletions internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,18 @@ func StatusV2(w http.ResponseWriter, r *http.Request) {
c.Cache = query.CacheFromRequest(r)
}

rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest("resolve", map[string]interface{}{"urls": resolveURL}))
rpcRes, err := c.Call(r.Context(), jsonrpc.NewRequest(query.MethodResolve, map[string]interface{}{"urls": resolveURL}))

if err != nil {
srv.Error = err.Error()
srv.Status = statusOffline
failureDetected = true
logger.Log().Error("we're failing: ", err)
logger.Log().Errorf("status call resolve is failing: %s", err)
} else if rpcRes.Error != nil {
srv.Error = rpcRes.Error.Message
srv.Status = statusNotReady
failureDetected = true
logger.Log().Error("we're failing: ", err)
logger.Log().Errorf("status call resolve is failing: %s", err)
} else {
if user != nil {
response.User = &userData{
Expand Down
4 changes: 2 additions & 2 deletions pkg/sturdycache/sturdycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func NewReplicatedCache(
Addr: masterAddr,
Password: password,
DB: 0,
PoolSize: 10,
MinIdleConns: 5,
PoolSize: 200,
MinIdleConns: 10,
})

masterStore := redis_store.NewRedis(masterClient)
Expand Down

0 comments on commit 2c04acf

Please sign in to comment.