diff --git a/be/src/exprs/agg/percentile_approx.h b/be/src/exprs/agg/percentile_approx.h index 52c03e6edc5b93..0176175bc695d6 100644 --- a/be/src/exprs/agg/percentile_approx.h +++ b/be/src/exprs/agg/percentile_approx.h @@ -27,6 +27,7 @@ namespace starrocks { struct PercentileApproxState { public: PercentileApproxState() : percentile(new PercentileValue()) {} + explicit PercentileApproxState(double compression) : percentile(new PercentileValue(compression)) {} ~PercentileApproxState() = default; int64_t mem_usage() const { return percentile->mem_usage(); } @@ -34,11 +35,35 @@ struct PercentileApproxState { std::unique_ptr percentile; double targetQuantile = -1.0; bool is_null = true; + bool is_init = false; }; class PercentileApproxAggregateFunction final : public AggregateFunctionBatchHelper { +private: + static constexpr double DEFAULT_COMPRESSION_FACTOR = 10000; + public: + double get_compression_factor(FunctionContext* ctx) const { + double compression = DEFAULT_COMPRESSION_FACTOR; + if (ctx->get_num_args() > 2) { + compression = ColumnHelper::get_const_value(ctx->get_constant_column(2)); + if (compression < 2048 || compression > 10000) { + compression = DEFAULT_COMPRESSION_FACTOR; + } + } + return compression; + } + + void init_state_if_necessary(FunctionContext* ctx, AggDataPtr __restrict state) const { + if (this->data(state).is_init) { + return; + } + this->data(state).is_init = true; + const double compression = get_compression_factor(ctx); + this->data(state).percentile->resetWithCompression(compression); + } + void update(FunctionContext* ctx, const Column** columns, AggDataPtr state, size_t row_num) const override { double column_value; if (columns[0]->is_nullable()) { @@ -57,6 +82,7 @@ class PercentileApproxAggregateFunction final DCHECK(!columns[1]->is_null(0)); + init_state_if_necessary(ctx, state); int64_t prev_memory = data(state).percentile->mem_usage(); data(state).percentile->add(implicit_cast(column_value)); data(state).targetQuantile = columns[1]->get(0).get_double(); @@ -79,9 +105,11 @@ class PercentileApproxAggregateFunction final double quantile; memcpy(&quantile, src.data, sizeof(double)); - PercentileApproxState src_percentile; + PercentileApproxState src_percentile(this->get_compression_factor(ctx)); src_percentile.targetQuantile = quantile; src_percentile.percentile->deserialize((char*)src.data + sizeof(double)); + + init_state_if_necessary(ctx, state); int64_t prev_memory = data(state).percentile->mem_usage(); data(state).percentile->merge(src_percentile.percentile.get()); data(state).targetQuantile = quantile; diff --git a/be/src/util/percentile_value.h b/be/src/util/percentile_value.h index 3e94c92d3778d8..02bbec75f32c43 100644 --- a/be/src/util/percentile_value.h +++ b/be/src/util/percentile_value.h @@ -21,6 +21,10 @@ class PercentileValue { public: PercentileValue() { _type = TDIGEST; } + explicit PercentileValue(double compression) : _tdigest(compression) { + _type = TDIGEST; + } + explicit PercentileValue(const Slice& src) { switch (*src.data) { case PercentileDataType::TDIGEST: @@ -32,6 +36,10 @@ class PercentileValue { _tdigest.deserialize(src.data + 1); } + void resetWithCompression(double compression) { + this->_tdigest = TDigest(compression); + } + void add(float value) { _tdigest.add(value); } void merge(const PercentileValue* other) { _tdigest.merge(&other->_tdigest); } diff --git a/be/src/util/tdigest.h b/be/src/util/tdigest.h index 4ebdb5f18e1cfc..ee4e30b4b12490 100644 --- a/be/src/util/tdigest.h +++ b/be/src/util/tdigest.h @@ -124,7 +124,7 @@ class TDigest { using TDigestQueue = std::priority_queue, TDigestComparator>; public: - TDigest() : TDigest(1000) {} + TDigest() : TDigest(10000) {} explicit TDigest(Value compression) : TDigest(compression, 0) {} explicit TDigest(const char* src) { this->deserialize(src); } explicit TDigest(const Slice& src) { this->deserialize(src.data); }