diff --git a/bench/benchreporter/benchmark.go b/bench/benchreporter/benchmark.go new file mode 100644 index 00000000..5a69ca1f --- /dev/null +++ b/bench/benchreporter/benchmark.go @@ -0,0 +1,292 @@ +package benchreporter + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + + "github.com/open-telemetry/opentelemetry-ebpf-profiler/libpf" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/util" +) + +// compile time check for interface implementation +var _ reporter.Reporter = (*BenchmarkReporter)(nil) + +type BenchmarkReporter struct { + saveInputsTo string + f *os.File + rep reporter.Reporter + uid int + gid int +} + +func (r *BenchmarkReporter) ReportFramesForTrace(trace *libpf.Trace) { + r.store("FramesForTrace", trace) + r.rep.ReportFramesForTrace(trace) +} + +type countForTrace struct { + TraceHash libpf.TraceHash + Meta *reporter.TraceEventMeta + Count uint16 +} + +func (r *BenchmarkReporter) ReportCountForTrace(traceHash libpf.TraceHash, + count uint16, meta *reporter.TraceEventMeta) { + r.store("CountForTrace", &countForTrace{ + TraceHash: traceHash, + Meta: meta, + Count: count, + }) + r.rep.ReportCountForTrace(traceHash, count, meta) +} + +type traceEvent struct { + Trace *libpf.Trace + Meta *reporter.TraceEventMeta +} + +func (r *BenchmarkReporter) ReportTraceEvent(trace *libpf.Trace, meta *reporter.TraceEventMeta) { + r.store("TraceEvent", &traceEvent{ + Trace: trace, + Meta: meta, + }) + r.rep.ReportTraceEvent(trace, meta) +} + +func (r *BenchmarkReporter) SupportsReportTraceEvent() bool { + return r.rep.SupportsReportTraceEvent() +} + +type fallbackSymbol struct { + FileID libpf.FileID + AddressOrLine libpf.AddressOrLineno + Symbol string +} + +func (r *BenchmarkReporter) ReportFallbackSymbol(frameID libpf.FrameID, symbol string) { + r.store("FallbackSymbol", &fallbackSymbol{ + FileID: frameID.FileID(), + AddressOrLine: frameID.AddressOrLine(), + Symbol: symbol, + }) + r.rep.ReportFallbackSymbol(frameID, symbol) +} + +type executableMetadata struct { + FileID libpf.FileID + FileName string + BuildID string + Interp libpf.InterpreterType +} + +func (r *BenchmarkReporter) ExecutableMetadata(fileID libpf.FileID, + fileName, buildID string, interp libpf.InterpreterType, open reporter.ExecutableOpener) { + r.store("ExecutableMetadata", &executableMetadata{ + FileID: fileID, + FileName: fileName, + BuildID: buildID, + Interp: interp, + }) + r.rep.ExecutableMetadata(fileID, fileName, buildID, interp, open) +} + +type frameMetadata struct { + FileID libpf.FileID + AddressOrLine libpf.AddressOrLineno + LineNumber util.SourceLineno + FunctionOffset uint32 + FunctionName string + FilePath string +} + +func (r *BenchmarkReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno, + lineNumber util.SourceLineno, functionOffset uint32, functionName, filePath string) { + r.store("FrameMetadata", &frameMetadata{ + FileID: fileID, + AddressOrLine: addressOrLine, + LineNumber: lineNumber, + FunctionOffset: functionOffset, + FunctionName: functionName, + FilePath: filePath, + }) + r.rep.FrameMetadata(fileID, addressOrLine, lineNumber, functionOffset, functionName, filePath) +} + +type hostMetadata struct { + Metadata map[string]string +} + +func (r *BenchmarkReporter) ReportHostMetadata(metadata map[string]string) { + r.store("HostMetadata", &hostMetadata{ + Metadata: metadata, + }) + r.rep.ReportHostMetadata(metadata) +} + +func (r *BenchmarkReporter) ReportHostMetadataBlocking(ctx context.Context, + metadataMap map[string]string, maxRetries int, waitRetry time.Duration) error { + return r.rep.ReportHostMetadataBlocking(ctx, metadataMap, maxRetries, waitRetry) +} + +type metrics struct { + Timestamp uint32 + IDs []uint32 + Values []int64 +} + +func (r *BenchmarkReporter) ReportMetrics(timestamp uint32, ids []uint32, values []int64) { + r.store("Metrics", &metrics{ + Timestamp: timestamp, + IDs: ids, + Values: values, + }) + r.rep.ReportMetrics(timestamp, ids, values) +} + +func (r *BenchmarkReporter) Stop() { + r.rep.Stop() + _ = r.f.Close() +} + +func (r *BenchmarkReporter) GetMetrics() reporter.Metrics { + return r.rep.GetMetrics() +} + +func NewBenchmarkReporter(saveInputsTo string, rep reporter.Reporter) (*BenchmarkReporter, error) { + r := &BenchmarkReporter{ + saveInputsTo: saveInputsTo, + rep: rep, + } + r.uid, r.gid = originUser() + + var err error + if r.f, err = os.OpenFile(saveInputsTo, + os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644); err != nil { + return nil, fmt.Errorf("failed to open file %s: %v", saveInputsTo, err) + } + + if err = r.f.Chown(r.uid, r.gid); err != nil { + return nil, fmt.Errorf("failed to change ownership of %s to %d:%d: %v", + saveInputsTo, r.uid, r.gid, err) + } + + return r, nil +} + +func originUser() (uid, gid int) { + if uidStr := os.Getenv("SUDO_UID"); uidStr != "" { + uid, _ = strconv.Atoi(uidStr) + } + if gidStr := os.Getenv("SUDO_GID"); gidStr != "" { + gid, _ = strconv.Atoi(gidStr) + } + return +} + +type metaInfo struct { + TS int64 `json:"ts"` + Name string `json:"name"` +} + +// store appends data as NDJSON to the output file. +func (r *BenchmarkReporter) store(name string, data any) { + meta := metaInfo{ + TS: time.Now().UnixNano(), + Name: name, + } + + // encode meta data to JSON + bytes, err := json.Marshal(meta) + if err != nil { + panic(err) + } + if err = appendToFile(r.f, bytes); err != nil { + panic(err) + } + + // encode reporter input to JSON + bytes, err = json.Marshal(data) + if err != nil { + panic(err) + } + if err = appendToFile(r.f, bytes); err != nil { + panic(err) + } +} + +func appendToFile(f *os.File, bytes []byte) error { + _, err := f.Write(append(bytes, '\n')) + return err +} + +func changeOwner(pathName string, uid, gid int) { + if err := os.Chown(pathName, uid, gid); err != nil { + log.Errorf("Failed to change ownership of %s to %d:%d: %v", + pathName, uid, gid, err) + } +} + +func changeDirOwner(dirName string, uid, gid int) { + dirs := strings.Split(dirName, "/") + for i := 1; i <= len(dirs); i++ { + dir := filepath.Join(dirs[:i]...) + changeOwner(dir, uid, gid) + } +} + +func GRPCInterceptor(saveDir string) grpc.UnaryClientInterceptor { + if saveDir != "" { + if err := os.MkdirAll(saveDir, 0o755); err != nil { + log.Errorf("Failed to create directory for storing protobuf messages: %v", err) + return nil + } + + uid, gid := originUser() + + if uid != 0 || gid != 0 { + changeDirOwner(saveDir, uid, gid) + } + + // return interceptor to write the uncompressed protobuf messages to disk. + return func(ctx context.Context, method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + storeProtobuf(saveDir, req, uid, gid) + return invoker(ctx, method, req, reply, cc, opts...) + } + } + + return nil +} + +var protoMsgID atomic.Uint64 + +func storeProtobuf(msgDir string, msg any, uid, gid int) { + protoMsgID.Add(1) + + // Get the wire format of the request message. + msgBytes, err := proto.Marshal(msg.(proto.Message)) + if err != nil { + log.Errorf("failed to marshal request: %v", err) + return + } + + filePath := fmt.Sprintf("%s/%05X.proto", msgDir, protoMsgID.Load()) + if err = os.WriteFile(filePath, msgBytes, 0o600); err != nil { + log.Errorf("failed to write request: %v", err) + return + } + + changeOwner(filePath, uid, gid) +} diff --git a/bench/benchreporter/replay.go b/bench/benchreporter/replay.go new file mode 100644 index 00000000..83c933f2 --- /dev/null +++ b/bench/benchreporter/replay.go @@ -0,0 +1,107 @@ +package benchreporter + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/open-telemetry/opentelemetry-ebpf-profiler/libpf" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter" +) + +// Replay replays the stored data from replayInputsFrom. +// The argument r is the reporter that will receive the replayed data. +func Replay(ctx context.Context, replayInputsFrom string, rep reporter.Reporter) error { + stream, err := os.Open(replayInputsFrom) + if err != nil { + return fmt.Errorf("failed to open file %s: %v", replayInputsFrom, err) + } + decoder := json.NewDecoder(stream) + + var m metaInfo + var curTS int64 + + for { + if err = decoder.Decode(&m); err != nil { + // EOF is returned at the end of the stream. + if err != io.EOF { + return err + } + break + } + + if curTS != 0 { + time.Sleep(time.Duration(m.TS-curTS) * time.Nanosecond) + } + curTS = m.TS + + switch m.Name { + case "TraceEvent": + var v traceEvent + if err = decodeTo(decoder, &v); err == nil { + rep.ReportTraceEvent(v.Trace, v.Meta) + } + case "CountForTrace": + var v countForTrace + if err = decodeTo(decoder, &v); err == nil { + rep.ReportCountForTrace(v.TraceHash, v.Count, v.Meta) + } + case "FramesForTrace": + var v libpf.Trace + if err = decodeTo[libpf.Trace](decoder, &v); err == nil { + rep.ReportFramesForTrace(&v) + } + case "FallbackSymbol": + var v fallbackSymbol + if err = decodeTo(decoder, &v); err == nil { + rep.ReportFallbackSymbol(libpf.NewFrameID(v.FileID, v.AddressOrLine), v.Symbol) + } + case "ExecutableMetadata": + var v executableMetadata + if err = decodeTo(decoder, &v); err == nil { + rep.ExecutableMetadata(v.FileID, v.FileName, v.BuildID, v.Interp, nil) + } + case "FrameMetadata": + var v frameMetadata + if err = decodeTo(decoder, &v); err == nil { + rep.FrameMetadata(v.FileID, v.AddressOrLine, v.LineNumber, v.FunctionOffset, + v.FunctionName, v.FilePath) + } + case "HostMetadata": + var v hostMetadata + if err = decodeTo(decoder, &v); err == nil { + rep.ReportHostMetadata(v.Metadata) + } + case "Metrics": + var v metrics + if err = decodeTo[metrics](decoder, &v); err == nil { + rep.ReportMetrics(v.Timestamp, v.IDs, v.Values) + } + default: + err = fmt.Errorf("unsupported function name in file %s: %s", replayInputsFrom, m.Name) + } + + if err != nil { + log.Errorf("Failed to replay data from file %s: %v", m.Name, err) + } + + if err = ctx.Err(); err != nil { + return err + } + } + + return nil +} + +func decodeTo[T any](decoder *json.Decoder, data *T) error { + if err := decoder.Decode(data); err != nil { + return fmt.Errorf("failed to decode JSON: %v", err) + } + + return nil +} diff --git a/cli_flags.go b/cli_flags.go index 2bf2c736..f66e8a55 100644 --- a/cli_flags.go +++ b/cli_flags.go @@ -21,13 +21,16 @@ import ( const ( // Default values for CLI flags - defaultArgSamplesPerSecond = 20 - defaultArgReporterInterval = 5.0 * time.Second - defaultArgMonitorInterval = 5.0 * time.Second - defaultClockSyncInterval = 3 * time.Minute - defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax - defaultProbabilisticInterval = 1 * time.Minute - defaultArgSendErrorFrames = false + defaultArgSamplesPerSecond = 20 + defaultArgReporterInterval = 5.0 * time.Second + defaultArgMonitorInterval = 5.0 * time.Second + defaultClockSyncInterval = 3 * time.Minute + defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax + defaultProbabilisticInterval = 1 * time.Minute + defaultArgSendErrorFrames = false + defaultArgReporterRecordsInputsTo = "" + defaultArgReporterReplayInputsFrom = "" + defaultArgReporterSaveOutputsTo = "" // This is the X in 2^(n + x) where n is the default hardcoded map size value defaultArgMapScaleFactor = 0 @@ -61,35 +64,42 @@ var ( tracer.ProbabilisticThresholdMax-1, tracer.ProbabilisticThresholdMax-1) probabilisticIntervalHelp = "Time interval for which probabilistic profiling will be " + "enabled or disabled." - pprofHelp = "Listening address (e.g. localhost:6060) to serve pprof information." + pprofHelp = "Listening address (e.g. localhost:6060) to serve" + + " pprof information." samplesPerSecondHelp = "Set the frequency (in Hz) of stack trace sampling." reporterIntervalHelp = "Set the reporter's interval in seconds." monitorIntervalHelp = "Set the monitor interval in seconds." clockSyncIntervalHelp = "Set the sync interval with the realtime clock. " + "If zero, monotonic-realtime clock sync will be performed once, " + "on agent startup, but not periodically." - sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)" + sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)" + reporterRecordInputsToHelp = "Record reporter input to given NDJSON file." + reporterReplayInputsFromHelp = "Replay reporter input from given NDJSON file." + reporterSaveOutputsToHelp = "Directory to store raw protobuf wire messages." ) type arguments struct { - bpfVerifierLogLevel uint - bpfVerifierLogSize int - collAgentAddr string - copyright bool - disableTLS bool - mapScaleFactor uint - monitorInterval time.Duration - clockSyncInterval time.Duration - noKernelVersionCheck bool - pprofAddr string - probabilisticInterval time.Duration - probabilisticThreshold uint - reporterInterval time.Duration - samplesPerSecond int - sendErrorFrames bool - tracers string - verboseMode bool - version bool + reporterRecordInputsTo string + reporterSaveOutputsTo string + reporterReplayInputsFrom string + bpfVerifierLogLevel uint + bpfVerifierLogSize int + collAgentAddr string + copyright bool + disableTLS bool + mapScaleFactor uint + monitorInterval time.Duration + clockSyncInterval time.Duration + noKernelVersionCheck bool + pprofAddr string + probabilisticInterval time.Duration + probabilisticThreshold uint + reporterInterval time.Duration + samplesPerSecond int + sendErrorFrames bool + tracers string + verboseMode bool + version bool fs *flag.FlagSet } @@ -147,6 +157,13 @@ func parseArgs() (*arguments, error) { fs.BoolVar(&args.verboseMode, "verbose", false, verboseModeHelp) fs.BoolVar(&args.version, "version", false, versionHelp) + fs.StringVar(&args.reporterRecordInputsTo, "reporter-record-inputs-to", + defaultArgReporterRecordsInputsTo, reporterRecordInputsToHelp) + fs.StringVar(&args.reporterReplayInputsFrom, "reporter-replay-inputs-from", + defaultArgReporterReplayInputsFrom, reporterReplayInputsFromHelp) + fs.StringVar(&args.reporterSaveOutputsTo, "reporter-save-outputs-to", + defaultArgReporterSaveOutputsTo, reporterSaveOutputsToHelp) + fs.Usage = func() { fs.PrintDefaults() } diff --git a/go.mod b/go.mod index 1d8bd5e9..dc9e9d22 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( golang.org/x/sync v0.7.0 golang.org/x/sys v0.21.0 google.golang.org/grpc v1.64.1 + google.golang.org/protobuf v1.34.1 ) require ( @@ -58,6 +59,5 @@ require ( golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect - google.golang.org/protobuf v1.34.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/libpf/libpf.go b/libpf/libpf.go index 47db7135..8c06984b 100644 --- a/libpf/libpf.go +++ b/libpf/libpf.go @@ -40,28 +40,38 @@ func NowAsUInt32() uint32 { // UnixTime64 represents nanoseconds or (reduced precision) seconds since epoch. type UnixTime64 uint64 -func (t UnixTime64) MarshalJSON() ([]byte, error) { - if t > math.MaxUint32 { +func (t *UnixTime64) MarshalJSON() ([]byte, error) { + if *t > math.MaxUint32 { // Nanoseconds, ES does not support 'epoch_nanoseconds' so // we have to pass it a value formatted as 'strict_date_optional_time_nanos'. out := []byte(fmt.Sprintf("%q", - time.Unix(0, int64(t)).UTC().Format(time.RFC3339Nano))) + time.Unix(0, int64(*t)).UTC().Format(time.RFC3339Nano))) return out, nil } // Reduced precision seconds-since-the-epoch, ES 'epoch_second' formatter will match these. - out := []byte(fmt.Sprintf("%d", t)) + out := []byte(fmt.Sprintf("%d", *t)) return out, nil } +func (t *UnixTime64) UnmarshalJSON(data []byte) error { + var ts time.Time + if err := ts.UnmarshalJSON(data); err != nil { + return err + } + + *t = UnixTime64(ts.UnixNano()) + return nil +} + // Unix returns the value as seconds since epoch. -func (t UnixTime64) Unix() int64 { - if t > math.MaxUint32 { +func (t *UnixTime64) Unix() int64 { + if *t > math.MaxUint32 { // Nanoseconds, convert to seconds-since-the-epoch - return time.Unix(0, int64(t)).Unix() + return time.Unix(0, int64(*t)).Unix() } - return int64(t) + return int64(*t) } // Compile-time interface checks diff --git a/libpf/libpf_test.go b/libpf/libpf_test.go index 1641c7ae..da0093b4 100644 --- a/libpf/libpf_test.go +++ b/libpf/libpf_test.go @@ -77,6 +77,14 @@ func TestUnixTime64_MarshalJSON(t *testing.T) { b, err := test.time.MarshalJSON() require.NoError(t, err) assert.Equal(t, test.want, b) + + // Unmarshal the value and check it is the same + if test.want[0] == '"' { + var u UnixTime64 + err = u.UnmarshalJSON(b) + require.NoError(t, err) + assert.Equal(t, test.time, u) + } }) } } diff --git a/main.go b/main.go index 6fa144a9..8cb1e55c 100644 --- a/main.go +++ b/main.go @@ -10,32 +10,27 @@ import ( "context" "fmt" "net/http" + _ "net/http/pprof" //nolint:gosec "os" "os/signal" "runtime" "time" - //nolint:gosec - _ "net/http/pprof" - - "github.com/open-telemetry/opentelemetry-ebpf-profiler/times" - tracertypes "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracer/types" - "github.com/open-telemetry/opentelemetry-ebpf-profiler/util" - "github.com/open-telemetry/opentelemetry-ebpf-profiler/vc" + log "github.com/sirupsen/logrus" "github.com/tklauser/numcpus" "golang.org/x/sys/unix" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/bench/benchreporter" "github.com/open-telemetry/opentelemetry-ebpf-profiler/host" - "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracehandler" - "github.com/open-telemetry/opentelemetry-ebpf-profiler/hostmetadata" - "github.com/open-telemetry/opentelemetry-ebpf-profiler/metrics" "github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter" - + "github.com/open-telemetry/opentelemetry-ebpf-profiler/times" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracehandler" "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracer" - - log "github.com/sirupsen/logrus" + tracertypes "github.com/open-telemetry/opentelemetry-ebpf-profiler/tracer/types" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/util" + "github.com/open-telemetry/opentelemetry-ebpf-profiler/vc" ) // Short copyright / license text for eBPF code @@ -174,10 +169,8 @@ func mainWithExitCode() exitCode { metadataCollector.AddCustomData("host.name", hostname) metadataCollector.AddCustomData("host.ip", sourceIP) - // Network operations to CA start here - var rep reporter.Reporter - // Connect to the collection agent - rep, err = reporter.Start(mainCtx, &reporter.Config{ + // Network operations to the collector start here. + rep, err := reporter.Start(mainCtx, &reporter.Config{ CollAgentAddr: args.collAgentAddr, DisableTLS: args.disableTLS, MaxRPCMsgSize: 32 << 20, // 32 MiB @@ -185,17 +178,33 @@ func mainWithExitCode() exitCode { GRPCOperationTimeout: intervals.GRPCOperationTimeout(), GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), + GRPCClientInterceptor: benchreporter.GRPCInterceptor(args.reporterSaveOutputsTo), ReportInterval: intervals.ReportInterval(), CacheSize: traceHandlerCacheSize, SamplesPerSecond: args.samplesPerSecond, KernelVersion: kernelVersion, HostName: hostname, IPAddress: sourceIP, + DisableJitter: args.reporterRecordInputsTo == "", }) if err != nil { return failure("Failed to start reporting: %v", err) } + if args.reporterRecordInputsTo != "" { + rep, err = benchreporter.NewBenchmarkReporter(args.reporterRecordInputsTo, rep) + if err != nil { + return failure("Failed to create benchmark reporter: %v", err) + } + } + + if args.reporterReplayInputsFrom != "" { + if err = benchreporter.Replay(mainCtx, args.reporterReplayInputsFrom, rep); err != nil { + return failure("Failed to replay benchmark data: %v", err) + } + return exitSuccess + } + metrics.SetReporter(rep) // Now that set the initial host metadata, start a goroutine to keep sending updates regularly. diff --git a/reporter/config.go b/reporter/config.go index 550fb722..eaa7b79a 100644 --- a/reporter/config.go +++ b/reporter/config.go @@ -25,6 +25,8 @@ type Config struct { // MaxRPCMsgSize defines the maximum size of a gRPC message. MaxRPCMsgSize int + // DisableJitter disables the jitter in the report interval. + DisableJitter bool // Disable secure communication with Collection Agent. DisableTLS bool // CacheSize defines the size of the reporter caches. diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 5093be97..0808b1dd 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -357,6 +357,10 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) go func() { + jitter := 0.2 + if cfg.DisableJitter { + jitter = 0 + } tick := time.NewTicker(cfg.ReportInterval) defer tick.Stop() for { @@ -369,7 +373,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { if err := r.reportOTLPProfile(ctx); err != nil { log.Errorf("Request failed: %v", err) } - tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2)) + tick.Reset(libpf.AddJitter(cfg.ReportInterval, jitter)) } } }() diff --git a/tools/protobench/README.md b/tools/protobench/README.md new file mode 100644 index 00000000..7a94bea9 --- /dev/null +++ b/tools/protobench/README.md @@ -0,0 +1,57 @@ +## Introduction + +The idea is to record the wire messages of the profiling agent and see how well they compress using different +compressors and what the CPU impact is. + +To record the wire messages, you need to run the profiling agent with the `-reporter-save-outputs-to` flag. +This will write the wire messages into the given directory. The directory will be created if it does not exist. + +You can then use the `protobench` tool to compress the wire messages and see how well they compress and how much +CPU time it takes to compress them. + +### Recording wire messages + +Make sure you have a receiving endpoint, e.g. `devfiler` listening on localhost:11000. +Now run the profiling agent with the `-reporter-save-outputs-to` flag: +```shell +sudo ./opentelemetry-ebpf-profiler -reporter-save-outputs-to=/tmp/protobuf -collection-agent=127.0.0.1:11000 -disable-tls +``` +The wire messages are written to `protobuf/`, one file per message. + +### Benchmark compression of wire messages + +In reality, the previously recorded wire messages are sent compressed over the network. +Compression efficiency and CPU usage are important factors to consider when choosing a compression algorithm. + +The `protobench` tool helps to compare compression algorithms and their performance: +- for realistic results, wire messages are compressed one-by-one +- different compressors with different compression levels are used +- the compression ratio and CPU usage are measured +- the results are written as a bar chart or a CSV file + +To compress the wire messages and generate a bar chart, run the `protobench` tool with an output file ending in `.png`: +```shell +cd tools/protobench +go run ./... -bench-proto-dir=/tmp/protobuf -output-file=results.png +``` +If you don't see any errors, the tool will generate a PNG file with a bar chart showing the compression ratio and +CPU usage for each compressor/level. +The extension `.csv` can be used to generate a CSV file with the raw data instead of a PNG file. +No `-output-file` flag will display the results in the terminal. + +Of course, you can also use the `protobench` tool to compare compression of any other files. + +### Reproducible reporter outputs + +The profiling agent supports recording and replaying reporter inputs with the `-reporter-record-inputs-to` and +`-reporter-replay-inputs-from` flags. + +Replaying in combination with `-reporter-save-outputs-to` generates a (nearly) reproducible set of wire messages, +which be easily compared with the `protobench` tool. This can be useful for comparing different implementations of +the reporter or the wire protocol. + +Generated inputs can be shared for CI, benchmarking, development, testing or debugging purposes. + +### Example PNG output + +![Example output](example.png) diff --git a/tools/protobench/cli_flags.go b/tools/protobench/cli_flags.go new file mode 100644 index 00000000..60ed2df1 --- /dev/null +++ b/tools/protobench/cli_flags.go @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Apache License 2.0. + * See the file "LICENSE" for details. + */ + +package main + +import ( + "errors" + "flag" + "os" + "path/filepath" + + "github.com/peterbourgon/ff/v3" +) + +const ( + defaultArgInputDir = "" + defaultArgOutputFile = "" +) + +// Help strings for command line arguments +var ( + inputDirHelp = "Directory to read and compress files from." + outputFileHelp = "Output file to store the benchmark results (*.csv or *.png)." +) + +type arguments struct { + inputDir string + outputFile string + + fs *flag.FlagSet +} + +func (args *arguments) SanityCheck() error { + if args.inputDir == "" { + return errors.New("no protobuf message directory specified") + } + + if args.outputFile != "" { + switch filepath.Ext(args.outputFile) { + case ".csv", ".png": + default: + return errors.New("output file must be either a .csv or .png file") + } + } + + return nil +} + +// Package-scope variable, so that conditionally compiled other components can refer +// to the same flagset. + +func parseArgs() (*arguments, error) { + var args arguments + + fs := flag.NewFlagSet("protobench", flag.ExitOnError) + + fs.StringVar(&args.inputDir, "input-dir", defaultArgInputDir, inputDirHelp) + fs.StringVar(&args.outputFile, "output-file", defaultArgOutputFile, outputFileHelp) + + fs.Usage = func() { + fs.PrintDefaults() + } + + args.fs = fs + + return &args, ff.Parse(fs, os.Args[1:], + ff.WithEnvVarPrefix("OTEL_PROTOBENCH"), + ff.WithConfigFileFlag("config"), + ff.WithConfigFileParser(ff.PlainParser), + ff.WithAllowMissingConfigFile(true), + ) +} diff --git a/tools/protobench/compressor.go b/tools/protobench/compressor.go new file mode 100644 index 00000000..c1c6a376 --- /dev/null +++ b/tools/protobench/compressor.go @@ -0,0 +1,139 @@ +package main + +import ( + "bytes" + + "github.com/andybalholm/brotli" + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" +) + +type compressor interface { + // compress compresses the content and writes it to the pre-allocated buffer. + compress([]byte, *bytes.Buffer) (int64, error) + id() string +} + +type noneCompressor struct { + name string +} + +func (n noneCompressor) id() string { return n.name } +func (noneCompressor) compress(content []byte, _ *bytes.Buffer) (int64, error) { + return int64(len(content)), nil +} + +type gzipCompressor struct { + name string + level int +} + +func (g gzipCompressor) id() string { return g.name } +func (g gzipCompressor) compress(content []byte, buf *bytes.Buffer) (int64, error) { + encoder, err := gzip.NewWriterLevel(buf, g.level) + if err != nil { + return 0, err + } + defer encoder.Close() + + if _, err = encoder.Write(content); err != nil { + return 0, err + } + if err := encoder.Close(); err != nil { + return 0, err + } + + encoder.Flush() + + return int64(buf.Len()), nil +} + +type zstdCompressor struct { + name string + level zstd.EncoderLevel +} + +func (z zstdCompressor) id() string { return z.name } + +func (z zstdCompressor) compress(content []byte, buf *bytes.Buffer) (int64, error) { + encoder, err := zstd.NewWriter(buf, zstd.WithEncoderLevel(z.level)) + if err != nil { + return 0, err + } + defer encoder.Close() + + if _, err = encoder.Write(content); err != nil { + return 0, err + } + + encoder.Flush() + + return int64(buf.Len()), nil +} + +type brotliCompressor struct { + name string + level int +} + +func (b brotliCompressor) id() string { return b.name } + +func (b brotliCompressor) compress(content []byte, buf *bytes.Buffer) (int64, error) { + encoder := brotli.NewWriterLevel(buf, b.level) + defer encoder.Close() + + if _, err := encoder.Write(content); err != nil { + return 0, err + } + + encoder.Flush() + + return int64(buf.Len()), nil +} + +type s2Compressor struct { + name string + level s2.WriterOption +} + +func (s s2Compressor) id() string { return s.name } + +func (s s2Compressor) compress(content []byte, buf *bytes.Buffer) (int64, error) { + encoder := s2.NewWriter(buf, s.level) + defer encoder.Close() + + if _, err := encoder.Write(content); err != nil { + return 0, err + } + + encoder.Flush() + + return int64(buf.Len()), nil +} + +type lz4Compressor struct { + name string + level lz4.CompressionLevel +} + +func (l lz4Compressor) id() string { return l.name } + +func (l lz4Compressor) compress(content []byte, buf *bytes.Buffer) (int64, error) { + encoder := lz4.NewWriter(buf) + defer encoder.Close() + + err := encoder.Apply(lz4.CompressionLevelOption(l.level)) + if err != nil { + return 0, err + } + + if _, err = encoder.Write(content); err != nil { + return 0, err + } + + encoder.Flush() + + return int64(buf.Len()), nil +} diff --git a/tools/protobench/example.png b/tools/protobench/example.png new file mode 100644 index 00000000..408cd575 Binary files /dev/null and b/tools/protobench/example.png differ diff --git a/tools/protobench/go.mod b/tools/protobench/go.mod new file mode 100644 index 00000000..65aa4853 --- /dev/null +++ b/tools/protobench/go.mod @@ -0,0 +1,24 @@ +module github.com/open-telemetry/opentelemetry-ebpf-profiler/tools/protobench + +go 1.22.6 + +require ( + github.com/andybalholm/brotli v1.1.0 + github.com/klauspost/compress v1.17.9 + github.com/peterbourgon/ff/v3 v3.4.0 + github.com/pierrec/lz4/v4 v4.1.21 + gonum.org/v1/plot v0.14.0 +) + +require ( + git.sr.ht/~sbinet/gg v0.5.0 // indirect + github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b // indirect + github.com/campoy/embedmd v1.0.0 // indirect + github.com/go-fonts/liberation v0.3.1 // indirect + github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9 // indirect + github.com/go-pdf/fpdf v0.8.0 // indirect + github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/image v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect +) diff --git a/tools/protobench/go.sum b/tools/protobench/go.sum new file mode 100644 index 00000000..31314f59 --- /dev/null +++ b/tools/protobench/go.sum @@ -0,0 +1,90 @@ +git.sr.ht/~sbinet/cmpimg v0.1.0 h1:E0zPRk2muWuCqSKSVZIWsgtU9pjsw3eKHi8VmQeScxo= +git.sr.ht/~sbinet/cmpimg v0.1.0/go.mod h1:FU12psLbF4TfNXkKH2ZZQ29crIqoiqTZmeQ7dkp/pxE= +git.sr.ht/~sbinet/gg v0.5.0 h1:6V43j30HM623V329xA9Ntq+WJrMjDxRjuAB1LFWF5m8= +git.sr.ht/~sbinet/gg v0.5.0/go.mod h1:G2C0eRESqlKhS7ErsNey6HHrqU1PwsnCQlekFi9Q2Oo= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= +github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= +github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b h1:slYM766cy2nI3BwyRiyQj/Ud48djTMtMebDqepE95rw= +github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY= +github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= +github.com/go-fonts/dejavu v0.1.0 h1:JSajPXURYqpr+Cu8U9bt8K+XcACIHWqWrvWCKyeFmVQ= +github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= +github.com/go-fonts/latin-modern v0.3.1 h1:/cT8A7uavYKvglYXvrdDw4oS5ZLkcOU22fa2HJ1/JVM= +github.com/go-fonts/latin-modern v0.3.1/go.mod h1:ysEQXnuT/sCDOAONxC7ImeEDVINbltClhasMAqEtRK0= +github.com/go-fonts/liberation v0.3.1 h1:9RPT2NhUpxQ7ukUvz3jeUckmN42T9D9TpjtQcqK/ceM= +github.com/go-fonts/liberation v0.3.1/go.mod h1:jdJ+cqF+F4SUL2V+qxBth8fvBpBDS7yloUL5Fi8GTGY= +github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9 h1:NxXI5pTAtpEaU49bpLpQoDsu1zrteW/vxzTz8Cd2UAs= +github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9/go.mod h1:gWuR/CrFDDeVRFQwHPvsv9soJVB/iqymhuZQuJ3a9OM= +github.com/go-pdf/fpdf v0.8.0 h1:IJKpdaagnWUeSkUFUjTcSzTppFxmv8ucGQyNPQWxYOQ= +github.com/go-pdf/fpdf v0.8.0/go.mod h1:gfqhcNwXrsd3XYKte9a7vM3smvU/jB4ZRDrmWSxpfdc= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc= +github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyXE/eKY9RztQ= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/image v0.11.0 h1:ds2RoQvBvYTiJkwpSFDwCcDFNX7DqjL2WsUgTNk0Ooo= +golang.org/x/image v0.11.0/go.mod h1:bglhjqbqVuEb9e9+eNR45Jfu7D+T4Qan+NhQk8Ck2P8= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.14.0 h1:2NiG67LD1tEH0D7kM+ps2V+fXmsAnpUeec7n8tcr4S0= +gonum.org/v1/gonum v0.14.0/go.mod h1:AoWeoz0becf9QMWtE8iWXNXc27fK4fNeHNf/oMejGfU= +gonum.org/v1/plot v0.14.0 h1:+LBDVFYwFe4LHhdP8coW6296MBEY4nQ+Y4vuUpJopcE= +gonum.org/v1/plot v0.14.0/go.mod h1:MLdR9424SJed+5VqC6MsouEpig9pZX2VZ57H9ko2bXU= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/tools/protobench/main.go b/tools/protobench/main.go new file mode 100644 index 00000000..dfe6a324 --- /dev/null +++ b/tools/protobench/main.go @@ -0,0 +1,172 @@ +package main + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "syscall" + + "github.com/andybalholm/brotli" + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" +) + +func main() { + err := mainWithError() + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + os.Exit(0) +} + +func mainWithError() error { + args, err := parseArgs() + if err != nil { + return fmt.Errorf("failed to parse arguments: %v", err) + } + + if err = args.SanityCheck(); err != nil { + return err + } + + summary, err := benchmark(args.inputDir) + if err != nil { + return fmt.Errorf("benchmark failed: %v", err) + } + + switch filepath.Ext(args.outputFile) { + case ".csv": + if err = summary.toCSV(args.outputFile); err != nil { + return fmt.Errorf("failed to generate bar chart: %v", err) + } + case ".png": + if err = summary.toBarChart(args.outputFile); err != nil { + return fmt.Errorf("failed to generate bar chart: %v", err) + } + default: + summary.printCSV() + } + + return nil +} + +func benchmark(inputDir string) (*benchSummary, error) { + // scan directory for files + files, err := os.ReadDir(inputDir) + if err != nil { + return nil, fmt.Errorf("failed to read directory %s: %v", inputDir, err) + } + + summary := &benchSummary{ + totalFiles: len(files), + results: make([]benchResult, 0, len(compressors)+1), + } + + totalUncompressed, maxSize := sumFileSizes(files) + summary.totalUncompressed = totalUncompressed + + // pre-allocate buffer for compressed data to avoid reallocations + var buf = bytes.NewBuffer(make([]byte, 0, maxSize)) + + // Warm-up + _, _ = compressFiles(noneCompressor{name: "none"}, files, inputDir, buf) + + baseUsage := int64(0) + for _, c := range compressors { + var compressed int64 + + cpuUsage := getCPUUsage(func() { + compressed, err = compressFiles(c, files, inputDir, buf) + }) + + if err != nil { + return nil, fmt.Errorf("compression failed: %v", err) + } + + summary.results = append(summary.results, benchResult{ + name: c.id(), + totalCompressed: compressed, + cpuUsage: cpuUsage - baseUsage, + }) + + if baseUsage == 0 { + // The CPU usage of the noneCompressor is used as the base usage for all compressors. + baseUsage = cpuUsage + } + } + + return summary, nil +} + +func getCPUUsage(f func()) int64 { + start := getCPUTime() + f() + return getCPUTime() - start +} + +func getCPUTime() int64 { + usage := new(syscall.Rusage) + err := syscall.Getrusage(syscall.RUSAGE_SELF, usage) + if err != nil { + panic(fmt.Errorf("failed to get CPU usage: %v", err)) + } + return usage.Utime.Nano() + usage.Stime.Nano() +} + +func sumFileSizes(files []os.DirEntry) (total, maxSize int64) { + for _, f := range files { + if fi, err := f.Info(); err == nil { + total += fi.Size() + maxSize = max(maxSize, fi.Size()) + } + } + return +} + +func compressFiles(c compressor, files []os.DirEntry, benchProtoDir string, + buf *bytes.Buffer) (int64, error) { + var totalCompressed int64 + for _, f := range files { + pathName := filepath.Join(benchProtoDir, f.Name()) + + // read file contents + content, err := os.ReadFile(pathName) + if err != nil { + return 0, fmt.Errorf("failed to read file %s: %v", pathName, err) + } + + buf.Reset() + + // compress content with compressor to memory + compressed, err := c.compress(content, buf) + if err != nil { + return 0, fmt.Errorf("failed to compress file %s with %s: %v", + pathName, c.id(), err) + } + + totalCompressed += compressed + } + return totalCompressed, nil +} + +var compressors = []compressor{ + noneCompressor{name: "none"}, + gzipCompressor{name: "gzip.BestSpeed", level: gzip.BestSpeed}, + gzipCompressor{name: "gzip.BestCompression", level: gzip.BestCompression}, + zstdCompressor{name: "zstd.SpeedFastest", level: zstd.SpeedFastest}, + zstdCompressor{name: "zstd.SpeedDefault", level: zstd.SpeedDefault}, + zstdCompressor{name: "zstd.SpeedBetterCompression", level: zstd.SpeedBetterCompression}, + brotliCompressor{name: "brotli.BestSpeed", level: brotli.BestSpeed}, + brotliCompressor{name: "brotli.DefaultCompression", level: brotli.DefaultCompression}, + // Removed due to extremely high CPU usage. + // brotliCompressor{name: "brotli.BestCompression", level: brotli.BestCompression}, + s2Compressor{name: "s2.WriterBetterCompression", level: s2.WriterBetterCompression()}, + s2Compressor{name: "s2.WriterBestCompression", level: s2.WriterBestCompression()}, + lz4Compressor{name: "lz4.Level1", level: lz4.Level1}, + lz4Compressor{name: "lz4.Level6", level: lz4.Level6}, + lz4Compressor{name: "lz4.Level9", level: lz4.Level9}, +} diff --git a/tools/protobench/summary.go b/tools/protobench/summary.go new file mode 100644 index 00000000..7102bb14 --- /dev/null +++ b/tools/protobench/summary.go @@ -0,0 +1,117 @@ +package main + +import ( + "bytes" + "fmt" + "image/color" + "os" + + "gonum.org/v1/plot" + "gonum.org/v1/plot/plotter" + "gonum.org/v1/plot/vg" +) + +type benchSummary struct { + totalFiles int + totalUncompressed int64 + results []benchResult +} + +type benchResult struct { + name string + totalCompressed int64 + cpuUsage int64 +} + +func (s *benchSummary) printCSV() { + fmt.Printf("Total files: %d\n", s.totalFiles) + fmt.Printf("Total uncompressed: %d bytes\n", s.totalUncompressed) + fmt.Print("\n", string(s.csvBytes())) +} + +func (s *benchSummary) toCSV(filename string) error { + return os.WriteFile(filename, s.csvBytes(), 0o600) +} + +func (s *benchSummary) csvBytes() []byte { + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + buf.WriteString("Compressor,Total compressed,CPU usage\n") + for _, r := range s.results { + _, _ = fmt.Fprintf(buf, "%s,%d,%d\n", r.name, r.totalCompressed, r.cpuUsage) + } + return buf.Bytes() +} + +func (s *benchSummary) toBarChart(filename string) error { + p := plot.New() + + p.Title.Text = "Compression Results" + p.Y.Label.Text = "Go Compressors" + p.X.Label.Text = "in % (smaller is better)" + + // Order results by compression rate, but put "none" last, so it is plotted on top. + none := s.results[0] + for i := 1; i < len(s.results); i++ { + s.results[i-1] = s.results[i] + } + s.results[len(s.results)-1] = none + + // Calculate compression rates as percentages. + compressionRates := make(plotter.Values, len(s.results)) + for i, r := range s.results { + compressionRates[i] = percent(s.totalUncompressed, r.totalCompressed) + } + + // Find the maximum CPU usage and use it as "100%". + maxCPUUsage := int64(0) + for _, r := range s.results { + maxCPUUsage = max(maxCPUUsage, r.cpuUsage) + } + + // Calculate CPU usage as percentages. + cpuUsages := make(plotter.Values, len(s.results)) + for i, r := range s.results { + cpuUsages[i] = percent(maxCPUUsage, r.cpuUsage) + } + + labels := make([]string, len(s.results)) + for i, r := range s.results { + labels[i] = r.name + } + + bars, err := plotter.NewBarChart(compressionRates, vg.Points(20)) + if err != nil { + return err + } + + bars.Horizontal = true + bars.LineStyle.Width = vg.Length(0) + bars.Color = color.RGBA{R: 0, G: 0, B: 128, A: 255} + bars.Offset = -vg.Points(10) + p.Legend.Add("Compressed size", bars) + + bars2, err := plotter.NewBarChart(cpuUsages, vg.Points(20)) + if err != nil { + return err + } + + bars2.Horizontal = true + bars2.LineStyle.Width = vg.Length(0) + bars2.Color = color.RGBA{R: 128, G: 0, B: 0, A: 255} + bars2.Offset = vg.Points(10) + p.Legend.Add("CPU usage", bars2) + + p.Add(bars, bars2) + p.NominalY(labels...) + + // Save the plot to a PNG file. + if err := p.Save(10*vg.Inch, 10*vg.Inch, filename); err != nil { + return err + } + + return nil +} + +func percent(total, part int64) float64 { + return (float64(part) / float64(total)) * 100 +}