diff --git a/internal/component/prometheus/write/queue/e2e_bench_test.go b/internal/component/prometheus/write/queue/e2e_bench_test.go deleted file mode 100644 index 3ae7c36534..0000000000 --- a/internal/component/prometheus/write/queue/e2e_bench_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package queue - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" -) - -func BenchmarkE2E(b *testing.B) { - // Around 120k ops if you look at profile roughly 20k are actual implementation with the rest being benchmark - // setup. - type e2eTest struct { - name string - maker func(index int, app storage.Appender) - tester func(samples []prompb.TimeSeries) - } - tests := []e2eTest{ - { - // This should be ~1200 allocs an op - name: "normal", - maker: func(index int, app storage.Appender) { - ts, v, lbls := makeSeries(index) - _, _ = app.Append(0, lbls, ts, v) - }, - tester: func(samples []prompb.TimeSeries) { - b.Helper() - for _, s := range samples { - require.True(b, len(s.Samples) == 1) - } - }, - }, - } - for _, test := range tests { - b.Run(test.name, func(t *testing.B) { - runBenchmark(t, test.maker, test.tester) - }) - } -} - -func runBenchmark(t *testing.B, add func(index int, appendable storage.Appender), _ func(samples []prompb.TimeSeries)) { - t.ReportAllocs() - l := log.NewNopLogger() - done := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) - expCh := make(chan Exports, 1) - c, err := newComponentBenchmark(t, l, srv.URL, expCh) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := 0 - app := exp.Receiver.Appender(ctx) - - for i := 0; i < t.N; i++ { - index++ - add(index, app) - } - require.NoError(t, app.Commit()) - - tm := time.NewTimer(10 * time.Second) - select { - case <-done: - case <-tm.C: - } - cancel() -} - -func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Exports) (*Queue, error) { - return NewComponent(component.Options{ - ID: "test", - Logger: l, - DataPath: t.TempDir(), - OnStateChange: func(e component.Exports) { - exp <- e.(Exports) - }, - Registerer: fakeRegistry{}, - Tracer: nil, - }, Arguments{ - TTL: 2 * time.Hour, - Persistence: Persistence{ - MaxSignalsToBatch: 100_000, - BatchInterval: 1 * time.Second, - }, - Endpoints: []EndpointConfig{{ - Name: "test", - URL: url, - Timeout: 10 * time.Second, - RetryBackoff: 1 * time.Second, - MaxRetryAttempts: 0, - BatchCount: 50, - FlushInterval: 1 * time.Second, - Parallelism: 1, - }}, - }) -} - -var _ prometheus.Registerer = (*fakeRegistry)(nil) - -type fakeRegistry struct{} - -func (f fakeRegistry) Register(collector prometheus.Collector) error { - return nil -} - -func (f fakeRegistry) MustRegister(collector ...prometheus.Collector) { -} - -func (f fakeRegistry) Unregister(collector prometheus.Collector) bool { - return true -} diff --git a/internal/component/prometheus/write/queue/e2e_stats_test.go b/internal/component/prometheus/write/queue/e2e_stats_test.go deleted file mode 100644 index b6737d5be8..0000000000 --- a/internal/component/prometheus/write/queue/e2e_stats_test.go +++ /dev/null @@ -1,711 +0,0 @@ -package queue - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/grafana/alloy/internal/util" - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/require" -) - -const remoteSamples = "prometheus_remote_storage_samples_total" -const remoteHistograms = "prometheus_remote_storage_histograms_total" -const remoteMetadata = "prometheus_remote_storage_metadata_total" - -const sentBytes = "prometheus_remote_storage_sent_bytes_total" -const sentMetadataBytes = "prometheus_remote_storage_metadata_bytes_total" - -const outTimestamp = "prometheus_remote_storage_queue_highest_sent_timestamp_seconds" -const inTimestamp = "prometheus_remote_storage_highest_timestamp_in_seconds" - -const failedSample = "prometheus_remote_storage_samples_failed_total" -const failedHistogram = "prometheus_remote_storage_histograms_failed_total" -const failedMetadata = "prometheus_remote_storage_metadata_failed_total" - -const retriedSamples = "prometheus_remote_storage_samples_retried_total" -const retriedHistogram = "prometheus_remote_storage_histograms_retried_total" -const retriedMetadata = "prometheus_remote_storage_metadata_retried_total" - -const prometheusDuration = "prometheus_remote_storage_queue_duration_seconds" - -const serializerIncoming = "alloy_queue_series_serializer_incoming_signals" -const alloySent = "alloy_queue_series_network_sent" -const alloySerializerIncoming = "alloy_queue_series_serializer_incoming_timestamp_seconds" -const alloyNetworkDuration = "alloy_queue_series_network_duration_seconds" -const alloyFailures = "alloy_queue_series_network_failed" -const alloyRetries = "alloy_queue_series_network_retried" -const alloy429 = "alloy_queue_series_network_retried_429" - -const alloyMetadataDuration = "alloy_queue_metadata_network_duration_seconds" -const alloyMetadataSent = "alloy_queue_metadata_network_sent" -const alloyMetadataFailed = "alloy_queue_metadata_network_failed" -const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429" -const alloyMetadataRetried = "alloy_queue_metadata_network_retried" - -const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds" - -const alloyDrift = "alloy_queue_series_timestamp_drift_seconds" - -// TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. -func TestMetadata(t *testing.T) { - // Check assumes you are checking for any value that is not 0. - // The test at the end will see if there are any values that were not 0. - tests := []statsTest{ - // Metadata Tests - { - name: "metadata success", - returnStatusCode: http.StatusOK, - dtype: Metadata, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteMetadata, - value: 10, - }, - { - name: sentMetadataBytes, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataSent, - value: 10, - }, - }, - }, - { - name: "metadata failure", - returnStatusCode: http.StatusBadRequest, - dtype: Metadata, - checks: []check{ - { - name: alloyMetadataFailed, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedMetadata, - value: 10, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "metadata retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Metadata, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedMetadata, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataRetried, - valueFunc: greaterThenZero, - }, - { - name: alloyMetadataRetried429, - valueFunc: greaterThenZero, - }, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runE2eStats(t, test) - }) - } - -} - -// TestMetrics is the large end to end testing for the queue based wal. -func TestMetrics(t *testing.T) { - // Check assumes you are checking for any value that is not 0. - // The test at the end will see if there are any values that were not 0. - tests := []statsTest{ - // Sample Tests - { - name: "sample success", - returnStatusCode: http.StatusOK, - dtype: Sample, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteSamples, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "sample failure", - returnStatusCode: http.StatusBadRequest, - dtype: Sample, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedSample, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "sample retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Sample, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedSamples, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - // histograms - { - name: "histogram success", - returnStatusCode: http.StatusOK, - dtype: Histogram, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteHistograms, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "histogram failure", - returnStatusCode: http.StatusBadRequest, - dtype: Histogram, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedHistogram, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "histogram retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Histogram, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedHistogram, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyDrift, - valueFunc: greaterThenZero, - }, - }, - }, - // TURNING OFF EXEMPLAR TESTS until underlying issue is resolved. - //exemplar, note that once it hits the appender exemplars are treated the same as series. - /*{ - name: "exemplar success", - returnStatusCode: http.StatusOK, - dtype: Exemplar, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: remoteSamples, - value: 10, - }, - { - name: alloySent, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: sentBytes, - valueFunc: greaterThenZero, - }, - { - name: outTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - { - name: alloyNetworkTimestamp, - valueFunc: greaterThenZero, - }, - }, - }, - { - name: "exemplar failure", - returnStatusCode: http.StatusBadRequest, - dtype: Exemplar, - checks: []check{ - { - name: alloyFailures, - value: 10, - }, - { - name: serializerIncoming, - value: 10, - }, - { - name: failedSample, - value: 10, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - }, - }, - { - name: "exemplar retry", - returnStatusCode: http.StatusTooManyRequests, - dtype: Exemplar, - checks: []check{ - { - name: serializerIncoming, - value: 10, - }, - { - name: retriedSamples, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloyRetries, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: alloy429, - // This will be more than 10 since it retries in a loop. - valueFunc: greaterThenZero, - }, - { - name: prometheusDuration, - valueFunc: greaterThenZero, - }, - { - name: alloyNetworkDuration, - valueFunc: greaterThenZero, - }, - { - name: alloySerializerIncoming, - valueFunc: isReasonableTimeStamp, - }, - { - name: inTimestamp, - valueFunc: isReasonableTimeStamp, - }, - }, - },*/ - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runE2eStats(t, test) - }) - } - -} - -func greaterThenZero(v float64) bool { - return v > 0 -} - -func isReasonableTimeStamp(v float64) bool { - if v < 0 { - return false - } - unixTime := time.Unix(int64(v), 0) - - return time.Since(unixTime) < 10*time.Second -} - -type dataType int - -const ( - Sample dataType = iota - Histogram - Exemplar - Metadata -) - -type check struct { - name string - value float64 - valueFunc func(v float64) bool -} -type statsTest struct { - name string - returnStatusCode int - // Only check for non zero values, once all checks are ran it will automatically ensure all remaining metrics are 0. - checks []check - dtype dataType -} - -func runE2eStats(t *testing.T, test statsTest) { - l := util.TestAlloyLogger(t) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(test.returnStatusCode) - })) - expCh := make(chan Exports, 1) - - reg := prometheus.NewRegistry() - c, err := newComponent(t, l, srv.URL, expCh, reg) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := 0 - - go func() { - app := exp.Receiver.Appender(ctx) - for j := 0; j < 10; j++ { - index++ - switch test.dtype { - case Sample: - ts, v, lbls := makeSeries(index) - _, errApp := app.Append(0, lbls, ts, v) - require.NoError(t, errApp) - case Histogram: - ts, lbls, h := makeHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) - require.NoError(t, errApp) - case Exemplar: - ex := makeExemplar(index) - _, errApp := app.AppendExemplar(0, nil, ex) - require.NoError(t, errApp) - case Metadata: - md, lbls := makeMetadata(index) - _, errApp := app.UpdateMetadata(0, lbls, md) - require.NoError(t, errApp) - default: - require.True(t, false) - } - } - require.NoError(t, app.Commit()) - }() - time.Sleep(5 * time.Second) - require.Eventually(t, func() bool { - dtos, gatherErr := reg.Gather() - require.NoError(t, gatherErr) - // Check if we have some valid metrics. - found := 0 - for _, d := range dtos { - if getValue(d) > 0 { - found++ - } - } - // Make sure we have a few metrics. - return found > 1 - }, 10*time.Second, 1*time.Second) - - metrics := make(map[string]float64) - dtos, err := reg.Gather() - require.NoError(t, err) - // Cancel needs to be here since it will unregister the metrics. - cancel() - - // Get the value of metrics. - for _, d := range dtos { - metrics[*d.Name] = getValue(d) - } - - // Check for the metrics that matter. - for _, valChk := range test.checks { - // These check functions will return the list of metrics with the one checked for deleted. - // Ideally at the end we should only be left with metrics with a value of zero.s - if valChk.valueFunc != nil { - metrics = checkValueCondition(t, valChk.name, valChk.valueFunc, metrics) - } else { - metrics = checkValue(t, valChk.name, valChk.value, metrics) - } - } - // all other metrics should be zero. - for k, v := range metrics { - require.Zerof(t, v, "%s should be zero", k) - } -} - -func getValue(d *dto.MetricFamily) float64 { - switch *d.Type { - case dto.MetricType_COUNTER: - return d.Metric[0].Counter.GetValue() - case dto.MetricType_GAUGE: - return d.Metric[0].Gauge.GetValue() - case dto.MetricType_SUMMARY: - return d.Metric[0].Summary.GetSampleSum() - case dto.MetricType_UNTYPED: - return d.Metric[0].Untyped.GetValue() - case dto.MetricType_HISTOGRAM: - return d.Metric[0].Histogram.GetSampleSum() - case dto.MetricType_GAUGE_HISTOGRAM: - return d.Metric[0].Histogram.GetSampleSum() - default: - panic("unknown type " + d.Type.String()) - } -} - -func checkValue(t *testing.T, name string, value float64, metrics map[string]float64) map[string]float64 { - v, ok := metrics[name] - require.Truef(t, ok, "invalid metric name %s", name) - require.Equalf(t, value, v, "%s should be %f", name, value) - delete(metrics, name) - return metrics -} - -func checkValueCondition(t *testing.T, name string, chk func(float64) bool, metrics map[string]float64) map[string]float64 { - v, ok := metrics[name] - require.Truef(t, ok, "invalid metric name %s", name) - require.Truef(t, chk(v), "false test for metric name %s", name) - delete(metrics, name) - return metrics -} diff --git a/internal/component/prometheus/write/queue/e2e_test.go b/internal/component/prometheus/write/queue/e2e_test.go deleted file mode 100644 index 634f48f60a..0000000000 --- a/internal/component/prometheus/write/queue/e2e_test.go +++ /dev/null @@ -1,421 +0,0 @@ -//go:build !windows - -package queue - -import ( - "context" - "fmt" - "io" - "net/http" - "net/http/httptest" - "reflect" - "strings" - "sync" - "testing" - "time" - - "github.com/golang/snappy" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/runtime/logging" - "github.com/grafana/alloy/internal/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func TestE2E(t *testing.T) { - type e2eTest struct { - name string - maker func(index int, app storage.Appender) (float64, labels.Labels) - tester func(samples *safeSlice[prompb.TimeSeries]) - testMeta func(samples *safeSlice[prompb.MetricMetadata]) - } - tests := []e2eTest{ - { - name: "normal", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, v, lbls := makeSeries(index) - _, errApp := app.Append(0, lbls, ts, v) - require.NoError(t, errApp) - return v, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 1) - require.True(t, s.Samples[0].Timestamp > 0) - require.True(t, s.Samples[0].Value > 0) - require.True(t, len(s.Labels) == 1) - require.Truef(t, s.Labels[0].Name == fmt.Sprintf("name_%d", int(s.Samples[0].Value)), "%d name %s", int(s.Samples[0].Value), s.Labels[0].Name) - require.True(t, s.Labels[0].Value == fmt.Sprintf("value_%d", int(s.Samples[0].Value))) - } - }, - }, - { - name: "metadata", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - meta, lbls := makeMetadata(index) - _, errApp := app.UpdateMetadata(0, lbls, meta) - require.NoError(t, errApp) - return 0, lbls - }, - testMeta: func(samples *safeSlice[prompb.MetricMetadata]) { - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, s.GetUnit() == "seconds") - require.True(t, s.Help == "metadata help") - require.True(t, s.Unit == "seconds") - require.True(t, s.Type == prompb.MetricMetadata_COUNTER) - require.True(t, strings.HasPrefix(s.MetricFamilyName, "name_")) - } - }, - }, - - { - name: "histogram", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, lbls, h := makeHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) - require.NoError(t, errApp) - return h.Sum, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 0) - require.True(t, len(s.Labels) == 1) - histSame(t, hist(int(s.Histograms[0].Sum)), s.Histograms[0]) - } - }, - }, - { - name: "float histogram", - maker: func(index int, app storage.Appender) (float64, labels.Labels) { - ts, lbls, h := makeFloatHistogram(index) - _, errApp := app.AppendHistogram(0, lbls, ts, nil, h) - require.NoError(t, errApp) - return h.Sum, lbls - }, - tester: func(samples *safeSlice[prompb.TimeSeries]) { - t.Helper() - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - require.True(t, len(s.Samples) == 0) - require.True(t, len(s.Labels) == 1) - histFloatSame(t, histFloat(int(s.Histograms[0].Sum)), s.Histograms[0]) - } - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runTest(t, test.maker, test.tester, test.testMeta) - }) - } -} - -const ( - iterations = 10 - items = 100 -) - -func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples *safeSlice[prompb.TimeSeries]), metaTest func(meta *safeSlice[prompb.MetricMetadata])) { - l := util.TestAlloyLogger(t) - done := make(chan struct{}) - var series atomic.Int32 - var meta atomic.Int32 - samples := newSafeSlice[prompb.TimeSeries]() - metaSamples := newSafeSlice[prompb.MetricMetadata]() - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - newSamples, newMetadata := handlePost(t, w, r) - series.Add(int32(len(newSamples))) - meta.Add(int32(len(newMetadata))) - samples.AddSlice(newSamples) - metaSamples.AddSlice(newMetadata) - if series.Load() == iterations*items { - done <- struct{}{} - } - if meta.Load() == iterations*items { - done <- struct{}{} - } - })) - expCh := make(chan Exports, 1) - c, err := newComponent(t, l, srv.URL, expCh, prometheus.NewRegistry()) - require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - - go func() { - runErr := c.Run(ctx) - require.NoError(t, runErr) - }() - // Wait for export to spin up. - exp := <-expCh - - index := atomic.NewInt64(0) - results := &safeMap{ - results: make(map[float64]labels.Labels), - } - - for i := 0; i < iterations; i++ { - go func() { - app := exp.Receiver.Appender(ctx) - for j := 0; j < items; j++ { - val := index.Add(1) - v, lbl := add(int(val), app) - results.Add(v, lbl) - } - require.NoError(t, app.Commit()) - }() - } - - // This is a weird use case to handle eventually. - // With race turned on this can take a long time. - tm := time.NewTimer(20 * time.Second) - select { - case <-done: - case <-tm.C: - require.Truef(t, false, "failed to collect signals in the appropriate time") - } - - cancel() - - for i := 0; i < samples.Len(); i++ { - s := samples.Get(i) - if len(s.Histograms) == 1 { - lbls, ok := results.Get(s.Histograms[0].Sum) - require.True(t, ok) - for i, sLbl := range s.Labels { - require.True(t, lbls[i].Name == sLbl.Name) - require.True(t, lbls[i].Value == sLbl.Value) - } - } else { - lbls, ok := results.Get(s.Samples[0].Value) - require.True(t, ok) - for i, sLbl := range s.Labels { - require.True(t, lbls[i].Name == sLbl.Name) - require.True(t, lbls[i].Value == sLbl.Value) - } - } - } - if test != nil { - test(samples) - } else { - metaTest(metaSamples) - } -} - -func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) { - defer r.Body.Close() - data, err := io.ReadAll(r.Body) - require.NoError(t, err) - - data, err = snappy.Decode(nil, data) - require.NoError(t, err) - - var req prompb.WriteRequest - err = req.Unmarshal(data) - require.NoError(t, err) - return req.GetTimeseries(), req.Metadata -} - -func makeSeries(index int) (int64, float64, labels.Labels) { - return time.Now().UTC().UnixMilli(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) -} - -func makeMetadata(index int) (metadata.Metadata, labels.Labels) { - return metadata.Metadata{ - Type: "counter", - Unit: "seconds", - Help: "metadata help", - }, labels.FromStrings("__name__", fmt.Sprintf("name_%d", index)) -} - -func makeHistogram(index int) (int64, labels.Labels, *histogram.Histogram) { - return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) -} - -func makeExemplar(index int) exemplar.Exemplar { - return exemplar.Exemplar{ - Labels: labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), - Ts: time.Now().UnixMilli(), - HasTs: true, - Value: float64(index), - } -} - -func hist(i int) *histogram.Histogram { - return &histogram.Histogram{ - CounterResetHint: 1, - Schema: 2, - ZeroThreshold: 3, - ZeroCount: 4, - Count: 5, - Sum: float64(i), - PositiveSpans: []histogram.Span{ - { - Offset: 1, - Length: 2, - }, - }, - NegativeSpans: []histogram.Span{ - { - Offset: 3, - Length: 4, - }, - }, - PositiveBuckets: []int64{1, 2, 3}, - NegativeBuckets: []int64{1, 2, 3}, - } -} - -func histSame(t *testing.T, h *histogram.Histogram, pb prompb.Histogram) { - require.True(t, h.Sum == pb.Sum) - require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountInt).ZeroCountInt) - require.True(t, h.Schema == pb.Schema) - require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountInt).CountInt) - require.True(t, h.ZeroThreshold == pb.ZeroThreshold) - require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) - require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveDeltas)) - require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeDeltas)) - histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) - histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) -} - -func histSpanSame(t *testing.T, h []histogram.Span, pb []prompb.BucketSpan) { - require.True(t, len(h) == len(pb)) - for i := range h { - require.True(t, h[i].Length == pb[i].Length) - require.True(t, h[i].Offset == pb[i].Offset) - } -} - -func makeFloatHistogram(index int) (int64, labels.Labels, *histogram.FloatHistogram) { - return time.Now().UTC().UnixMilli(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) -} - -func histFloat(i int) *histogram.FloatHistogram { - return &histogram.FloatHistogram{ - CounterResetHint: 1, - Schema: 2, - ZeroThreshold: 3, - ZeroCount: 4, - Count: 5, - Sum: float64(i), - PositiveSpans: []histogram.Span{ - { - Offset: 1, - Length: 2, - }, - }, - NegativeSpans: []histogram.Span{ - { - Offset: 3, - Length: 4, - }, - }, - PositiveBuckets: []float64{1.1, 2.2, 3.3}, - NegativeBuckets: []float64{1.2, 2.3, 3.4}, - } -} - -func histFloatSame(t *testing.T, h *histogram.FloatHistogram, pb prompb.Histogram) { - require.True(t, h.Sum == pb.Sum) - require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountFloat).ZeroCountFloat) - require.True(t, h.Schema == pb.Schema) - require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountFloat).CountFloat) - require.True(t, h.ZeroThreshold == pb.ZeroThreshold) - require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) - require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveCounts)) - require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeCounts)) - histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) - histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) -} - -func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, reg prometheus.Registerer) (*Queue, error) { - return NewComponent(component.Options{ - ID: "test", - Logger: l, - DataPath: t.TempDir(), - OnStateChange: func(e component.Exports) { - exp <- e.(Exports) - }, - Registerer: reg, - Tracer: nil, - }, Arguments{ - TTL: 2 * time.Hour, - Persistence: Persistence{ - MaxSignalsToBatch: 10, - BatchInterval: 1 * time.Second, - }, - Endpoints: []EndpointConfig{{ - Name: "test", - URL: url, - Timeout: 20 * time.Second, - RetryBackoff: 5 * time.Second, - MaxRetryAttempts: 1, - BatchCount: 5, - FlushInterval: 1 * time.Second, - Parallelism: 1, - }}, - }) -} - -func newSafeSlice[T any]() *safeSlice[T] { - return &safeSlice[T]{slice: make([]T, 0)} -} - -type safeSlice[T any] struct { - slice []T - mut sync.Mutex -} - -func (s *safeSlice[T]) Add(v T) { - s.mut.Lock() - defer s.mut.Unlock() - s.slice = append(s.slice, v) -} - -func (s *safeSlice[T]) AddSlice(v []T) { - s.mut.Lock() - defer s.mut.Unlock() - s.slice = append(s.slice, v...) -} - -func (s *safeSlice[T]) Len() int { - s.mut.Lock() - defer s.mut.Unlock() - return len(s.slice) -} - -func (s *safeSlice[T]) Get(i int) T { - s.mut.Lock() - defer s.mut.Unlock() - return s.slice[i] -} - -type safeMap struct { - mut sync.Mutex - results map[float64]labels.Labels -} - -func (s *safeMap) Add(v float64, ls labels.Labels) { - s.mut.Lock() - defer s.mut.Unlock() - s.results[v] = ls -} - -func (s *safeMap) Get(v float64) (labels.Labels, bool) { - s.mut.Lock() - defer s.mut.Unlock() - res, ok := s.results[v] - return res, ok -}