diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index f6970da38f..cd655b0960 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/alloy/internal/service" httpservice "github.com/grafana/alloy/internal/service/http" "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" otel_service "github.com/grafana/alloy/internal/service/otel" remotecfgservice "github.com/grafana/alloy/internal/service/remotecfg" uiservice "github.com/grafana/alloy/internal/service/ui" @@ -272,8 +273,11 @@ func (fr *alloyRun) Run(configPath string) error { return fmt.Errorf("failed to create the remotecfg service: %w", err) } + liveDebuggingService := livedebugging.New() + uiService := uiservice.New(uiservice.Options{ - UIPrefix: fr.uiPrefix, + UIPrefix: fr.uiPrefix, + DebuggingStreamHandler: liveDebuggingService.Data().(livedebugging.DebugStreamHandler), }) otelService := otel_service.New(l) @@ -292,6 +296,7 @@ func (fr *alloyRun) Run(configPath string) error { MinStability: fr.minStability, Services: []service.Service{ httpService, + liveDebuggingService, uiService, clusterService, otelService, diff --git a/internal/service/ui/ui.go b/internal/service/ui/ui.go index a15aa8f4b2..39d0fedc8d 100644 --- a/internal/service/ui/ui.go +++ b/internal/service/ui/ui.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/service" http_service "github.com/grafana/alloy/internal/service/http" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/web/api" "github.com/grafana/alloy/internal/web/ui" ) @@ -21,7 +22,8 @@ const ServiceName = "ui" // Options are used to configure the UI service. Options are constant for the // lifetime of the UI service. type Options struct { - UIPrefix string // Path prefix to host the UI at. + UIPrefix string // Path prefix to host the UI at. + DebuggingStreamHandler livedebugging.DebugStreamHandler // Debugging stream handler used for live debugging in the UI. } // Service implements the UI service. @@ -46,7 +48,7 @@ func (s *Service) Definition() service.Definition { return service.Definition{ Name: ServiceName, ConfigType: nil, // ui does not accept configuration - DependsOn: []string{http_service.ServiceName}, + DependsOn: []string{http_service.ServiceName, livedebugging.ServiceName}, Stability: featuregate.StabilityGenerallyAvailable, } } @@ -75,7 +77,7 @@ func (s *Service) Data() any { func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler) { r := mux.NewRouter() - fa := api.NewAlloyAPI(host) + fa := api.NewAlloyAPI(host, s.opts.DebuggingStreamHandler) fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r) ui.RegisterRoutes(s.opts.UIPrefix, r) diff --git a/internal/web/api/api.go b/internal/web/api/api.go index edc1328857..e1264fd565 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -6,24 +6,29 @@ package api import ( "encoding/json" + "math/rand" "net/http" "path" + "strconv" + "strings" "github.com/gorilla/mux" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/service/cluster" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/prometheus/prometheus/util/httputil" ) // AlloyAPI is a wrapper around the component API. type AlloyAPI struct { - alloy service.Host + alloy service.Host + debuggingStreamHandler livedebugging.DebugStreamHandler } // NewAlloyAPI instantiates a new Alloy API. -func NewAlloyAPI(alloy service.Host) *AlloyAPI { - return &AlloyAPI{alloy: alloy} +func NewAlloyAPI(alloy service.Host, debuggingStreamHandler livedebugging.DebugStreamHandler) *AlloyAPI { + return &AlloyAPI{alloy: alloy, debuggingStreamHandler: debuggingStreamHandler} } // RegisterRoutes registers all the API's routes. @@ -36,6 +41,7 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) { r.Handle(path.Join(urlPrefix, "/components"), httputil.CompressionHandler{Handler: a.listComponentsHandler()}) r.Handle(path.Join(urlPrefix, "/components/{id:.+}"), httputil.CompressionHandler{Handler: a.getComponentHandler()}) r.Handle(path.Join(urlPrefix, "/peers"), httputil.CompressionHandler{Handler: a.getClusteringPeersHandler()}) + r.Handle(path.Join(urlPrefix, "/debug/{id:.+}"), a.startDebugStream()) } func (a *AlloyAPI) listComponentsHandler() http.HandlerFunc { @@ -107,3 +113,70 @@ func (a *AlloyAPI) getClusteringPeersHandler() http.HandlerFunc { _, _ = w.Write(bb) } } + +func (a *AlloyAPI) startDebugStream() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + componentID := vars["id"] + + // Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory. + dataCh := make(chan string, 1000) + ctx := r.Context() + + sampleProb := setSampleProb(w, r.URL.Query().Get("sampleProb")) + + a.debuggingStreamHandler.SetStream(componentID, func(data string) { + select { + case <-ctx.Done(): + return + default: + if sampleProb < 1 && rand.Float64() > sampleProb { + return + } + // Avoid blocking the channel when the channel is full + select { + case dataCh <- data: + default: + } + } + }) + + stopStreaming := func() { + close(dataCh) + a.debuggingStreamHandler.DeleteStream(componentID) + } + + for { + select { + case data := <-dataCh: + var builder strings.Builder + builder.WriteString(data) + // |;| delimiter is added at the end of every chunk + builder.WriteString("|;|") + _, writeErr := w.Write([]byte(builder.String())) + if writeErr != nil { + stopStreaming() + return + } + // TODO: flushing at a regular interval might be better performance wise + w.(http.Flusher).Flush() + case <-ctx.Done(): + stopStreaming() + return + } + } + } +} + +func setSampleProb(w http.ResponseWriter, sampleProbParam string) (sampleProb float64) { + sampleProb = 1.0 + if sampleProbParam != "" { + var err error + sampleProb, err = strconv.ParseFloat(sampleProbParam, 64) + if err != nil || sampleProb < 0 || sampleProb > 1 { + http.Error(w, "Invalid sample probability", http.StatusBadRequest) + return 1.0 + } + } + return sampleProb +}