From e977961e529a81eec44bc36e490e662e298eddbc Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Thu, 16 Jan 2025 17:20:17 +0000 Subject: [PATCH 1/2] feat(pyroscope.receive_http): Support pushv1.Push in receive_http /push.v1.PusherService/Push which is a connect API used by profilecli and pyroscope.write with pyroscope.ebpf and pyroscope.alloy. This is in addtion to the /ingest API the component already supports. --- .../pyroscope/receive_http/receive_http.go | 76 ++++++- .../receive_http/receive_http_test.go | 199 +++++++++++++++++- 2 files changed, 262 insertions(+), 13 deletions(-) diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index c17868b427..4e969694dc 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -9,8 +9,10 @@ import ( "reflect" "sync" + "connectrpc.com/connect" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "github.com/grafana/alloy/internal/component" @@ -20,6 +22,9 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) const ( @@ -137,14 +142,81 @@ func (c *Component) Update(args component.Arguments) error { c.server = srv return c.server.MountAndRun(func(router *mux.Router) { + // this mounts the og pyroscope ingest API, mostly used by SDKs router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) + + // mount connect go pushv1 + pathPush, handlePush := pushv1connect.NewPusherServiceHandler(c) + router.PathPrefix(pathPush).Handler(handlePush).Methods(http.MethodPost) }) } -func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { +func setLabelBuilderFromAPI(lb *labels.Builder, api []*typesv1.LabelPair) { + for i := range api { + lb.Set(api[i].Name, api[i].Value) + } +} + +func apiToAlloySamples(api []*pushv1.RawSample) []*pyroscope.RawSample { + var ( + alloy = make([]*pyroscope.RawSample, len(api)) + ) + for i := range alloy { + alloy[i] = &pyroscope.RawSample{ + RawProfile: api[i].RawProfile, + } + } + return alloy +} + +func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest], +) (*connect.Response[pushv1.PushResponse], error) { + appendables := c.getAppendables() + + // Create an errgroup with the timeout context + g, ctx := errgroup.WithContext(ctx) + + // Start copying the request body to all pipes + for i := range appendables { + appendable := appendables[i].Appender() + g.Go(func() error { + var ( + errs error + lb = labels.NewBuilder(nil) + ) + + for idx := range req.Msg.Series { + lb.Reset(nil) + setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels) + err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples)) + if err != nil { + errs = errors.Join( + errs, + fmt.Errorf("unable to append series %s to appendable %d: %w", lb.Labels().String(), i, err), + ) + } + } + return errs + }) + } + if err := g.Wait(); err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to forward profiles requests", "err", err) + return nil, connect.NewError(connect.CodeInternal, err) + } + + level.Debug(c.opts.Logger).Log("msg", "Profiles successfully forwarded") + return connect.NewResponse(&pushv1.PushResponse{}), nil +} + +func (c *Component) getAppendables() []pyroscope.Appendable { c.mut.Lock() + defer c.mut.Unlock() appendables := c.appendables - c.mut.Unlock() + return appendables +} + +func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { + appendables := c.getAppendables() // Create a pipe for each appendable pipeWriters := make([]io.Writer, len(appendables)) diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go index 5c507016cb..255189d4a2 100644 --- a/internal/component/pyroscope/receive_http/receive_http_test.go +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "errors" "fmt" "io" "net/http" @@ -11,6 +12,7 @@ import ( "testing" "time" + "connectrpc.com/connect" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -20,13 +22,18 @@ import ( fnet "github.com/grafana/alloy/internal/component/common/net" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/util" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) -// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component -// under various scenarios. It tests different profile sizes, HTTP methods, paths, -// query parameters, and error conditions to ensure correct forwarding behavior -// and proper error handling. -func TestForwardsProfiles(t *testing.T) { +// TestForwardsProfilesIngest verifies the behavior of the +// pyroscope.receive_http component under various scenarios. It tests different +// profile sizes, HTTP methods, paths, query parameters, and error conditions +// to ensure correct forwarding behavior and proper error handling, when +// clients use the legacy OG Pyroscope /ingest API, which is predominentaly +// used by the SDKs. +func TestForwardsProfilesIngest(t *testing.T) { tests := []struct { name string profileSize int @@ -135,6 +142,136 @@ func TestForwardsProfiles(t *testing.T) { } } +// TestForwardsProfilesPushV1 verifies the behavior of the +// pyroscope.receive_http using the connect pushv1 API. This is predominentaly +// used by other alloy components like pyrscope.ebpf. +func TestForwardsProfilesPushV1(t *testing.T) { + for _, tc := range []struct { + name string + clientOpts []connect.ClientOption + appendableErrors []error + + numSeries int + numSamplesPerSeries int + SampleSize int + + expectedSeries []string + expectedError error + }{ + { + name: "One series, one small profile, one appendables", + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, one small profile, one appendables using JSON", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithProtoJSON()}, + }, + { + name: "One series, one small profile, one appendables using GRPC", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithGRPC()}, + }, + { + name: "One series, one small profile, one appendables using GRPCWeb", + expectedSeries: []string{`{app="app-0"}`}, + clientOpts: []connect.ClientOption{connect.WithGRPCWeb()}, + }, + { + name: "Two series, one small profile, one appendables", + numSeries: 2, + expectedSeries: []string{ + `{app="app-0"}`, + `{app="app-1"}`, + }, + }, + { + name: "One series, two small profile, one appendable", + numSamplesPerSeries: 2, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, nil}, + expectedSeries: []string{`{app="app-0"}`}, + }, + { + name: "One series, two small profile, two appendable one with errors", + numSamplesPerSeries: 2, + appendableErrors: []error{nil, errors.New("wtf")}, + expectedSeries: []string{`{app="app-0"}`}, + expectedError: errors.New(`internal: unable to append series {app="app-0"} to appendable 1: wtf`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + if tc.SampleSize == 0 { + tc.SampleSize = 1024 + } + if tc.numSeries == 0 { + tc.numSeries = 1 + } + if len(tc.appendableErrors) == 0 { + tc.appendableErrors = []error{nil} + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + appendables := createTestAppendables(tc.appendableErrors) + port := startComponent(t, appendables) + + c := pushv1connect.NewPusherServiceClient( + http.DefaultClient, + fmt.Sprintf("http://127.0.0.1:%d", port), + tc.clientOpts...) + + var series []*pushv1.RawProfileSeries + for i := 0; i < tc.numSeries; i++ { + var samples []*pushv1.RawSample + for j := 0; j < tc.numSamplesPerSeries; j++ { + samples = append(samples, &pushv1.RawSample{ + RawProfile: bytes.Repeat([]byte{0xde, 0xad}, tc.SampleSize/2), + }) + } + + series = append(series, &pushv1.RawProfileSeries{ + Labels: []*typesv1.LabelPair{ + {Name: "app", Value: fmt.Sprintf("app-%d", i)}, + }, + Samples: samples, + }) + } + + _, err := c.Push(ctx, connect.NewRequest(&pushv1.PushRequest{ + Series: series, + })) + if tc.expectedError != nil { + require.ErrorContains(t, err, tc.expectedError.Error()) + } else { + require.NoError(t, err) + } + + for idx := range appendables { + a := appendables[idx].(*testAppender) + + // check series match + require.Equal(t, a.series(), tc.expectedSeries) + + // check number of samples is correct + require.Equal(t, tc.numSeries*tc.numSamplesPerSeries, a.samples()) + + // check samples are received in full + for _, samples := range a.pushedSamples { + for _, sample := range samples { + require.Len(t, sample.RawProfile, tc.SampleSize) + } + } + } + }) + } +} + func createTestAppendables(errors []error) []pyroscope.Appendable { var appendables []pyroscope.Appendable for _, err := range errors { @@ -153,7 +290,13 @@ func countForwardedProfiles(appendables []pyroscope.Appendable) int { return count } -func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) { +func verifyForwardedProfiles( + t *testing.T, + appendables []pyroscope.Appendable, + expectedProfile []byte, + expectedHeaders map[string]string, + expectedQueryParams string, +) { for i, app := range appendables { testApp, ok := app.(*testAppender) require.True(t, ok, "Appendable is not a testAppender") @@ -166,7 +309,14 @@ func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, e // Verify headers for key, value := range expectedHeaders { - require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i) + require.Equal( + t, + value, + testApp.lastProfile.Headers.Get(key), + "Header mismatch for key %s in appendable %d", + key, + i, + ) } // Verify query parameters @@ -208,7 +358,13 @@ func startComponent(t *testing.T, appendables []pyroscope.Appendable) int { return port } -func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) { +func sendCustomRequest( + t *testing.T, + port int, + method, path, queryParams string, + headers map[string]string, + profileSize int, +) ([]byte, *http.Response) { t.Helper() testProfile := make([]byte, profileSize) _, err := rand.Read(testProfile) @@ -251,14 +407,35 @@ func testAppendable(appendErr error) pyroscope.Appendable { type testAppender struct { appendErr error lastProfile *pyroscope.IncomingProfile + + pushedLabels []labels.Labels + pushedSamples [][]*pyroscope.RawSample +} + +func (a *testAppender) samples() int { + var c = 0 + for _, x := range a.pushedSamples { + c += len(x) + } + return c +} + +func (a *testAppender) series() []string { + var series []string + for _, labels := range a.pushedLabels { + series = append(series, labels.String()) + } + return series } func (a *testAppender) Appender() pyroscope.Appender { return a } -func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { - return fmt.Errorf("Append method not implemented for test") +func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*pyroscope.RawSample) error { + a.pushedLabels = append(a.pushedLabels, lbls) + a.pushedSamples = append(a.pushedSamples, samples) + return a.appendErr } func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { @@ -288,7 +465,7 @@ func testOptions(t *testing.T) component.Options { } } -// TestUpdateArgs verifies that the component can be updated with new arguments. This explictly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. +// TestUpdateArgs verifies that the component can be updated with new arguments. This explicitly also makes sure that the server is restarted when the server configuration changes. And there are no metric registration conflicts. func TestUpdateArgs(t *testing.T) { ports, err := freeport.GetFreePorts(2) require.NoError(t, err) From 088f70707282a398f503d577dfad23aebe6ac02f Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Wed, 29 Jan 2025 08:24:03 +0000 Subject: [PATCH 2/2] Update changelog and docs --- CHANGELOG.md | 2 ++ .../reference/components/pyroscope/pyroscope.receive_http.md | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f5d830895..c097805fb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ Main (unreleased) - Bump snmp_exporter and embedded modules to 0.27.0. Add support for multi-module handling by comma separation and expose argument to increase SNMP polling concurrency for `prometheus.exporter.snmp`. (@v-zhuravlev) +- Add support for pushv1.PusherService Connect API in `pyroscope.receive_http`. (@simonswine) + v1.6.1 ----------------- diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md index d5f50962f8..f8a38b1cf2 100644 --- a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -12,7 +12,7 @@ title: pyroscope.receive_http `pyroscope.receive_http` receives profiles over HTTP and forwards them to `pyroscope.*` components capable of receiving profiles. -The HTTP API exposed is compatible with the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). +The HTTP API exposed is compatible with both the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/) and the [pushv1.PusherService](https://github.com/grafana/pyroscope/blob/main/api/push/v1/push.proto) Connect API. This allows `pyroscope.receive_http` to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. ## Usage @@ -30,6 +30,7 @@ pyroscope.receive_http "LABEL" { The component will start an HTTP server supporting the following endpoint. * `POST /ingest` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope ingest API. +* `POST /push.v1.PusherService/Push` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope pushv1.PusherService Connect API. ## Arguments