Skip to content

Commit

Permalink
Add support for range predicate pushdowns
Browse files Browse the repository at this point in the history
This work is a complete refactor of how we generate Partitions in the planning/execution phases.
Instead of generating Partitions after planning, we now integrate the generation directly
into the optimization planning phase so that we can iterate our plan based on data returned
by the Partitions that we discover. This not only provides a number of opportunities for
optimization, but also significantly simplifies the amount of cross-talk between the
split generation and the planning phases. As a result of these changes, it makes
it much easier to build native range predicate push downs.
  • Loading branch information
erichwang committed Nov 28, 2013
1 parent c8e3c08 commit 5b72787
Show file tree
Hide file tree
Showing 62 changed files with 6,254 additions and 648 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
*/
package com.facebook.presto.example;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.Partition;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;

import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -51,9 +48,9 @@ public String getTableName()
}

@Override
public Map<ColumnHandle, Object> getKeys()
public TupleDomain getTupleDomain()
{
return ImmutableMap.of();
return TupleDomain.all();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
*/
package com.facebook.presto.example;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.Partition;
import com.facebook.presto.spi.PartitionResult;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

Expand All @@ -26,7 +27,6 @@
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -58,13 +58,15 @@ public boolean canHandle(TableHandle tableHandle)
}

@Override
public List<Partition> getPartitions(TableHandle tableHandle, Map<ColumnHandle, Object> bindings)
public PartitionResult getPartitions(TableHandle tableHandle, TupleDomain tupleDomain)
{
checkArgument(tableHandle instanceof ExampleTableHandle, "tableHandle is not an instance of ExampleTableHandle");
ExampleTableHandle exampleTableHandle = (ExampleTableHandle) tableHandle;

// example connector has only one partition
return ImmutableList.<Partition>of(new ExamplePartition(exampleTableHandle.getSchemaName(), exampleTableHandle.getTableName()));
List<Partition> partitions = ImmutableList.<Partition>of(new ExamplePartition(exampleTableHandle.getSchemaName(), exampleTableHandle.getTableName()));
// example connector does not do any additional processing/filtering with the TupleDomain, so just return the whole TupleDomain
return new PartitionResult(partitions, tupleDomain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class HiveBucketing

private HiveBucketing() {}

public static Optional<Integer> getBucketNumber(Table table, Map<ColumnHandle, Object> bindings)
public static Optional<Integer> getBucketNumber(Table table, Map<ColumnHandle, ?> bindings)
{
if (!table.getSd().isSetBucketCols() || table.getSd().getBucketCols().isEmpty() ||
!table.getSd().isSetNumBuckets() || table.getSd().getNumBuckets() <= 0 ||
Expand Down Expand Up @@ -99,7 +99,7 @@ public static Optional<Integer> getBucketNumber(Table table, Map<ColumnHandle, O

// Get bindings for bucket columns
Map<String, Object> bucketBindings = new HashMap<>();
for (Entry<ColumnHandle, Object> entry : bindings.entrySet()) {
for (Entry<ColumnHandle, ?> entry : bindings.entrySet()) {
HiveColumnHandle colHandle = (HiveColumnHandle) entry.getKey();
if (bucketColumns.contains(colHandle.getName())) {
bucketBindings.put(colHandle.getName(), entry.getValue());
Expand Down
74 changes: 51 additions & 23 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import com.facebook.presto.spi.ConnectorRecordSetProvider;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Domain;
import com.facebook.presto.spi.Partition;
import com.facebook.presto.spi.PartitionResult;
import com.facebook.presto.spi.Range;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -73,6 +79,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.transform;
import static java.lang.Boolean.parseBoolean;
Expand Down Expand Up @@ -329,10 +337,10 @@ public void dropTable(TableHandle tableHandle)
}

@Override
public List<Partition> getPartitions(TableHandle tableHandle, Map<ColumnHandle, Object> bindings)
public PartitionResult getPartitions(TableHandle tableHandle, TupleDomain tupleDomain)
{
checkNotNull(tableHandle, "tableHandle is null");
checkNotNull(bindings, "bindings is null");
checkNotNull(tupleDomain, "tupleDomain is null");
SchemaTableName tableName = getTableName(tableHandle);

List<FieldSchema> partitionKeys;
Expand All @@ -347,27 +355,32 @@ public List<Partition> getPartitions(TableHandle tableHandle, Map<ColumnHandle,
}

partitionKeys = table.getPartitionKeys();
bucket = getBucketNumber(table, bindings);
bucket = getBucketNumber(table, tupleDomain.extractFixedValues());
}
catch (NoSuchObjectException e) {
throw new TableNotFoundException(tableName);
}

ImmutableMap.Builder<String, ColumnHandle> partitionKeysByName = ImmutableMap.builder();
ImmutableMap.Builder<String, ColumnHandle> partitionKeysByNameBuilder = ImmutableMap.builder();
List<String> filterPrefix = new ArrayList<>();
for (int i = 0; i < partitionKeys.size(); i++) {
FieldSchema field = partitionKeys.get(i);

HiveColumnHandle columnHandle = new HiveColumnHandle(connectorId, field.getName(), i, getSupportedHiveType(field.getType()), -1, true);
partitionKeysByName.put(field.getName(), columnHandle);
partitionKeysByNameBuilder.put(field.getName(), columnHandle);

// only add to prefix if all previous keys have a value
if (filterPrefix.size() == i) {
Object value = bindings.get(columnHandle);
if (value != null) {
checkArgument(value instanceof Boolean || value instanceof String || value instanceof Double || value instanceof Long,
"Only Boolean, String, Double and Long partition keys are supported");
filterPrefix.add(value.toString());
if (filterPrefix.size() == i && !tupleDomain.isNone()) {
Domain domain = tupleDomain.getDomains().get(columnHandle);
if (domain != null && domain.getRanges().getRangeCount() == 1) {
// We intentionally ignore whether NULL is in the domain since partition keys can never be NULL
Range range = Iterables.getOnlyElement(domain.getRanges());
if (range.isSingleValue()) {
Comparable<?> value = range.getLow().getValue();
checkArgument(value instanceof Boolean || value instanceof String || value instanceof Double || value instanceof Long,
"Only Boolean, String, Double and Long partition keys are supported");
filterPrefix.add(value.toString());
}
}
}
}
Expand All @@ -390,8 +403,20 @@ else if (filterPrefix.isEmpty()) {
}

// do a final pass to filter based on fields that could not be used to build the prefix
Iterable<Partition> partitions = transform(partitionNames, toPartition(tableName, partitionKeysByName.build(), bucket));
return ImmutableList.copyOf(Iterables.filter(partitions, partitionMatches(bindings)));
Map<String, ColumnHandle> partitionKeysByName = partitionKeysByNameBuilder.build();
List<Partition> partitions = FluentIterable.from(partitionNames)
.transform(toPartition(tableName, partitionKeysByName, bucket))
.filter(partitionMatches(tupleDomain))
.filter(Partition.class)
.toList();

// All partition key domains will be fully evaluated, so we don't need to include those
TupleDomain remainingTupleDomain = TupleDomain.none();
if (!tupleDomain.isNone()) {
remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(tupleDomain.getDomains(), not(in(partitionKeysByName.values()))));
}

return new PartitionResult(partitions, remainingTupleDomain);
}

@Override
Expand Down Expand Up @@ -539,23 +564,23 @@ public String toString()
.toString();
}

private static Function<String, Partition> toPartition(
private static Function<String, HivePartition> toPartition(
final SchemaTableName tableName,
final Map<String, ColumnHandle> columnsByName,
final Optional<Integer> bucket)
{
return new Function<String, Partition>()
return new Function<String, HivePartition>()
{
@Override
public Partition apply(String partitionId)
public HivePartition apply(String partitionId)
{
try {
if (partitionId.equals(UNPARTITIONED_ID)) {
return new HivePartition(tableName);
}

LinkedHashMap<String, String> keys = Warehouse.makeSpecFromName(partitionId);
ImmutableMap.Builder<ColumnHandle, Object> builder = ImmutableMap.builder();
ImmutableMap.Builder<ColumnHandle, Comparable<?>> builder = ImmutableMap.builder();
for (Entry<String, String> entry : keys.entrySet()) {
ColumnHandle columnHandle = columnsByName.get(entry.getKey());
checkArgument(columnHandle != null, "Invalid partition key %s in partition %s", entry.getKey(), partitionId);
Expand Down Expand Up @@ -607,16 +632,19 @@ else if (hiveColumnHandle.getHiveType() == HiveType.TIMESTAMP) {
};
}

public static Predicate<Partition> partitionMatches(final Map<ColumnHandle, Object> filters)
public static Predicate<HivePartition> partitionMatches(final TupleDomain tupleDomain)
{
return new Predicate<Partition>()
return new Predicate<HivePartition>()
{
@Override
public boolean apply(Partition partition)
public boolean apply(HivePartition partition)
{
for (Map.Entry<ColumnHandle, Object> entry : partition.getKeys().entrySet()) {
Object filterValue = filters.get(entry.getKey());
if (filterValue != null && !entry.getValue().equals(filterValue)) {
if (tupleDomain.isNone()) {
return false;
}
for (Entry<ColumnHandle, Comparable<?>> entry : partition.getKeys().entrySet()) {
Domain allowedDomain = tupleDomain.getDomains().get(entry.getKey());
if (allowedDomain != null && !allowedDomain.includesValue(entry.getValue())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.Partition;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;

Expand All @@ -31,7 +32,7 @@ public class HivePartition

private final SchemaTableName tableName;
private final String partitionId;
private final Map<ColumnHandle, Object> keys;
private final Map<ColumnHandle, Comparable<?>> keys;
private final Optional<Integer> bucket;

public HivePartition(SchemaTableName tableName)
Expand All @@ -42,7 +43,7 @@ public HivePartition(SchemaTableName tableName)
this.bucket = Optional.absent();
}

public HivePartition(SchemaTableName tableName, String partitionId, Map<ColumnHandle, Object> keys, Optional<Integer> bucket)
public HivePartition(SchemaTableName tableName, String partitionId, Map<ColumnHandle, Comparable<?>> keys, Optional<Integer> bucket)
{
this.tableName = checkNotNull(tableName, "tableName is null");
this.partitionId = checkNotNull(partitionId, "partitionId is null");
Expand All @@ -62,7 +63,12 @@ public String getPartitionId()
}

@Override
public Map<ColumnHandle, Object> getKeys()
public TupleDomain getTupleDomain()
{
return TupleDomain.withFixedValues(keys);
}

public Map<ColumnHandle, Comparable<?>> getKeys()
{
return keys;
}
Expand Down
Loading

0 comments on commit 5b72787

Please sign in to comment.