Skip to content

Commit

Permalink
Cherry pick tls staleness (#2464)
Browse files Browse the repository at this point in the history
* update changelog for rc (#2360)

* update changelog for rc

* update changelog for rc

Signed-off-by: matt durham <[email protected]>

* fix conversion

* Fanout: reduce allocation related to staleness tracking (#2405)

(cherry picked from commit c58c114)

* Actually use tls alloy and not yaml. (#2461)

* Actually use tls alloy and not yaml.

* remove bad test

(cherry picked from commit 419e997)

* Cherry pick staleness and tls.

---------

Signed-off-by: matt durham <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
mattdurham and thampiotr authored Jan 21, 2025
1 parent 954130d commit c0b8217
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 39 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This document contains a historical list of changes between releases. Only
changes that impact end-user behavior are listed; changes to documentation or
internal API changes are not present.

v1.6.0-rc.2
v1.6.0-rc.3
-----------------

### Breaking changes
Expand Down Expand Up @@ -45,6 +45,8 @@ v1.6.0-rc.2

### Enhancements

- Improved performance by reducing allocation in Prometheus write pipelines by ~30% (@thampiotr)

- Update `prometheus.write.queue` to support v2 for cpu performance. (@mattdurham)

- (_Experimental_) Add health reporting to `database_observability.mysql` component (@cristiangreco)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ Name | Type | Description | Default

### tls_config block

{{< docs/shared lookup="reference/components/tls-config-block.md" source="alloy" version="<ALLOY_VERSION>" >}}
Name | Type | Description | Default | Required
-----------------------|----------|----------------------------------------------------------|---------|---------
`ca_pem` | `string` | CA PEM-encoded text to validate the server with. | | no
`cert_pem` | `string` | Certificate PEM-encoded text for client authentication. | | no
`insecure_skip_verify` | `bool` | Disables validation of the server certificate. | | no
`key_pem` | `secret` | Key PEM-encoded text for client authentication. | | no

## Exported fields

Expand Down
39 changes: 20 additions & 19 deletions internal/component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/service/labelstore"
)
Expand All @@ -29,6 +30,10 @@ type Fanout struct {
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
ls labelstore.LabelStore

// lastSeriesCount stores the number of series that were sent through the last appender. It helps to estimate how
// much memory to allocate for the staleness trackers.
lastSeriesCount atomic.Int64
}

// NewFanout creates a fanout appendable.
Expand Down Expand Up @@ -77,11 +82,8 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {

app := &appender{
children: make([]storage.Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
samplesCounter: f.samplesCounter,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
fanout: f,
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
}

for _, x := range f.children {
Expand All @@ -95,12 +97,9 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender {

type appender struct {
children []storage.Appender
componentID string
writeLatency prometheus.Histogram
samplesCounter prometheus.Counter
start time.Time
ls labelstore.LabelStore
stalenessTrackers []labelstore.StalenessTracker
fanout *Fanout
}

var _ storage.Appender = (*appender)(nil)
Expand All @@ -111,7 +110,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
GlobalRefID: uint64(ref),
Expand All @@ -129,7 +128,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
}
}
if updated {
a.samplesCounter.Inc()
a.fanout.samplesCounter.Inc()
}
return ref, multiErr
}
Expand All @@ -138,7 +137,8 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
func (a *appender) Commit() error {
defer a.recordLatency()
var multiErr error
a.ls.TrackStaleness(a.stalenessTrackers)
a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.fanout.ls.TrackStaleness(a.stalenessTrackers)
for _, x := range a.children {
err := x.Commit()
if err != nil {
Expand All @@ -151,7 +151,8 @@ func (a *appender) Commit() error {
// Rollback satisfies the Appender interface.
func (a *appender) Rollback() error {
defer a.recordLatency()
a.ls.TrackStaleness(a.stalenessTrackers)
a.fanout.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.fanout.ls.TrackStaleness(a.stalenessTrackers)
var multiErr error
for _, x := range a.children {
err := x.Rollback()
Expand All @@ -167,7 +168,7 @@ func (a *appender) recordLatency() {
return
}
duration := time.Since(a.start)
a.writeLatency.Observe(duration.Seconds())
a.fanout.writeLatency.Observe(duration.Seconds())
}

// AppendExemplar satisfies the Appender interface.
Expand All @@ -176,7 +177,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -194,7 +195,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -211,7 +212,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -228,7 +229,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
a.start = time.Now()
}
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
ref = storage.SeriesRef(a.fanout.ls.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
Expand All @@ -244,7 +245,7 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
type NoopMetadataStore map[string]scrape.MetricMetadata

// GetMetadata implements the MetricMetadataStore interface.
func (ms NoopMetadataStore) GetMetadata(familyName string) (scrape.MetricMetadata, bool) {
func (ms NoopMetadataStore) GetMetadata(_ string) (scrape.MetricMetadata, bool) {
return scrape.MetricMetadata{}, false
}

Expand Down
9 changes: 8 additions & 1 deletion internal/component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/service/labelstore"
)
Expand All @@ -26,6 +27,10 @@ type Interceptor struct {
next storage.Appendable

ls labelstore.LabelStore

// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
// much memory to allocate for the staleness trackers.
lastSeriesCount atomic.Int64
}

var _ storage.Appendable = (*Interceptor)(nil)
Expand Down Expand Up @@ -91,7 +96,7 @@ func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
app := &interceptappender{
interceptor: f,
ls: f.ls,
stalenessTrackers: make([]labelstore.StalenessTracker, 0),
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
}
if f.next != nil {
app.child = f.next.Appender(ctx)
Expand Down Expand Up @@ -130,6 +135,7 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int

// Commit satisfies the Appender interface.
func (a *interceptappender) Commit() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
Expand All @@ -139,6 +145,7 @@ func (a *interceptappender) Commit() error {

// Rollback satisfies the Appender interface.
func (a *interceptappender) Rollback() error {
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
a.ls.TrackStaleness(a.stalenessTrackers)
if a.child == nil {
return nil
Expand Down
13 changes: 8 additions & 5 deletions internal/component/prometheus/write/queue/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/alloy/syntax/alloytypes"
"github.com/grafana/walqueue/types"
common "github.com/prometheus/common/config"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/storage"
)
Expand Down Expand Up @@ -67,9 +66,6 @@ func (r *Arguments) Validate() error {
if conn.FlushInterval < 1*time.Second {
return fmt.Errorf("flush_interval must be greater or equal to 1s, the internal timers resolution is 1s")
}
if conn.BasicAuth != nil && conn.TLSConfig != nil {
return fmt.Errorf("endpoint %s cannot have both BasicAuth and TLSConfig set", conn.Name)
}
}

return nil
Expand All @@ -93,10 +89,17 @@ type EndpointConfig struct {
// How many concurrent queues to have.
Parallelism uint `alloy:"parallelism,attr,optional"`
ExternalLabels map[string]string `alloy:"external_labels,attr,optional"`
TLSConfig *common.TLSConfig `alloy:"tls_config,block,optional"`
TLSConfig *TLSConfig `alloy:"tls_config,block,optional"`
RoundRobin bool `alloy:"enable_round_robin,attr,optional"`
}

type TLSConfig struct {
CA string `alloy:"ca_pem,attr,optional"`
Cert string `alloy:"cert_pem,attr,optional"`
Key alloytypes.Secret `alloy:"key_pem,attr,optional"`
InsecureSkipVerify bool `alloy:"insecure_skip_verify,attr,optional"`
}

var UserAgent = fmt.Sprintf("Alloy/%s", version.Version)

func (cc EndpointConfig) ToNativeType() types.ConnectionConfig {
Expand Down
30 changes: 18 additions & 12 deletions internal/component/prometheus/write/queue/types_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package queue

import (
common "github.com/prometheus/common/config"
"github.com/grafana/alloy/syntax"
"github.com/stretchr/testify/require"
"strings"
"testing"
)

func TestBasicAuthAndTLSBothSetError(t *testing.T) {
args := defaultArgs()
args.Endpoints = make([]EndpointConfig, 1)
args.Endpoints[0] = defaultEndpointConfig()
args.Endpoints[0].Name = "test"
args.Endpoints[0].TLSConfig = &common.TLSConfig{}
args.Endpoints[0].BasicAuth = &BasicAuth{}
err := args.Validate()
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "cannot have both BasicAuth and TLSConfig"))
func TestParsingTLSConfig(t *testing.T) {
var args Arguments
err := syntax.Unmarshal([]byte(`
endpoint "cloud" {
url = "http://example.com"
basic_auth {
username = 12345
password = "password"
}
tls_config {
insecure_skip_verify = true
}
}
`), &args)

require.NoError(t, err)
}

0 comments on commit c0b8217

Please sign in to comment.