Skip to content

Commit

Permalink
feat(query): override aggregation function if consolidateBy is passed (
Browse files Browse the repository at this point in the history
  • Loading branch information
mchrome authored Apr 26, 2024
1 parent b21b957 commit 9ce7d6e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 14 deletions.
2 changes: 2 additions & 0 deletions render/data/ch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CHResponse struct {
Until int64
// if true, return points for all metrics, replacing empty results with list of NaN
AppendOutEmptySeries bool
AppliedFunctions map[string][]string
}

// CHResponses is a slice of CHResponse
Expand Down Expand Up @@ -142,6 +143,7 @@ func (c *CHResponse) ToMultiFetchResponseV3() (*v3pb.MultiFetchResponse, error)
XFilesFactor: 0,
HighPrecisionTimestamps: false,
Values: values,
AppliedFunctions: c.AppliedFunctions[a.Target],
RequestStartTime: c.From,
RequestStopTime: c.Until,
}
Expand Down
3 changes: 3 additions & 0 deletions render/data/multi_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func MFRToMultiTarget(v3Request *v3pb.MultiFetchRequest) MultiTarget {
} else {
multiTarget[tf] = NewTargetsOne(m.PathExpression, len(v3Request.Metrics), alias.New())
}
if len(m.FilterFunctions) > 0 {
multiTarget[tf].SetFilteringFunctions(m.PathExpression, m.FilterFunctions)
}
}
}
return multiTarget
Expand Down
16 changes: 16 additions & 0 deletions render/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/dry"
"github.com/lomik/graphite-clickhouse/pkg/reverse"
Expand Down Expand Up @@ -88,6 +89,7 @@ type conditions struct {
metricsRequested []string
metricsUnreverse []string
metricsLookup []string
appliedFunctions map[string][]string
}

