diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index 3d60dc02c..56df29d16 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -314,6 +314,14 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { ) } + val customIntConverter = new TypeConverter[java.lang.Integer] { + def targetTypeTag = typeTag[java.lang.Integer] + + def convertPF = { + case x: String => x.toInt + 1 + } + } + "A CassandraRDD" should "allow to read a Cassandra table as Array of CassandraRow" in { val result = sc.cassandraTable(ks, "key_value").collect() result should have length 3 @@ -1490,4 +1498,107 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { 1L: java.lang.Long) } } + + it should "allow for converting fields to map types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Map[Int, Short]] { + def targetTypeTag = typeTag[Map[Int, Short]] + + def convertPF = { + case x: String => Map(x.toInt -> x.toShort) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter (id text primary key, test_map map)") + sc.parallelize(Seq(("id", "5"))).saveToCassandra(ks, "test_table_converter") + val res = sc.cassandraTable(ks, "test_table_converter").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getMap[Int, Int](1) should be (Map(5 -> 5)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter""") + } + } + + it should "use the default map converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter2 (id text primary key, test_map map)") + sc.parallelize(Seq(("id", Map("5" -> "5")))).saveToCassandra(ks, "test_table_converter2") + val res = sc.cassandraTable(ks, "test_table_converter2").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getMap[Int, Int](1) should be (Map(6 -> 6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter2""") + } + } + + it should "allow for converting fields to list types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Vector[Int]] { + def targetTypeTag = typeTag[Vector[Int]] + def convertPF = { + case x: String => Vector(x.toInt, x.toInt) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter3 (id text primary key, test_list list)") + sc.parallelize(Seq(("id", "7"))).saveToCassandra(ks, "test_table_converter3") + val res = sc.cassandraTable(ks, "test_table_converter3").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getList[Int](1) should be (Vector(7, 7)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter3""") + } + } + + it should "use the default list converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter4 (id text primary key, test_list list)") + sc.parallelize(Seq(("id", Vector("5")))).saveToCassandra(ks, "test_table_converter4") + val res = sc.cassandraTable(ks, "test_table_converter4").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getList[Int](1) should be (Vector(6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter4""") + } + } + + it should "allow for converting fields to set types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Set[Int]] { + def targetTypeTag = typeTag[Set[Int]] + def convertPF = { + case x: String => Set(x.toInt, x.toInt + 1) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter5 (id text primary key, test_set set)") + sc.parallelize(Seq(("id", "7"))).saveToCassandra(ks, "test_table_converter5") + val res = sc.cassandraTable(ks, "test_table_converter5").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getSet[Int](1) should be (Set(7, 8)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter5""") + } + } + + it should "use the default set converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter6 (id text primary key, test_list set)") + sc.parallelize(Seq(("id", Set("5")))).saveToCassandra(ks, "test_table_converter6") + val res = sc.cassandraTable(ks, "test_table_converter6").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getSet[Int](1) should be (Set(6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter6""") + } + } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala index 7cd7f847d..2bbc323c1 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala @@ -2,53 +2,54 @@ package com.datastax.spark.connector.types import scala.language.existentials import scala.reflect.runtime.universe._ +import scala.reflect.ClassTag +import com.datastax.spark.connector.types.TypeConverter.{MapConverter, VectorConverter, SetConverter} + trait CollectionColumnType[T] extends ColumnType[T] { def isCollection = true } case class ListType[T](elemType: ColumnType[T]) extends CollectionColumnType[Vector[T]] { - - @transient - lazy val scalaTypeTag = { - implicit val elemTypeTag = elemType.scalaTypeTag - implicitly[TypeTag[Vector[T]]] - } - + @transient implicit lazy val elemTypeTag = elemType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Vector[T]]] def cqlTypeName = s"list<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter(TypeConverter.listConverter(elemType.converterToCassandra)) + TypeConverter.forType[Vector[T]] match { + case tc: VectorConverter[_] => + new TypeConverter.OptionToNullConverter(TypeConverter.listConverter(elemType.converterToCassandra)) + case tc => tc + } } case class SetType[T](elemType: ColumnType[T]) extends CollectionColumnType[Set[T]] { - - @transient - lazy val scalaTypeTag = { - implicit val elemTypeTag = elemType.scalaTypeTag - implicitly[TypeTag[Set[T]]] - } - + @transient implicit lazy val elemTypeTag = elemType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Set[T]]] def cqlTypeName = s"set<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter(TypeConverter.setConverter(elemType.converterToCassandra)) + TypeConverter.forType[Set[T]] match { + case tc: SetConverter[_] => + new TypeConverter.OptionToNullConverter( + TypeConverter.setConverter(elemType.converterToCassandra)) + case tc => tc + } } case class MapType[K, V](keyType: ColumnType[K], valueType: ColumnType[V]) extends CollectionColumnType[Map[K, V]] { - - @transient - lazy val scalaTypeTag = { - implicit val keyTypeTag = keyType.scalaTypeTag - implicit val valueTypeTag = valueType.scalaTypeTag - implicitly[TypeTag[Map[K, V]]] - } - + @transient implicit lazy val keyTypeTag = keyType.scalaTypeTag + @transient implicit lazy val valueTypeTag = valueType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Map[K, V]]] def cqlTypeName = s"map<${keyType.cqlTypeName}, ${valueType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter( - TypeConverter.mapConverter(keyType.converterToCassandra, valueType.converterToCassandra)) + TypeConverter.forType[Map[K, V]] match { + case tc: MapConverter[_, _] => + new TypeConverter.OptionToNullConverter( + TypeConverter.mapConverter(keyType.converterToCassandra, valueType.converterToCassandra)) + case tc => tc + } }