Skip to content

Commit

Permalink
[Star Tree] [Search] Keyword & Numeric Terms Aggregation (#17165)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 authored Mar 4, 2025
1 parent 21f69ca commit 218f353
Show file tree
Hide file tree
Showing 12 changed files with 986 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
*/
public final void collectStarTreeBucket(StarTreeBucketCollector collector, long docCount, long bucketOrd, int entryBit)
throws IOException {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
} else {
grow(bucketOrd + 1);
}

if (docCounts.increment(bucketOrd, docCount) == docCount) {
multiBucketConsumer.accept(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,14 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
Expand Down Expand Up @@ -192,9 +188,9 @@ public ScoreMode scoreMode() {
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
return true;
}
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
}
Expand Down Expand Up @@ -268,6 +264,10 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
) throws IOException {
assert parentCollector == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(starTreeDateDimension);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
Expand All @@ -287,17 +287,6 @@ public void setSubCollectors() throws IOException {
}
}

SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(starTreeDateDimension);

String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
"_doc_count",
MetricStat.DOC_COUNT.getTypeName()
);
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (!valuesIterator.advanceExact(starTreeEntry)) {
Expand All @@ -311,15 +300,8 @@ public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();

long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
Expand Down Expand Up @@ -393,20 +375,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
return 1.0;
}
}

private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, starTree, null);
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();

int numBits = matchingDocsBitSet.length();

if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
Expand All @@ -63,14 +67,20 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -85,18 +95,19 @@
*
* @opensearch.internal
*/
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator implements StarTreePreComputeCollector {
protected final ResultStrategy<?, ?, ?> resultStrategy;
protected final ValuesSource.Bytes.WithOrdinals valuesSource;

private final LongPredicate acceptedGlobalOrdinals;
private final long valueCount;
private final String fieldName;
protected final String fieldName;
private Weight weight;
protected final CollectionStrategy collectionStrategy;
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
LongUnaryOperator globalOperator;

/**
* Lookup global ordinals
Expand Down Expand Up @@ -219,6 +230,9 @@ boolean tryCollectFromTermFrequencies(LeafReaderContext ctx, SortedSetDocValues
@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
if (tryStarTreePrecompute(ctx) == true) {
return true;
}
if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
&& subAggregators.length == 0) {
Expand All @@ -231,6 +245,17 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
return false;
}

protected boolean tryStarTreePrecompute(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
globalOperator = valuesSource.globalOrdinalsMapping(ctx);
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return false;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
Expand Down Expand Up @@ -307,6 +332,56 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedSetStarTreeValuesIterator valuesIterator = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(
fieldName
);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;
}
for (int i = 0, count = valuesIterator.docValueCount(); i < count; i++) {
long dimensionValue = valuesIterator.value();
long ord = globalOperator.applyAsLong(dimensionValue);

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();
long bucketOrd = collectionStrategy.globalOrdToBucketOrd(0, ord);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
};
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down Expand Up @@ -444,7 +519,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
);
}
return false;
return tryStarTreePrecompute(ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.LongArray;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.fielddata.FieldData;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -52,6 +56,8 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
Expand All @@ -60,6 +66,9 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.math.BigInteger;
Expand All @@ -79,11 +88,12 @@
*
* @opensearch.internal
*/
public class NumericTermsAggregator extends TermsAggregator {
public class NumericTermsAggregator extends TermsAggregator implements StarTreePreComputeCollector {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource.Numeric valuesSource;
private final LongKeyedBucketOrds bucketOrds;
private final LongFilter longFilter;
private final String fieldName;

public NumericTermsAggregator(
String name,
Expand All @@ -105,6 +115,9 @@ public NumericTermsAggregator(
this.valuesSource = valuesSource;
this.longFilter = longFilter;
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
this.fieldName = (this.valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
}

@Override
Expand Down Expand Up @@ -146,6 +159,73 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return false;
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(fieldName);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;
}
long dimensionValue = valuesIterator.nextValue();
// Only numeric & floating points are supported as of now in star-tree
// TODO: Add support for isBigInteger() when it gets supported in star-tree
if (valuesSource.isFloatingPoint()) {
double doubleValue = ((NumberFieldMapper.NumberFieldType) context.mapperService().fieldType(fieldName)).toDoubleValue(
dimensionValue
);
dimensionValue = NumericUtils.doubleToSortableLong(doubleValue);
}

for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
};
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down
Loading

0 comments on commit 218f353

Please sign in to comment.