func newQuery(cfg *config.Config, targets int) *query {
Expand Down Expand Up @@ -239,6 +241,7 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
From: cond.From,
Until: cond.Until,
AppendOutEmptySeries: cond.appendEmptySeries,
AppliedFunctions: cond.appliedFunctions,
})
return nil
}
Expand Down Expand Up @@ -279,13 +282,26 @@ func (c *conditions) prepareMetricsLists() {
func (c *conditions) prepareLookup() {
age := uint32(dry.Max(0, time.Now().Unix()-c.From))
c.aggregations = make(map[string][]string)
c.appliedFunctions = make(map[string][]string)
c.extDataBodies = make(map[string]*strings.Builder)
c.steps = make(map[uint32][]string)
aggName := ""

for i := range c.metricsRequested {
step, agg := c.rollupRules.Lookup(c.metricsLookup[i], age)

// Override agregation with an argument of consolidateBy function.
// consolidateBy with its argument is passed through FilteringFunctions field of carbonapi_v3_pb protocol.
// Currently it just finds the first target matching the metric
// to avoid making multiple request for every type of aggregation for a given metric.
for _, alias := range c.AM.Get(c.metricsUnreverse[i]) {
if requestedAgg := c.GetRequestedAggregation(alias.Target); requestedAgg != "" {
agg = rollup.AggrMap[requestedAgg]
c.appliedFunctions[alias.Target] = []string{graphiteConsolidationFunction}
break
}
}

if _, ok := c.steps[step]; !ok {
c.steps[step] = make([]string, 0)
}
Expand Down
51 changes: 51 additions & 0 deletions render/data/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
Expand Down Expand Up @@ -304,6 +305,56 @@ func TestPrepareLookup(t *testing.T) {
assert.Equal(t, steps, cond.steps)
assert.Equal(t, aggregations, cond.aggregations)
})

t.Run("non-reverse query with overriden aggregation", func(t *testing.T) {
cond := newCondition(5400, 1800, 5)

cond.aggregated = true
cond.isReverse = false
cond.prepareMetricsLists()
sort.Strings(cond.metricsLookup)
sort.Strings(cond.metricsRequested)
sort.Strings(cond.metricsUnreverse)
var aggregations map[string][]string
for _, aggrStr := range []string{"avg", "min", "max", "sum"} {
cond.SetFilteringFunctions(
"*.name.*",
[]*v3pb.FilteringFunction{{Name: "consolidateBy", Arguments: []string{aggrStr}}},
)
cond.prepareLookup()
aggregations = map[string][]string{
aggrStr: {"10_min.name.any", "1_min.name.avg", "5_min.name.min", "5_sec.name.max"},
}
assert.Equal(t, aggregations, cond.aggregations)
}

// Steps saves only values, not the metrics list
steps := map[uint32][]string{
30: {},
60: {},
300: {},
1200: {},
}
assert.Equal(t, steps, cond.steps)
bodies := make(map[string]string)
for a, m := range aggregations {
bodies[a] = strings.Join(m, "\n") + "\n"
}
assert.Equal(t, bodies, extTableString(cond.extDataBodies))

cond.From = ageToTimestamp(1800)
cond.Until = ageToTimestamp(0)
cond.prepareLookup()
steps = map[uint32][]string{
30: {},
60: {},
300: {},
5: {},
}
assert.Equal(t, steps, cond.steps)
assert.Equal(t, aggregations, cond.aggregations)
assert.Equal(t, bodies, extTableString(cond.extDataBodies))
})
}

func TestSetStep(t *testing.T) {
Expand Down
56 changes: 42 additions & 14 deletions render/data/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"fmt"
"time"

v3pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/alias"
)

const graphiteConsolidationFunction = "consolidateBy"

type FilteringFunctionsByTarget map[string][]*v3pb.FilteringFunction
type Cache struct {
Cached bool
TS int64 // cached timestamp
Expand All @@ -26,37 +30,46 @@ type Targets struct {
Cache []Cache
Cached bool // all is cached
// AM stores found expanded metrics
AM *alias.Map
pointsTable string
isReverse bool
rollupRules *rollup.Rules
rollupUseReverted bool
queryMetrics *metrics.QueryMetrics
AM *alias.Map
filteringFunctionsByTarget FilteringFunctionsByTarget
pointsTable string
isReverse bool
rollupRules *rollup.Rules
rollupUseReverted bool
queryMetrics *metrics.QueryMetrics
}

func NewTargets(list []string, am *alias.Map) *Targets {
return &Targets{
List: list,
Cache: make([]Cache, len(list)),
AM: am,
targets := &Targets{
List: list,
Cache: make([]Cache, len(list)),
AM: am,
filteringFunctionsByTarget: make(FilteringFunctionsByTarget),
}
return targets
}

func NewTargetsOne(target string, capacity int, am *alias.Map) *Targets {
list := make([]string, 1, capacity)
list[0] = target
return &Targets{
List: list,
Cache: make([]Cache, 1, capacity),
AM: am,
targets := &Targets{
List: list,
Cache: make([]Cache, len(list)),
AM: am,
filteringFunctionsByTarget: make(FilteringFunctionsByTarget),
}
return targets
}

func (tt *Targets) Append(target string) {
tt.List = append(tt.List, target)
tt.Cache = append(tt.Cache, Cache{})
}

func (tt *Targets) SetFilteringFunctions(target string, filteringFunctions []*v3pb.FilteringFunction) {
tt.filteringFunctionsByTarget[target] = filteringFunctions
}

func (tt *Targets) selectDataTable(cfg *config.Config, tf *TimeFrame, context string) error {
now := time.Now().Unix()

Expand Down Expand Up @@ -117,3 +130,18 @@ TableLoop:

return fmt.Errorf("data tables is not specified for %v", tt.List[0])
}

func (tt *Targets) GetRequestedAggregation(target string) string {
if ffs, ok := tt.filteringFunctionsByTarget[target]; !ok {
return ""
} else {
for _, filteringFunc := range ffs {
ffName := filteringFunc.GetName()
ffArgs := filteringFunc.GetArguments()
if ffName == graphiteConsolidationFunction && len(ffArgs) > 0 {
return ffArgs[0]
}
}
}
return ""
}

0 comments on commit 9ce7d6e

Please sign in to comment.