diff --git a/internal/component/pyroscope/receive_http/ip_match.go b/internal/component/pyroscope/receive_http/ip_match.go new file mode 100644 index 0000000000..288488e3a8 --- /dev/null +++ b/internal/component/pyroscope/receive_http/ip_match.go @@ -0,0 +1,70 @@ +package receive_http + +import ( + "net/netip" + + "github.com/go-kit/log" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + labelMetaDockerNetworkIP = "__meta_docker_network_ip" + labelMetaKubernetesPodIP = "__meta_kubernetes_pod_ip" +) + +// buildIPLookupMap builds a map of IP addresses to discovery targets. When there are targets with the same IP address, only labels that have the same value will be kept. +func buildIPLookupMap(logger log.Logger, targets []discovery.Target) map[netip.Addr]discovery.Target { + result := make(map[netip.Addr]discovery.Target) + for _, t := range targets { + var addr netip.Addr + for _, key := range []string{labelMetaDockerNetworkIP, labelMetaKubernetesPodIP} { + ip, ok := t[key] + if !ok { + continue + } + + if a, err := netip.ParseAddr(ip); err != nil { + level.Warn(logger).Log("msg", "Unable to parse IP address", "ip", ip) + continue + } else { + addr = a + } + } + + if !addr.IsValid() { + continue + } + + // add the discovery target into the resultkey + for k, v := range t { + if _, ok := result[addr]; !ok { + result[addr] = make(discovery.Target) + } + + // check if the label already exists, if not add it and exit + vExisting, ok := result[addr][k] + if !ok { + result[addr][k] = v + continue + } + + // check if existing element is matching, if not set it to the empty string + if vExisting != v { + result[addr][k] = "" + } + } + } + + // go through all targets again and remove empty value labels + for keyIP := range result { + for name, value := range result[keyIP] { + if value == "" { + delete(result[keyIP], name) + } + } + } + + return result +} diff --git a/internal/component/pyroscope/receive_http/ip_match_test.go b/internal/component/pyroscope/receive_http/ip_match_test.go new file mode 100644 index 0000000000..1d4c9f020f --- /dev/null +++ b/internal/component/pyroscope/receive_http/ip_match_test.go @@ -0,0 +1,93 @@ +package receive_http + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/util" +) + +func toJson(t testing.TB, o any) string { + t.Helper() + x, err := json.Marshal(o) + require.NoError(t, err) + return string(x) +} + +func TestBuildIPLookupMap(t *testing.T) { + logger := util.TestAlloyLogger(t) + + for _, tc := range []struct { + name string + targets []discovery.Target + result string + }{ + {name: "empty targets"}, + { + name: "valid ips, no overlap", + targets: []discovery.Target{ + map[string]string{ + labelMetaDockerNetworkIP: "1.2.3.4", + "my-label": "value", + "my-label2": "value2", + }, + map[string]string{ + labelMetaKubernetesPodIP: "1.2.3.5", + "my-pod": "pod2", + "my-namespace": "namespace2", + }, + }, + result: `{ + "1.2.3.4":{"` + labelMetaDockerNetworkIP + `":"1.2.3.4","my-label":"value","my-label2":"value2"}, + "1.2.3.5":{"` + labelMetaKubernetesPodIP + `":"1.2.3.5","my-pod":"pod2","my-namespace":"namespace2"} + }`, + }, + { + name: "valid overlapping ips, pod label overlaps", + targets: []discovery.Target{ + map[string]string{ + labelMetaKubernetesPodIP: "1.2.3.4", + "my-pod": "pod1", + "my-namespace": "namespace1", + }, + map[string]string{ + labelMetaKubernetesPodIP: "1.2.3.4", + "my-pod": "pod2", + "my-namespace": "namespace1", + }, + }, + result: `{ + "1.2.3.4":{"` + labelMetaKubernetesPodIP + `":"1.2.3.4","my-namespace":"namespace1"} + }`, + }, + { + name: "valid overlapping ipv6s, pod label overlaps", + targets: []discovery.Target{ + map[string]string{ + labelMetaKubernetesPodIP: "cafe::", + "my-pod": "pod1", + "my-namespace": "namespace1", + }, + map[string]string{ + labelMetaKubernetesPodIP: "cafe::0", // note: string is not overlapping + "my-pod": "pod2", + "my-namespace": "namespace1", + }, + }, + result: `{ + "cafe::":{"my-namespace":"namespace1"} + }`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.result == "" { + tc.result = "{}" + } + got := buildIPLookupMap(logger, tc.targets) + require.JSONEq(t, tc.result, toJson(t, got)) + }) + } +} diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index c17868b427..2055b809b3 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -5,21 +5,31 @@ import ( "errors" "fmt" "io" + "net" "net/http" + "net/netip" + "net/url" "reflect" "sync" + "connectrpc.com/connect" "github.com/gorilla/mux" + "github.com/grafana/pyroscope/api/model/labelset" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "github.com/grafana/alloy/internal/component" fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) const ( @@ -39,7 +49,17 @@ func init() { } type Arguments struct { - Server *fnet.ServerConfig `alloy:",squash"` + // Server is the configuration for the HTTP server. + Server *fnet.ServerConfig `alloy:",squash"` + + // Join takes a discovert.Target and will add information that can be matched with incoming profiles. + // + // The matching is taking place using + // __meta_docker_network_ip + // __meta_kubernetes_pod_ip + Join []discovery.Target + + // ForwardTo is a list of appendables to forward the received profiles to. ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"` } @@ -59,7 +79,8 @@ type Component struct { server *fnet.TargetServer uncheckedCollector *util.UncheckedCollector appendables []pyroscope.Appendable - mut sync.Mutex + ipLookup map[netip.Addr]discovery.Target + mut sync.RWMutex } func New(opts component.Options, args Arguments) (*Component, error) { @@ -94,9 +115,13 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) + // build new ip lookup, before acquiring lock + ipLookup := buildIPLookupMap(c.opts.Logger, newArgs.Join) + c.mut.Lock() defer c.mut.Unlock() + c.ipLookup = ipLookup c.appendables = newArgs.ForwardTo // if no server config provided, we'll use defaults @@ -137,14 +162,141 @@ func (c *Component) Update(args component.Arguments) error { c.server = srv return c.server.MountAndRun(func(router *mux.Router) { + // this mounts the og pyroscope ingest API, mostly used by SDKs router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) + + // mount connect go pushv1 + pathPush, handlePush := pushv1connect.NewPusherServiceHandler(c) + router.PathPrefix(pathPush).Handler(handlePush).Methods(http.MethodPost) }) } -func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { - c.mut.Lock() +func setLabelBuilderFromAPI(lb *labels.Builder, api []*typesv1.LabelPair) { + for i := range api { + lb.Set(api[i].Name, api[i].Value) + } +} + +func apiToAlloySamples(api []*pushv1.RawSample) []*pyroscope.RawSample { + var ( + alloy = make([]*pyroscope.RawSample, len(api)) + ) + for i := range alloy { + alloy[i] = &pyroscope.RawSample{ + RawProfile: api[i].RawProfile, + } + } + return alloy +} + +func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest], +) (*connect.Response[pushv1.PushResponse], error) { + + // lookup extra labels to join + var extraLabels discovery.Target + if remoteIP := c.ipFromReq(req.Peer().Addr, req.Header()); remoteIP.IsValid() { + extraLabels = c.getIPLookup()[remoteIP] + } + + appendables := c.getAppendables() + + // Create an errgroup with the timeout context + g, ctx := errgroup.WithContext(ctx) + + // Start copying the request body to all pipes + for i := range appendables { + appendable := appendables[i].Appender() + g.Go(func() error { + var ( + errs error + lb = labels.NewBuilder(nil) + ) + + for idx := range req.Msg.Series { + lb.Reset(nil) + setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels) + for k, v := range extraLabels { + lb.Set(k, v) + } + err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples)) + if err != nil { + errs = errors.Join( + errs, + fmt.Errorf("unable to append series %s to appendable %d: %w", lb.Labels().String(), i, err), + ) + } + } + return errs + }) + } + if err := g.Wait(); err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to forward profiles requests", "err", err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + level.Debug(c.opts.Logger).Log("msg", "Profiles successfully forwarded") + return connect.NewResponse(&pushv1.PushResponse{}), nil +} + +func (c *Component) getIPLookup() map[netip.Addr]discovery.Target { + c.mut.RLock() + defer c.mut.RUnlock() + ipLookup := c.ipLookup + return ipLookup +} + +func (c *Component) getAppendables() []pyroscope.Appendable { + c.mut.RLock() + defer c.mut.RUnlock() appendables := c.appendables - c.mut.Unlock() + return appendables +} + +// TODO: This is likely to simple we should also accept headers X-Forwarded-For, if it coming from an internal IP +func (c *Component) ipFromReq(remoteAddr string, _ http.Header) netip.Addr { + if remoteAddr != "" { + host, _, _ := net.SplitHostPort(remoteAddr) + addr, err := netip.ParseAddr(host) + if err == nil { + return addr + } + level.Error(c.opts.Logger).Log("msg", "Unable to parse remote IP address", "ip", host) + } + + return netip.Addr{} +} + +// Parse and rewrite labels/Series +// TODO: Keep labels out of url until pyroscope.write +// TODO: Investigate merging of appendables[0].Append() and AppendIngest again +func (c *Component) rewriteIngestURL(ip netip.Addr, u url.URL) url.URL { + ipLookup := c.getIPLookup() + + // loop up ip in ipLookup + extraLabels, found := ipLookup[ip] + if !found { + return u + } + + // parse existing labels + ls, err := labelset.Parse(u.Query().Get("name")) + if err != nil { + level.Warn(c.opts.Logger).Log("msg", "Failed to parse labelset", "err", err) + return u + } + + for k, v := range extraLabels { + ls.Add(k, v) + } + query := u.Query() + query.Set("name", ls.Normalized()) + u.RawQuery = query.Encode() + + return u +} + +func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { + appendables := c.getAppendables() // Create a pipe for each appendable pipeWriters := make([]io.Writer, len(appendables)) @@ -170,6 +322,12 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { return err }) + newURL := *r.URL + remoteIP := c.ipFromReq(r.RemoteAddr, r.Header) + if remoteIP.IsValid() { + newURL = c.rewriteIngestURL(remoteIP, *r.URL) + } + // Process each appendable for i, appendable := range appendables { g.Go(func() error { @@ -178,7 +336,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { profile := &pyroscope.IncomingProfile{ Body: io.NopCloser(pipeReaders[i]), Headers: r.Header.Clone(), - URL: r.URL, + URL: &newURL, } err := appendable.Appender().AppendIngest(ctx, profile) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 5c507016cb..81dedeb0a6 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "errors" "fmt" "io" "net/http" @@ -11,6 +12,7 @@ import ( "testing" "time" + "connectrpc.com/connect" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -18,15 +20,21 @@ import ( "github.com/grafana/alloy/internal/component" fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) -// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component -// under various scenarios. It tests different profile sizes, HTTP methods, paths, -// query parameters, and error conditions to ensure correct forwarding behavior -// and proper error handling. -func TestForwardsProfiles(t *testing.T) { +// TestForwardsProfilesIngest verifies the behavior of the +// pyroscope.receive_http component under various scenarios. It tests different +// profile sizes, HTTP methods, paths, query parameters, and error conditions +// to ensure correct forwarding behavior and proper error handling, when +// clients use the legacy OG Pyroscope /ingest API, which is predominentaly +// used by the SDKs. +func TestForwardsProfilesIngest(t *testing.T) { tests := []struct { name string profileSize int @@ -135,6 +143,138 @@ func TestForwardsProfiles(t *testing.T) { } } +func generatePushPayload(numSeries, numSamplesPerSeries, sampleSize int) *connect.Request[pushv1.PushRequest] { + var series []*pushv1.RawProfileSeries + for i := 0; i < numSeries; i++ { + var samples []*pushv1.RawSample + for j := 0; j < numSamplesPerSeries; j++ { + samples = append(samples, &pushv1.RawSample{ + RawProfile: bytes.Repeat([]byte{0xde, 0xad}, sampleSize/2), + }) + } + + series = append(series, &pushv1.RawProfileSeries{ + Labels: []*typesv1.LabelPair{ + {Name: "app", Value: fmt.Sprintf("app-%d", i)}, + }, + Samples: samples, + }) + } + + return connect.NewRequest(&pushv1.PushRequest{Series: series}) +} + +// TestForwardsProfilesPushV1 verifies the behavior of the +// pyroscope.receive_http using the connect pushv1 API. This is predominentaly +// used by other alloy components like pyrscope.ebpf. +func TestForwardsProfilesPushV1(t *testing.T) { + for _, tc := range []struct { + name string + clientOpts []connect.ClientOption + appendableErrors []error + + numSeries int + numSamplesPerSeries int + SampleSize int + + expectedSeries []string + expectedError error + }{ + { + name: "One series, one small profile, one appendables", + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, one small profile, one appendables using JSON", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithProtoJSON()}, + }, + { + name: "One series, one small profile, one appendables using GRPC", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithGRPC()}, + }, + { + name: "One series, one small profile, one appendables using GRPCWeb", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithGRPCWeb()}, + }, + { + name: "Two series, one small profile, one appendables", + numSeries: 2, + expectedSeries: []string{ + `{app="app-0"}`, + `{app="app-1"}`, + }, + }, + { + name: "One series, two small profile, one appendable", + numSamplesPerSeries: 2, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, nil}, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable one with errors", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, errors.New("wtf")}, + expectedSeries: []string{`{app="app-0"}`}, + expectedError: errors.New(`internal: unable to append series {app="app-0"} to appendable 1: wtf`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.SampleSize == 0 { + tc.SampleSize = 1024 + } + if tc.numSeries == 0 { + tc.numSeries = 1 + } + if len(tc.appendableErrors) == 0 { + tc.appendableErrors = []error{nil} + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + appendables := createTestAppendables(tc.appendableErrors) + port := startComponent(t, appendables) + + c := pushv1connect.NewPusherServiceClient( + http.DefaultClient, + fmt.Sprintf("http://127.0.0.1:%d", port), + tc.clientOpts...) + + _, err := c.Push(ctx, generatePushPayload(tc.numSeries, tc.numSamplesPerSeries, tc.SampleSize)) + if tc.expectedError != nil { + require.ErrorContains(t, err, tc.expectedError.Error()) + } else { + require.NoError(t, err) + } + + for idx := range appendables { + a := appendables[idx].(*testAppender) + + // check series match + require.Equal(t, a.series(), tc.expectedSeries) + + // check number of samples is correct + require.Equal(t, tc.numSeries*tc.numSamplesPerSeries, a.samples()) + + // check samples are received in full + for _, samples := range a.pushedSamples { + for _, sample := range samples { + require.Len(t, sample.RawProfile, tc.SampleSize) + } + } + } + }) + } +} + func createTestAppendables(errors []error) []pyroscope.Appendable { var appendables []pyroscope.Appendable for _, err := range errors { @@ -153,7 +293,13 @@ func countForwardedProfiles(appendables []pyroscope.Appendable) int { return count } -func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) { +func verifyForwardedProfiles( + t *testing.T, + appendables []pyroscope.Appendable, + expectedProfile []byte, + expectedHeaders map[string]string, + expectedQueryParams string, +) { for i, app := range appendables { testApp, ok := app.(*testAppender) require.True(t, ok, "Appendable is not a testAppender") @@ -166,7 +312,14 @@ func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, e // Verify headers for key, value := range expectedHeaders { - require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i) + require.Equal( + t, + value, + testApp.lastProfile.Headers.Get(key), + "Header mismatch for key %s in appendable %d", + key, + i, + ) } // Verify query parameters @@ -208,7 +361,13 @@ func startComponent(t *testing.T, appendables []pyroscope.Appendable) int { return port } -func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) { +func sendCustomRequest( + t *testing.T, + port int, + method, path, queryParams string, + headers map[string]string, + profileSize int, +) ([]byte, *http.Response) { t.Helper() testProfile := make([]byte, profileSize) _, err := rand.Read(testProfile) @@ -251,14 +410,42 @@ func testAppendable(appendErr error) pyroscope.Appendable { type testAppender struct { appendErr error lastProfile *pyroscope.IncomingProfile + + pushedLabels []labels.Labels + pushedSamples [][]*pyroscope.RawSample +} + +func (a *testAppender) reset(expectedErr error) { + a.appendErr = expectedErr + a.lastProfile = nil + a.pushedLabels = a.pushedLabels[:0] + a.pushedSamples = a.pushedSamples[:0] +} + +func (a *testAppender) samples() int { + var c = 0 + for _, x := range a.pushedSamples { + c += len(x) + } + return c +} + +func (a *testAppender) series() []string { + var series []string + for _, labels := range a.pushedLabels { + series = append(series, labels.String()) + } + return series } func (a *testAppender) Appender() pyroscope.Appender { return a } -func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { - return fmt.Errorf("Append method not implemented for test") +func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error { + a.pushedLabels = append(a.pushedLabels, lbls) + a.pushedSamples = append(a.pushedSamples, samples) + return a.appendErr } func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { @@ -288,7 +475,7 @@ func testOptions(t *testing.T) component.Options { } } -// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. +// TestUpdateArgs verifies that the component can be updated with new arguments. This explicitly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. func TestUpdateArgs(t *testing.T) { ports, err := freeport.GetFreePorts(2) require.NoError(t, err) @@ -329,3 +516,69 @@ func TestUpdateArgs(t *testing.T) { waitForServerReady(t, ports[1]) } + +// The join parameter allows to add extra labels onto the received profiles, based on IP address matching +func TestJoin(t *testing.T) { + // TODO: test for ingest endpoint + port, err := freeport.GetFreePort() + require.NoError(t, err) + + forwardTo := []pyroscope.Appendable{testAppendable(nil)} + + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + }, + ForwardTo: forwardTo, + Join: []discovery.Target{ + {labelMetaKubernetesPodIP: "127.0.0.1", "pod": "my-pod-a", "namespace": "my-namespace"}, + }, + } + + comp, err := New(testOptions(t), args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + waitForServerReady(t, port) + + c := pushv1connect.NewPusherServiceClient( + http.DefaultClient, + fmt.Sprintf("http://127.0.0.1:%d", port)) + + _, err = c.Push(ctx, generatePushPayload(1, 1, 1024)) + require.NoError(t, err) + + appendable := forwardTo[0].(*testAppender) + require.Equal( + t, + []string{"{__meta_kubernetes_pod_ip=\"127.0.0.1\", app=\"app-0\", namespace=\"my-namespace\", pod=\"my-pod-a\"}"}, + appendable.series(), + ) + appendable.reset(nil) + + // update join info + args.Join = []discovery.Target{ + {labelMetaKubernetesPodIP: "127.0.0.1", "pod": "my-pod-b", "namespace": "my-namespace"}, + } + comp.Update(args) + + _, err = c.Push(ctx, generatePushPayload(1, 1, 1024)) + require.NoError(t, err) + + require.Equal( + t, + []string{"{__meta_kubernetes_pod_ip=\"127.0.0.1\", app=\"app-0\", namespace=\"my-namespace\", pod=\"my-pod-b\"}"}, + appendable.series(), + ) + appendable.reset(nil) + +}