diff --git a/CHANGELOG.md b/CHANGELOG.md index 549529f3..d64be8dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -102,6 +102,7 @@ - Lift Intervals struct to utils ([#498](https://github.com/getsentry/vroom/pull/498)) - Fix flakey test for profile examples ([#504](https://github.com/getsentry/vroom/pull/504)) - Instrument flamegraph generation with spans ([#510](https://github.com/getsentry/vroom/pull/510)), ([#511](https://github.com/getsentry/vroom/pull/511)) +- Move calltree generation into readjob ([#514](https://github.com/getsentry/vroom/pull/514)) ## 23.12.0 diff --git a/internal/chunk/readjob.go b/internal/chunk/readjob.go index 80088fb8..b017d702 100644 --- a/internal/chunk/readjob.go +++ b/internal/chunk/readjob.go @@ -3,6 +3,7 @@ package chunk import ( "context" + "github.com/getsentry/vroom/internal/nodetree" "github.com/getsentry/vroom/internal/storageutil" "gocloud.dev/blob" ) @@ -55,3 +56,49 @@ func (job ReadJob) Read() { func (result ReadJobResult) Error() error { return result.Err } + +type ( + CallTreesReadJob ReadJob + + CallTreesReadJobResult struct { + Err error + CallTrees map[string][]*nodetree.Node + Chunk Chunk + TransactionID string + ThreadID *string + Start uint64 + End uint64 + } +) + +func (job CallTreesReadJob) Read() { + var chunk Chunk + + err := storageutil.UnmarshalCompressed( + job.Ctx, + job.Storage, + StoragePath(job.OrganizationID, job.ProjectID, job.ProfilerID, job.ChunkID), + &chunk, + ) + + if err != nil { + job.Result <- CallTreesReadJobResult{Err: err} + return + } + + callTrees, err := chunk.CallTrees(job.ThreadID) + + job.Result <- CallTreesReadJobResult{ + Err: err, + CallTrees: callTrees, + Chunk: chunk, + TransactionID: job.TransactionID, + ThreadID: job.ThreadID, + Start: job.Start, + End: job.End, + } +} + +func (result CallTreesReadJobResult) Error() error { + return result.Err +} diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go index b3be9315..f60778a7 100644 --- a/internal/flamegraph/flamegraph.go +++ b/internal/flamegraph/flamegraph.go @@ -488,7 +488,7 @@ func GetFlamegraphFromCandidates( dispatchSpan.SetData("continuous_candidates", len(continuousProfileCandidates)) for _, candidate := range transactionProfileCandidates { - jobs <- profile.ReadJob{ + jobs <- profile.CallTreesReadJob{ Ctx: ctx, OrganizationID: organizationID, ProjectID: candidate.ProjectID, @@ -499,7 +499,7 @@ func GetFlamegraphFromCandidates( } for _, candidate := range continuousProfileCandidates { - jobs <- chunk.ReadJob{ + jobs <- chunk.CallTreesReadJob{ Ctx: ctx, OrganizationID: organizationID, ProjectID: candidate.ProjectID, @@ -513,6 +513,7 @@ func GetFlamegraphFromCandidates( Result: results, } } + dispatchSpan.Finish() var flamegraphTree []*nodetree.Node @@ -536,42 +537,28 @@ func GetFlamegraphFromCandidates( continue } - if result, ok := res.(profile.ReadJobResult); ok { + if result, ok := res.(profile.CallTreesReadJobResult); ok { transactionProfileSpan := span.StartChild("calltree") transactionProfileSpan.Description = "transaction profile" - profileCallTrees, err := result.Profile.CallTrees() - if err != nil { - hub.CaptureException(err) - transactionProfileSpan.Finish() - continue - } - example := utils.NewExampleFromProfileID(result.Profile.ProjectID(), result.Profile.ID()) annotate := annotateWithProfileExample(example) - for _, callTree := range profileCallTrees { + for _, callTree := range result.CallTrees { addCallTreeToFlamegraph(&flamegraphTree, callTree, annotate) } // if metrics aggregator is not null, while we're at it, // compute the metrics as well if ma != nil { - functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(profileCallTrees), int(ma.MaxUniqueFunctions), true) + functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(result.CallTrees), int(ma.MaxUniqueFunctions), true) ma.AddFunctions(functions, example) } transactionProfileSpan.Finish() - } else if result, ok := res.(chunk.ReadJobResult); ok { + } else if result, ok := res.(chunk.CallTreesReadJobResult); ok { chunkProfileSpan := span.StartChild("calltree") chunkProfileSpan.Description = "continuous profile" - chunkCallTrees, err := result.Chunk.CallTrees(result.ThreadID) - if err != nil { - hub.CaptureException(err) - chunkProfileSpan.Finish() - continue - } - example := utils.NewExampleFromProfilerChunk( result.Chunk.ProjectID, result.Chunk.ProfilerID, @@ -583,7 +570,7 @@ func GetFlamegraphFromCandidates( ) annotate := annotateWithProfileExample(example) - for _, callTree := range chunkCallTrees { + for _, callTree := range result.CallTrees { if result.Start > 0 && result.End > 0 { interval := utils.Interval{ Start: result.Start, @@ -596,7 +583,7 @@ func GetFlamegraphFromCandidates( // if metrics aggregator is not null, while we're at it, // compute the metrics as well if ma != nil { - functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(chunkCallTrees), int(ma.MaxUniqueFunctions), true) + functions := metrics.CapAndFilterFunctions(metrics.ExtractFunctionsFromCallTrees(result.CallTrees), int(ma.MaxUniqueFunctions), true) ma.AddFunctions(functions, example) } chunkProfileSpan.Finish() diff --git a/internal/profile/readjob.go b/internal/profile/readjob.go index 8b45f5c6..8c0909d8 100644 --- a/internal/profile/readjob.go +++ b/internal/profile/readjob.go @@ -3,6 +3,7 @@ package profile import ( "context" + "github.com/getsentry/vroom/internal/nodetree" "github.com/getsentry/vroom/internal/storageutil" "gocloud.dev/blob" ) @@ -39,3 +40,41 @@ func (job ReadJob) Read() { func (result ReadJobResult) Error() error { return result.Err } + +type ( + CallTreesReadJob ReadJob + + CallTreesReadJobResult struct { + Err error + CallTrees map[uint64][]*nodetree.Node + Profile Profile + } +) + +func (job CallTreesReadJob) Read() { + var profile Profile + + err := storageutil.UnmarshalCompressed( + job.Ctx, + job.Storage, + StoragePath(job.OrganizationID, job.ProjectID, job.ProfileID), + &profile, + ) + + if err != nil { + job.Result <- CallTreesReadJobResult{Err: err} + return + } + + callTrees, err := profile.CallTrees() + + job.Result <- CallTreesReadJobResult{ + CallTrees: callTrees, + Profile: profile, + Err: err, + } +} + +func (result CallTreesReadJobResult) Error() error { + return result.Err +}