Skip to content

Commit

Permalink
Rename CLI options and semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
rockdaboot committed Aug 19, 2024
1 parent 53bb056 commit 41d3141
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 176 deletions.
78 changes: 47 additions & 31 deletions bench/benchreporter/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand All @@ -25,7 +24,8 @@ import (
var _ reporter.Reporter = (*BenchmarkReporter)(nil)

type BenchmarkReporter struct {
benchDataDir string
saveInputsTo string
f *os.File
rep reporter.Reporter
uid int
gid int
Expand Down Expand Up @@ -70,14 +70,16 @@ func (r *BenchmarkReporter) SupportsReportTraceEvent() bool {
}

type fallbackSymbol struct {
FrameID libpf.FrameID
Symbol string
FileID libpf.FileID
AddressOrLine libpf.AddressOrLineno
Symbol string
}

func (r *BenchmarkReporter) ReportFallbackSymbol(frameID libpf.FrameID, symbol string) {
r.store("FallbackSymbol", &fallbackSymbol{
FrameID: frameID,
Symbol: symbol,
FileID: frameID.FileID(),
AddressOrLine: frameID.AddressOrLine(),
Symbol: symbol,
})
r.rep.ReportFallbackSymbol(frameID, symbol)
}
Expand Down Expand Up @@ -155,30 +157,31 @@ func (r *BenchmarkReporter) ReportMetrics(timestamp uint32, ids []uint32, values

func (r *BenchmarkReporter) Stop() {
r.rep.Stop()
_ = r.f.Close()
}

func (r *BenchmarkReporter) GetMetrics() reporter.Metrics {
return r.rep.GetMetrics()
}

func NewBenchmarkReporter(benchDataDir string, rep reporter.Reporter) (*BenchmarkReporter, error) {
func NewBenchmarkReporter(saveInputsTo string, rep reporter.Reporter) (*BenchmarkReporter, error) {
r := &BenchmarkReporter{
benchDataDir: benchDataDir,
saveInputsTo: saveInputsTo,
rep: rep,
}
r.uid, r.gid = originUser()

if err := os.MkdirAll(benchDataDir, 0o755); err != nil {
return nil, err
var err error
if r.f, err = os.OpenFile(saveInputsTo,
os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644); err != nil {
return nil, fmt.Errorf("failed to open file %s: %v", saveInputsTo, err)
}

if r.uid != 0 || r.gid != 0 {
changeDirOwner(benchDataDir, r.uid, r.gid)
if err = r.f.Chown(r.uid, r.gid); err != nil {
return nil, fmt.Errorf("failed to change ownership of %s to %d:%d: %v",
saveInputsTo, r.uid, r.gid, err)
}

// Just for storing the initial timestamp.
r.store("Start", libpf.Void{})

return r, nil
}

Expand All @@ -192,27 +195,40 @@ func originUser() (uid, gid int) {
return
}

var counter atomic.Uint64
type metaInfo struct {
TS int64 `json:"ts"`
Name string `json:"name"`
}

// store stores data as JSON.
// store appends data as NDJSON to the output file.
func (r *BenchmarkReporter) store(name string, data any) {
ts := time.Now().UnixNano()
id := counter.Add(1)
fileName := fmt.Sprintf("%d_%06x_%s.json", ts, id, name)
pathName := path.Join(r.benchDataDir, fileName)
meta := metaInfo{
TS: time.Now().UnixNano(),
Name: name,
}

// encode data to JSON
bytes, err := json.Marshal(data)
// encode meta data to JSON
bytes, err := json.Marshal(meta)
if err != nil {
panic(err)
}
if err = appendToFile(r.f, bytes); err != nil {
panic(err)
}

//nolint:gosec
if err = os.WriteFile(pathName, bytes, 0o644); err != nil {
// encode reporter input to JSON
bytes, err = json.Marshal(data)
if err != nil {
panic(err)
}
if err = appendToFile(r.f, bytes); err != nil {
panic(err)
}
}

changeOwner(pathName, r.uid, r.gid)
func appendToFile(f *os.File, bytes []byte) error {
_, err := f.Write(append(bytes, '\n'))
return err
}

func changeOwner(pathName string, uid, gid int) {
Expand All @@ -230,23 +246,23 @@ func changeDirOwner(dirName string, uid, gid int) {
}
}

func GRPCInterceptor(benchProtoDir string) grpc.UnaryClientInterceptor {
if benchProtoDir != "" {
if err := os.MkdirAll(benchProtoDir, 0o755); err != nil {
func GRPCInterceptor(saveDir string) grpc.UnaryClientInterceptor {
if saveDir != "" {
if err := os.MkdirAll(saveDir, 0o755); err != nil {
log.Errorf("Failed to create directory for storing protobuf messages: %v", err)
return nil
}

uid, gid := originUser()

if uid != 0 || gid != 0 {
changeDirOwner(benchProtoDir, uid, gid)
changeDirOwner(saveDir, uid, gid)
}

// return interceptor to write the uncompressed protobuf messages to disk.
return func(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
storeProtobuf(benchProtoDir, req, uid, gid)
storeProtobuf(saveDir, req, uid, gid)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
Expand Down
113 changes: 34 additions & 79 deletions bench/benchreporter/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -16,117 +14,81 @@ import (
"github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter"
)

type fileInfo struct {
name string
timestamp int64
id uint64
funcName string
}

// Replay replays the stored data from benchDataDir.
// Replay replays the stored data from replayInputsFrom.
// The argument r is the reporter that will receive the replayed data.
func Replay(ctx context.Context, benchDataDir string, rep reporter.Reporter) error {
files, err := os.ReadDir(benchDataDir)
func Replay(ctx context.Context, replayInputsFrom string, rep reporter.Reporter) error {
stream, err := os.Open(replayInputsFrom)
if err != nil {
return fmt.Errorf("failed to read directory %s: %v", benchDataDir, err)
return fmt.Errorf("failed to open file %s: %v", replayInputsFrom, err)
}
decoder := json.NewDecoder(stream)

fileInfos := make([]fileInfo, 0, len(files))
var m metaInfo
var curTS int64

for _, f := range files {
if !strings.HasSuffix(f.Name(), ".json") {
continue
for {
if err = decoder.Decode(&m); err != nil {
// EOF is returned at the end of the stream.
if err != io.EOF {
return err
}
break
}

name := f.Name()
// scan name for timestamp, counter and function name
var timestamp int64
var id uint64
var funcName string
if _, err = fmt.Sscanf(name, "%d_%x_%s", &timestamp, &id, &funcName); err != nil {
log.Errorf("Failed to parse file name %s: %v", name, err)
continue
if curTS != 0 {
time.Sleep(time.Duration(m.TS-curTS) * time.Nanosecond)
}
funcName = strings.TrimSuffix(funcName, ".json")

fileInfos = append(fileInfos, fileInfo{
name: name,
timestamp: timestamp,
id: id,
funcName: funcName,
})
}

if len(fileInfos) == 0 {
return nil
}

// Sort fileInfos ascending by ID.
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].id < fileInfos[j].id
})
curTS = m.TS

if fileInfos[0].funcName != "Start" {
return fmt.Errorf("first function name must be \"Start\", instead it is \"%s\"",
fileInfos[0].funcName)
}

curTS := fileInfos[0].timestamp

// Replay the stored data
for _, fi := range fileInfos[1:] {
time.Sleep(time.Duration(fi.timestamp-curTS) * time.Nanosecond)
curTS = fi.timestamp

switch fi.funcName {
switch m.Name {
case "TraceEvent":
var v traceEvent
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
if err = decodeTo(decoder, &v); err == nil {
rep.ReportTraceEvent(v.Trace, v.Meta)
}
case "CountForTrace":
var v countForTrace
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
if err = decodeTo(decoder, &v); err == nil {
rep.ReportCountForTrace(v.TraceHash, v.Count, v.Meta)
}
case "FramesForTrace":
var v libpf.Trace
if err = dataFromFileInfo[libpf.Trace](benchDataDir, fi, &v); err == nil {
if err = decodeTo[libpf.Trace](decoder, &v); err == nil {
rep.ReportFramesForTrace(&v)
}
case "FallbackSymbol":
var v fallbackSymbol
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
rep.ReportFallbackSymbol(v.FrameID, v.Symbol)
if err = decodeTo(decoder, &v); err == nil {
rep.ReportFallbackSymbol(libpf.NewFrameID(v.FileID, v.AddressOrLine), v.Symbol)
}
case "ExectableMetadata":
case "ExecutableMetadata":
var v executableMetadata
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
if err = decodeTo(decoder, &v); err == nil {
rep.ExecutableMetadata(context.Background(), v.FileID, v.FileName, v.BuildID,
v.Interp, nil)
}
case "FrameMetadata":
var v frameMetadata
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
if err = decodeTo(decoder, &v); err == nil {
rep.FrameMetadata(v.FileID, v.AddressOrLine, v.LineNumber, v.FunctionOffset,
v.FunctionName, v.FilePath)
}
case "HostMetadata":
var v hostMetadata
if err = dataFromFileInfo(benchDataDir, fi, &v); err == nil {
if err = decodeTo(decoder, &v); err == nil {
rep.ReportHostMetadata(v.Metadata)
}
case "Metrics":
var v metrics
if err = dataFromFileInfo[metrics](benchDataDir, fi, &v); err == nil {
if err = decodeTo[metrics](decoder, &v); err == nil {
rep.ReportMetrics(v.Timestamp, v.IDs, v.Values)
}
default:
err = fmt.Errorf("unsupported function name in file %s: %s", fi.name, fi.funcName)
err = fmt.Errorf("unsupported function name in file %s: %s", replayInputsFrom, m.Name)
}

if err != nil {
log.Errorf("Failed to replay data from file %s: %v", fi.name, err)
log.Errorf("Failed to replay data from file %s: %v", m.Name, err)
}

if err = ctx.Err(); err != nil {
Expand All @@ -137,16 +99,9 @@ func Replay(ctx context.Context, benchDataDir string, rep reporter.Reporter) err
return nil
}

func dataFromFileInfo[T any](dir string, fi fileInfo, data *T) error {
pathName := filepath.Join(dir, fi.name)
f, err := os.Open(pathName)
if err != nil {
return fmt.Errorf("failed to open file %s: %v", pathName, err)
}
defer f.Close()

if err = json.NewDecoder(f).Decode(data); err != nil {
return fmt.Errorf("failed to decode JSON from file %s: %v", pathName, err)
func decodeTo[T any](decoder *json.Decoder, data *T) error {
if err := decoder.Decode(data); err != nil {
return fmt.Errorf("failed to decode JSON: %v", err)
}

return nil
Expand Down
Loading

0 comments on commit 41d3141

Please sign in to comment.