From 88772ea5bed517dbe2ac9998f4e67a92c0e8b0d1 Mon Sep 17 00:00:00 2001 From: Aviral Takkar <39969667+avtakkar@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:34:04 -0700 Subject: [PATCH] refactor: move metrics and distribution to pkgs (#40) * refactor: move metrics to pkg * refactor: move distribution to pkgs --- build/ci/scripts/kind.sh | 2 +- cmd/proxy/main.go | 7 ++++ internal/context/context.go | 2 +- internal/files/cache/cache.go | 5 +-- internal/{ => files}/cache/syncmap.go | 17 +++++---- internal/{ => files}/cache/syncmap_test.go | 6 +-- internal/files/store/file_test.go | 15 ++------ internal/files/store/main_test.go | 6 +++ internal/files/store/store.go | 37 ++++++++++--------- internal/files/store/store_test.go | 9 ++--- internal/handlers/files/handler.go | 9 +++-- internal/handlers/files/handler_test.go | 20 +++++----- internal/handlers/root_test.go | 18 +++++---- internal/handlers/v2/handler.go | 16 ++++---- internal/handlers/v2/handler_test.go | 11 ++++-- internal/oci/registry.go | 2 +- internal/oci/registry_test.go | 2 +- internal/remote/reader.go | 15 +++++--- internal/remote/reader_test.go | 17 +++++---- {internal => pkg}/metrics/interface.go | 23 ++++++++++-- {internal => pkg}/metrics/prometheus.go | 23 +++++++----- {internal => pkg}/metrics/prometheus_test.go | 8 ++-- {internal => pkg}/oci/distribution/v2.go | 0 {internal => pkg}/oci/distribution/v2_test.go | 0 {internal => tests/random}/math/math.go | 0 {internal => tests/random}/math/math_test.go | 0 {internal => tests/random}/math/reverse.go | 0 .../random}/math/reverse_test.go | 0 tests/random/random.go | 2 +- 29 files changed, 160 insertions(+), 112 deletions(-) rename internal/{ => files}/cache/syncmap.go (71%) rename internal/{ => files}/cache/syncmap_test.go (96%) rename {internal => pkg}/metrics/interface.go (56%) rename {internal => pkg}/metrics/prometheus.go (80%) rename {internal => pkg}/metrics/prometheus_test.go (96%) rename {internal => pkg}/oci/distribution/v2.go (100%) rename {internal => pkg}/oci/distribution/v2_test.go (100%) rename {internal => tests/random}/math/math.go (100%) rename {internal => tests/random}/math/math_test.go (100%) rename {internal => tests/random}/math/reverse.go (100%) rename {internal => tests/random}/math/reverse_test.go (100%) diff --git a/build/ci/scripts/kind.sh b/build/ci/scripts/kind.sh index b0b2a64..b6b7313 100755 --- a/build/ci/scripts/kind.sh +++ b/build/ci/scripts/kind.sh @@ -124,7 +124,7 @@ print_p2p_metrics() { for pod in $( echo "$p" | tr -s " " "\012" ); do echo "checking pod '$pod' for metrics" kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "cat /var/log/peerdmetrics" - kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "curl http://localhost:5004/metrics/prometheus" + kubectl --context=$KIND_CLUSTER_CONTEXT -n peerd-ns exec -i $pod -- bash -c "curl http://localhost:5004/metrics/prometheus" | head -n 10 done } diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 9be88d4..b0eb90c 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -23,6 +23,7 @@ import ( "github.com/azure/peerd/pkg/discovery/routing" "github.com/azure/peerd/pkg/k8s" "github.com/azure/peerd/pkg/k8s/events" + "github.com/azure/peerd/pkg/metrics" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "github.com/spf13/afero" @@ -45,6 +46,12 @@ func main() { l := zerolog.New(os.Stdout).With().Timestamp().Str("self", p2pcontext.NodeName).Str("version", version).Logger() ctx := l.WithContext(context.Background()) + ctx, err = metrics.WithContext(ctx, p2pcontext.NodeName, "peerd") + if err != nil { + l.Error().Err(err).Msg("failed to initialize metrics") + os.Exit(1) + } + err = run(ctx, args) if err != nil { l.Error().Err(err).Msg("server error") diff --git a/internal/context/context.go b/internal/context/context.go index 16232c1..2dc8014 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -77,7 +77,7 @@ func Logger(c *gin.Context) zerolog.Logger { l = *ctxLog } - return l.With().Str("correlationid", c.GetString(CorrelationIdCtxKey)).Str("url", c.Request.URL.String()).Str("range", c.Request.Header.Get("Range")).Bool("p2p", IsRequestFromAPeer(c)).Str("ip", c.ClientIP()).Str("peer", c.Request.Header.Get(NodeHeaderKey)).Logger() + return l.With().Str("correlationid", c.GetString(CorrelationIdCtxKey)).Str("url", c.Request.URL.String()).Str("range", c.Request.Header.Get("Range")).Bool("requestfrompeer", IsRequestFromAPeer(c)).Str("clientip", c.ClientIP()).Str("clientname", c.Request.Header.Get(NodeHeaderKey)).Logger() } // BlobUrl extracts the blob URL from the incoming request URL. diff --git a/internal/files/cache/cache.go b/internal/files/cache/cache.go index c391671..fe4db73 100644 --- a/internal/files/cache/cache.go +++ b/internal/files/cache/cache.go @@ -13,7 +13,6 @@ import ( "sync/atomic" "time" - syncmap "github.com/azure/peerd/internal/cache" "github.com/azure/peerd/internal/files" "github.com/dgraph-io/ristretto" "github.com/rs/zerolog" @@ -22,7 +21,7 @@ import ( // fileCache implements FileCache. type fileCache struct { fileCache *ristretto.Cache - metadataCache *syncmap.SyncMap + metadataCache *SyncMap path string lock sync.RWMutex log zerolog.Logger @@ -162,7 +161,7 @@ func New(ctx context.Context) Cache { cache := &fileCache{ log: log, path: Path, - metadataCache: syncmap.MakeSyncMap(1e7), + metadataCache: NewSyncMap(1e7), } var err error diff --git a/internal/cache/syncmap.go b/internal/files/cache/syncmap.go similarity index 71% rename from internal/cache/syncmap.go rename to internal/files/cache/syncmap.go index 2b25b74..79094d4 100644 --- a/internal/cache/syncmap.go +++ b/internal/files/cache/syncmap.go @@ -6,9 +6,9 @@ import ( "sync" ) -const defaultEvictionPercentage int = 5 //The default eviction percentage used when map reaches its capacity at insertion +const defaultEvictionPercentage int = 5 // The default eviction percentage. Used when the map reaches its capacity at insertion. -// SyncMap is a map with synchronized access support +// SyncMap is a map that can be safely accessed concurrently. type SyncMap struct { mapObj *map[string]interface{} lock *sync.RWMutex @@ -32,14 +32,15 @@ func (sm *SyncMap) Get(key string) (entry interface{}, ok bool) { func (sm *SyncMap) Set(key string, entry interface{}) { sm.lock.Lock() defer sm.lock.Unlock() - if _, ok := (*sm.mapObj)[key]; !ok { //We will need to add an entry - if numEntries := len(*sm.mapObj); numEntries >= sm.capacity { //exceeding capacity, remove evictionPercentage of the entries + + if _, ok := (*sm.mapObj)[key]; !ok { + if numEntries := len(*sm.mapObj); numEntries >= sm.capacity { numToEvict := numEntries * sm.evictionPercentage / 100 - if numToEvict <= 1 { //We will evict one as the minimum + if numToEvict <= 1 { numToEvict = 1 } numEvicted := 0 - for k := range *sm.mapObj { // GO map iterator will randomize the order. We just delete the first in the iterator + for k := range *sm.mapObj { delete(*sm.mapObj, k) numEvicted++ if numEvicted >= numToEvict { @@ -60,9 +61,9 @@ func (sm *SyncMap) Delete(key string) { delete(*sm.mapObj, key) } -// MakeSyncMap creates a new SyncMap with the specified maximum number of entries. +// NewSyncMap creates a new SyncMap with the specified maximum number of entries. // If the maximum number of entries is less than or equal to 0, it will be set to 1. -func MakeSyncMap(maxEntries int) *SyncMap { +func NewSyncMap(maxEntries int) *SyncMap { if maxEntries <= 0 { maxEntries = 1 } diff --git a/internal/cache/syncmap_test.go b/internal/files/cache/syncmap_test.go similarity index 96% rename from internal/cache/syncmap_test.go rename to internal/files/cache/syncmap_test.go index e9051b3..4c3d6d3 100644 --- a/internal/cache/syncmap_test.go +++ b/internal/files/cache/syncmap_test.go @@ -9,7 +9,7 @@ import ( ) func TestSyncMapAddEvict(t *testing.T) { - sm := MakeSyncMap(100) + sm := NewSyncMap(100) sm.evictionPercentage = 10 var wg sync.WaitGroup addEntry := func(key string, value int) { @@ -34,7 +34,7 @@ func TestSyncMapAddEvict(t *testing.T) { } func TestSyncMapAddDelete(t *testing.T) { - sm := MakeSyncMap(10) + sm := NewSyncMap(10) var wg sync.WaitGroup addEntry := func(key string, value int) { @@ -67,7 +67,7 @@ func TestSyncMapAddDelete(t *testing.T) { } func TestSyncMapUpdate(t *testing.T) { - sm := MakeSyncMap(10) + sm := NewSyncMap(10) var wg sync.WaitGroup addEntry := func(key string, value int) { sm.Set(key, value) diff --git a/internal/files/store/file_test.go b/internal/files/store/file_test.go index 23808dd..a305f19 100644 --- a/internal/files/store/file_test.go +++ b/internal/files/store/file_test.go @@ -3,7 +3,6 @@ package store import ( - "context" "crypto/rand" "io" "os" @@ -17,12 +16,11 @@ import ( ) func TestReadAtWithChunkOffset(t *testing.T) { - ctx := context.Background() data := []byte("hello world") files.CacheBlockSize = 1 // 1 byte - s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -76,12 +74,11 @@ func TestReadAtWithChunkOffset(t *testing.T) { } func TestReadAt(t *testing.T) { - ctx := context.Background() data := []byte("hello world") files.CacheBlockSize = 1 // 1 byte - s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -129,11 +126,9 @@ func TestReadAt(t *testing.T) { } func TestSeek(t *testing.T) { - ctx := context.Background() - data := []byte("hello world") - s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -192,14 +187,12 @@ func TestSeek(t *testing.T) { } func TestFstat(t *testing.T) { - ctx := context.Background() - data, err := randomBytesN(100) if err != nil { t.Fatal(err) } - s, err := NewFilesStore(ctx, tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } diff --git a/internal/files/store/main_test.go b/internal/files/store/main_test.go index 263c3bc..21268e9 100644 --- a/internal/files/store/main_test.go +++ b/internal/files/store/main_test.go @@ -3,12 +3,18 @@ package store import ( + "context" "crypto/rand" "fmt" "os" "testing" "github.com/azure/peerd/internal/files/cache" + "github.com/azure/peerd/pkg/metrics" +) + +var ( + ctxWithMetrics, _ = metrics.WithContext(context.Background(), "test", "peerd") ) func TestMain(m *testing.M) { diff --git a/internal/files/store/store.go b/internal/files/store/store.go index 42d5957..7150669 100644 --- a/internal/files/store/store.go +++ b/internal/files/store/store.go @@ -14,6 +14,7 @@ import ( "github.com/azure/peerd/internal/files/cache" "github.com/azure/peerd/internal/remote" "github.com/azure/peerd/pkg/discovery/routing" + "github.com/azure/peerd/pkg/metrics" "github.com/azure/peerd/pkg/urlparser" "github.com/gin-gonic/gin" "github.com/opencontainers/go-digest" @@ -23,14 +24,15 @@ import ( // NewFilesStore creates a new store. func NewFilesStore(ctx context.Context, r routing.Router) (FilesStore, error) { fs := &store{ - cache: cache.New(ctx), - prefetchChan: make(chan prefetchableSegment, PrefetchWorkers), - prefetchable: PrefetchWorkers > 0, - router: r, - resolveRetries: ResolveRetries, - resolveTimeout: ResolveTimeout, - blobsChan: make(chan string, 1000), - parser: urlparser.New(), + metricsRecorder: metrics.FromContext(ctx), + cache: cache.New(ctx), + prefetchChan: make(chan prefetchableSegment, PrefetchWorkers), + prefetchable: PrefetchWorkers > 0, + router: r, + resolveRetries: ResolveRetries, + resolveTimeout: ResolveTimeout, + blobsChan: make(chan string, 1000), + parser: urlparser.New(), } go func() { @@ -61,14 +63,15 @@ type prefetchableSegment struct { // store describes a content store whose contents can come from disk or a remote source. type store struct { - cache cache.Cache - prefetchable bool - prefetchChan chan prefetchableSegment - router routing.Router - resolveRetries int - resolveTimeout time.Duration - blobsChan chan string - parser urlparser.Parser + metricsRecorder metrics.Metrics + cache cache.Cache + prefetchable bool + prefetchChan chan prefetchableSegment + router routing.Router + resolveRetries int + resolveTimeout time.Duration + blobsChan chan string + parser urlparser.Parser } var _ FilesStore = &store{} @@ -100,7 +103,7 @@ func (s *store) Open(c *gin.Context) (File, error) { store: s, cur: 0, size: 0, - reader: remote.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout), + reader: remote.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout, s.metricsRecorder), } if p2pcontext.IsRequestFromAPeer(c) { diff --git a/internal/files/store/store_test.go b/internal/files/store/store_test.go index b4dd469..8fc7694 100644 --- a/internal/files/store/store_test.go +++ b/internal/files/store/store_test.go @@ -3,7 +3,6 @@ package store import ( - "context" "fmt" "net/http" "net/http/httptest" @@ -44,7 +43,7 @@ func TestOpenP2p(t *testing.T) { ctx.Set(p2pcontext.FileChunkCtxKey, expK) PrefetchWorkers = 0 // turn off prefetching - s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -75,7 +74,7 @@ func TestOpenNonP2p(t *testing.T) { ctx.Set(p2pcontext.FileChunkCtxKey, expK) PrefetchWorkers = 0 // turn off prefetching - s, err := NewMockStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -106,7 +105,7 @@ func TestKey(t *testing.T) { {Key: "url", Value: hostAndPath}, } - s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } @@ -126,7 +125,7 @@ func TestKey(t *testing.T) { } func TestSubscribe(t *testing.T) { - s, err := NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } diff --git a/internal/handlers/files/handler.go b/internal/handlers/files/handler.go index f7b41f0..35e081d 100644 --- a/internal/handlers/files/handler.go +++ b/internal/handlers/files/handler.go @@ -10,13 +10,14 @@ import ( p2pcontext "github.com/azure/peerd/internal/context" "github.com/azure/peerd/internal/files/store" - "github.com/azure/peerd/internal/metrics" + "github.com/azure/peerd/pkg/metrics" "github.com/gin-gonic/gin" ) // FilesHandler describes a handler for files. type FilesHandler struct { - store store.FilesStore + store store.FilesStore + metricsRecorder metrics.Metrics } var _ gin.HandlerFunc = (&FilesHandler{}).Handle @@ -28,7 +29,7 @@ func (h *FilesHandler) Handle(c *gin.Context) { s := time.Now() defer func() { dur := time.Since(s) - metrics.Global.RecordRequest(c.Request.Method, "files", float64(dur.Milliseconds())) + h.metricsRecorder.RecordRequest(c.Request.Method, "files", float64(dur.Milliseconds())) log.Debug().Dur("duration", dur).Msg("files handler stop") }() @@ -80,5 +81,5 @@ func (h *FilesHandler) fill(c *gin.Context) error { // New creates a new files handler. func New(ctx context.Context, fs store.FilesStore) *FilesHandler { - return &FilesHandler{fs} + return &FilesHandler{fs, metrics.FromContext(ctx)} } diff --git a/internal/handlers/files/handler_test.go b/internal/handlers/files/handler_test.go index 7c0fdbb..c66eb1e 100644 --- a/internal/handlers/files/handler_test.go +++ b/internal/handlers/files/handler_test.go @@ -14,13 +14,15 @@ import ( "github.com/azure/peerd/internal/files" "github.com/azure/peerd/internal/files/store" "github.com/azure/peerd/pkg/discovery/routing/tests" + "github.com/azure/peerd/pkg/metrics" "github.com/gin-gonic/gin" ) var ( - hostAndPath = "https://avtakkartest.blob.core.windows.net/d18c7a64c5158179-ff8cb2f639ff44879c12c94361a746d0-782b855128//docker/registry/v2/blobs/sha256/d1/d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d/data" - query = "?se=2023-09-20T01%3A14%3A49Z&sig=m4Cr%2BYTZHZQlN5LznY7nrTQ4LCIx2OqnDDM3Dpedbhs%3D&sp=r&spr=https&sr=b&sv=2018-03-28®id=01031d61e1024861afee5d512651eb9f" - u = hostAndPath + query + hostAndPath = "https://avtakkartest.blob.core.windows.net/d18c7a64c5158179-ff8cb2f639ff44879c12c94361a746d0-782b855128//docker/registry/v2/blobs/sha256/d1/d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d/data" + query = "?se=2023-09-20T01%3A14%3A49Z&sig=m4Cr%2BYTZHZQlN5LznY7nrTQ4LCIx2OqnDDM3Dpedbhs%3D&sp=r&spr=https&sr=b&sv=2018-03-28®id=01031d61e1024861afee5d512651eb9f" + u = hostAndPath + query + ctxWithMetrics, _ = metrics.WithContext(context.Background(), "test", "peerd") ) func TestPartialContentResponseInP2PMode(t *testing.T) { @@ -45,12 +47,12 @@ func TestPartialContentResponseInP2PMode(t *testing.T) { } store.PrefetchWorkers = 0 // turn off prefetching - s, err := store.NewMockStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := store.NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } - h := New(context.Background(), s) + h := New(ctxWithMetrics, s) // Write the chunk file. content := newRandomStringN(10) @@ -95,12 +97,12 @@ func TestNotFoundInP2PMode(t *testing.T) { } store.PrefetchWorkers = 0 // turn off prefetching - s, err := store.NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } - h := New(context.Background(), s) + h := New(ctxWithMetrics, s) h.Handle(ctx) if ctx.Writer.Status() != http.StatusNotFound { @@ -129,12 +131,12 @@ func TestFill(t *testing.T) { } store.PrefetchWorkers = 0 // turn off prefetching - s, err := store.NewFilesStore(context.Background(), tests.NewMockRouter(make(map[string][]string))) + s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string))) if err != nil { t.Fatal(err) } - h := New(context.Background(), s) + h := New(ctxWithMetrics, s) err = h.fill(ctx) if err != nil { diff --git a/internal/handlers/root_test.go b/internal/handlers/root_test.go index 933f013..1665fe5 100644 --- a/internal/handlers/root_test.go +++ b/internal/handlers/root_test.go @@ -9,12 +9,17 @@ import ( "github.com/azure/peerd/internal/files/store" "github.com/azure/peerd/pkg/containerd" "github.com/azure/peerd/pkg/discovery/routing/tests" + "github.com/azure/peerd/pkg/metrics" "github.com/gin-gonic/gin" ) -var simpleOKHandler = gin.HandlerFunc(func(c *gin.Context) { - c.Status(http.StatusOK) -}) +var ( + simpleOKHandler = gin.HandlerFunc(func(c *gin.Context) { + c.Status(http.StatusOK) + }) + + ctxWithMetrics, _ = metrics.WithContext(context.Background(), "test", "peerd") +) func TestV2RoutesRegistrations(t *testing.T) { recorder := httptest.NewRecorder() @@ -82,7 +87,7 @@ func TestV2RoutesRegistrations(t *testing.T) { } func TestNewEngine(t *testing.T) { - engine := newEngine(context.Background()) + engine := newEngine(ctxWithMetrics) if engine == nil { t.Fatal("Expected non-nil engine, got nil") } @@ -97,15 +102,14 @@ func TestNewEngine(t *testing.T) { } func TestHandler(t *testing.T) { - ctx := context.Background() mr := tests.NewMockRouter(map[string][]string{}) ms := containerd.NewMockContainerdStore(nil) - mfs, err := store.NewMockStore(ctx, mr) + mfs, err := store.NewMockStore(ctxWithMetrics, mr) if err != nil { t.Fatal(err) } - h, err := Handler(ctx, mr, ms, mfs) + h, err := Handler(ctxWithMetrics, mr, ms, mfs) if err != nil { t.Fatal(err) } diff --git a/internal/handlers/v2/handler.go b/internal/handlers/v2/handler.go index 87fa1a9..c70b6d0 100644 --- a/internal/handlers/v2/handler.go +++ b/internal/handlers/v2/handler.go @@ -9,18 +9,19 @@ import ( "time" p2pcontext "github.com/azure/peerd/internal/context" - "github.com/azure/peerd/internal/metrics" "github.com/azure/peerd/internal/oci" - "github.com/azure/peerd/internal/oci/distribution" "github.com/azure/peerd/pkg/containerd" "github.com/azure/peerd/pkg/discovery/routing" + "github.com/azure/peerd/pkg/metrics" + "github.com/azure/peerd/pkg/oci/distribution" "github.com/gin-gonic/gin" ) // V2Handler describes a handler for OCI content. type V2Handler struct { - mirror *oci.Mirror - registry *oci.Registry + mirror *oci.Mirror + registry *oci.Registry + metricsRecorder metrics.Metrics } var _ gin.HandlerFunc = (&V2Handler{}).Handle @@ -32,7 +33,7 @@ func (h *V2Handler) Handle(c *gin.Context) { s := time.Now() defer func() { dur := time.Since(s) - metrics.Global.RecordRequest(c.Request.Method, "oci", dur.Seconds()) + h.metricsRecorder.RecordRequest(c.Request.Method, "oci", dur.Seconds()) l.Debug().Dur("duration", dur).Str("ns", c.GetString(p2pcontext.NamespaceCtxKey)).Str("ref", c.GetString(p2pcontext.ReferenceCtxKey)).Str("digest", c.GetString(p2pcontext.DigestCtxKey)).Msg("v2 handler stop") }() @@ -89,7 +90,8 @@ func (h *V2Handler) fill(c *gin.Context) error { // New creates a new OCI content handler. func New(ctx context.Context, router routing.Router, containerdStore containerd.Store) (*V2Handler, error) { return &V2Handler{ - mirror: oci.NewMirror(router), - registry: oci.NewRegistry(containerdStore), + mirror: oci.NewMirror(router), + registry: oci.NewRegistry(containerdStore), + metricsRecorder: metrics.FromContext(ctx), }, nil } diff --git a/internal/handlers/v2/handler_test.go b/internal/handlers/v2/handler_test.go index c551c79..88e16bd 100644 --- a/internal/handlers/v2/handler_test.go +++ b/internal/handlers/v2/handler_test.go @@ -7,17 +7,22 @@ import ( "testing" p2pcontext "github.com/azure/peerd/internal/context" - "github.com/azure/peerd/internal/oci/distribution" "github.com/azure/peerd/pkg/containerd" "github.com/azure/peerd/pkg/discovery/routing/tests" + "github.com/azure/peerd/pkg/metrics" + "github.com/azure/peerd/pkg/oci/distribution" "github.com/gin-gonic/gin" ) +var ( + ctxWithMetrics, _ = metrics.WithContext(context.Background(), "test", "peerd") +) + func TestNew(t *testing.T) { mr := tests.NewMockRouter(nil) ms := containerd.NewMockContainerdStore(nil) - h, err := New(context.Background(), mr, ms) + h, err := New(ctxWithMetrics, mr, ms) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -31,7 +36,7 @@ func TestFillDefault(t *testing.T) { mr := tests.NewMockRouter(nil) ms := containerd.NewMockContainerdStore(nil) - h, err := New(context.Background(), mr, ms) + h, err := New(ctxWithMetrics, mr, ms) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/internal/oci/registry.go b/internal/oci/registry.go index 2e8c673..4a2e617 100644 --- a/internal/oci/registry.go +++ b/internal/oci/registry.go @@ -9,8 +9,8 @@ import ( "time" p2pcontext "github.com/azure/peerd/internal/context" - "github.com/azure/peerd/internal/oci/distribution" "github.com/azure/peerd/pkg/containerd" + "github.com/azure/peerd/pkg/oci/distribution" "github.com/gin-gonic/gin" "github.com/opencontainers/go-digest" ) diff --git a/internal/oci/registry_test.go b/internal/oci/registry_test.go index 2dd8f21..1a898f1 100644 --- a/internal/oci/registry_test.go +++ b/internal/oci/registry_test.go @@ -8,8 +8,8 @@ import ( "testing" p2pcontext "github.com/azure/peerd/internal/context" - "github.com/azure/peerd/internal/oci/distribution" "github.com/azure/peerd/pkg/containerd" + "github.com/azure/peerd/pkg/oci/distribution" "github.com/gin-gonic/gin" ) diff --git a/internal/remote/reader.go b/internal/remote/reader.go index 04a2ec1..80c3a09 100644 --- a/internal/remote/reader.go +++ b/internal/remote/reader.go @@ -13,8 +13,8 @@ import ( "time" p2pcontext "github.com/azure/peerd/internal/context" - "github.com/azure/peerd/internal/metrics" "github.com/azure/peerd/pkg/discovery/routing" + "github.com/azure/peerd/pkg/metrics" "github.com/gin-gonic/gin" "github.com/rs/zerolog" ) @@ -36,6 +36,8 @@ type reader struct { router routing.Router resolveRetries int defaultHttpClient *http.Client + + metricsRecorder metrics.Metrics } var _ Reader = &reader{} @@ -68,7 +70,7 @@ func (r *reader) PreadRemote(buf []byte, offset int64) (int, error) { count32 := int(0) defer func() { - metrics.Global.RecordUpstreamResponse(originReq.URL.Hostname(), key, "pread", time.Since(startTime).Seconds(), int64(count32)) + r.metricsRecorder.RecordUpstreamResponse(originReq.URL.Hostname(), key, "pread", time.Since(startTime).Seconds(), int64(count32)) }() count32, err = r.preadRemote(log, originReq, r.defaultHttpClient, buf) return count32, err @@ -90,7 +92,7 @@ func (r *reader) FstatRemote() (int64, error) { var count int64 defer func() { - metrics.Global.RecordUpstreamResponse(originReq.URL.Hostname(), key, "fstat", time.Since(startTime).Seconds(), count) + r.metricsRecorder.RecordUpstreamResponse(originReq.URL.Hostname(), key, "fstat", time.Since(startTime).Seconds(), count) }() count, err = r.fstatRemote(log, originReq, r.defaultHttpClient) return count, err @@ -139,7 +141,7 @@ peerLoop: if peerCount == 0 { // Only report the time it took to discover the first peer. - metrics.Global.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds()) + r.metricsRecorder.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds()) peerCount++ } @@ -172,7 +174,7 @@ peerLoop: if o == operationPreadRemote { op = "pread" } - metrics.Global.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count) + r.metricsRecorder.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count) return count, nil } } @@ -275,7 +277,7 @@ func (r *reader) remoteRequest(u string, start, end int64) (*http.Request, error } // NewReader creates a new remote reader. -func NewReader(c *gin.Context, router routing.Router, resolveRetries int, resolveTimeout time.Duration) Reader { +func NewReader(c *gin.Context, router routing.Router, resolveRetries int, resolveTimeout time.Duration, metricsRecorder metrics.Metrics) Reader { cc := c.Copy() return &reader{ context: cc, @@ -283,5 +285,6 @@ func NewReader(c *gin.Context, router routing.Router, resolveRetries int, resolv router: router, resolveRetries: resolveRetries, defaultHttpClient: router.Net().HTTPClientFor(""), + metricsRecorder: metricsRecorder, } } diff --git a/internal/remote/reader_test.go b/internal/remote/reader_test.go index a1fa9ed..09dc320 100644 --- a/internal/remote/reader_test.go +++ b/internal/remote/reader_test.go @@ -10,7 +10,9 @@ import ( p2pcontext "github.com/azure/peerd/internal/context" "github.com/azure/peerd/pkg/discovery/routing/tests" + "github.com/azure/peerd/pkg/metrics" "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" ) @@ -18,6 +20,7 @@ var ( hostAndPath = "https://avtakkartest.blob.core.windows.net/d18c7a64c5158179-ff8cb2f639ff44879c12c94361a746d0-782b855128//docker/registry/v2/blobs/sha256/d1/d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d/data" query = "?se=2023-09-20T01%3A14%3A49Z&sig=m4Cr%2BYTZHZQlN5LznY7nrTQ4LCIx2OqnDDM3Dpedbhs%3D&sp=r&spr=https&sr=b&sv=2018-03-28®id=01031d61e1024861afee5d512651eb9f" u = hostAndPath + query + mr = metrics.NewPromMetrics(prometheus.DefaultRegisterer, "test", "test") ) func TestPreadRemoteUpstream(t *testing.T) { @@ -71,7 +74,7 @@ func TestPreadRemoteUpstream(t *testing.T) { c.Set(p2pcontext.BlobRangeCtxKey, "bytes=0-10") c.Set(p2pcontext.FileChunkCtxKey, key) - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) b := make([]byte, 10) // Test @@ -119,7 +122,7 @@ func TestFstatRemote(t *testing.T) { c.Set(p2pcontext.BlobUrlCtxKey, p2pcontext.BlobUrl(c)) c.Set(p2pcontext.BlobRangeCtxKey, "bytes=0-0") - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) got, err := r.FstatRemote() if err != nil { @@ -162,7 +165,7 @@ func TestFstatRemotePartialContent(t *testing.T) { c.Set(p2pcontext.BlobUrlCtxKey, p2pcontext.BlobUrl(c)) c.Set(p2pcontext.BlobRangeCtxKey, "bytes=0-0") - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) got, err := r.FstatRemote() if err != nil { @@ -202,7 +205,7 @@ func TestP2pRetries(t *testing.T) { router := tests.NewMockRouter(m) c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = req - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) b := make([]byte, 10) got, err := r.doP2p(l, key, 0, 10, operationPreadRemote, b) @@ -239,7 +242,7 @@ func TestP2pSuccess(t *testing.T) { router := tests.NewMockRouter(m) c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = req - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) b := make([]byte, 10) got, err := r.doP2p(l, key, 0, 10, operationPreadRemote, b) @@ -267,7 +270,7 @@ func TestP2pPeerNotFound(t *testing.T) { c, _ := gin.CreateTestContext(httptest.NewRecorder()) c.Request = req - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) b := make([]byte, 10) _, err = r.doP2p(l, "key", 0, 10, operationPreadRemote, b) @@ -297,7 +300,7 @@ func TestP2pNoInfiniteLoops(t *testing.T) { c.Request = req c.Request.Header.Add(p2pcontext.P2PHeaderKey, "true") - r := NewReader(c, router, 3, 500*time.Millisecond).(*reader) + r := NewReader(c, router, 3, 500*time.Millisecond, mr).(*reader) b := make([]byte, 10) _, err = r.doP2p(l, key, 0, 10, operationPreadRemote, b) diff --git a/internal/metrics/interface.go b/pkg/metrics/interface.go similarity index 56% rename from internal/metrics/interface.go rename to pkg/metrics/interface.go index 682517e..18fe58a 100644 --- a/internal/metrics/interface.go +++ b/pkg/metrics/interface.go @@ -2,7 +2,12 @@ // Licensed under the MIT License. package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "context" + "errors" + + "github.com/prometheus/client_golang/prometheus" +) // Metrics defines an interface to collect p2p metrics. type Metrics interface { @@ -19,5 +24,17 @@ type Metrics interface { RecordUpstreamResponse(hostname, key, op string, duration float64, count int64) } -// Global is the global metrics collector. -var Global Metrics = NewPromMetrics(prometheus.DefaultRegisterer) +// WithContext returns a new context with an metrics recorder. +func WithContext(ctx context.Context, name, prefix string) (context.Context, error) { + pm := NewPromMetrics(prometheus.DefaultRegisterer, name, prefix) + if pm == nil { + return nil, errors.New("failed to create prometheus metrics") + } + + return context.WithValue(ctx, ctxKey{}, pm), nil +} + +// FromContext returns the metrics recorder from the context. +func FromContext(ctx context.Context) Metrics { + return ctx.Value(ctxKey{}).(*promMetrics) +} diff --git a/internal/metrics/prometheus.go b/pkg/metrics/prometheus.go similarity index 80% rename from internal/metrics/prometheus.go rename to pkg/metrics/prometheus.go index 0f33935..21b472d 100644 --- a/internal/metrics/prometheus.go +++ b/pkg/metrics/prometheus.go @@ -2,12 +2,14 @@ package metrics import ( - "github.com/azure/peerd/internal/context" "github.com/prometheus/client_golang/prometheus" ) +type ctxKey struct{} + // promMetrics is a metrics collector that stores metrics in Prometheus. type promMetrics struct { + name string requestDuration *prometheus.HistogramVec peerDiscoveryDuration *prometheus.HistogramVec peerResponseSpeed *prometheus.HistogramVec @@ -18,61 +20,62 @@ var _ Metrics = &promMetrics{} // RecordPeerDiscovery records the duration of peer discovery for a given IP address. func (m *promMetrics) RecordPeerDiscovery(ip string, duration float64) { - m.peerDiscoveryDuration.WithLabelValues(context.NodeName, ip).Observe(duration) + m.peerDiscoveryDuration.WithLabelValues(m.name, ip).Observe(duration) } // RecordPeerResponse records the response time and count of a peer's operation. // It calculates the speed (count/duration) and updates the Prometheus metric. func (m *promMetrics) RecordPeerResponse(ip string, key string, op string, duration float64, count int64) { bps := float64(count) / duration - m.peerResponseSpeed.WithLabelValues(context.NodeName, ip, op).Observe(bps / float64(1024*1024)) + m.peerResponseSpeed.WithLabelValues(m.name, ip, op).Observe(bps / float64(1024*1024)) } // RecordRequest records the duration of a request for a specific method and handler. // It updates the Prometheus metric for request duration. func (m *promMetrics) RecordRequest(method string, handler string, duration float64) { - m.requestDuration.WithLabelValues(context.NodeName, method, handler).Observe(duration) + m.requestDuration.WithLabelValues(m.name, method, handler).Observe(duration) } // RecordUpstreamResponse records the duration and count of an upstream response. // It calculates the speed of the response and updates the corresponding Prometheus metric. func (m *promMetrics) RecordUpstreamResponse(hostname string, key string, op string, duration float64, count int64) { bps := float64(count) / duration - m.upstreamResponseSpeed.WithLabelValues(context.NodeName, hostname, op).Observe(bps / float64(1024*1024)) + m.upstreamResponseSpeed.WithLabelValues(m.name, hostname, op).Observe(bps / float64(1024*1024)) } // NewPromMetrics creates a new instance of promMetrics. -func NewPromMetrics(reg prometheus.Registerer) *promMetrics { +func NewPromMetrics(reg prometheus.Registerer, name, prefix string) *promMetrics { requestDurationHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "peerd_request_duration_seconds", + Name: prefix + "_request_duration_seconds", Help: "Duration of requests in seconds.", Buckets: prometheus.LinearBuckets(0.005, 0.025, 200), }, []string{"self", "method", "handler"}) reg.MustRegister(requestDurationHist) peerDiscoveryDurationHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "peerd_peer_discovery_duration_seconds", + Name: prefix + "_peer_discovery_duration_seconds", Help: "Duration of peer discovery in seconds.", Buckets: prometheus.LinearBuckets(0.001, 0.002, 200), }, []string{"self", "ip"}) reg.MustRegister(peerDiscoveryDurationHist) peerResponseDurationHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "peerd_peer_response_speed_mib_per_second", + Name: prefix + "_peer_response_speed_mib_per_second", Help: "Speed of peer response in Mib per second.", Buckets: prometheus.LinearBuckets(1, 15, 200), }, []string{"self", "ip", "op"}) reg.MustRegister(peerResponseDurationHist) upstreamResponseDurationHist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "peerd_upstream_response_speed_mib_per_second", + Name: prefix + "_upstream_response_speed_mib_per_second", Help: "Speed of upstream response in Mib per second.", Buckets: prometheus.LinearBuckets(1, 15, 200), }, []string{"self", "hostname", "op"}) reg.MustRegister(upstreamResponseDurationHist) return &promMetrics{ + name: name, requestDuration: requestDurationHist, peerDiscoveryDuration: peerDiscoveryDurationHist, peerResponseSpeed: peerResponseDurationHist, diff --git a/internal/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go similarity index 96% rename from internal/metrics/prometheus_test.go rename to pkg/metrics/prometheus_test.go index 9bc25f1..6086ff6 100644 --- a/internal/metrics/prometheus_test.go +++ b/pkg/metrics/prometheus_test.go @@ -10,7 +10,7 @@ import ( func TestPromMetrics_RecordPeerDiscovery(t *testing.T) { reg := prometheus.NewPedanticRegistry() - m := NewPromMetrics(reg) + m := NewPromMetrics(reg, "test", "peerd") ip := "192.168.0.1" duration := 0.001 @@ -47,7 +47,7 @@ func TestPromMetrics_RecordPeerDiscovery(t *testing.T) { func TestPromMetrics_RecordPeerResponse(t *testing.T) { reg := prometheus.NewPedanticRegistry() - m := NewPromMetrics(reg) + m := NewPromMetrics(reg, "test", "peerd") ip := "192.168.0.1" key := "key" @@ -90,7 +90,7 @@ func TestPromMetrics_RecordPeerResponse(t *testing.T) { func TestPromMetrics_RecordRequest(t *testing.T) { reg := prometheus.NewPedanticRegistry() - m := NewPromMetrics(reg) + m := NewPromMetrics(reg, "test", "peerd") method := "GET" handler := "files" @@ -129,7 +129,7 @@ func TestPromMetrics_RecordRequest(t *testing.T) { func TestPromMetrics_RecordUpstreamResponse(t *testing.T) { reg := prometheus.NewPedanticRegistry() - m := NewPromMetrics(reg) + m := NewPromMetrics(reg, "test", "peerd") hostname := "localhost" key := "key" diff --git a/internal/oci/distribution/v2.go b/pkg/oci/distribution/v2.go similarity index 100% rename from internal/oci/distribution/v2.go rename to pkg/oci/distribution/v2.go diff --git a/internal/oci/distribution/v2_test.go b/pkg/oci/distribution/v2_test.go similarity index 100% rename from internal/oci/distribution/v2_test.go rename to pkg/oci/distribution/v2_test.go diff --git a/internal/math/math.go b/tests/random/math/math.go similarity index 100% rename from internal/math/math.go rename to tests/random/math/math.go diff --git a/internal/math/math_test.go b/tests/random/math/math_test.go similarity index 100% rename from internal/math/math_test.go rename to tests/random/math/math_test.go diff --git a/internal/math/reverse.go b/tests/random/math/reverse.go similarity index 100% rename from internal/math/reverse.go rename to tests/random/math/reverse.go diff --git a/internal/math/reverse_test.go b/tests/random/math/reverse_test.go similarity index 100% rename from internal/math/reverse_test.go rename to tests/random/math/reverse_test.go diff --git a/tests/random/random.go b/tests/random/random.go index b1db8ee..682069c 100644 --- a/tests/random/random.go +++ b/tests/random/random.go @@ -17,7 +17,7 @@ import ( "sync" "time" - "github.com/azure/peerd/internal/math" + "github.com/azure/peerd/tests/random/math" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" )