diff --git a/bench/benchreporter/benchmark.go b/bench/benchreporter/benchmark.go index 224a2b0a..a6db99dd 100644 --- a/bench/benchreporter/benchmark.go +++ b/bench/benchreporter/benchmark.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "os" - "path" "path/filepath" "strconv" "strings" @@ -25,7 +24,8 @@ import ( var _ reporter.Reporter = (*BenchmarkReporter)(nil) type BenchmarkReporter struct { - benchDataDir string + saveInputsTo string + f *os.File rep reporter.Reporter uid int gid int @@ -70,14 +70,16 @@ func (r *BenchmarkReporter) SupportsReportTraceEvent() bool { } type fallbackSymbol struct { - FrameID libpf.FrameID - Symbol string + FileID libpf.FileID + AddressOrLine libpf.AddressOrLineno + Symbol string } func (r *BenchmarkReporter) ReportFallbackSymbol(frameID libpf.FrameID, symbol string) { r.store("FallbackSymbol", &fallbackSymbol{ - FrameID: frameID, - Symbol: symbol, + FileID: frameID.FileID(), + AddressOrLine: frameID.AddressOrLine(), + Symbol: symbol, }) r.rep.ReportFallbackSymbol(frameID, symbol) } @@ -155,30 +157,31 @@ func (r *BenchmarkReporter) ReportMetrics(timestamp uint32, ids []uint32, values func (r *BenchmarkReporter) Stop() { r.rep.Stop() + _ = r.f.Close() } func (r *BenchmarkReporter) GetMetrics() reporter.Metrics { return r.rep.GetMetrics() } -func NewBenchmarkReporter(benchDataDir string, rep reporter.Reporter) (*BenchmarkReporter, error) { +func NewBenchmarkReporter(saveInputsTo string, rep reporter.Reporter) (*BenchmarkReporter, error) { r := &BenchmarkReporter{ - benchDataDir: benchDataDir, + saveInputsTo: saveInputsTo, rep: rep, } r.uid, r.gid = originUser() - if err := os.MkdirAll(benchDataDir, 0o755); err != nil { - return nil, err + var err error + if r.f, err = os.OpenFile(saveInputsTo, + os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644); err != nil { + return nil, fmt.Errorf("failed to open file %s: %v", saveInputsTo, err) } - if r.uid != 0 || r.gid != 0 { - changeDirOwner(benchDataDir, r.uid, r.gid) + if err = r.f.Chown(r.uid, r.gid); err != nil { + return nil, fmt.Errorf("failed to change ownership of %s to %d:%d: %v", + saveInputsTo, r.uid, r.gid, err) } - // Just for storing the initial timestamp. - r.store("Start", libpf.Void{}) - return r, nil } @@ -192,27 +195,40 @@ func originUser() (uid, gid int) { return } -var counter atomic.Uint64 +type metaInfo struct { + TS int64 `json:"ts"` + Name string `json:"name"` +} -// store stores data as JSON. +// store appends data as NDJSON to the output file. func (r *BenchmarkReporter) store(name string, data any) { - ts := time.Now().UnixNano() - id := counter.Add(1) - fileName := fmt.Sprintf("%d_%06x_%s.json", ts, id, name) - pathName := path.Join(r.benchDataDir, fileName) + meta := metaInfo{ + TS: time.Now().UnixNano(), + Name: name, + } - // encode data to JSON - bytes, err := json.Marshal(data) + // encode meta data to JSON + bytes, err := json.Marshal(meta) if err != nil { panic(err) } + if err = appendToFile(r.f, bytes); err != nil { + panic(err) + } - //nolint:gosec - if err = os.WriteFile(pathName, bytes, 0o644); err != nil { + // encode reporter input to JSON + bytes, err = json.Marshal(data) + if err != nil { + panic(err) + } + if err = appendToFile(r.f, bytes); err != nil { panic(err) } +} - changeOwner(pathName, r.uid, r.gid) +func appendToFile(f *os.File, bytes []byte) error { + _, err := f.Write(append(bytes, '\n')) + return err } func changeOwner(pathName string, uid, gid int) { @@ -230,9 +246,9 @@ func changeDirOwner(dirName string, uid, gid int) { } } -func GRPCInterceptor(benchProtoDir string) grpc.UnaryClientInterceptor { - if benchProtoDir != "" { - if err := os.MkdirAll(benchProtoDir, 0o755); err != nil { +func GRPCInterceptor(saveDir string) grpc.UnaryClientInterceptor { + if saveDir != "" { + if err := os.MkdirAll(saveDir, 0o755); err != nil { log.Errorf("Failed to create directory for storing protobuf messages: %v", err) return nil } @@ -240,13 +256,13 @@ func GRPCInterceptor(benchProtoDir string) grpc.UnaryClientInterceptor { uid, gid := originUser() if uid != 0 || gid != 0 { - changeDirOwner(benchProtoDir, uid, gid) + changeDirOwner(saveDir, uid, gid) } // return interceptor to write the uncompressed protobuf messages to disk. return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - storeProtobuf(benchProtoDir, req, uid, gid) + storeProtobuf(saveDir, req, uid, gid) return invoker(ctx, method, req, reply, cc, opts...) } } diff --git a/bench/benchreporter/replay.go b/bench/benchreporter/replay.go index 222cc3f3..ecc05c41 100644 --- a/bench/benchreporter/replay.go +++ b/bench/benchreporter/replay.go @@ -4,10 +4,8 @@ import ( "context" "encoding/json" "fmt" + "io" "os" - "path/filepath" - "sort" - "strings" "time" log "github.com/sirupsen/logrus" @@ -16,117 +14,81 @@ import ( "github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter" ) -type fileInfo struct { - name string - timestamp int64 - id uint64 - funcName string -} - -// Replay replays the stored data from benchDataDir. +// Replay replays the stored data from replayInputsFrom. // The argument r is the reporter that will receive the replayed data. -func Replay(ctx context.Context, benchDataDir string, rep reporter.Reporter) error { - files, err := os.ReadDir(benchDataDir) +func Replay(ctx context.Context, replayInputsFrom string, rep reporter.Reporter) error { + stream, err := os.Open(replayInputsFrom) if err != nil { - return fmt.Errorf("failed to read directory %s: %v", benchDataDir, err) + return fmt.Errorf("failed to open file %s: %v", replayInputsFrom, err) } + decoder := json.NewDecoder(stream) - fileInfos := make([]fileInfo, 0, len(files)) + var m metaInfo + var curTS int64 - for _, f := range files { - if !strings.HasSuffix(f.Name(), ".json") { - continue + for { + if err = decoder.Decode(&m); err != nil { + // EOF is returned at the end of the stream. + if err != io.EOF { + return err + } + break } - name := f.Name() - // scan name for timestamp, counter and function name - var timestamp int64 - var id uint64 - var funcName string - if _, err = fmt.Sscanf(name, "%d_%x_%s", ×tamp, &id, &funcName); err != nil { - log.Errorf("Failed to parse file name %s: %v", name, err) - continue + if curTS != 0 { + time.Sleep(time.Duration(m.TS-curTS) * time.Nanosecond) } - funcName = strings.TrimSuffix(funcName, ".json") - - fileInfos = append(fileInfos, fileInfo{ - name: name, - timestamp: timestamp, - id: id, - funcName: funcName, - }) - } - - if len(fileInfos) == 0 { - return nil - } - - // Sort fileInfos ascending by ID. - sort.Slice(fileInfos, func(i, j int) bool { - return fileInfos[i].id < fileInfos[j].id - }) + curTS = m.TS - if fileInfos[0].funcName != "Start" { - return fmt.Errorf("first function name must be \"Start\", instead it is \"%s\"", - fileInfos[0].funcName) - } - - curTS := fileInfos[0].timestamp - - // Replay the stored data - for _, fi := range fileInfos[1:] { - time.Sleep(time.Duration(fi.timestamp-curTS) * time.Nanosecond) - curTS = fi.timestamp - - switch fi.funcName { + switch m.Name { case "TraceEvent": var v traceEvent - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { + if err = decodeTo(decoder, &v); err == nil { rep.ReportTraceEvent(v.Trace, v.Meta) } case "CountForTrace": var v countForTrace - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { + if err = decodeTo(decoder, &v); err == nil { rep.ReportCountForTrace(v.TraceHash, v.Count, v.Meta) } case "FramesForTrace": var v libpf.Trace - if err = dataFromFileInfo[libpf.Trace](benchDataDir, fi, &v); err == nil { + if err = decodeTo[libpf.Trace](decoder, &v); err == nil { rep.ReportFramesForTrace(&v) } case "FallbackSymbol": var v fallbackSymbol - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { - rep.ReportFallbackSymbol(v.FrameID, v.Symbol) + if err = decodeTo(decoder, &v); err == nil { + rep.ReportFallbackSymbol(libpf.NewFrameID(v.FileID, v.AddressOrLine), v.Symbol) } - case "ExectableMetadata": + case "ExecutableMetadata": var v executableMetadata - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { + if err = decodeTo(decoder, &v); err == nil { rep.ExecutableMetadata(context.Background(), v.FileID, v.FileName, v.BuildID, v.Interp, nil) } case "FrameMetadata": var v frameMetadata - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { + if err = decodeTo(decoder, &v); err == nil { rep.FrameMetadata(v.FileID, v.AddressOrLine, v.LineNumber, v.FunctionOffset, v.FunctionName, v.FilePath) } case "HostMetadata": var v hostMetadata - if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil { + if err = decodeTo(decoder, &v); err == nil { rep.ReportHostMetadata(v.Metadata) } case "Metrics": var v metrics - if err = dataFromFileInfo[metrics](benchDataDir, fi, &v); err == nil { + if err = decodeTo[metrics](decoder, &v); err == nil { rep.ReportMetrics(v.Timestamp, v.IDs, v.Values) } default: - err = fmt.Errorf("unsupported function name in file %s: %s", fi.name, fi.funcName) + err = fmt.Errorf("unsupported function name in file %s: %s", replayInputsFrom, m.Name) } if err != nil { - log.Errorf("Failed to replay data from file %s: %v", fi.name, err) + log.Errorf("Failed to replay data from file %s: %v", m.Name, err) } if err = ctx.Err(); err != nil { @@ -137,16 +99,9 @@ func Replay(ctx context.Context, benchDataDir string, rep reporter.Reporter) err return nil } -func dataFromFileInfo[T any](dir string, fi fileInfo, data *T) error { - pathName := filepath.Join(dir, fi.name) - f, err := os.Open(pathName) - if err != nil { - return fmt.Errorf("failed to open file %s: %v", pathName, err) - } - defer f.Close() - - if err = json.NewDecoder(f).Decode(data); err != nil { - return fmt.Errorf("failed to decode JSON from file %s: %v", pathName, err) +func decodeTo[T any](decoder *json.Decoder, data *T) error { + if err := decoder.Decode(data); err != nil { + return fmt.Errorf("failed to decode JSON: %v", err) } return nil diff --git a/cli_flags.go b/cli_flags.go index 3fe486e0..54b0627d 100644 --- a/cli_flags.go +++ b/cli_flags.go @@ -21,15 +21,15 @@ import ( const ( // Default values for CLI flags - defaultArgSamplesPerSecond = 20 - defaultArgReporterInterval = 5.0 * time.Second - defaultArgMonitorInterval = 5.0 * time.Second - defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax - defaultProbabilisticInterval = 1 * time.Minute - defaultArgSendErrorFrames = false - defaultArgBenchDataDir = "" - defaultArgBenchProtoDir = "" - defaultArgBenchReplay = false + defaultArgSamplesPerSecond = 20 + defaultArgReporterInterval = 5.0 * time.Second + defaultArgMonitorInterval = 5.0 * time.Second + defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax + defaultProbabilisticInterval = 1 * time.Minute + defaultArgSendErrorFrames = false + defaultArgReporterRecordsInputsTo = "" + defaultArgReporterReplayInputsFrom = "" + defaultArgReporterSaveOutputsTo = "" // This is the X in 2^(n + x) where n is the default hardcoded map size value defaultArgMapScaleFactor = 0 @@ -64,38 +64,39 @@ var ( tracer.ProbabilisticThresholdMax-1, tracer.ProbabilisticThresholdMax-1) probabilisticIntervalHelp = "Time interval for which probabilistic profiling will be " + "enabled or disabled." - pprofHelp = "Listening address (e.g. localhost:6060) to serve pprof information." - samplesPerSecondHelp = "Set the frequency (in Hz) of stack trace sampling." - reporterIntervalHelp = "Set the reporter's interval in seconds." - monitorIntervalHelp = "Set the monitor interval in seconds." - sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)" - benchDataDirHelp = "Directory to store data for benchmarking." - benchProtoDirHelp = "Directory to store raw protobuf wire messages." - benchReplayHelp = "Replay data from -bench-data-dir directory." + pprofHelp = "Listening address (e.g. localhost:6060) to serve" + + " pprof information." + samplesPerSecondHelp = "Set the frequency (in Hz) of stack trace sampling." + reporterIntervalHelp = "Set the reporter's interval in seconds." + monitorIntervalHelp = "Set the monitor interval in seconds." + sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)" + ReporterRecordInputsToHelp = "Record reporter input to given NDJSON file." + reporterReplayInputsFromHelp = "Replay reporter input from given NDJSON file." + reporterSaveOutputsToHelp = "Directory to store raw protobuf wire messages." ) type arguments struct { - benchDataDir string - benchProtoDir string - benchReplay bool - bpfVerifierLogLevel uint - bpfVerifierLogSize int - collAgentAddr string - copyright bool - disableTLS bool - mapScaleFactor uint - monitorInterval time.Duration - noKernelVersionCheck bool - pprofAddr string - probabilisticInterval time.Duration - probabilisticThreshold uint - reporterInterval time.Duration - samplesPerSecond int - secretToken string - sendErrorFrames bool - tracers string - verboseMode bool - version bool + reporterRecordInputsTo string + reporterSaveOutputsTo string + reporterReplayInputsFrom string + bpfVerifierLogLevel uint + bpfVerifierLogSize int + collAgentAddr string + copyright bool + disableTLS bool + mapScaleFactor uint + monitorInterval time.Duration + noKernelVersionCheck bool + pprofAddr string + probabilisticInterval time.Duration + probabilisticThreshold uint + reporterInterval time.Duration + samplesPerSecond int + secretToken string + sendErrorFrames bool + tracers string + verboseMode bool + version bool fs *flag.FlagSet } @@ -153,12 +154,12 @@ func parseArgs() (*arguments, error) { fs.BoolVar(&args.verboseMode, "verbose", false, verboseModeHelp) fs.BoolVar(&args.version, "version", false, versionHelp) - fs.StringVar(&args.benchDataDir, "bench-data-dir", defaultArgBenchDataDir, - benchDataDirHelp) - fs.StringVar(&args.benchProtoDir, "bench-proto-dir", defaultArgBenchProtoDir, - benchProtoDirHelp) - fs.BoolVar(&args.benchReplay, "bench-replay", defaultArgBenchReplay, - benchReplayHelp) + fs.StringVar(&args.reporterRecordInputsTo, "reporter-record-inputs-to", + defaultArgReporterRecordsInputsTo, ReporterRecordInputsToHelp) + fs.StringVar(&args.reporterReplayInputsFrom, "reporter-replay-inputs-from", + defaultArgReporterReplayInputsFrom, reporterReplayInputsFromHelp) + fs.StringVar(&args.reporterSaveOutputsTo, "reporter-save-outputs-to", + defaultArgReporterSaveOutputsTo, reporterSaveOutputsToHelp) fs.Usage = func() { fs.PrintDefaults() diff --git a/libpf/libpf.go b/libpf/libpf.go index 47db7135..8c06984b 100644 --- a/libpf/libpf.go +++ b/libpf/libpf.go @@ -40,28 +40,38 @@ func NowAsUInt32() uint32 { // UnixTime64 represents nanoseconds or (reduced precision) seconds since epoch. type UnixTime64 uint64 -func (t UnixTime64) MarshalJSON() ([]byte, error) { - if t > math.MaxUint32 { +func (t *UnixTime64) MarshalJSON() ([]byte, error) { + if *t > math.MaxUint32 { // Nanoseconds, ES does not support 'epoch_nanoseconds' so // we have to pass it a value formatted as 'strict_date_optional_time_nanos'. out := []byte(fmt.Sprintf("%q", - time.Unix(0, int64(t)).UTC().Format(time.RFC3339Nano))) + time.Unix(0, int64(*t)).UTC().Format(time.RFC3339Nano))) return out, nil } // Reduced precision seconds-since-the-epoch, ES 'epoch_second' formatter will match these. - out := []byte(fmt.Sprintf("%d", t)) + out := []byte(fmt.Sprintf("%d", *t)) return out, nil } +func (t *UnixTime64) UnmarshalJSON(data []byte) error { + var ts time.Time + if err := ts.UnmarshalJSON(data); err != nil { + return err + } + + *t = UnixTime64(ts.UnixNano()) + return nil +} + // Unix returns the value as seconds since epoch. -func (t UnixTime64) Unix() int64 { - if t > math.MaxUint32 { +func (t *UnixTime64) Unix() int64 { + if *t > math.MaxUint32 { // Nanoseconds, convert to seconds-since-the-epoch - return time.Unix(0, int64(t)).Unix() + return time.Unix(0, int64(*t)).Unix() } - return int64(t) + return int64(*t) } // Compile-time interface checks diff --git a/libpf/libpf_test.go b/libpf/libpf_test.go index 1641c7ae..da0093b4 100644 --- a/libpf/libpf_test.go +++ b/libpf/libpf_test.go @@ -77,6 +77,14 @@ func TestUnixTime64_MarshalJSON(t *testing.T) { b, err := test.time.MarshalJSON() require.NoError(t, err) assert.Equal(t, test.want, b) + + // Unmarshal the value and check it is the same + if test.want[0] == '"' { + var u UnixTime64 + err = u.UnmarshalJSON(b) + require.NoError(t, err) + assert.Equal(t, test.time, u) + } }) } } diff --git a/main.go b/main.go index f1b96ed6..44358f67 100644 --- a/main.go +++ b/main.go @@ -175,32 +175,33 @@ func mainWithExitCode() exitCode { GRPCOperationTimeout: intervals.GRPCOperationTimeout(), GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), - GRPCClientInterceptor: benchreporter.GRPCInterceptor(args.benchProtoDir), + GRPCClientInterceptor: benchreporter.GRPCInterceptor(args.reporterSaveOutputsTo), ReportInterval: intervals.ReportInterval(), CacheSize: traceHandlerCacheSize, SamplesPerSecond: args.samplesPerSecond, KernelVersion: kernelVersion, HostName: hostname, IPAddress: sourceIP, - DisableJitter: args.benchDataDir == "", + DisableJitter: args.reporterRecordInputsTo == "", }) if err != nil { return failure("Failed to start reporting: %v", err) } - if args.benchDataDir != "" { - if args.benchReplay { - if err = benchreporter.Replay(mainCtx, args.benchDataDir, rep); err != nil { - return failure("Failed to replay benchmark data: %v", err) - } - return exitSuccess - } - rep, err = benchreporter.NewBenchmarkReporter(args.benchDataDir, rep) + if args.reporterRecordInputsTo != "" { + rep, err = benchreporter.NewBenchmarkReporter(args.reporterRecordInputsTo, rep) if err != nil { return failure("Failed to create benchmark reporter: %v", err) } } + if args.reporterReplayInputsFrom != "" { + if err = benchreporter.Replay(mainCtx, args.reporterReplayInputsFrom, rep); err != nil { + return failure("Failed to replay benchmark data: %v", err) + } + return exitSuccess + } + metrics.SetReporter(rep) // Now that set the initial host metadata, start a goroutine to keep sending updates regularly. @@ -351,10 +352,6 @@ func sanityCheck(args *arguments) exitCode { } } - if args.benchReplay && args.benchDataDir == "" { - return failure("Replay requested but no data directory specified") - } - return exitSuccess }