From a377c689333139867559b645ef7d18eef429ca53 Mon Sep 17 00:00:00 2001 From: aashish Date: Fri, 14 Dec 2018 14:29:31 -0500 Subject: [PATCH 1/2] allow custom type converters to be utilized for collection types --- .../connector/rdd/CassandraRDDSpec.scala | 111 ++++++++++++++++++ .../types/CollectionColumnType.scala | 53 +++++---- 2 files changed, 138 insertions(+), 26 deletions(-) diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala index 007e86f94..424a96a31 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala @@ -314,6 +314,14 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { ) } + val customIntConverter = new TypeConverter[java.lang.Integer] { + def targetTypeTag = typeTag[java.lang.Integer] + + def convertPF = { + case x: String => x.toInt + 1 + } + } + "A CassandraRDD" should "allow to read a Cassandra table as Array of CassandraRow" in { val result = sc.cassandraTable(ks, "key_value").collect() result should have length 3 @@ -1490,4 +1498,107 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase { 1L: java.lang.Long) } } + + it should "allow for converting fields to map types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Map[Int, Short]] { + def targetTypeTag = typeTag[Map[Int, Short]] + + def convertPF = { + case x: String => Map(x.toInt -> x.toShort) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter (id text primary key, test_map map)") + sc.parallelize(Seq(("id", "5"))).saveToCassandra(ks, "test_table_converter") + val res = sc.cassandraTable(ks, "test_table_converter").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getMap[Int, Int](1) should be (Map(5 -> 5)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter""") + } + } + + it should "use the default map converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter2 (id text primary key, test_map map)") + sc.parallelize(Seq(("id", Map("5" -> "5")))).saveToCassandra(ks, "test_table_converter2") + val res = sc.cassandraTable(ks, "test_table_converter2").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getMap[Int, Int](1) should be (Map(6 -> 6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter2""") + } + } + + it should "allow for converting fields to list types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Vector[Int]] { + def targetTypeTag = typeTag[Vector[Int]] + def convertPF = { + case x: String => Vector(x.toInt, x.toInt) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter3 (id text primary key, test_list list)") + sc.parallelize(Seq(("id", "7"))).saveToCassandra(ks, "test_table_converter3") + val res = sc.cassandraTable(ks, "test_table_converter3").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getList[Int](1) should be (Vector(7, 7)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter3""") + } + } + + it should "use the default list converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter4 (id text primary key, test_list list)") + sc.parallelize(Seq(("id", Vector("5")))).saveToCassandra(ks, "test_table_converter4") + val res = sc.cassandraTable(ks, "test_table_converter4").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getList[Int](1) should be (Vector(6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter4""") + } + } + + it should "allow for converting fields to set types by user-defined TypeConverter" in { + conn.withSessionDo { session => + val customConverter = new TypeConverter[Set[Int]] { + def targetTypeTag = typeTag[Set[Int]] + def convertPF = { + case x: String => Set(x.toInt, x.toInt + 1) + } + } + TypeConverter.registerConverter(customConverter) + session.execute(s"create table $ks.test_table_converter5 (id text primary key, test_set set)") + sc.parallelize(Seq(("id", "7"))).saveToCassandra(ks, "test_table_converter5") + val res = sc.cassandraTable(ks, "test_table_converter5").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getSet[Int](1) should be (Set(7, 8)) + TypeConverter.unregisterConverter(customConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter5""") + } + } + + it should "use the default set converters when no other converter exists" in { + conn.withSessionDo { session => + TypeConverter.registerConverter(customIntConverter) + session.execute(s"create table $ks.test_table_converter6 (id text primary key, test_list set)") + sc.parallelize(Seq(("id", Set("5")))).saveToCassandra(ks, "test_table_converter6") + val res = sc.cassandraTable(ks, "test_table_converter6").collect + res.size should be (1) + res.head.getString(0) should be ("id") + res.head.getSet[Int](1) should be (Set(6)) + TypeConverter.unregisterConverter(customIntConverter) + session.execute(s"""DROP TABLE IF EXISTS $ks.test_table_converter6""") + } + } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala index 8826a3042..431ed2515 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala @@ -2,6 +2,9 @@ package com.datastax.spark.connector.types import scala.language.existentials import scala.reflect.runtime.universe._ +import scala.reflect.ClassTag +import com.datastax.spark.connector.types.TypeConverter.{MapConverter, VectorConverter, SetConverter} + import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock @@ -10,47 +13,45 @@ trait CollectionColumnType[T] extends ColumnType[T] { } case class ListType[T](elemType: ColumnType[T]) extends CollectionColumnType[Vector[T]] { - - @transient - lazy val scalaTypeTag = SparkReflectionLock.synchronized { - implicit val elemTypeTag = elemType.scalaTypeTag - implicitly[TypeTag[Vector[T]]] - } - + @transient implicit lazy val elemTypeTag = SparkReflectionLock.synchronized { elemType.scalaTypeTag } + @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Vector[T]]] } def cqlTypeName = s"list<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter(TypeConverter.listConverter(elemType.converterToCassandra)) + TypeConverter.forType[Vector[T]] match { + case tc: VectorConverter[_] => + new TypeConverter.OptionToNullConverter(TypeConverter.listConverter(elemType.converterToCassandra)) + case tc => tc + } } case class SetType[T](elemType: ColumnType[T]) extends CollectionColumnType[Set[T]] { - - @transient - lazy val scalaTypeTag = SparkReflectionLock.synchronized { - implicit val elemTypeTag = elemType.scalaTypeTag - implicitly[TypeTag[Set[T]]] - } - + @transient implicit lazy val elemTypeTag = SparkReflectionLock.synchronized { elemType.scalaTypeTag } + @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Set[T]]] } def cqlTypeName = s"set<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter(TypeConverter.setConverter(elemType.converterToCassandra)) + TypeConverter.forType[Set[T]] match { + case tc: SetConverter[_] => + new TypeConverter.OptionToNullConverter( + TypeConverter.setConverter(elemType.converterToCassandra)) + case tc => tc + } } case class MapType[K, V](keyType: ColumnType[K], valueType: ColumnType[V]) extends CollectionColumnType[Map[K, V]] { - - @transient - lazy val scalaTypeTag = SparkReflectionLock.synchronized { - implicit val keyTypeTag = keyType.scalaTypeTag - implicit val valueTypeTag = valueType.scalaTypeTag - implicitly[TypeTag[Map[K, V]]] - } - + @transient implicit lazy val keyTypeTag = SparkReflectionLock.synchronized { keyType.scalaTypeTag } + @transient implicit lazy val valueTypeTag = SparkReflectionLock.synchronized { valueType.scalaTypeTag } + @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Map[K, V]]] } def cqlTypeName = s"map<${keyType.cqlTypeName}, ${valueType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = - new TypeConverter.OptionToNullConverter( - TypeConverter.mapConverter(keyType.converterToCassandra, valueType.converterToCassandra)) + TypeConverter.forType[Map[K, V]] match { + case tc: MapConverter[_, _] => + new TypeConverter.OptionToNullConverter( + TypeConverter.mapConverter(keyType.converterToCassandra, valueType.converterToCassandra)) + case tc => tc + } } From a4bdb32cb21b3f694d94b25a1e91c828f3187a76 Mon Sep 17 00:00:00 2001 From: aashish Date: Fri, 14 Dec 2018 15:40:53 -0500 Subject: [PATCH 2/2] fix synchronization issue --- .../connector/types/CollectionColumnType.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala index c6b53c462..2bbc323c1 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/CollectionColumnType.scala @@ -11,8 +11,8 @@ trait CollectionColumnType[T] extends ColumnType[T] { } case class ListType[T](elemType: ColumnType[T]) extends CollectionColumnType[Vector[T]] { - @transient implicit lazy val elemTypeTag = SparkReflectionLock.synchronized { elemType.scalaTypeTag } - @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Vector[T]]] } + @transient implicit lazy val elemTypeTag = elemType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Vector[T]]] def cqlTypeName = s"list<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = @@ -24,8 +24,8 @@ case class ListType[T](elemType: ColumnType[T]) extends CollectionColumnType[Vec } case class SetType[T](elemType: ColumnType[T]) extends CollectionColumnType[Set[T]] { - @transient implicit lazy val elemTypeTag = SparkReflectionLock.synchronized { elemType.scalaTypeTag } - @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Set[T]]] } + @transient implicit lazy val elemTypeTag = elemType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Set[T]]] def cqlTypeName = s"set<${elemType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] = @@ -39,9 +39,9 @@ case class SetType[T](elemType: ColumnType[T]) extends CollectionColumnType[Set[ case class MapType[K, V](keyType: ColumnType[K], valueType: ColumnType[V]) extends CollectionColumnType[Map[K, V]] { - @transient implicit lazy val keyTypeTag = SparkReflectionLock.synchronized { keyType.scalaTypeTag } - @transient implicit lazy val valueTypeTag = SparkReflectionLock.synchronized { valueType.scalaTypeTag } - @transient lazy val scalaTypeTag = SparkReflectionLock.synchronized { implicitly[TypeTag[Map[K, V]]] } + @transient implicit lazy val keyTypeTag = keyType.scalaTypeTag + @transient implicit lazy val valueTypeTag = valueType.scalaTypeTag + @transient lazy val scalaTypeTag = implicitly[TypeTag[Map[K, V]]] def cqlTypeName = s"map<${keyType.cqlTypeName}, ${valueType.cqlTypeName}>" override def converterToCassandra: TypeConverter[_ <: AnyRef] =