Skip to content

Commit

Permalink
Add S3 schema with buckets and objects
Browse files Browse the repository at this point in the history
  • Loading branch information
nineinchnick committed Sep 21, 2021
1 parent 2f9c022 commit 47e8d0e
Show file tree
Hide file tree
Showing 8 changed files with 471 additions and 80 deletions.
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns>
<ignoredResourcePattern>software/amazon/awssdk/global/handlers/execution.interceptors</ignoredResourcePattern>
<ignoredResourcePattern>about.html</ignoredResourcePattern>
<ignoredResourcePattern>software/amazon/awssdk/global/handlers/execution\.interceptors</ignoredResourcePattern>
<ignoredResourcePattern>about\.html</ignoredResourcePattern>
<ignoredResourcePattern>codegen-resources/.*\.config</ignoredResourcePattern>
<ignoredResourcePattern>codegen-resources/.*\.json</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
Expand Down
10 changes: 8 additions & 2 deletions trino-cloud-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,16 @@
</exclusion>
</exclusions>
</dependency>
<!--dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency-->
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
Expand Down
146 changes: 104 additions & 42 deletions trino-cloud-aws/src/main/java/pl/net/was/cloud/aws/AwsMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import pl.net.was.cloud.aws.filters.BucketFilter;
import pl.net.was.cloud.aws.filters.FilterApplier;
import software.amazon.awssdk.core.SdkField;
import software.amazon.awssdk.core.protocol.MarshallingType;
import software.amazon.awssdk.core.traits.ListTrait;
Expand All @@ -61,14 +66,20 @@
import software.amazon.awssdk.services.ec2.model.VpcPeeringConnection;
import software.amazon.awssdk.services.ec2.model.VpnConnection;
import software.amazon.awssdk.services.ec2.model.VpnGateway;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.DeleteMarkerEntry;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.model.S3Object;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -85,8 +96,6 @@
public class AwsMetadata
implements ConnectorMetadata
{
public static final String SCHEMA_NAME = "ec2";

// all types must be handled in AwsRecordSetProvider.encode()
// missing: Void, BigDecimal, SdkBytes, Document
// SdkPojo, List<?> and Map<String, ?> handled separately
Expand All @@ -102,37 +111,76 @@ public class AwsMetadata
.put(MarshallingType.LIST, new ArrayType(VARCHAR))
.build();

public final Map<String, List<ColumnMetadata>> columns;
public final Map<SchemaTableName, List<ColumnMetadata>> columns;
public final Map<String, Map<String, ColumnHandle>> columnHandles;

public final Map<String, ? extends FilterApplier> filterAppliers = new ImmutableMap.Builder<String, FilterApplier>()
.put("s3.objects", new BucketFilter())
.put("s3.object_versions", new BucketFilter())
.put("s3.deleted_objects", new BucketFilter())
.build();

@Inject
public AwsMetadata()
{
// must match AwsRecordSetProvider.rowGetters
columns = new ImmutableMap.Builder<String, List<ColumnMetadata>>()
.put("availability_zones", fieldsToColumns(AvailabilityZone.builder().sdkFields()))
.put("images", fieldsToColumns(Image.builder().sdkFields()))
.put("instance_types", fieldsToColumns(InstanceTypeInfo.builder().sdkFields()))
.put("instances", fieldsToColumns(Instance.builder().sdkFields()))
.put("key_pairs", fieldsToColumns(KeyPairInfo.builder().sdkFields()))
.put("launch_templates", fieldsToColumns(LaunchTemplate.builder().sdkFields()))
.put("nat_gateways", fieldsToColumns(NatGateway.builder().sdkFields()))
.put("network_interfaces", fieldsToColumns(NetworkInterface.builder().sdkFields()))
.put("placement_groups", fieldsToColumns(PlacementGroup.builder().sdkFields()))
.put("prefix_lists", fieldsToColumns(PrefixList.builder().sdkFields()))
.put("public_ipv4_pools", fieldsToColumns(PublicIpv4Pool.builder().sdkFields()))
.put("regions", fieldsToColumns(Region.builder().sdkFields()))
.put("route_tables", fieldsToColumns(RouteTable.builder().sdkFields()))
.put("snapshots", fieldsToColumns(Snapshot.builder().sdkFields()))
.put("security_groups", fieldsToColumns(SecurityGroup.builder().sdkFields()))
.put("subnets", fieldsToColumns(Subnet.builder().sdkFields()))
.put("tags", fieldsToColumns(Tag.builder().sdkFields()))
.put("volumes", fieldsToColumns(Volume.builder().sdkFields()))
.put("vpc_endpoints", fieldsToColumns(VpcEndpoint.builder().sdkFields()))
.put("vpc_peering_connections", fieldsToColumns(VpcPeeringConnection.builder().sdkFields()))
.put("vpcs", fieldsToColumns(Vpc.builder().sdkFields()))
.put("vpn_connections", fieldsToColumns(VpnConnection.builder().sdkFields()))
.put("vpn_gateways", fieldsToColumns(VpnGateway.builder().sdkFields()))
columns = new ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>>()
.put(new SchemaTableName("ec2", "availability_zones"), fieldsToColumns(AvailabilityZone.builder().sdkFields()))
.put(new SchemaTableName("ec2", "images"), fieldsToColumns(Image.builder().sdkFields()))
.put(new SchemaTableName("ec2", "instance_types"), fieldsToColumns(InstanceTypeInfo.builder().sdkFields()))
.put(new SchemaTableName("ec2", "instances"), fieldsToColumns(Instance.builder().sdkFields()))
.put(new SchemaTableName("ec2", "key_pairs"), fieldsToColumns(KeyPairInfo.builder().sdkFields()))
.put(new SchemaTableName("ec2", "launch_templates"), fieldsToColumns(LaunchTemplate.builder().sdkFields()))
.put(new SchemaTableName("ec2", "nat_gateways"), fieldsToColumns(NatGateway.builder().sdkFields()))
.put(new SchemaTableName("ec2", "network_interfaces"), fieldsToColumns(NetworkInterface.builder().sdkFields()))
.put(new SchemaTableName("ec2", "placement_groups"), fieldsToColumns(PlacementGroup.builder().sdkFields()))
.put(new SchemaTableName("ec2", "prefix_lists"), fieldsToColumns(PrefixList.builder().sdkFields()))
.put(new SchemaTableName("ec2", "public_ipv4_pools"), fieldsToColumns(PublicIpv4Pool.builder().sdkFields()))
.put(new SchemaTableName("ec2", "regions"), fieldsToColumns(Region.builder().sdkFields()))
.put(new SchemaTableName("ec2", "route_tables"), fieldsToColumns(RouteTable.builder().sdkFields()))
.put(new SchemaTableName("ec2", "snapshots"), fieldsToColumns(Snapshot.builder().sdkFields()))
.put(new SchemaTableName("ec2", "security_groups"), fieldsToColumns(SecurityGroup.builder().sdkFields()))
.put(new SchemaTableName("ec2", "subnets"), fieldsToColumns(Subnet.builder().sdkFields()))
.put(new SchemaTableName("ec2", "tags"), fieldsToColumns(Tag.builder().sdkFields()))
.put(new SchemaTableName("ec2", "volumes"), fieldsToColumns(Volume.builder().sdkFields()))
.put(new SchemaTableName("ec2", "vpc_endpoints"), fieldsToColumns(VpcEndpoint.builder().sdkFields()))
.put(new SchemaTableName("ec2", "vpc_peering_connections"), fieldsToColumns(VpcPeeringConnection.builder().sdkFields()))
.put(new SchemaTableName("ec2", "vpcs"), fieldsToColumns(Vpc.builder().sdkFields()))
.put(new SchemaTableName("ec2", "vpn_connections"), fieldsToColumns(VpnConnection.builder().sdkFields()))
.put(new SchemaTableName("ec2", "vpn_gateways"), fieldsToColumns(VpnGateway.builder().sdkFields()))
.put(new SchemaTableName("s3", "buckets"), fieldsToColumns(Bucket.builder().sdkFields()))
.put(new SchemaTableName("s3", "objects"), fieldsToColumns(
List.of(
new ColumnMetadata("bucket_name", VARCHAR)),
S3Object.builder().sdkFields()))
.put(new SchemaTableName("s3", "object_versions"), fieldsToColumns(
List.of(
new ColumnMetadata("bucket_name", VARCHAR)),
ObjectVersion.builder().sdkFields()))
.put(new SchemaTableName("s3", "deleted_objects"), fieldsToColumns(
List.of(
new ColumnMetadata("bucket_name", VARCHAR)),
DeleteMarkerEntry.builder().sdkFields()))
.build();

columnHandles = columns
.entrySet()
.stream()
.map(e -> Map.entry(
e.getKey().toString(),
e.getValue()
.stream()
.collect(Collectors.toMap(
ColumnMetadata::getName,
c -> (ColumnHandle) new AwsColumnHandle(c.getName(), c.getType())))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private List<ColumnMetadata> fieldsToColumns(List<ColumnMetadata> extraColumns, List<SdkField<?>> sdkFields)
{
return Stream.of(extraColumns, fieldsToColumns(sdkFields))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private List<ColumnMetadata> fieldsToColumns(List<SdkField<?>> sdkFields)
Expand Down Expand Up @@ -161,13 +209,13 @@ private List<ColumnMetadata> fieldsToColumns(List<SdkField<?>> sdkFields)
@Override
public List<String> listSchemaNames(ConnectorSession connectorSession)
{
return List.of(SCHEMA_NAME);
return List.of("ec2", "s3");
}

@Override
public ConnectorTableHandle getTableHandle(ConnectorSession connectorSession, SchemaTableName schemaTableName)
{
if (!schemaTableName.getSchemaName().equals(SCHEMA_NAME)) {
if (!listSchemaNames(connectorSession).contains(schemaTableName.getSchemaName())) {
return null;
}
return new AwsTableHandle(
Expand All @@ -188,10 +236,10 @@ public ConnectorTableMetadata getTableMetadata(
SchemaTableName schemaTableName = tableHandle.getSchemaTableName();
return new ConnectorTableMetadata(
schemaTableName,
getColumns(schemaTableName.getTableName()));
getColumns(schemaTableName));
}

private List<ColumnMetadata> getColumns(String tableName)
private List<ColumnMetadata> getColumns(SchemaTableName tableName)
{
if (!columns.containsKey(tableName)) {
throw new TrinoException(TABLE_NOT_FOUND, "Invalid table name: " + tableName);
Expand All @@ -202,11 +250,7 @@ private List<ColumnMetadata> getColumns(String tableName)
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
return columns
.keySet()
.stream()
.map(table -> new SchemaTableName(SCHEMA_NAME, table))
.collect(toList());
return new ArrayList<>(columns.keySet());
}

@Override
Expand Down Expand Up @@ -241,15 +285,13 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(
ConnectorSession connectorSession,
SchemaTablePrefix schemaTablePrefix)
public Stream<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
return columns.entrySet()
.stream()
.collect(Collectors.toMap(
e -> new SchemaTableName(schemaTablePrefix.getSchema().orElse(""), e.getKey()),
Map.Entry::getValue));
.filter(e -> e.getKey().getSchemaName().startsWith(prefix.getSchema().orElse(""))
&& e.getKey().getTableName().startsWith(prefix.getTable().orElse("")))
.map(e -> TableColumnsMetadata.forTable(e.getKey(), e.getValue()));
}

@Override
Expand All @@ -268,4 +310,24 @@ public Optional<ConnectorOutputMetadata> finishInsert(
{
return Optional.empty();
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session,
ConnectorTableHandle table,
Constraint constraint)
{
AwsTableHandle awsTable = (AwsTableHandle) table;
String tableName = awsTable.getSchemaTableName().toString();

FilterApplier filterApplier = filterAppliers.get(tableName);
if (filterApplier == null) {
return Optional.empty();
}
return filterApplier.applyFilter(
awsTable,
columnHandles.get(tableName),
filterApplier.getSupportedFilters(),
constraint.getSummary());
}
}
Loading

0 comments on commit 47e8d0e

Please sign in to comment.