Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reporter: minor cleanup and #256

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 67 additions & 52 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const (
executableCacheLifetime = 1 * time.Hour
framesCacheLifetime = 1 * time.Hour
)

// Assert that we implement the full Reporter interface.
Expand Down Expand Up @@ -88,6 +89,19 @@ type attrKeyValue[T string | int64] struct {
value T
}

// attributeMap is a temporary cache that maps deduplicated attribute keys to an
// index into an attribute table mappings.
type attributeMap map[string]uint64

// stringMap is a temporary cache that maps deduplicated strings to a index.
type stringMap map[string]uint32

// funcMap is a temporary cache that maps deduplicated function information to a index.
type funcMap map[funcInfo]uint64

// fileIDMap is a temporary cache that maps deduplicated file IDs to a index.
type fileIDMap map[libpf.FileID]uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These mean the reporter can't handle concurrent exports.


// OTLPReporter receives and transforms information to be OTLP/profiles compliant.
type OTLPReporter struct {
config *Config
Expand Down Expand Up @@ -159,7 +173,7 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.
frames.SetLifetime(framesCacheLifetime) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
Expand Down Expand Up @@ -255,7 +269,7 @@ func (r *OTLPReporter) ReportCountForTrace(_ libpf.TraceHash, _ uint16, _ *Trace
// ExecutableKnown returns true if the metadata of the Executable specified by fileID is
// cached in the reporter.
func (r *OTLPReporter) ExecutableKnown(fileID libpf.FileID) bool {
_, known := r.executables.Get(fileID)
_, known := r.executables.GetAndRefresh(fileID, executableCacheLifetime)
return known
}

Expand All @@ -272,7 +286,8 @@ func (r *OTLPReporter) ExecutableMetadata(args *ExecutableMetadataArgs) {
// cached in the reporter.
func (r *OTLPReporter) FrameKnown(frameID libpf.FrameID) bool {
known := false
if frameMapLock, exists := r.frames.Get(frameID.FileID()); exists {
if frameMapLock, exists := r.frames.GetAndRefresh(frameID.FileID(),
framesCacheLifetime); exists {
frameMap := frameMapLock.RLock()
defer frameMapLock.RUnlock(&frameMap)
_, known = (*frameMap)[frameID.AddressOrLine()]
Expand Down Expand Up @@ -497,32 +512,32 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
clear(*traceEvents)
r.traceEvents.WUnlock(&traceEvents)

// stringMap is a temporary helper that will build the StringTable.
// strMap is a temporary helper that will build the StringTable.
// By specification, the first element should be empty.
stringMap := make(map[string]uint32)
stringMap[""] = 0
strMap := make(stringMap)
strMap[""] = 0

// funcMap is a temporary helper that will build the Function array
// fnMap is a temporary helper that will build the Function array
// in profile and make sure information is deduplicated.
funcMap := make(map[funcInfo]uint64)
funcMap[funcInfo{name: "", fileName: ""}] = 0
fnMap := make(funcMap)
fnMap[funcInfo{name: "", fileName: ""}] = 0

// attributeMap is a temporary helper that maps attribute values to
// attrMap is a temporary helper that maps attribute values to
// their respective indices.
// This is to ensure that AttributeTable does not contain duplicates.
attributeMap := make(map[string]uint64)
attrMap := make(attributeMap)

numSamples := len(samples)
profile = &profiles.Profile{
// SampleType - Next step: Figure out the correct SampleType.
Sample: make([]*profiles.Sample, 0, numSamples),
SampleType: []*profiles.ValueType{{
Type: int64(getStringMapIndex(stringMap, "samples")),
Unit: int64(getStringMapIndex(stringMap, "count")),
Type: int64(getStringMapIndex(strMap, "samples")),
Unit: int64(getStringMapIndex(strMap, "count")),
}},
PeriodType: &profiles.ValueType{
Type: int64(getStringMapIndex(stringMap, "cpu")),
Unit: int64(getStringMapIndex(stringMap, "nanoseconds")),
Type: int64(getStringMapIndex(strMap, "cpu")),
Unit: int64(getStringMapIndex(strMap, "nanoseconds")),
},
Period: 1e9 / int64(r.samplesPerSecond),
// AttributeUnits - Optional element we do not use.
Expand All @@ -536,13 +551,13 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
locationIndex := uint64(0)

// Temporary lookup to reference existing Mappings.
fileIDtoMapping := make(map[libpf.FileID]uint64)
fileIDtoMapping := make(fileIDMap)

for traceKey, traceInfo := range samples {
sample := &profiles.Sample{}
sample.LocationsStartIndex = locationIndex

sample.StacktraceIdIndex = getStringMapIndex(stringMap,
sample.StacktraceIdIndex = getStringMapIndex(strMap,
traceKey.hash.Base64())

slices.Sort(traceInfo.timestamps)
Expand All @@ -556,7 +571,7 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
for i := range traceInfo.frameTypes {
frameAttributes := addProfileAttributes(profile, []attrKeyValue[string]{
{key: "profile.frame.type", value: traceInfo.frameTypes[i].String()},
}, attributeMap)
}, attrMap)

loc := &profiles.Location{
// Id - Optional element we do not use.
Expand Down Expand Up @@ -596,14 +611,14 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
{key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID},
{key: "process.executable.build_id.htlhash",
value: traceInfo.files[i].StringNoQuotes()},
}, attributeMap)
}, attrMap)

profile.Mapping = append(profile.Mapping, &profiles.Mapping{
// Id - Optional element we do not use.
MemoryStart: uint64(traceInfo.mappingStarts[i]),
MemoryLimit: uint64(traceInfo.mappingEnds[i]),
FileOffset: traceInfo.mappingFileOffsets[i],
Filename: int64(getStringMapIndex(stringMap, fileName)),
Filename: int64(getStringMapIndex(strMap, fileName)),
Attributes: mappingAttributes,
// HasFunctions - Optional element we do not use.
// HasFilenames - Optional element we do not use.
Expand All @@ -621,26 +636,27 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
// Store interpreted frame information as a Line message:
line := &profiles.Line{}

fileIDInfoLock, exists := r.frames.Get(traceInfo.files[i])
fileIDInfoLock, exists := r.frames.GetAndRefresh(traceInfo.files[i],
framesCacheLifetime)
if !exists {
// At this point, we do not have enough information for the frame.
// Therefore, we report a dummy entry and use the interpreter as filename.
line.FunctionIndex = createFunctionEntry(funcMap,
line.FunctionIndex = createFunctionEntry(fnMap,
"UNREPORTED", frameKind.String())
} else {
fileIDInfo := fileIDInfoLock.RLock()
if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists {
line.Line = int64(si.lineNumber)

line.FunctionIndex = createFunctionEntry(funcMap,
line.FunctionIndex = createFunctionEntry(fnMap,
si.functionName, si.filePath)
} else {
// At this point, we do not have enough information for the frame.
// Therefore, we report a dummy entry and use the interpreter as filename.
// To differentiate this case from the case where no information about
// the file ID is available at all, we use a different name for reported
// function.
line.FunctionIndex = createFunctionEntry(funcMap,
line.FunctionIndex = createFunctionEntry(fnMap,
"UNRESOLVED", frameKind.String())
}
fileIDInfoLock.RUnlock(&fileIDInfo)
Expand All @@ -649,7 +665,7 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u

// To be compliant with the protocol, generate a dummy mapping entry.
loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping,
stringMap, attributeMap, profile, traceInfo.files[i])
strMap, attrMap, profile, traceInfo.files[i])
}
profile.Location = append(profile.Location, loc)
}
Expand All @@ -658,9 +674,9 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
{key: string(semconv.ContainerIDKey), value: traceKey.containerID},
{key: string(semconv.ThreadNameKey), value: traceKey.comm},
{key: string(semconv.ServiceNameKey), value: traceKey.apmServiceName},
}, attributeMap), addProfileAttributes(profile, []attrKeyValue[int64]{
}, attrMap), addProfileAttributes(profile, []attrKeyValue[int64]{
{key: string(semconv.ProcessPIDKey), value: traceKey.pid},
}, attributeMap)...)
}, attrMap)...)
sample.LocationsLength = uint64(len(traceInfo.frameTypes))
locationIndex += sample.LocationsLength

Expand All @@ -669,20 +685,20 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
log.Debugf("Reporting OTLP profile with %d samples", len(profile.Sample))

// Populate the deduplicated functions into profile.
funcTable := make([]*profiles.Function, len(funcMap))
for v, idx := range funcMap {
funcTable := make([]*profiles.Function, len(fnMap))
for v, idx := range fnMap {
funcTable[idx] = &profiles.Function{
Name: int64(getStringMapIndex(stringMap, v.name)),
Filename: int64(getStringMapIndex(stringMap, v.fileName)),
Name: int64(getStringMapIndex(strMap, v.name)),
Filename: int64(getStringMapIndex(strMap, v.fileName)),
}
}
profile.Function = append(profile.Function, funcTable...)

// When ranging over stringMap, the order will be according to the
// When ranging over strMap, the order will be according to the
// hash value of the key. To get the correct order for profile.StringTable,
// put the values in stringMap, in the correct array order.
stringTable := make([]string, len(stringMap))
for v, idx := range stringMap {
// put the values in strMap, in the correct array order.
stringTable := make([]string, len(strMap))
for v, idx := range strMap {
stringTable[idx] = v
}
profile.StringTable = append(profile.StringTable, stringTable...)
Expand All @@ -700,39 +716,39 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
return profile, startTS, endTS
}

// getStringMapIndex inserts or looks up the index for value in stringMap.
func getStringMapIndex(stringMap map[string]uint32, value string) uint32 {
if idx, exists := stringMap[value]; exists {
// getStringMapIndex inserts or looks up the index for value in strMap.
func getStringMapIndex(strMap stringMap, value string) uint32 {
if idx, exists := strMap[value]; exists {
return idx
}

idx := uint32(len(stringMap))
stringMap[value] = idx
idx := uint32(len(strMap))
strMap[value] = idx

return idx
}

// createFunctionEntry adds a new function and returns its reference index.
func createFunctionEntry(funcMap map[funcInfo]uint64,
func createFunctionEntry(fnMap funcMap,
name string, fileName string) uint64 {
key := funcInfo{
name: name,
fileName: fileName,
}
if idx, exists := funcMap[key]; exists {
if idx, exists := fnMap[key]; exists {
return idx
}

idx := uint64(len(funcMap))
funcMap[key] = idx
idx := uint64(len(fnMap))
fnMap[key] = idx

return idx
}

// addProfileAttributes adds attributes to Profile.attribute_table and returns
// the indices to these attributes.
func addProfileAttributes[T string | int64](profile *profiles.Profile,
attributes []attrKeyValue[T], attributeMap map[string]uint64) []uint64 {
attributes []attrKeyValue[T], attrMap attributeMap) []uint64 {
indices := make([]uint64, 0, len(attributes))

addAttr := func(attr attrKeyValue[T]) {
Expand All @@ -754,7 +770,7 @@ func addProfileAttributes[T string | int64](profile *profiles.Profile,
return
}

if attributeIndex, exists := attributeMap[attributeCompositeKey]; exists {
if attributeIndex, exists := attrMap[attributeCompositeKey]; exists {
indices = append(indices, attributeIndex)
return
}
Expand All @@ -764,7 +780,7 @@ func addProfileAttributes[T string | int64](profile *profiles.Profile,
Key: attr.key,
Value: &attributeValue,
})
attributeMap[attributeCompositeKey] = newIndex
attrMap[attributeCompositeKey] = newIndex
}

for i := range attributes {
Expand All @@ -775,9 +791,8 @@ func addProfileAttributes[T string | int64](profile *profiles.Profile,
}

// getDummyMappingIndex inserts or looks up an entry for interpreted FileIDs.
func getDummyMappingIndex(fileIDtoMapping map[libpf.FileID]uint64,
stringMap map[string]uint32, attributeMap map[string]uint64,
profile *profiles.Profile, fileID libpf.FileID) uint64 {
func getDummyMappingIndex(fileIDtoMapping fileIDMap, strMap stringMap,
attrMap attributeMap, profile *profiles.Profile, fileID libpf.FileID) uint64 {
if tmpMappingIndex, exists := fileIDtoMapping[fileID]; exists {
return tmpMappingIndex
}
Expand All @@ -786,10 +801,10 @@ func getDummyMappingIndex(fileIDtoMapping map[libpf.FileID]uint64,

mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{
{key: "process.executable.build_id.htlhash",
value: fileID.StringNoQuotes()}}, attributeMap)
value: fileID.StringNoQuotes()}}, attrMap)

profile.Mapping = append(profile.Mapping, &profiles.Mapping{
Filename: int64(getStringMapIndex(stringMap, "")),
Filename: int64(getStringMapIndex(strMap, "")),
Attributes: mappingAttributes,
})
return idx
Expand Down
8 changes: 4 additions & 4 deletions reporter/otlp_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestGetSampleAttributes(t *testing.T) {
tests := map[string]struct {
profile *profiles.Profile
k []traceAndMetaKey
attributeMap map[string]uint64
attributeMap attributeMap
expectedIndices [][]uint64
expectedAttributeTable []*common.KeyValue
}{
Expand All @@ -29,7 +29,7 @@ func TestGetSampleAttributes(t *testing.T) {
pid: 0,
},
},
attributeMap: make(map[string]uint64),
attributeMap: make(attributeMap),
expectedIndices: [][]uint64{{0}},
expectedAttributeTable: []*common.KeyValue{
{
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestGetSampleAttributes(t *testing.T) {
pid: 1234,
},
},
attributeMap: make(map[string]uint64),
attributeMap: make(attributeMap),
expectedIndices: [][]uint64{{0, 1, 2, 3}, {0, 1, 2, 3}},
expectedAttributeTable: []*common.KeyValue{
{
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGetSampleAttributes(t *testing.T) {
pid: 6789,
},
},
attributeMap: make(map[string]uint64),
attributeMap: make(attributeMap),
expectedIndices: [][]uint64{{0, 1, 2, 3}, {4, 5, 6, 7}},
expectedAttributeTable: []*common.KeyValue{
{
Expand Down