Skip to content

Commit

Permalink
Wire up health monitoring in collector
Browse files Browse the repository at this point in the history
  • Loading branch information
jwilder committed Jan 24, 2025
1 parent f727c8a commit 0fecf1a
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 21 deletions.
9 changes: 9 additions & 0 deletions collector/otlp/logs_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type LogsProxyServiceOpts struct {
LiftAttributes []string
Endpoint string
InsecureSkipVerify bool
HealthChecker interface{ IsHealthy() bool }
}

type LogsProxyService struct {
staticAttributes []*commonv1.KeyValue
clients map[string]logsv1connect.LogsServiceClient
liftAttributes map[string]struct{}
healthChecker interface{ IsHealthy() bool }
}

func NewLogsProxyService(opts LogsProxyServiceOpts) *LogsProxyService {
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewLogsProxyService(opts LogsProxyServiceOpts) *LogsProxyService {
staticAttributes: staticAttributes,
liftAttributes: lift,
clients: rpcClients,
healthChecker: opts.HealthChecker,
}
}

Expand All @@ -117,6 +120,12 @@ func (s *LogsProxyService) Handler(w http.ResponseWriter, r *http.Request) {
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/logs"})
defer r.Body.Close()

if !s.healthChecker.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}

switch r.Header.Get("Content-Type") {
case "application/x-protobuf":
// We're receiving an OTLP protobuf, so we just need to
Expand Down
32 changes: 31 additions & 1 deletion collector/otlp/logs_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

func TestLogsProxyService(t *testing.T) {
s := NewLogsProxyService(LogsProxyServiceOpts{})
s := NewLogsProxyService(LogsProxyServiceOpts{
HealthChecker: fakeHealthChecker{true},
})
require.NoError(t, s.Open(context.Background()))
defer s.Close()

Expand All @@ -32,3 +34,31 @@ func TestLogsProxyService(t *testing.T) {
s.Handler(resp, req)
require.Equal(t, http.StatusOK, resp.Code)
}

func TestLogsProxyService_Overloaded(t *testing.T) {
s := NewLogsProxyService(LogsProxyServiceOpts{
HealthChecker: fakeHealthChecker{false},
})
require.NoError(t, s.Open(context.Background()))
defer s.Close()

var msg v1.ExportLogsServiceRequest
require.NoError(t, protojson.Unmarshal(rawlog, &msg))

b, err := proto.Marshal(&msg)
require.NoError(t, err)

req, err := http.NewRequest("POST", "/logs", bytes.NewReader(b))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")

resp := httptest.NewRecorder()
s.Handler(resp, req)
require.Equal(t, http.StatusTooManyRequests, resp.Code)
}

type fakeHealthChecker struct {
healthy bool
}

func (f fakeHealthChecker) IsHealthy() bool { return f.healthy }
13 changes: 11 additions & 2 deletions collector/otlp/logs_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
type LogsServiceOpts struct {
Store storage.Store
AddAttributes map[string]string
HealthChecker interface{ IsHealthy() bool }
}

type LogsService struct {
store storage.Store
logger *slog.Logger
store storage.Store
logger *slog.Logger
healthChecker interface{ IsHealthy() bool }

staticAttributes []*commonv1.KeyValue
}
Expand Down Expand Up @@ -55,6 +57,7 @@ func NewLogsService(opts LogsServiceOpts) *LogsService {
),
),
staticAttributes: add,
healthChecker: opts.HealthChecker,
}
}

Expand All @@ -70,6 +73,12 @@ func (s *LogsService) Handler(w http.ResponseWriter, r *http.Request) {
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/v1/logs"})
defer r.Body.Close()

if !s.healthChecker.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}

