From 8afed613955f92a54c8fbecb564b0f87e75eeece Mon Sep 17 00:00:00 2001 From: Aaruna Godthi Date: Wed, 13 Apr 2016 20:30:10 +0530 Subject: [PATCH] Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns --- build.sbt | 18 +-- src/it/resources/setup.cql | 7 ++ .../cassandra/CassandraSinkTaskSpec.scala | 82 ++++++++++++- .../connect/cassandra/CassandraSinkTask.scala | 114 +++++++++++++++--- 4 files changed, 190 insertions(+), 31 deletions(-) diff --git a/build.sbt b/build.sbt index d7b24b4..19218f8 100644 --- a/build.sbt +++ b/build.sbt @@ -30,14 +30,16 @@ libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % cassandra, //was: 2.1.9 "org.scalatest" %% "scalatest" % "2.2.6" % "test,it", "org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it", - "ch.qos.logback" % "logback-classic" % "1.0.7" % "test,it", - CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, minor)) if minor < 11 => - "org.slf4j" % "slf4j-api" % "1.7.13" - case _ => - "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0" - } -) + "ch.qos.logback" % "logback-classic" % "1.0.7" % "test,it" +) ++ (CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, minor)) if minor < 11 => Seq( + "org.slf4j" % "slf4j-api" % "1.7.13" + ) + case _ => Seq( + "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", + "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4" + ) +}) publishMavenStyle := true diff --git a/src/it/resources/setup.cql b/src/it/resources/setup.cql index cf7b6d6..221be6c 100644 --- a/src/it/resources/setup.cql +++ b/src/it/resources/setup.cql @@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv ( value int, PRIMARY KEY (key)); +CREATE TABLE IF NOT EXISTS test.fieldmap ( + new_key text, + new_value int, + new_nested text, + new_dnested text, + PRIMARY KEY (new_key)); + CREATE TABLE test.playlists ( id bigint, song_order int, diff --git a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala index 28e3a9a..3b4b23f 100644 --- a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala +++ b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala @@ -18,17 +18,19 @@ */ package com.tuplejump.kafka.connect.cassandra -import scala.collection.JavaConverters._ import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct} import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext} -class CassandraSinkTaskSpec extends AbstractFlatSpec { +import scala.collection.JavaConverters._ +import scala.util.parsing.json.JSONObject - val topicName = "test_kv_topic" - val tableName = "test.kv" - val config = sinkConfig((topicName, tableName)) +class CassandraSinkTaskSpec extends AbstractFlatSpec { it should "start sink task" in { + val topicName = "test_kv_topic" + val tableName = "test.kv" + val config = sinkConfig((topicName, tableName)) + val sinkTask = new CassandraSinkTask() val mockContext = mock[SinkTaskContext] @@ -41,6 +43,7 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { it should "save records in cassandra" in { val topicName = "test_kv_topic" val tableName = "test.kv" + val config = sinkConfig((topicName, tableName)) val sinkTask = new CassandraSinkTask() val mockContext = mock[SinkTaskContext] @@ -67,4 +70,73 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { rowCount should be(2) } + + it should "save records in cassandra with custom field mapping" in { + val topicName = "test_fieldmap_topic" + val tableName = "test.fieldmap" + val config = sinkConfig((topicName, tableName)) + + val sinkTask = new CassandraSinkTask() + val mockContext = mock[SinkTaskContext] + + val fieldMapping: JSONObject = JSONObject(Map( + "key" -> "new_key", + "value" -> "new_value", + "nvalue" -> JSONObject(Map( + "blah1" -> "new_nested", + "blah2" -> JSONObject(Map( + "blah2" -> "new_dnested" + )) + )) + )) + + sinkTask.initialize(mockContext) + sinkTask.start((config + ("field.mapping" -> fieldMapping.toString())).asJava) + + val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1) + .field("blah1", Schema.STRING_SCHEMA) + .field("blah2", Schema.STRING_SCHEMA).build + val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1) + .field("blah1", Schema.STRING_SCHEMA) + .field("blah2", doubleNestedSchema).build + val valueSchema = SchemaBuilder.struct.name("record").version(1) + .field("key", Schema.STRING_SCHEMA) + .field("value", Schema.INT32_SCHEMA) + .field("nvalue", nestedSchema).build + + val dnestedValue1 = new Struct(doubleNestedSchema) + .put("blah1", "dnes_blah1_1") + .put("blah2", "dnes_blah2_1") + val nestedValue1 = new Struct(nestedSchema) + .put("blah1", "nes_blah1_1") + .put("blah2", dnestedValue1) + val value1 = new Struct(valueSchema) + .put("key", "pqr") + .put("value", 15) + .put("nvalue", nestedValue1) + + val dnestedValue2 = new Struct(doubleNestedSchema) + .put("blah1", "dnes_blah1_2") + .put("blah2", "dnes_blah2_2") + val nestedValue2 = new Struct(nestedSchema) + .put("blah1", "nes_blah1_2") + .put("blah2", dnestedValue2) + val value2 = new Struct(valueSchema) + .put("key", "abc") + .put("value", 17) + .put("nvalue", nestedValue2) + + val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0) + val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0) + + sinkTask.put(List(record1, record2).asJavaCollection) + + sinkTask.stop() + + val session = CassandraCluster.local.connect + val result = session.execute(s"select count(1) from $tableName").one() + val rowCount = result.getLong(0) + rowCount should be(2) + } + } diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala index e8b02fb..d187b85 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala @@ -16,28 +16,32 @@ package com.tuplejump.kafka.connect.cassandra -import java.util.{Collection => JCollection, Map => JMap, Date => JDate} +import java.util.{Collection => JCollection, Date => JDate, Map => JMap} -import org.apache.kafka.connect.connector.Task - -import scala.collection.JavaConverters._ +import com.tuplejump.kafka.connect.cassandra.Configuration.Query import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition -import org.apache.kafka.connect.sink.{SinkRecord, SinkTask} -import org.apache.kafka.connect.errors.{ConnectException, DataException} -import org.apache.kafka.connect.data.{Schema, Struct, Timestamp} +import org.apache.kafka.connect.connector.Task import org.apache.kafka.connect.data.Schema.Type._ +import org.apache.kafka.connect.data.{Field, Schema, Struct, Timestamp} +import org.apache.kafka.connect.errors.{ConnectException, DataException} +import org.apache.kafka.connect.sink.{SinkRecord, SinkTask} + +import scala.collection.JavaConverters._ class CassandraSinkTask extends SinkTask with TaskLifecycle { + import CassandraSinkTask._ + private val FieldMappingParam = "field.mapping" + def taskClass: Class[_ <: Task] = classOf[CassandraSinkTask] override def put(records: JCollection[SinkRecord]): Unit = records.asScala.foreach { record => configuration.find(record.topic) match { case Some(topic) => - val query = convert(record, topic) + val query: Query = convert(record, topic, configuration.config.get(FieldMappingParam)) session.execute(query) case other => throw new ConnectException("Failed to get cassandra session.") @@ -51,33 +55,107 @@ class CassandraSinkTask extends SinkTask with TaskLifecycle { /** INTERNAL API. */ private[kafka] object CassandraSinkTask { + import Configuration._ + import scala.util.parsing.json._ + /* TODO: Use keySchema, partition and kafkaOffset TODO: Add which types are currently supported in README */ - def convert(record: SinkRecord, sink: SinkConfig): Query = { + def convert(record: SinkRecord, sink: SinkConfig, fieldMappingProperty: Option[String] = None): Query = { + val colNamesVsValues: Map[String, String] = { + fieldMappingProperty match { + case Some(fieldMappingString) => convertToCqlData(record, fieldMappingString) + case None => convertToCqlData(record) + } + } + colNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match { + case columnNames :: columnValues :: Nil => + s"INSERT INTO ${sink.namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})" + } + } + + def convertToCqlData(record: SinkRecord): (Map[String, String]) = { val valueSchema = record.valueSchema - val columnNames = valueSchema.fields.asScala.map(_.name).toSet - val columnValues = valueSchema.`type`() match { + valueSchema.`type`() match { case STRUCT => val struct: Struct = record.value.asInstanceOf[Struct] - columnNames.map(schema(valueSchema, struct, _)).mkString(",") + valueSchema.fields.asScala.map { field => + (field.name, schema(valueSchema, struct, field)) + }.toMap case other => throw new DataException( s"Unable to create insert statement with unsupported value schema type $other.") } - s"INSERT INTO ${sink.namespace}(${columnNames.mkString(",")}) VALUES($columnValues)" } /* TODO support all types. */ - def schema(valueSchema: Schema, result: Struct, col: String): AnyRef = - valueSchema.field(col).schema match { + def schema(valueSchema: Schema, result: Struct, field: Field): String = { + field.schema match { case x if x.`type`() == Schema.STRING_SCHEMA.`type`() => - s"'${result.get(col).toString}'" + s"'${result.get(field).toString}'" case x if x.name() == Timestamp.LOGICAL_NAME => - val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate]) + val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate]) s"$time" case y => - result.get(col) + String.valueOf(result.get(field)) + } + } + + + // scalastyle:off + def convertToCqlData(record: SinkRecord, fieldMappingString: String): Map[String, String] = { + lazy val exception = new DataException(s"Invalid fieldMapping received - $fieldMappingString") + val result = scala.collection.mutable.Map.empty[String, String] + JSON.parseFull(fieldMappingString) match { + case Some(data) => + data match { + case map: Map[_, _] => + val valueSchema = record.valueSchema + valueSchema.`type`() match { + case STRUCT => + val struct: Struct = record.value.asInstanceOf[Struct] + populateResultsMap(result, struct, map.asInstanceOf[Map[String, Any]]) + case other => + throw new DataException( + s"Unable to create insert statement with unsupported value schema type $other.") + } + case other => + throw exception + } + case None => + throw exception } + result.toMap + } + + private def populateResultsMap(results: scala.collection.mutable.Map[String, String], struct: Struct, fieldMapping: Map[String, Any]): Unit = { + lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema") + struct.schema.fields.asScala.foreach { field => + val fieldMappingValue = fieldMapping.get(field.name) + field.schema.`type`() match { + case STRUCT => + fieldMappingValue match { + case Some(value) => + value match { + case newMap: Map[_, _] => populateResultsMap(results, struct.get(field).asInstanceOf[Struct], newMap.asInstanceOf[Map[String, Any]]) + case _ => throw exception + } + case None => + } + case _ => + fieldMappingValue match { + case Some(value) => + value match { + case strValue: String => results.put(strValue, schema(field.schema, struct, field)) + case _ => throw exception + } + case None => + } + } + } + } + + // scalastyle:on + } \ No newline at end of file