Skip to content

Commit

Permalink
refactor implementation
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT committed Jan 10, 2025
1 parent c9d37d6 commit d443535
Showing 1 changed file with 46 additions and 31 deletions.
77 changes: 46 additions & 31 deletions pkg/ottl/ottlfuncs/func_flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type FlattenArguments[K any] struct {
ResolveConflicts ottl.Optional[bool]
}

type flattenData struct {
result pcommon.Map
existingKeys map[string]int
resolveConflict bool
maxDepth int64
}

func NewFlattenFactory[K any]() ottl.Factory[K] {
return ottl.NewFactory("flatten", &FlattenArguments[K]{}, createFlattenFunction[K])
}
Expand Down Expand Up @@ -49,9 +56,9 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O
prefix = p.Get()
}

conflict := false
resolveConflict := false
if !c.IsEmpty() {
conflict = c.Get()
resolveConflict = c.Get()
}

return func(ctx context.Context, tCtx K) (any, error) {
Expand All @@ -60,68 +67,76 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O
return nil, err
}

result := pcommon.NewMap()
existingKeys := map[string]int{}
flattenMap(m, result, prefix, 0, depth, conflict, existingKeys)
result.MoveTo(m)
flattenData := initFlattenData(resolveConflict, depth)
flattenData.flattenMap(m, prefix, 0)
flattenData.result.MoveTo(m)

return nil, nil
}, nil
}

func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64, conflict bool, existingKeys map[string]int) {
func initFlattenData(resolveConflict bool, maxDepth int64) *flattenData {
return &flattenData{
result: pcommon.NewMap(),
existingKeys: map[string]int{},
resolveConflict: resolveConflict,
maxDepth: maxDepth,
}
}

func (f *flattenData) flattenMap(m pcommon.Map, prefix string, currentDepth int64) {
if len(prefix) > 0 {
prefix += "."
}
m.Range(func(k string, v pcommon.Value) bool {
return flattenValue(k, v, currentDepth, maxDepth, result, prefix, conflict, existingKeys)
return f.flattenValue(k, v, currentDepth, prefix)
})
}

func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64, conflict bool, existingKeys map[string]int) {
func (f *flattenData) flattenSlice(s pcommon.Slice, prefix string, currentDepth int64) {
for i := 0; i < s.Len(); i++ {
flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix, conflict, existingKeys)
f.flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, prefix)
}
}

func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string, conflict bool, existingKeys map[string]int) bool {
func (f *flattenData) flattenValue(k string, v pcommon.Value, currentDepth int64, prefix string) bool {
switch {
case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth:
flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth, conflict, existingKeys)
case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth:
case v.Type() == pcommon.ValueTypeMap && currentDepth < f.maxDepth:
f.flattenMap(v.Map(), prefix+k, currentDepth+1)
case v.Type() == pcommon.ValueTypeSlice && currentDepth < f.maxDepth:
for i := 0; i < v.Slice().Len(); i++ {
switch {
case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth:
flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys)
case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth:
flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys)
case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < f.maxDepth:
f.flattenMap(v.Slice().At(i).Map(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2)
case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < f.maxDepth:
f.flattenSlice(v.Slice().At(i).Slice(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2)
default:
key := prefix + k
if conflict {
handleConflict(existingKeys, key, v.Slice().At(i), &result)
if f.resolveConflict {
f.handleConflict(key, v.Slice().At(i))
} else {
v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", key, i)))
v.Slice().At(i).CopyTo(f.result.PutEmpty(fmt.Sprintf("%v.%v", key, i)))
}
}
}
default:
key := prefix + k
if conflict {
handleConflict(existingKeys, key, v, &result)
if f.resolveConflict {
f.handleConflict(key, v)
} else {
v.CopyTo(result.PutEmpty(key))
v.CopyTo(f.result.PutEmpty(key))
}
}
return true
}

func handleConflict(existingKeys map[string]int, key string, v pcommon.Value, result *pcommon.Map) {
if _, exists := result.Get(key); exists {
newKey := key + "." + strconv.Itoa(existingKeys[key])
existingKeys[key]++
v.CopyTo(result.PutEmpty(newKey))
func (f *flattenData) handleConflict(key string, v pcommon.Value) {
if _, exists := f.result.Get(key); exists {
newKey := key + "." + strconv.Itoa(f.existingKeys[key])
f.existingKeys[key]++
v.CopyTo(f.result.PutEmpty(newKey))
} else {
existingKeys[key] = 0
v.CopyTo(result.PutEmpty(key))
f.existingKeys[key] = 0
v.CopyTo(f.result.PutEmpty(key))
}
}

0 comments on commit d443535

Please sign in to comment.