switch r.Header.Get("Content-Type") {
case "application/x-protobuf":

Expand Down
40 changes: 39 additions & 1 deletion collector/otlp/logs_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestLogsService(t *testing.T) {
require.NoError(t, store.Open(context.Background()))
defer store.Close()
s := NewLogsService(LogsServiceOpts{
Store: store,
Store: store,
HealthChecker: fakeHealthChecker{true},
})
require.NoError(t, s.Open(context.Background()))
defer s.Close()
Expand All @@ -50,3 +51,40 @@ func TestLogsService(t *testing.T) {
require.Equal(t, 1, len(keys))
require.Equal(t, "ADatabase_ATable", string(keys[0]))
}

func TestLogsService_Overloaded(t *testing.T) {
dir := t.TempDir()

store := storage.NewLocalStore(
storage.StoreOpts{
StorageDir: dir,
})

require.NoError(t, store.Open(context.Background()))
defer store.Close()
s := NewLogsService(LogsServiceOpts{
Store: store,
HealthChecker: fakeHealthChecker{false},
})
require.NoError(t, s.Open(context.Background()))
defer s.Close()

var msg v1.ExportLogsServiceRequest
require.NoError(t, protojson.Unmarshal(rawlog, &msg))

b, err := proto.Marshal(&msg)
require.NoError(t, err)

req, err := http.NewRequest("POST", "/v1/logs", bytes.NewReader(b))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")

resp := httptest.NewRecorder()
s.Handler(resp, req)
require.Equal(t, http.StatusTooManyRequests, resp.Code)

require.NoError(t, store.Close())

keys := store.PrefixesByAge()
require.Equal(t, 0, len(keys))
}
9 changes: 8 additions & 1 deletion collector/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,16 @@ type OltpMetricWriterOpts struct {
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int

Clients []remote.RemoteWriteClient
Clients []remote.RemoteWriteClient
HealthChecker interface { IsHealthy() bool }
}

type OltpMetricWriter struct {
requestTransformer *transform.RequestTransformer
remoteClients []remote.RemoteWriteClient
maxBatchSize int
disableMetricsForwarding bool
healthChecker interface{ IsHealthy() bool }
}

func NewOltpMetricWriter(opts OltpMetricWriterOpts) *OltpMetricWriter {
Expand All @@ -102,11 +104,16 @@ func NewOltpMetricWriter(opts OltpMetricWriterOpts) *OltpMetricWriter {
remoteClients: opts.Clients,
maxBatchSize: opts.MaxBatchSize,
disableMetricsForwarding: opts.DisableMetricsForwarding,
healthChecker: opts.HealthChecker,
}
}

// Write takes an OTLP ExportMetricsServiceRequest and writes it to the configured endpoints.
func (t *OltpMetricWriter) Write(ctx context.Context, msg *v1.ExportMetricsServiceRequest) error {
if !t.healthChecker.IsHealthy() {
return errors.New("Overloaded. Retry later")
}

// Causes allocation. Like in prom remote receiver, create rather than using a bunch of space in pool.
wr := prompb.WriteRequestPool.Get()
defer prompb.WriteRequestPool.Put(wr)
Expand Down
8 changes: 8 additions & 0 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type ScraperOpts struct {
MaxBatchSize int

RemoteClients []remote.RemoteWriteClient

HealthChecker interface{ IsHealthy() bool }
}

func (s *ScraperOpts) RequestTransformer() *transform.RequestTransformer {
Expand Down Expand Up @@ -111,6 +113,7 @@ type Scraper struct {
remoteClients []remote.RemoteWriteClient
scrapeClient *MetricsClient
seriesCreator *seriesCreator
healthChecker interface{ IsHealthy() bool }

wg sync.WaitGroup
cancel context.CancelFunc
Expand All @@ -126,6 +129,7 @@ func NewScraper(opts *ScraperOpts) *Scraper {
seriesCreator: &seriesCreator{},
requestTransformer: opts.RequestTransformer(),
remoteClients: opts.RemoteClients,
healthChecker: opts.HealthChecker,
targets: make(map[string]ScrapeTarget),
}
}
Expand Down Expand Up @@ -197,6 +201,10 @@ func (s *Scraper) scrape(ctx context.Context) {
case <-ctx.Done():
return
case <-t.C:
if !s.healthChecker.IsHealthy() {
logger.Warnf("Collector is unhealthy, skipping scrape")
continue
}
s.scrapeTargets(ctx)
case <-reconnectTimer.C:
for _, remoteClient := range s.remoteClients {
Expand Down
45 changes: 29 additions & 16 deletions collector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type ServiceOpts struct {
// will be rejected until space is freed. A value of 0 means no max usage.
MaxDiskUsage int64

// MaxSegmentCount is the maximum number of segments files allowed on disk before signaling back-pressure.
MaxSegmentCount int64

// StorageDir is the directory where the WAL will be stored
StorageDir string

Expand Down Expand Up @@ -183,6 +186,22 @@ func NewService(opts *ServiceOpts) (*Service, error) {
maxSegmentSize = opts.MaxSegmentSize
}

maxSegmentCount := int64(10000)
if opts.MaxSegmentCount > 0 {
maxSegmentCount = opts.MaxSegmentCount
}

maxDiskUsage := int64(10 * 1024 * 1024 * 1024) // 10 GB
if opts.MaxDiskUsage > 0 {
maxDiskUsage = opts.MaxDiskUsage
}

health := cluster.NewHealth(cluster.HealthOpts{
UnhealthyTimeout: time.Minute,
MaxSegmentCount: maxSegmentCount,
MaxDiskUsage: maxDiskUsage,
})

store := storage.NewLocalStore(storage.StoreOpts{
StorageDir: opts.StorageDir,
SegmentMaxAge: maxSegmentAge,
Expand All @@ -197,13 +216,15 @@ func NewService(opts *ServiceOpts) (*Service, error) {
logsSvc := otlp.NewLogsService(otlp.LogsServiceOpts{
Store: store,
AddAttributes: opts.AddAttributes,
HealthChecker: health,
})

logsProxySvc := otlp.NewLogsProxyService(otlp.LogsProxyServiceOpts{
LiftAttributes: opts.LiftAttributes,
AddAttributes: opts.AddAttributes,
Endpoint: opts.Endpoint,
InsecureSkipVerify: opts.InsecureSkipVerify,
HealthChecker: health,
})

var metricHttpHandlers []*http.HttpHandler
Expand All @@ -214,7 +235,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
Path: handlerOpts.Path,
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
RequestWriters: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRequestWriter{store}),
HealthChecker: fakeHealthChecker{},
HealthChecker: health,
})
metricHttpHandlers = append(metricHttpHandlers, &http.HttpHandler{
Path: handlerOpts.Path,
Expand All @@ -228,6 +249,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
Clients: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRemoteClient{store}),
MaxBatchSize: opts.MaxBatchSize,
DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
HealthChecker: health,
})
oltpMetricsService := otlp.NewMetricsService(writer, handlerOpts.Path, handlerOpts.GrpcPort)
if handlerOpts.Path != "" {
Expand Down Expand Up @@ -263,7 +285,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Hostname: opts.NodeName,
Partitioner: partitioner,
Health: fakeHealthChecker{},
Health: health,
SegmentRemover: store,
InsecureSkipVerify: opts.InsecureSkipVerify,
})
Expand Down Expand Up @@ -291,13 +313,16 @@ func NewService(opts *ServiceOpts) (*Service, error) {
MinUploadSize: 4 * 1024 * 1024,
UploadQueue: transferQueue,
TransferQueue: transferQueue,
PeerHealthReporter: fakeHealthChecker{},
PeerHealthReporter: health,
})

health.QueueSizer = batcher

var scraper *Scraper
if opts.Scraper != nil {
scraperOpts := opts.Scraper
scraperOpts.RemoteClients = append(scraperOpts.RemoteClients, &StoreRemoteClient{store})
scraperOpts.HealthChecker = health

scraper = NewScraper(opts.Scraper)
}
Expand All @@ -314,7 +339,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
svc := &Service{
opts: opts,
metricsSvc: metrics.NewService(metrics.ServiceOpts{
PeerHealthReport: fakeHealthChecker{},
PeerHealthReport: health,
}),
store: store,
scraper: scraper,
Expand Down Expand Up @@ -494,18 +519,6 @@ func plaintextListenerFunc() func(addr string) (net.Listener, error) {
}
}

type fakeHealthChecker struct{}

func (f fakeHealthChecker) IsPeerHealthy(peer string) bool { return true }
func (f fakeHealthChecker) SetPeerUnhealthy(peer string) {}
func (f fakeHealthChecker) SetPeerHealthy(peer string) {}
func (f fakeHealthChecker) TransferQueueSize() int { return 0 }
func (f fakeHealthChecker) UploadQueueSize() int { return 0 }
func (f fakeHealthChecker) SegmentsTotal() int64 { return 0 }
func (f fakeHealthChecker) SegmentsSize() int64 { return 0 }
func (f fakeHealthChecker) IsHealthy() bool { return true }
func (f fakeHealthChecker) UnhealthyReason() string { return "" }

// remotePartitioner is a Partitioner that always returns the same owner that forces a remove transfer.
type remotePartitioner struct {
host, addr string
Expand Down

0 comments on commit 0fecf1a

Please sign in to comment.