Skip to content

Commit

Permalink
understand the bucket ordinal
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 31, 2025
1 parent 484d732 commit 2aaae33
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.search.DocIdSetIterator;

import java.io.IOException;
import java.util.Arrays;

/**
* A composite view of multiple DocIdSetIterators from single segment
*/
public class CompositeDocIdSetIterator extends DocIdSetIterator {
private final DocIdSetIterator[] iterators;
private final int[] currentDocs; // Current docId for each iterator
private final boolean[] exhausted; // Track if each iterator is exhausted
private int currentDoc = -1; // Current doc for this composite iterator
private final int numIterators;

/**
* Creates a composite view of multiple DocIdSetIterators
* @param ordinalToIterator Mapping of bucket ordinal to its DocIdSetIterator
* @param maxOrdinal The maximum bucket ordinal (exclusive)
*/
public CompositeDocIdSetIterator(DocIdSetIterator[] ordinalToIterator, int maxOrdinal) {
this.iterators = Arrays.copyOf(ordinalToIterator, maxOrdinal);
this.numIterators = maxOrdinal;
this.currentDocs = new int[maxOrdinal];
this.exhausted = new boolean[maxOrdinal];

// Initialize currentDocs array to -1 for all iterators
Arrays.fill(currentDocs, -1);
}

@Override
public int docID() {
return currentDoc;
}

@Override
public int nextDoc() throws IOException {
return advance(currentDoc + 1);
}

@Override
public int advance(int target) throws IOException {
if (target == NO_MORE_DOCS) {
currentDoc = NO_MORE_DOCS;
return NO_MORE_DOCS;
}

int minDoc = NO_MORE_DOCS;

// Advance all iterators that are behind target
for (int i = 0; i < numIterators; i++) {
if (iterators[i] == null) {
exhausted[i] = true;
continue;
}

if (!exhausted[i] && currentDocs[i] < target) {
int doc = iterators[i].advance(target);
if (doc == NO_MORE_DOCS) {
exhausted[i] = true;
} else {
currentDocs[i] = doc;
minDoc = Math.min(minDoc, doc);
}
} else if (!exhausted[i]) {
minDoc = Math.min(minDoc, currentDocs[i]);
}
}

currentDoc = minDoc;
return currentDoc;
}

@Override
public long cost() {
long maxCost = 0;
for (DocIdSetIterator iterator : iterators) {
if (iterator != null) {
maxCost = Math.max(maxCost, iterator.cost());
}
}
return maxCost;
}

/**
* Checks if a specific bucket matches the current document
* @param ordinal The bucket ordinal to check
* @return true if the bucket matches the current document
*/
public boolean matches(int ordinal) {
if (ordinal >= numIterators || currentDoc == NO_MORE_DOCS) {
return false;
}
return !exhausted[ordinal] && currentDocs[ordinal] == currentDoc;
}

/**
* Gets a bit set representing all buckets that match the current document
* @return A long where each bit position represents whether the corresponding bucket matches
*/
public long getMatchingBuckets() {
if (currentDoc == NO_MORE_DOCS || numIterators > 64) {
return 0L;
}

long result = 0L;
for (int i = 0; i < numIterators; i++) {
if (matches(i)) {
result |= 1L << i;
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
incrementDocCount.accept(bucketOrd, (long) docCount);
};

Function<Integer, Long> getBucketOrd = (activeIndex) -> {
long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0);
rangeStart = fieldType.convertNanosToMillis(rangeStart);
return getBucketOrd(bucketOrdProducer().apply(rangeStart));
};

Supplier<DocIdSetBuilder> disBuilderSupplier = () -> {
try {
logger.debug("create DocIdSetBuilder of max doc {}", maxDoc);
Expand All @@ -155,7 +161,7 @@ final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
}
};

return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size, disBuilderSupplier);
return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size, disBuilderSupplier, getBucketOrd);
}

