Skip to content

Commit

Permalink
[CALCITE-4233] In Elasticsearch adapter, support generating disjuncti…
Browse files Browse the repository at this point in the history
…on max (dis_max) queries (shlok7296)

close #2218
  • Loading branch information
shlok7296 authored and julianhyde committed Nov 3, 2020
1 parent e7c579f commit add837a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Objects;

/**
Expand Down Expand Up @@ -82,7 +85,19 @@ String translateMatch(RexNode condition) throws IOException,

StringWriter writer = new StringWriter();
JsonGenerator generator = mapper.getFactory().createGenerator(writer);
QueryBuilders.constantScoreQuery(PredicateAnalyzer.analyze(condition)).writeJson(generator);
boolean disMax = condition.isA(SqlKind.OR);
Iterator<RexNode> operands = ((RexCall) condition).getOperands().iterator();
while (operands.hasNext() && !disMax) {
if (operands.next().isA(SqlKind.OR)) {
disMax = true;
break;
}
}
if (disMax) {
QueryBuilders.disMaxQueryBuilder(PredicateAnalyzer.analyze(condition)).writeJson(generator);
} else {
QueryBuilders.constantScoreQuery(PredicateAnalyzer.analyze(condition)).writeJson(generator);
}
generator.flush();
generator.close();
return "{\"query\" : " + writer.toString() + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ static ConstantScoreQueryBuilder constantScoreQuery(QueryBuilder queryBuilder) {
return new ConstantScoreQueryBuilder(queryBuilder);
}

/**
* A query that wraps another query and simply returns a dismax score equal to the
* query boost for every document in the query.
*
* @param queryBuilder The query to wrap in a constant score query
*/
static DisMaxQueryBuilder disMaxQueryBuilder(QueryBuilder queryBuilder) {
return new DisMaxQueryBuilder(queryBuilder);
}

/**
* A filter to filter only documents where a field exists in them.
*
Expand Down Expand Up @@ -540,6 +550,33 @@ private ConstantScoreQueryBuilder(final QueryBuilder builder) {
}
}

/**
* A query that wraps a filter and simply returns a dismax score equal to the
* query boost for every document in the filter.
*/
static class DisMaxQueryBuilder extends QueryBuilder {

private final QueryBuilder builder;

private DisMaxQueryBuilder(final QueryBuilder builder) {
this.builder = Objects.requireNonNull(builder, "builder");
}

@Override void writeJson(final JsonGenerator generator) throws IOException {
generator.writeStartObject();
generator.writeFieldName("dis_max");
generator.writeStartObject();
generator.writeFieldName("queries");
generator.writeStartArray();
builder.writeJson(generator);
generator.writeEndArray();
generator.writeEndObject();
generator.writeEndObject();
}
}



/**
* A query that matches on all documents.
* <pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ private static Consumer<ResultSet> sortedResultSetChecker(String column,
.query("select _MAP['state'] from elastic.zips order by _MAP['city']")
.returnsCount(ZIPS_SIZE);

CalciteAssert.that()
.with(newConnectionFactory())
.query("select * from elastic.zips where _MAP['state'] = 'NY' or "
+ "_MAP['city'] = 'BROOKLYN'"
+ " order by _MAP['city']")
.queryContains(
ElasticsearchChecker.elasticsearchChecker(
"query:{'dis_max':{'queries':[{'bool':{'should':"
+ "[{'term':{'state':'NY'}},{'term':"
+ "{'city':'BROOKLYN'}}]}}]}},'sort':[{'city':'asc'}]",
String.format(Locale.ROOT, "size:%s",
ElasticsearchTransport.DEFAULT_FETCH_SIZE)));

CalciteAssert.that()
.with(newConnectionFactory())
.query("select _MAP['city'] from elastic.zips where _MAP['state'] = 'NY' "
Expand Down Expand Up @@ -421,6 +434,34 @@ private static Consumer<ResultSet> sortedResultSetChecker(String column,
.explainContains(explain);
}

@Test public void testDismaxQuery() {
final String sql = "select * from zips\n"
+ "where state = 'CA' or pop >= 94000\n"
+ "order by state, pop";
final String explain = "PLAN=ElasticsearchToEnumerableConverter\n"
+ " ElasticsearchSort(sort0=[$4], sort1=[$3], dir0=[ASC], dir1=[ASC])\n"
+ " ElasticsearchProject(city=[CAST(ITEM($0, 'city')):VARCHAR(20)], longitude=[CAST(ITEM(ITEM($0, 'loc'), 0)):FLOAT], latitude=[CAST(ITEM(ITEM($0, 'loc'), 1)):FLOAT], pop=[CAST(ITEM($0, 'pop')):INTEGER], state=[CAST(ITEM($0, 'state')):VARCHAR(2)], id=[CAST(ITEM($0, 'id')):VARCHAR(5)])\n"
+ " ElasticsearchFilter(condition=[OR(=(CAST(ITEM($0, 'state')):VARCHAR(2), 'CA'), >=(CAST(ITEM($0, 'pop')):INTEGER, 94000))])\n"
+ " ElasticsearchTableScan(table=[[elastic, zips]])\n\n";
calciteAssert()
.query(sql)
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'query' : "
+ "{'dis_max':{'queries':[{bool:"
+ "{should:[{term:{state:'CA'}},"
+ "{range:{pop:{gte:94000}}}]}}]}}",
"'script_fields': {longitude:{script:'params._source.loc[0]'}, "
+ "latitude:{script:'params._source.loc[1]'}, "
+ "city:{script: 'params._source.city'}, "
+ "pop:{script: 'params._source.pop'}, "
+ "state:{script: 'params._source.state'}, "
+ "id:{script: 'params._source.id'}}",
"sort: [ {state: 'asc'}, {pop: 'asc'}]",
String.format(Locale.ROOT, "size:%s",
ElasticsearchTransport.DEFAULT_FETCH_SIZE)))
.explainContains(explain);
}

@Test void testFilterSortDesc() {
final String sql = "select * from zips\n"
+ "where pop BETWEEN 95000 AND 100000\n"
Expand Down

0 comments on commit add837a

Please sign in to comment.