Skip to content

Commit

Permalink
Derive predicate support from Trino type in ElasticSearch
Browse files Browse the repository at this point in the history
  • Loading branch information
striderarun authored and martint committed Jan 17, 2025
1 parent 2a36038 commit 7250b50
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.elasticsearch;

import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.plugin.elasticsearch.decoders.IdColumnDecoder;
import io.trino.plugin.elasticsearch.decoders.ScoreColumnDecoder;
import io.trino.plugin.elasticsearch.decoders.SourceColumnDecoder;
Expand All @@ -31,22 +32,24 @@

enum BuiltinColumns
{
ID("_id", VARCHAR, new IdColumnDecoder.Descriptor(), true),
SOURCE("_source", VARCHAR, new SourceColumnDecoder.Descriptor(), false),
SCORE("_score", REAL, new ScoreColumnDecoder.Descriptor(), false);
ID("_id", VARCHAR, new IndexMetadata.PrimitiveType("text"), new IdColumnDecoder.Descriptor(), true),
SOURCE("_source", VARCHAR, new IndexMetadata.PrimitiveType("text"), new SourceColumnDecoder.Descriptor(), false),
SCORE("_score", REAL, new IndexMetadata.PrimitiveType("real"), new ScoreColumnDecoder.Descriptor(), false);

private static final Map<String, BuiltinColumns> COLUMNS_BY_NAME = stream(values())
.collect(toImmutableMap(BuiltinColumns::getName, identity()));

private final String name;
private final Type type;
private final IndexMetadata.Type elasticsearchType;
private final DecoderDescriptor decoderDescriptor;
private final boolean supportsPredicates;

BuiltinColumns(String name, Type type, DecoderDescriptor decoderDescriptor, boolean supportsPredicates)
BuiltinColumns(String name, Type type, IndexMetadata.Type elasticsearchType, DecoderDescriptor decoderDescriptor, boolean supportsPredicates)
{
this.name = name;
this.type = type;
this.elasticsearchType = elasticsearchType;
this.decoderDescriptor = decoderDescriptor;
this.supportsPredicates = supportsPredicates;
}
Expand Down Expand Up @@ -85,6 +88,7 @@ public ColumnHandle getColumnHandle()
return new ElasticsearchColumnHandle(
name,
type,
elasticsearchType,
decoderDescriptor,
supportsPredicates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.elasticsearch;

import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;

Expand All @@ -21,6 +22,7 @@
public record ElasticsearchColumnHandle(
String name,
Type type,
IndexMetadata.Type elasticsearchType,
DecoderDescriptor decoderDescriptor,
boolean supportsPredicates)
implements ColumnHandle
Expand All @@ -29,6 +31,7 @@ public record ElasticsearchColumnHandle(
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
requireNonNull(elasticsearchType, "elasticsearchType is null");
requireNonNull(decoderDescriptor, "decoderDescriptor is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,20 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarcharType;
import org.elasticsearch.client.ResponseException;

import java.util.ArrayList;
Expand Down Expand Up @@ -126,6 +135,7 @@ public class ElasticsearchMetadata
new ElasticsearchColumnHandle(
PASSTHROUGH_QUERY_RESULT_COLUMN_NAME,
VARCHAR,
new IndexMetadata.PrimitiveType("text"),
new VarcharDecoder.Descriptor(PASSTHROUGH_QUERY_RESULT_COLUMN_NAME),
false));

Expand Down Expand Up @@ -250,36 +260,14 @@ private Map<String, ColumnHandle> makeColumnHandles(List<IndexMetadata.Field> fi
result.put(field.name(), new ElasticsearchColumnHandle(
field.name(),
converted.type(),
field.type(),
converted.decoderDescriptor(),
supportsPredicates(field.type())));
supportsPredicates(field.type(), converted.type)));
}

return result.buildOrThrow();
}

private static boolean supportsPredicates(IndexMetadata.Type type)
{
if (type instanceof DateTimeType) {
return true;
}

if (type instanceof PrimitiveType) {
switch (((PrimitiveType) type).name().toLowerCase(ENGLISH)) {
case "boolean":
case "byte":
case "short":
case "integer":
case "long":
case "double":
case "float":
case "keyword":
return true;
}
}

return false;
}

private TypeAndDecoder toTrino(IndexMetadata.Field field)
{
return toTrino("", field);
Expand Down Expand Up @@ -679,6 +667,16 @@ public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTable
return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

private static boolean supportsPredicates(IndexMetadata.Type type, Type trinoType)
{
return switch (trinoType) {
case TimestampType _, BooleanType _, TinyintType _, SmallintType _, IntegerType _, BigintType _, RealType _ -> true;
case DoubleType _ -> !(type instanceof ScaledFloatType);
case VarcharType _ when type instanceof PrimitiveType primitiveType && primitiveType.name().toLowerCase(ENGLISH).equals("keyword") -> true;
default -> false;
};
}

private record InternalTableMetadata(SchemaTableName tableName, List<ColumnMetadata> columnMetadata, Map<String, ColumnHandle> columnHandles) {}

private record TypeAndDecoder(Type type, DecoderDescriptor decoderDescriptor) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.elasticsearch.client;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableList;

import java.util.List;
Expand All @@ -39,6 +41,15 @@ public record Field(boolean asRawJson, boolean isArray, String name, Type type)
}
}

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = DateTimeType.class, name = "date_time"),
@JsonSubTypes.Type(value = ObjectType.class, name = "object"),
@JsonSubTypes.Type(value = PrimitiveType.class, name = "primitive"),
@JsonSubTypes.Type(value = ScaledFloatType.class, name = "scaled_float"),
})
public interface Type {}

public record PrimitiveType(String name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.plugin.elasticsearch.decoders.DoubleDecoder;
import io.trino.plugin.elasticsearch.decoders.IntegerDecoder;
import io.trino.plugin.elasticsearch.decoders.VarcharDecoder;
Expand All @@ -41,10 +42,10 @@

public class TestElasticsearchQueryBuilder
{
private static final ElasticsearchColumnHandle NAME = new ElasticsearchColumnHandle("name", VARCHAR, new VarcharDecoder.Descriptor("name"), true);
private static final ElasticsearchColumnHandle AGE = new ElasticsearchColumnHandle("age", INTEGER, new IntegerDecoder.Descriptor("age"), true);
private static final ElasticsearchColumnHandle SCORE = new ElasticsearchColumnHandle("score", DOUBLE, new DoubleDecoder.Descriptor("score"), true);
private static final ElasticsearchColumnHandle LENGTH = new ElasticsearchColumnHandle("length", DOUBLE, new DoubleDecoder.Descriptor("length"), true);
private static final ElasticsearchColumnHandle NAME = new ElasticsearchColumnHandle("name", VARCHAR, new IndexMetadata.PrimitiveType("text"), new VarcharDecoder.Descriptor("name"), true);
private static final ElasticsearchColumnHandle AGE = new ElasticsearchColumnHandle("age", INTEGER, new IndexMetadata.PrimitiveType("int"), new IntegerDecoder.Descriptor("age"), true);
private static final ElasticsearchColumnHandle SCORE = new ElasticsearchColumnHandle("score", DOUBLE, new IndexMetadata.PrimitiveType("double"), new DoubleDecoder.Descriptor("score"), true);
private static final ElasticsearchColumnHandle LENGTH = new ElasticsearchColumnHandle("length", DOUBLE, new IndexMetadata.PrimitiveType("double"), new DoubleDecoder.Descriptor("length"), true);

@Test
public void testMatchAll()
Expand Down

0 comments on commit 7250b50

Please sign in to comment.