private static long getBucketOrd(long bucketOrd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Lon
logger.debug("Fast filter optimization applied to shard {} segment {}", shardId, leafCtx.ord);
logger.debug("Crossed leaf nodes: {}, inner nodes: {}", leafNodeVisited, innerNodeVisited);

// TODO refactor the tryOptimize to return a Result object which not only contains DebugInfo
// but also the DocIdSetIterator for sub aggregation
// At least 2 ways to do Iterating
// 1. List of Iterators per ranges
// 2. Composite iterator

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.opensearch.common.CheckedRunnable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
Expand Down Expand Up @@ -51,7 +54,8 @@ static FilterRewriteOptimizationContext.DebugInfo multiRangesTraverse(
final Ranges ranges,
final BiConsumer<Integer, Integer> incrementDocCount,
final int maxNumNonZeroRanges,
Supplier<DocIdSetBuilder> disBuilderSupplier
Supplier<DocIdSetBuilder> disBuilderSupplier,
Function<Integer, Long> getBucketOrd
) throws IOException {
FilterRewriteOptimizationContext.DebugInfo debugInfo = new FilterRewriteOptimizationContext.DebugInfo();
int activeIndex = ranges.firstRangeIndex(tree.getMinPackedValue(), tree.getMaxPackedValue());
Expand All @@ -64,8 +68,11 @@ static FilterRewriteOptimizationContext.DebugInfo multiRangesTraverse(
maxNumNonZeroRanges,
ranges,
activeIndex,
disBuilderSupplier
disBuilderSupplier,
getBucketOrd
);
// tree collector should have a function to help with recording bucket ordinal to disi
// At the end of a range, it should be able to calculate the bucket ordinal, and record the disi

PointValues.IntersectVisitor visitor = getIntersectVisitor(collector);
try {
Expand All @@ -91,6 +98,9 @@ static FilterRewriteOptimizationContext.DebugInfo multiRangesTraverse(
}
logger.debug("total count of documents from docIdSetBuilder: {}", totalCount);

Map<Long, DocIdSetBuilder> map = collector.bucketOrdinalToDocIdSetBuilder;
logger.debug("keys of bucketOrdinalToDocIdSetBuilder: {}", map.keySet());

return debugInfo;
}

Expand Down Expand Up @@ -207,6 +217,8 @@ private static class RangeCollectorForPointTree {
private final DocIdSetBuilder[] docIdSetBuilders;
private final Supplier<DocIdSetBuilder> disBuilderSupplier;
private DocIdSetBuilder.BulkAdder currentAdder;
private Function<Integer, Long> getBucketOrd;
private Map<Long, DocIdSetBuilder> bucketOrdinalToDocIdSetBuilder = new HashMap<>();

private int visitedRange = 0;
private final int maxNumNonZeroRange;
Expand All @@ -216,14 +228,16 @@ public RangeCollectorForPointTree(
int maxNumNonZeroRange,
Ranges ranges,
int activeIndex,
Supplier<DocIdSetBuilder> disBuilderSupplier
Supplier<DocIdSetBuilder> disBuilderSupplier,
Function<Integer, Long> getBucketOrd
) {
this.incrementRangeDocCount = incrementRangeDocCount;
this.maxNumNonZeroRange = maxNumNonZeroRange;
this.ranges = ranges;
this.activeIndex = activeIndex;
this.docIdSetBuilders = new DocIdSetBuilder[ranges.size];
this.disBuilderSupplier = disBuilderSupplier;
this.getBucketOrd = getBucketOrd;
}

private void count() {
Expand Down Expand Up @@ -267,6 +281,12 @@ private void finalizePreviousRange() {
incrementRangeDocCount.accept(activeIndex, counter);
counter = 0;
}

long bucketOrd = getBucketOrd.apply(activeIndex);
if (docIdSetBuilders[activeIndex] != null) {
logger.trace("finalize docIdSetBuilder[{}] with bucket ordinal {}", activeIndex, bucketOrd);
bucketOrdinalToDocIdSetBuilder.put(bucketOrd, docIdSetBuilders[activeIndex]);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
incrementDocCount.accept(bucketOrd, (long) docCount);
};

return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size, null);
return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size, null, null);
}

/**
Expand Down

0 comments on commit 2aaae33

Please sign in to comment.