Skip to content

Commit

Permalink
support compression factor for percentile_approx
Browse files Browse the repository at this point in the history
  • Loading branch information
INNOCENT-BOY committed Dec 19, 2024
1 parent 43f125e commit c85d003
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
30 changes: 29 additions & 1 deletion be/src/exprs/agg/percentile_approx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,43 @@ namespace starrocks {
struct PercentileApproxState {
public:
PercentileApproxState() : percentile(new PercentileValue()) {}
explicit PercentileApproxState(double compression) : percentile(new PercentileValue(compression)) {}
~PercentileApproxState() = default;

int64_t mem_usage() const { return percentile->mem_usage(); }

std::unique_ptr<PercentileValue> percentile;
double targetQuantile = -1.0;
bool is_null = true;
bool is_init = false;
};

class PercentileApproxAggregateFunction final
: public AggregateFunctionBatchHelper<PercentileApproxState, PercentileApproxAggregateFunction> {
private:
static constexpr double DEFAULT_COMPRESSION_FACTOR = 10000;

public:
double get_compression_factor(FunctionContext* ctx) const {
double compression = DEFAULT_COMPRESSION_FACTOR;
if (ctx->get_num_args() > 2) {
compression = ColumnHelper::get_const_value<TYPE_DOUBLE>(ctx->get_constant_column(2));
if (compression < 2048 || compression > 10000) {
compression = DEFAULT_COMPRESSION_FACTOR;
}
}
return compression;
}

void init_state_if_necessary(FunctionContext* ctx, AggDataPtr __restrict state) const {
if (this->data(state).is_init) {
return;
}
this->data(state).is_init = true;
const double compression = get_compression_factor(ctx);
this->data(state).percentile->resetWithCompression(compression);
}

void update(FunctionContext* ctx, const Column** columns, AggDataPtr state, size_t row_num) const override {
double column_value;
if (columns[0]->is_nullable()) {
Expand All @@ -57,6 +82,7 @@ class PercentileApproxAggregateFunction final

DCHECK(!columns[1]->is_null(0));

init_state_if_necessary(ctx, state);
int64_t prev_memory = data(state).percentile->mem_usage();
data(state).percentile->add(implicit_cast<float>(column_value));
data(state).targetQuantile = columns[1]->get(0).get_double();
Expand All @@ -79,9 +105,11 @@ class PercentileApproxAggregateFunction final
double quantile;
memcpy(&quantile, src.data, sizeof(double));

PercentileApproxState src_percentile;
PercentileApproxState src_percentile(this->get_compression_factor(ctx));
src_percentile.targetQuantile = quantile;
src_percentile.percentile->deserialize((char*)src.data + sizeof(double));

init_state_if_necessary(ctx, state);
int64_t prev_memory = data(state).percentile->mem_usage();
data(state).percentile->merge(src_percentile.percentile.get());
data(state).targetQuantile = quantile;
Expand Down
8 changes: 8 additions & 0 deletions be/src/util/percentile_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class PercentileValue {
public:
PercentileValue() { _type = TDIGEST; }

explicit PercentileValue(double compression) : _tdigest(compression) {
_type = TDIGEST;
}

explicit PercentileValue(const Slice& src) {
switch (*src.data) {
case PercentileDataType::TDIGEST:
Expand All @@ -32,6 +36,10 @@ class PercentileValue {
_tdigest.deserialize(src.data + 1);
}

void resetWithCompression(double compression) {
this->_tdigest = TDigest(compression);
}

void add(float value) { _tdigest.add(value); }

void merge(const PercentileValue* other) { _tdigest.merge(&other->_tdigest); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class TDigest {
using TDigestQueue = std::priority_queue<const TDigest*, std::vector<const TDigest*>, TDigestComparator>;

public:
TDigest() : TDigest(1000) {}
TDigest() : TDigest(10000) {}
explicit TDigest(Value compression) : TDigest(compression, 0) {}
explicit TDigest(const char* src) { this->deserialize(src); }
explicit TDigest(const Slice& src) { this->deserialize(src.data); }
Expand Down

0 comments on commit c85d003

Please sign in to comment.