From c35ba2ea0803771d676db4a42e674285a1666e06 Mon Sep 17 00:00:00 2001 From: "Maxim.Kolmakov" Date: Fri, 17 Nov 2023 22:39:46 +0100 Subject: [PATCH] Make analysis of branch results concurrent --- pkg/server/clickhouse.go | 76 ++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/pkg/server/clickhouse.go b/pkg/server/clickhouse.go index 0ecdbd83..1069388e 100644 --- a/pkg/server/clickhouse.go +++ b/pkg/server/clickhouse.go @@ -10,6 +10,7 @@ import ( "github.com/valyala/bytebufferpool" "net/http" "strings" + "sync" ) func (t *StatsServer) openDatabaseConnection() (driver.Conn, error) { @@ -41,6 +42,12 @@ func toJSONBuffer(data interface{}) (*bytebufferpool.ByteBuffer, error) { return buffer, nil } +type responseItem struct { + Project string + MeasureName string + Median float64 +} + func (t *StatsServer) getBranchComparison(request *http.Request) (*bytebufferpool.ByteBuffer, bool, error) { type requestParams struct { @@ -86,32 +93,55 @@ func (t *StatsServer) getBranchComparison(request *http.Request) (*bytebufferpoo return nil, false, err } - type responseItem struct { - Project string - MeasureName string - Median float64 - } + response := getMedianValues(queryResults) + buffer, err := toJSONBuffer(response) + return buffer, true, err +} - response := make([]responseItem, len(queryResults)) - for i, result := range queryResults { - indexes := degradation_detector.GetChangePointIndexes(result.MeasureValues, 1) - var valuesAfterLastChangePoint []int - if len(indexes) == 0 { - valuesAfterLastChangePoint = result.MeasureValues - } else { - lastIndex := indexes[len(indexes)-1] - valuesAfterLastChangePoint = result.MeasureValues[lastIndex:] - } - median := degradation_detector.CalculateMedian(valuesAfterLastChangePoint) - response[i] = responseItem{ - Project: result.Project, - MeasureName: result.MeasureName, - Median: median, - } +func getMedianValues(queryResults []struct { + Project string + MeasureName string + MeasureValues []int +}) []responseItem { + + responseChan := make(chan responseItem, len(queryResults)) + var wg sync.WaitGroup + for _, result := range queryResults { + wg.Add(1) + go func(result struct { + Project string + MeasureName string + MeasureValues []int + }) { + defer wg.Done() + indexes := degradation_detector.GetChangePointIndexes(result.MeasureValues, 1) + var valuesAfterLastChangePoint []int + if len(indexes) == 0 { + valuesAfterLastChangePoint = result.MeasureValues + } else { + lastIndex := indexes[len(indexes)-1] + valuesAfterLastChangePoint = result.MeasureValues[lastIndex:] + } + median := degradation_detector.CalculateMedian(valuesAfterLastChangePoint) + + responseChan <- responseItem{ + Project: result.Project, + MeasureName: result.MeasureName, + Median: median, + } + }(result) } - buffer, err := toJSONBuffer(response) - return buffer, true, err + go func() { + wg.Wait() + close(responseChan) + }() + + response := make([]responseItem, 0, len(queryResults)) + for item := range responseChan { + response = append(response, item) + } + return response } func (t *StatsServer) getDistinctHighlightingPasses(request *http.Request) (*bytebufferpool.ByteBuffer, bool, error) {