Skip to content

Commit

Permalink
fieldsummary implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kenrickyap committed Feb 10, 2025
1 parent 7692eb1 commit 77f1f6e
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 19 deletions.
58 changes: 56 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.math3.analysis.function.Exp;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
Expand All @@ -40,7 +43,6 @@
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
Expand All @@ -50,6 +52,7 @@
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.FieldSummary;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand All @@ -72,6 +75,7 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -80,7 +84,9 @@
import org.opensearch.sql.expression.LiteralExpression;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.AggregationState;
import org.opensearch.sql.expression.aggregation.Aggregator;
import org.opensearch.sql.expression.aggregation.AvgAggregator;
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.BuiltinFunctionRepository;
Expand All @@ -93,6 +99,7 @@
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFieldSummary;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
Expand Down Expand Up @@ -277,7 +284,7 @@ public LogicalPlan visitRename(Rename node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> renameMapBuilder =
new ImmutableMap.Builder<>();
for (Map renameMap : node.getRenameList()) {
for (org.opensearch.sql.ast.expression.Map renameMap : node.getRenameList()) {
Expression origin = expressionAnalyzer.analyze(renameMap.getOrigin(), context);
// We should define the new target field in the context instead of analyze it.
if (renameMap.getTarget() instanceof Field) {
Expand Down Expand Up @@ -336,6 +343,53 @@ public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) {
return new LogicalAggregation(child, aggregators, groupBys);
}

@Override
public LogicalPlan visitFieldSummary(FieldSummary node, AnalysisContext context) {
LogicalPlan child = node.getChild().getFirst().accept(this, context);

TypeEnvironment env = context.peek();
Map<String, ExprType> fieldsMap = env.lookupAllFields(Namespace.FIELD_NAME);

ImmutableList.Builder<NamedAggregator> aggregatorBuilder = new ImmutableList.Builder<>();
Map<String, String> aggregatorToFieldNameMap = new HashMap<String, String>();

for (Map.Entry<String, ExprType> entry : fieldsMap.entrySet()) {
ExprType fieldType = entry.getValue();
String fieldName = entry.getKey();
ReferenceExpression fieldExpression = DSL.ref(fieldName, fieldType);

aggregatorBuilder.add(new NamedAggregator("Count" + fieldName, DSL.count(fieldExpression)));
aggregatorToFieldNameMap.put("Count" + fieldName, fieldName);
aggregatorBuilder.add(new NamedAggregator("Distinct" + fieldName, DSL.distinctCount(fieldExpression)));
aggregatorToFieldNameMap.put("Distinct" + fieldName, fieldName);

if (ExprCoreType.numberTypes().contains(fieldType)) {
aggregatorBuilder.add(new NamedAggregator("Avg" + fieldName, DSL.avg(fieldExpression)));
aggregatorToFieldNameMap.put("Avg" + fieldName, fieldName);
aggregatorBuilder.add(new NamedAggregator("Max" + fieldName, DSL.max(fieldExpression)));
aggregatorToFieldNameMap.put("Max" + fieldName, fieldName);
aggregatorBuilder.add(new NamedAggregator("Min" + fieldName, DSL.min(fieldExpression)));
aggregatorToFieldNameMap.put("Min" + fieldName, fieldName);
}
}

ImmutableList<NamedAggregator> aggregators = aggregatorBuilder.build();
ImmutableList<NamedExpression> groupBys = new ImmutableList.Builder<NamedExpression>().build();

// new context
context.push();
TypeEnvironment newEnv = context.peek();

newEnv.define(new Symbol(Namespace.FIELD_NAME, "Field"), ExprCoreType.STRING);
newEnv.define(new Symbol(Namespace.FIELD_NAME, "Count"), ExprCoreType.INTEGER);
newEnv.define(new Symbol(Namespace.FIELD_NAME, "Distinct"), ExprCoreType.INTEGER);
newEnv.define(new Symbol(Namespace.FIELD_NAME, "Avg"), ExprCoreType.DOUBLE);
newEnv.define(new Symbol(Namespace.FIELD_NAME, "Max"), ExprCoreType.DOUBLE);
newEnv.define(new Symbol(Namespace.FIELD_NAME, "Min"), ExprCoreType.DOUBLE);

return new LogicalFieldSummary(child, aggregators, groupBys, aggregatorToFieldNameMap);
}

/** Build {@link LogicalRareTopN}. */
@Override
public LogicalPlan visitRareTopN(RareTopN node, AnalysisContext context) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.FieldList;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.HighlightFunction;
import org.opensearch.sql.ast.expression.In;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.FieldSummary;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -326,4 +328,12 @@ public T visitCloseCursor(CloseCursor closeCursor, C context) {
public T visitFillNull(FillNull fillNull, C context) {
return visitChildren(fillNull, context);
}

public T visitFieldSummary(FieldSummary fieldSummary, C context) {
return visitChildren(fieldSummary, context);
}

public T visitFieldList(FieldList fieldList, C context) {
return visitChildren(fieldList, context);
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FieldSummary;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
Expand Down Expand Up @@ -506,4 +507,11 @@ public static FillNull fillNull(
return new FillNull(
FillNull.ContainNullableFieldFill.ofVariousValue(replacementsBuilder.build()));
}

public static FieldSummary fieldSummary(
UnresolvedPlan input,
List<UnresolvedExpression> includeFields
) {
return new FieldSummary(includeFields).attach(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.expression;

import com.google.common.collect.ImmutableList;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;

import java.util.List;

/** Expression node that includes a list of fields nodes. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@AllArgsConstructor
public class FieldList extends UnresolvedExpression {
private final List<Field> fieldList;

@Override
public List<UnresolvedExpression> getChild() {
return ImmutableList.copyOf(fieldList);
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
return nodeVisitor.visitFieldList(this, context);
}
}
49 changes: 49 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/FieldSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class FieldSummary extends UnresolvedPlan {
private List<UnresolvedExpression> includeFields;
private UnresolvedPlan child;

public FieldSummary(List<UnresolvedExpression> collect) {
collect.forEach(exp -> {
if (exp instanceof AttributeList) {
this.includeFields = ((AttributeList)exp).getAttrList();
}
});
}

@Override
public List<? extends Node> getChild() {
return child == null ? List.of() : List.of(child);
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
return nodeVisitor.visitFieldSummary(this, context);
}

@Override
public FieldSummary attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.opensearch.sql.planner.logical;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.aggregation.NamedAggregator;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Logical Field Summary. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalFieldSummary extends LogicalAggregation {

Map<String, String> aggregationToFieldNameMap;

public LogicalFieldSummary(
LogicalPlan child, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList, Map<String, String> aggregationToFieldNameMap) {
super(child, aggregatorList, groupByList);
this.aggregationToFieldNameMap = aggregationToFieldNameMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -289,6 +290,19 @@ public void pushTypeMapping(Map<String, OpenSearchDataType> typeMapping) {
exprValueFactory.extendTypeMapping(typeMapping);
}

public void pushFieldSummaryTypeMapping() {
Map<String, OpenSearchDataType> typeMapping = Map.of(
"Field", OpenSearchDataType.of(OpenSearchDataType.MappingType.Text),
"Count", OpenSearchDataType.of(OpenSearchDataType.MappingType.Integer),
"Distinct", OpenSearchDataType.of(OpenSearchDataType.MappingType.Integer),
"Avg", OpenSearchDataType.of(OpenSearchDataType.MappingType.Double),
"Min", OpenSearchDataType.of(OpenSearchDataType.MappingType.Double),
"Max", OpenSearchDataType.of(OpenSearchDataType.MappingType.Double),
"Sum", OpenSearchDataType.of(OpenSearchDataType.MappingType.Double)
);
exprValueFactory.extendTypeMapping(typeMapping);
}

private boolean isSortByDocOnly() {
List<SortBuilder<?>> sorts = sourceBuilder.sorts();
if (sorts != null) {
Expand Down
Loading

0 comments on commit 77f1f6e

Please sign in to comment.