Skip to content

Commit

Permalink
Add support for Cassandra tuple type
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Sep 21, 2021
1 parent bcbcb9f commit 060e78f
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ TEXT VARCHAR
TIMESTAMP TIMESTAMP(3) WITH TIME ZONE
TIMEUUID VARCHAR
TINYINT TINYINT
TUPLE ROW with anonymous fields
VARCHAR VARCHAR
VARIANT VARCHAR
================ ======
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.json.JsonBinder;
import io.airlift.json.JsonCodec;
import io.airlift.security.pem.PemReader;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -76,9 +75,18 @@
public class CassandraClientModule
implements Module
{
private final TypeManager typeManager;

public CassandraClientModule(TypeManager typeManager)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
public void configure(Binder binder)
{
binder.bind(TypeManager.class).toInstance(typeManager);

binder.bind(CassandraConnector.class).in(Scopes.SINGLETON);
binder.bind(CassandraMetadata.class).in(Scopes.SINGLETON);
binder.bind(CassandraSplitManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private static String toCqlLiteral(CassandraColumnHandle columnHandle, Object va

private static String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range range)
{
if (columnHandle.getCassandraType().getKind() == CassandraType.Kind.TUPLE) {
// Building CQL literals for TUPLE type is not supported
return null;
}

if (range.isAll()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
Bootstrap app = new Bootstrap(
new MBeanModule(),
new JsonModule(),
new CassandraClientModule(),
new CassandraClientModule(context.getTypeManager()),
new MBeanServerModule());

Injector injector = app.doNotInitializeLogging()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ public Slice getSlice(int i)
}

@Override
public Object getObject(int field)
public Object getObject(int i)
{
throw new UnsupportedOperationException();
CassandraType cassandraType = cassandraTypes.get(i);
switch (cassandraType.getKind()) {
case TUPLE:
return cassandraType.getColumnValue(currentRow, i).getValue();
default:
throw new IllegalArgumentException("getObject cannot be called for " + cassandraType);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,30 @@
package io.trino.plugin.cassandra;

import com.datastax.driver.core.DataType;
import com.datastax.driver.core.GettableByIndexData;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.utils.Bytes;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.net.InetAddresses;
import io.airlift.slice.Slice;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.block.SingleRowBlockWriter;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampWithTimeZoneType;
Expand All @@ -48,8 +55,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.net.InetAddresses.toAddrString;
import static io.airlift.slice.Slices.utf8Slice;
Expand All @@ -58,6 +68,7 @@
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteralForJson;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -90,6 +101,7 @@ public enum Kind
LIST,
SET,
MAP,
TUPLE,
}

private final Kind kind;
Expand Down Expand Up @@ -180,6 +192,8 @@ public static Optional<CassandraType> toCassandraType(DataType dataType)
return Optional.of(CassandraTypes.TIMEUUID);
case TINYINT:
return Optional.of(CassandraTypes.TINYINT);
case TUPLE:
return createTypeForTuple(dataType);
case UUID:
return Optional.of(CassandraTypes.UUID);
case VARCHAR:
Expand All @@ -191,7 +205,35 @@ public static Optional<CassandraType> toCassandraType(DataType dataType)
}
}

private static Optional<CassandraType> createTypeForTuple(DataType dataType)
{
TupleType tupleType = (TupleType) dataType;
List<Optional<CassandraType>> argumentTypesOptionals = tupleType.getComponentTypes().stream()
.map(CassandraType::toCassandraType)
.collect(toImmutableList());

if (argumentTypesOptionals.stream().anyMatch(Optional::isEmpty)) {
return Optional.empty();
}

List<CassandraType> argumentTypes = argumentTypesOptionals.stream()
.map(Optional::get)
.collect(toImmutableList());

RowType trinoType = RowType.anonymous(
argumentTypes.stream()
.map(CassandraType::getTrinoType)
.collect(toImmutableList()));

return Optional.of(new CassandraType(Kind.TUPLE, trinoType, argumentTypes));
}

public NullableValue getColumnValue(Row row, int position)
{
return getColumnValue(row, position, () -> row.getColumnDefinitions().getType(position));
}

public NullableValue getColumnValue(GettableByIndexData row, int position, Supplier<DataType> dataTypeSupplier)
{
if (row.isNull(position)) {
return NullableValue.asNull(trinoType);
Expand Down Expand Up @@ -235,19 +277,20 @@ public NullableValue getColumnValue(Row row, int position)
return NullableValue.of(trinoType, wrappedBuffer(row.getBytesUnsafe(position)));
case SET:
case LIST:
return NullableValue.of(trinoType, utf8Slice(buildArrayValue(row, position)));
return NullableValue.of(trinoType, utf8Slice(buildArrayValue(row, position, dataTypeSupplier.get())));
case MAP:
return NullableValue.of(trinoType, utf8Slice(buildMapValue(row, position)));
return NullableValue.of(trinoType, utf8Slice(buildMapValue(row, position, dataTypeSupplier.get())));
case TUPLE:
return NullableValue.of(trinoType, buildTupleValue(row, position));
}
throw new IllegalStateException("Handling of type " + this + " is not implemented");
}

private static String buildMapValue(Row row, int position)
private static String buildMapValue(GettableByIndexData row, int position, DataType dataType)
{
DataType type = row.getColumnDefinitions().getType(position);
checkArgument(type.getTypeArguments().size() == 2, "Expected two type arguments, got: %s", type.getTypeArguments());
DataType keyType = type.getTypeArguments().get(0);
DataType valueType = type.getTypeArguments().get(1);
checkArgument(dataType.getTypeArguments().size() == 2, "Expected two type arguments, got: %s", dataType.getTypeArguments());
DataType keyType = dataType.getTypeArguments().get(0);
DataType valueType = dataType.getTypeArguments().get(1);
return buildMapValue((Map<?, ?>) row.getObject(position), keyType, valueType);
}

Expand All @@ -267,10 +310,9 @@ private static String buildMapValue(Map<?, ?> cassandraMap, DataType keyType, Da
return sb.toString();
}

private static String buildArrayValue(Row row, int position)
private static String buildArrayValue(GettableByIndexData row, int position, DataType dataType)
{
DataType type = row.getColumnDefinitions().getType(position);
DataType elementType = getOnlyElement(type.getTypeArguments());
DataType elementType = getOnlyElement(dataType.getTypeArguments());
return buildArrayValue((Collection<?>) row.getObject(position), elementType);
}

Expand All @@ -289,6 +331,24 @@ static String buildArrayValue(Collection<?> cassandraCollection, DataType elemen
return sb.toString();
}

private Block buildTupleValue(GettableByIndexData row, int position)
{
verify(this.kind == Kind.TUPLE, "Not a TUPLE type");
TupleValue tupleValue = row.getTupleValue(position);
RowBlockBuilder blockBuilder = (RowBlockBuilder) this.trinoType.createBlockBuilder(null, 1);
SingleRowBlockWriter singleRowBlockWriter = blockBuilder.beginBlockEntry();
int tuplePosition = 0;
for (CassandraType argumentType : this.getArgumentTypes()) {
int finalTuplePosition = tuplePosition;
NullableValue value = argumentType.getColumnValue(tupleValue, tuplePosition, () -> tupleValue.getType().getComponentTypes().get(finalTuplePosition));
writeNativeValue(argumentType.getTrinoType(), singleRowBlockWriter, value.getValue());
tuplePosition++;
}
// can I just return singleRowBlockWriter here? It extends AbstractSingleRowBlock and tests pass.
blockBuilder.closeEntry();
return (Block) this.trinoType.getObject(blockBuilder, 0);
}

// TODO unify with toCqlLiteral
public String getColumnValueForCql(Row row, int position)
{
Expand Down Expand Up @@ -336,6 +396,7 @@ public String getColumnValueForCql(Row row, int position)
case LIST:
case SET:
case MAP:
case TUPLE:
// unsupported
break;
}
Expand Down Expand Up @@ -389,6 +450,7 @@ private static String objectToJson(Object cassandraValue, DataType dataType)
case DATE:
case INET:
case VARINT:
case TUPLE:
return quoteStringLiteralForJson(cassandraValue.toString());

case BLOB:
Expand Down Expand Up @@ -449,6 +511,7 @@ public Object getJavaValue(Object trinoNativeValue)
return java.util.UUID.fromString(((Slice) trinoNativeValue).toStringUtf8());
case BLOB:
case CUSTOM:
case TUPLE:
return ((Slice) trinoNativeValue).toStringUtf8();
case VARINT:
return new BigInteger(((Slice) trinoNativeValue).toStringUtf8());
Expand Down Expand Up @@ -486,6 +549,7 @@ public boolean isSupportedPartitionKey()
case SET:
case LIST:
case MAP:
case TUPLE:
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class CassandraTestingUtils
public static final String TABLE_ALL_TYPES = "table_all_types";
public static final String TABLE_ALL_TYPES_INSERT = "table_all_types_insert";
public static final String TABLE_ALL_TYPES_PARTITION_KEY = "table_all_types_partition_key";
public static final String TABLE_TUPLE_TYPE = "table_tuple_type";
public static final String TABLE_CLUSTERING_KEYS = "table_clustering_keys";
public static final String TABLE_CLUSTERING_KEYS_LARGE = "table_clustering_keys_large";
public static final String TABLE_MULTI_PARTITION_CLUSTERING_KEYS = "table_multi_partition_clustering_keys";
Expand All @@ -51,6 +52,7 @@ public static void createTestTables(CassandraSession cassandraSession, String ke
createTableAllTypes(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES), date, 9);
createTableAllTypes(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES_INSERT), date, 0);
createTableAllTypesPartitionKey(cassandraSession, new SchemaTableName(keyspace, TABLE_ALL_TYPES_PARTITION_KEY), date);
createTableTupleType(cassandraSession, new SchemaTableName(keyspace, TABLE_TUPLE_TYPE));
createTableClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_CLUSTERING_KEYS), 9);
createTableClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_CLUSTERING_KEYS_LARGE), 1000);
createTableMultiPartitionClusteringKeys(cassandraSession, new SchemaTableName(keyspace, TABLE_MULTI_PARTITION_CLUSTERING_KEYS));
Expand Down Expand Up @@ -233,6 +235,20 @@ public static void createTableAllTypesPartitionKey(CassandraSession session, Sch
insertTestData(session, table, date, 9);
}

public static void createTableTupleType(CassandraSession session, SchemaTableName table)
{
session.execute("DROP TABLE IF EXISTS " + table);

session.execute("CREATE TABLE " + table + " (" +
" key int PRIMARY KEY, " +
" typetuple frozen<tuple<int, text, float>>" +
")");
session.execute(format("INSERT INTO %s (key, typetuple) VALUES (1, (1, 'text-1', 1.11))", table));
session.execute(format("INSERT INTO %s (key, typetuple) VALUES (2, (2, 'text-2', 2.22))", table));

assertEquals(session.execute("SELECT COUNT(*) FROM " + table).all().get(0).getLong(0), 2);
}

private static void insertTestData(CassandraSession session, SchemaTableName table, Date date, int rowsCount)
{
for (int rowNumber = 1; rowNumber <= rowsCount; rowNumber++) {
Expand Down
Loading

0 comments on commit 060e78f

Please sign in to comment.