Skip to content

Commit

Permalink
Merge pull request #514 from getsentry/txiao/perf/move-calltree-gener…
Browse files Browse the repository at this point in the history
…ation-into-readjob

perf(flamegraph): Move calltree generation into readjob
  • Loading branch information
Zylphrex authored Sep 5, 2024
2 parents 67e9107 + b7cf4a7 commit 6e2c2aa
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
- Lift Intervals struct to utils ([#498](https://github.com/getsentry/vroom/pull/498))
- Fix flakey test for profile examples ([#504](https://github.com/getsentry/vroom/pull/504))
- Instrument flamegraph generation with spans ([#510](https://github.com/getsentry/vroom/pull/510)), ([#511](https://github.com/getsentry/vroom/pull/511))
- Move calltree generation into readjob ([#514](https://github.com/getsentry/vroom/pull/514))

## 23.12.0

Expand Down
47 changes: 47 additions & 0 deletions internal/chunk/readjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunk
import (
"context"

"github.com/getsentry/vroom/internal/nodetree"
"github.com/getsentry/vroom/internal/storageutil"
"gocloud.dev/blob"
)
Expand Down Expand Up @@ -55,3 +56,49 @@ func (job ReadJob) Read() {
func (result ReadJobResult) Error() error {
return result.Err
}

type (
CallTreesReadJob ReadJob

CallTreesReadJobResult struct {
Err error
CallTrees map[string][]*nodetree.Node
Chunk Chunk
TransactionID string
ThreadID *string
Start uint64
End uint64
}
)

func (job CallTreesReadJob) Read() {
var chunk Chunk

err := storageutil.UnmarshalCompressed(
job.Ctx,
job.Storage,
StoragePath(job.OrganizationID, job.ProjectID, job.ProfilerID, job.ChunkID),
&chunk,
)

if err != nil {
job.Result <- CallTreesReadJobResult{Err: err}
return
}

callTrees, err := chunk.CallTrees(job.ThreadID)

job.Result <- CallTreesReadJobResult{
Err: err,
CallTrees: callTrees,
Chunk: chunk,
TransactionID: job.TransactionID,
ThreadID: job.ThreadID,
Start: job.Start,
End: job.End,
}
}

func (result CallTreesReadJobResult) Error() error {
return result.Err
}
31 changes: 9 additions & 22 deletions internal/flamegraph/flamegraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func GetFlamegraphFromCandidates(
dispatchSpan.SetData("continuous_candidates", len(continuousProfileCandidates))

for _, candidate := range transactionProfileCandidates {
jobs <- profile.ReadJob{
jobs <- profile.CallTreesReadJob{
Ctx: ctx,
OrganizationID: organizationID,
ProjectID: candidate.ProjectID,
Expand All @@ -499,7 +499,7 @@ func GetFlamegraphFromCandidates(
}

for _, candidate := range continuousProfileCandidates {
jobs <- chunk.ReadJob{
jobs <- chunk.CallTreesReadJob{
Ctx: ctx,
OrganizationID: organizationID,
ProjectID: candidate.ProjectID,
Expand All @@ -513,6 +513,7 @@ func GetFlamegraphFromCandidates(
Result: results,
}
}

dispatchSpan.Finish()

var flamegraphTree []*nodetree.Node
Expand All @@ -536,42 +537,28 @@ func GetFlamegraphFromCandidates(
continue
}

if result, ok := res.(profile.ReadJobResult); ok {
if result, ok := res.(profile.CallTreesReadJobResult); ok {
transactionProfileSpan := span.StartChild("calltree")
transactionProfileSpan.Description = "transaction profile"

profileCallTrees, err := result.Profile.CallTrees()
if err != nil {
hub.CaptureException(err)
transactionProfileSpan.Finish()
continue
}

example := utils.NewExampleFromProfileID(result.Profile.ProjectID(), result.Profile.ID())
annotate := annotateWithProfileExample(example)

for _, callTree := range profileCallTrees {
for _, callTree := range result.CallTrees {
addCallTreeToFlamegraph(&flamegraphTree, callTree, annotate)
}
// if metrics aggregator is not null, while we're at it,
// compute the metrics as well
if ma != nil {
functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(profileCallTrees), int(ma.MaxUniqueFunctions), true)
functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(result.CallTrees), int(ma.MaxUniqueFunctions), true)
ma.AddFunctions(functions, example)
}

transactionProfileSpan.Finish()
} else if result, ok := res.(chunk.ReadJobResult); ok {
} else if result, ok := res.(chunk.CallTreesReadJobResult); ok {
chunkProfileSpan := span.StartChild("calltree")
chunkProfileSpan.Description = "continuous profile"

chunkCallTrees, err := result.Chunk.CallTrees(result.ThreadID)
if err != nil {
hub.CaptureException(err)
chunkProfileSpan.Finish()
continue
}

example := utils.NewExampleFromProfilerChunk(
result.Chunk.ProjectID,
result.Chunk.ProfilerID,
Expand All @@ -583,7 +570,7 @@ func GetFlamegraphFromCandidates(
)
annotate := annotateWithProfileExample(example)

for _, callTree := range chunkCallTrees {
for _, callTree := range result.CallTrees {
if result.Start > 0 && result.End > 0 {
interval := utils.Interval{
Start: result.Start,
Expand All @@ -596,7 +583,7 @@ func GetFlamegraphFromCandidates(
// if metrics aggregator is not null, while we're at it,
// compute the metrics as well
if ma != nil {
functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(chunkCallTrees), int(ma.MaxUniqueFunctions), true)
functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(result.CallTrees), int(ma.MaxUniqueFunctions), true)
ma.AddFunctions(functions, example)
}
chunkProfileSpan.Finish()
Expand Down
39 changes: 39 additions & 0 deletions internal/profile/readjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package profile
import (
"context"

"github.com/getsentry/vroom/internal/nodetree"
"github.com/getsentry/vroom/internal/storageutil"
"gocloud.dev/blob"
)
Expand Down Expand Up @@ -39,3 +40,41 @@ func (job ReadJob) Read() {
func (result ReadJobResult) Error() error {
return result.Err
}

type (
CallTreesReadJob ReadJob

CallTreesReadJobResult struct {
Err error
CallTrees map[uint64][]*nodetree.Node
Profile Profile
}
)

func (job CallTreesReadJob) Read() {
var profile Profile

err := storageutil.UnmarshalCompressed(
job.Ctx,
job.Storage,
StoragePath(job.OrganizationID, job.ProjectID, job.ProfileID),
&profile,
)

if err != nil {
job.Result <- CallTreesReadJobResult{Err: err}
return
}

callTrees, err := profile.CallTrees()

job.Result <- CallTreesReadJobResult{
CallTrees: callTrees,
Profile: profile,
Err: err,
}
}

func (result CallTreesReadJobResult) Error() error {
return result.Err
}

0 comments on commit 6e2c2aa

Please sign in to comment.