Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Support compression factor for percentile_approx #54115

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion be/src/exprs/agg/percentile_approx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace starrocks {
struct PercentileApproxState {
public:
PercentileApproxState() : percentile(new PercentileValue()) {}
explicit PercentileApproxState(double compression) : percentile(new PercentileValue(compression)) {}
~PercentileApproxState() = default;

int64_t mem_usage() const { return percentile->mem_usage(); }
Expand All @@ -38,7 +39,30 @@ struct PercentileApproxState {

class PercentileApproxAggregateFunction final
: public AggregateFunctionBatchHelper<PercentileApproxState, PercentileApproxAggregateFunction> {
private:
static constexpr double MIN_COMPRESSION = 2048.0;
static constexpr double MAX_COMPRESSION = 10000.0;
static constexpr double DEFAULT_COMPRESSION_FACTOR = 10000.0;

public:
void create(FunctionContext* ctx, AggDataPtr __restrict ptr) const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a default compression if ctx is null because agg_state in common aggregate state it may be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

double compression = (ctx == nullptr) ? DEFAULT_COMPRESSION_FACTOR : get_compression_factor(ctx);
new (ptr) PercentileApproxState(compression);
}

double get_compression_factor(FunctionContext* ctx) const {
double compression = DEFAULT_COMPRESSION_FACTOR;
if (ctx->get_num_args() > 2) {
compression = ColumnHelper::get_const_value<TYPE_DOUBLE>(ctx->get_constant_column(2));
if (compression < MIN_COMPRESSION || compression > MAX_COMPRESSION) {
LOG(WARNING) << "Compression factor out of range. Using default compression factor: "
<< DEFAULT_COMPRESSION_FACTOR;
compression = DEFAULT_COMPRESSION_FACTOR;
}
}
return compression;
}

void update(FunctionContext* ctx, const Column** columns, AggDataPtr state, size_t row_num) const override {
double column_value;
if (columns[0]->is_nullable()) {
Expand Down Expand Up @@ -79,9 +103,10 @@ class PercentileApproxAggregateFunction final
double quantile;
memcpy(&quantile, src.data, sizeof(double));

PercentileApproxState src_percentile;
PercentileApproxState src_percentile(get_compression_factor(ctx));
src_percentile.targetQuantile = quantile;
src_percentile.percentile->deserialize((char*)src.data + sizeof(double));

int64_t prev_memory = data(state).percentile->mem_usage();
data(state).percentile->merge(src_percentile.percentile.get());
data(state).targetQuantile = quantile;
INNOCENT-BOY marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/percentile_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class PercentileValue {
public:
PercentileValue() { _type = TDIGEST; }

explicit PercentileValue(double compression) : _tdigest(compression) { _type = TDIGEST; }

explicit PercentileValue(const Slice& src) {
switch (*src.data) {
case PercentileDataType::TDIGEST:
Expand Down
24 changes: 20 additions & 4 deletions test/sql/test_agg_function/R/test_percentile_approx
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,37 @@ BUCKETS 1
PROPERTIES ("replication_num" = "1");
-- result:
-- !result
insert into t1 select generate_series, generate_series from table(generate_series(1, 1000));
insert into t1 select generate_series, generate_series from table(generate_series(1, 50000, 3));
-- result:
-- !result
set pipeline_dop=1;
-- result:
-- !result
select percentile_approx(c2, 0.5) from t1;
-- result:
500.5
25000.0
-- !result
select percentile_approx(c2, 0.9) from t1;
-- result:
45000.3984375
-- !result
select percentile_approx(c2, 0.9, 2048) from t1;
-- result:
45000.40234375
-- !result
select percentile_approx(c2, 0.9, 5000) from t1;
-- result:
45000.40234375
-- !result
select percentile_approx(c2, 0.9, 10000) from t1;
-- result:
45000.3984375
-- !result
with tt as (select @v1 as v1, c1, c2 from t1) select /*+ set_user_variable(@v1 = 0.5) */ percentile_approx(c2, v1) from tt;
-- result:
500.5
25000.0
-- !result
with tt as (select @v1 as v1, @v2 as v2, c1, c2 from t1) select /*+ set_user_variable(@v1= 0.5, @v2 = 4096) */ percentile_approx(c2, v1, v2 + 1) from tt;
-- result:
500.5
25000.0
-- !result
7 changes: 6 additions & 1 deletion test/sql/test_agg_function/T/test_percentile_approx
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ DUPLICATE KEY(c1)
DISTRIBUTED BY HASH(c1)
BUCKETS 1
PROPERTIES ("replication_num" = "1");
insert into t1 select generate_series, generate_series from table(generate_series(1, 1000));
insert into t1 select generate_series, generate_series from table(generate_series(1, 50000, 3));
set pipeline_dop=1;

select percentile_approx(c2, 0.5) from t1;
select percentile_approx(c2, 0.9) from t1;
select percentile_approx(c2, 0.9, 2048) from t1;
select percentile_approx(c2, 0.9, 5000) from t1;
select percentile_approx(c2, 0.9, 10000) from t1;

with tt as (select @v1 as v1, c1, c2 from t1) select /*+ set_user_variable(@v1 = 0.5) */ percentile_approx(c2, v1) from tt;
with tt as (select @v1 as v1, @v2 as v2, c1, c2 from t1) select /*+ set_user_variable(@v1= 0.5, @v2 = 4096) */ percentile_approx(c2, v1, v2 + 1) from tt;
Loading