Skip to content

Commit

Permalink
improve the refactor
Browse files Browse the repository at this point in the history
package name -> filterrewrite
move tree traversal logic to new class
add documentation for important abstract methods
add sub class for composite aggregation bridge

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jul 17, 2024
1 parent 3a60a81 commit 094c25b
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.optimization.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.optimization.filterrewrite.OptimizationContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;

Expand All @@ -94,6 +94,7 @@
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.optimization.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from multiple aggregations
Expand Down Expand Up @@ -166,7 +167,7 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

optimizationContext = new OptimizationContext(new DateHistogramAggregatorBridge() {
optimizationContext = new OptimizationContext(new CompositeAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

Expand Down Expand Up @@ -217,14 +218,9 @@ protected int getSize() {
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
Expand Down Expand Up @@ -563,7 +559,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount);
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.optimization.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.filterrewrite.OptimizationContext;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -68,6 +68,8 @@
import java.util.function.Function;
import java.util.function.LongToIntFunction;

import static org.opensearch.search.optimization.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* An aggregator for date values that attempts to return a specific number of
* buckets, reconfiguring how it rounds dates to buckets on the fly as new
Expand Down Expand Up @@ -198,14 +200,10 @@ protected Prepared getRoundingPrepared() {
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> getBucketOrds().add(0, preparedRounding.round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
Expand Down Expand Up @@ -241,7 +239,7 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}

boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount);
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

final SortedNumericDocValues values = valuesSource.longValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.optimization.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.optimization.filterrewrite.OptimizationContext;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.search.optimization.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* An aggregator for date values. Every date is rounded down using a configured
* {@link Rounding}.
Expand Down Expand Up @@ -144,14 +146,10 @@ protected long[] processHardBounds(long[] bounds) {
}

@Override
protected Function<Object, Long> bucketOrdProducer() {
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, preparedRounding.round((long) key));
}

@Override
protected boolean segmentMatchAll(LeafReaderContext leaf) throws IOException {
return segmentMatchAll(context, leaf);
}
});
if (optimizationContext.canOptimize(parent, subAggregators.length, context)) {
optimizationContext.prepare();
Expand All @@ -172,7 +170,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}

boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount);
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.optimization.ranges.OptimizationContext;
import org.opensearch.search.optimization.ranges.RangeAggregatorBridge;
import org.opensearch.search.optimization.filterrewrite.OptimizationContext;
import org.opensearch.search.optimization.filterrewrite.RangeAggregatorBridge;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -284,12 +284,12 @@ public RangeAggregator(
optimizationContext = new OptimizationContext(new RangeAggregatorBridge() {
@Override
public boolean canOptimize() {
return canOptimize(config, RangeAggregator.this.ranges);
return canOptimize(config, ranges);
}

@Override
public void prepare() {
buildRanges(RangeAggregator.this.ranges);
buildRanges(ranges);
}

@Override
Expand All @@ -312,7 +312,7 @@ public ScoreMode scoreMode() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount);
boolean optimized = optimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, false);
if (optimized) throw new CollectionTerminatedException();

final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.optimization.filterrewrite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.function.BiConsumer;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
*
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
* business logic separate from the aggregator implementation.
*
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
* to provide data that is needed for doing the optimization
*
* @opensearch.internal
*/
public abstract class AggregatorBridge {

/**
* The optimization context associated with this aggregator bridge.
*/
OptimizationContext optimizationContext;

/**
* The field type associated with this aggregator bridge.
*/
MappedFieldType fieldType;

void setOptimizationContext(OptimizationContext context) {
this.optimizationContext = context;
}

/**
* Checks whether the aggregator can be optimized.
*
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
* The result will be saved in the optimization context.
*/
public abstract boolean canOptimize();

/**
* Prepares the optimization at shard level.
* For example, figure out what are the ranges from the aggregation to do the optimization later
*/
public abstract void prepare() throws IOException;

/**
* Prepares the optimization for a specific segment and ignore whatever built at shard level
*
* @param leaf the leaf reader context for the segment
*/
public abstract void prepareFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
*/
public abstract void tryOptimize(PointValues values, BiConsumer<Long, Long> incrementDocCount) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.optimization.filterrewrite;

import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;

/**
* For composite aggregation to do optimization when it only has a single date histogram source
*/
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.search.optimization.ranges;
package org.opensearch.search.optimization.filterrewrite;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
Expand All @@ -16,35 +16,22 @@
import org.opensearch.common.Rounding;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.OptionalLong;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.search.optimization.ranges.Helper.multiRangesTraverse;
import static org.opensearch.search.optimization.filterrewrite.TreeTraversal.multiRangesTraverse;

/**
* For date histogram aggregation
*/
public abstract class DateHistogramAggregatorBridge extends AggregatorBridge {

protected boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}

protected boolean canOptimize(ValuesSourceConfig config) {
if (config.script() == null && config.missing() == null) {
MappedFieldType fieldType = config.fieldType();
Expand All @@ -58,11 +45,6 @@ protected boolean canOptimize(ValuesSourceConfig config) {
return false;
}

protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

protected void buildRanges(SearchContext context) throws IOException {
long[] bounds = Helper.getDateHistoAggBounds(context, fieldType.name());
optimizationContext.setRanges(buildRanges(bounds));
Expand Down Expand Up @@ -165,7 +147,22 @@ private static long getBucketOrd(long bucketOrd) {
return bucketOrd;
}

protected boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException {
/**
* Provides a function to produce bucket ordinals from the lower bound of the range
*/
protected abstract Function<Long, Long> bucketOrdProducer();

/**
* Checks whether the top level query matches all documents on the segment
*
* <p>This method creates a weight from the search context's query and checks whether the weight's
* document count matches the total number of documents in the leaf reader context.
*
* @param ctx the search context
* @param leafCtx the leaf reader context for the segment
* @return {@code true} if the segment matches all documents, {@code false} otherwise
*/
public static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException {
Weight weight = ctx.query().rewrite(ctx.searcher()).createWeight(ctx.searcher(), ScoreMode.COMPLETE_NO_SCORES, 1f);
return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs();
}
Expand Down
Loading

0 comments on commit 094c25b

Please sign in to comment.