From 05f66463e7ff9fbfdcc999f30391974d0fbf7a31 Mon Sep 17 00:00:00 2001
From: Shiti <ssaxena.ece@gmail.com>
Date: Mon, 21 Mar 2016 14:45:15 +0530
Subject: [PATCH] fixed errors in timestamp support and added tests

---
 build.sbt                                     |  2 +-
 src/it/resources/setup.cql                    |  8 +++
 .../cassandra/CassandraSourceTaskSpec.scala   | 67 ++++++++++++++++++-
 .../cassandra/CassandraSourceTask.scala       |  4 +-
 .../connect/cassandra/Configuration.scala     | 20 ++++--
 5 files changed, 89 insertions(+), 12 deletions(-)

diff --git a/build.sbt b/build.sbt
index c5d1463..967647a 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,6 +1,6 @@
 name := "kafka-connect-cassandra"
 
-version := "0.0.4"
+version := "0.0.5"
 
 crossScalaVersions := Seq("2.11.7", "2.10.6")
 
diff --git a/src/it/resources/setup.cql b/src/it/resources/setup.cql
index 9ce8bfa..cf7b6d6 100644
--- a/src/it/resources/setup.cql
+++ b/src/it/resources/setup.cql
@@ -56,3 +56,11 @@ CREATE TABLE githubstats.monthly_commits (
   year INT,
   PRIMARY KEY ((user), year, month)
 ) WITH CLUSTERING ORDER BY (year DESC, month DESC);
+
+CREATE TABLE IF NOT EXISTS test.event_store(
+    app_id text,
+    event_type text,
+    subscription_type text,
+    event_ts timestamp,
+    PRIMARY KEY((app_id, event_type), event_ts)
+    );
\ No newline at end of file
diff --git a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTaskSpec.scala b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTaskSpec.scala
index dfde522..60eeae7 100644
--- a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTaskSpec.scala
+++ b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTaskSpec.scala
@@ -19,9 +19,12 @@
 
 package com.tuplejump.kafka.connect.cassandra
 
-import scala.collection.JavaConverters._
+import java.util.concurrent.TimeUnit
+
 import org.apache.kafka.connect.source.SourceTaskContext
 
+import scala.collection.JavaConverters._
+
 class CassandraSourceTaskSpec extends AbstractFlatSpec {
 
   val query = "SELECT * FROM test.playlists"
@@ -39,7 +42,7 @@ class CassandraSourceTaskSpec extends AbstractFlatSpec {
     sourceTask.stop()
   }
 
-  it should "fetch records from cassandra" in {
+  it should "fetch records from cassandra in bulk" in {
     val sourceTask = new CassandraSourceTask()
     val mockContext = mock[SourceTaskContext]
 
@@ -53,4 +56,64 @@ class CassandraSourceTaskSpec extends AbstractFlatSpec {
     sourceTask.stop()
   }
 
+  def insertStmt(time: Long): String = {
+    "INSERT INTO test.event_store(app_id,event_type,subscription_type,event_ts) " +
+      s"VALUES ('website','renewal','annual',$time)"
+  }
+
+  it should "fetch only new records from cassandra" in {
+    val timeBasedQuery =
+      """SELECT * FROM test.event_store WHERE app_id='website' AND event_type='renewal'
+        | AND event_ts >= previousTime()""".stripMargin
+
+    val topic = "events"
+    val cassandraSourceConfig = sourceConfig(timeBasedQuery, topic)
+
+    val sourceTask = new CassandraSourceTask()
+    val mockContext = mock[SourceTaskContext]
+
+    sourceTask.initialize(mockContext)
+    sourceTask.start(cassandraSourceConfig.asJava)
+
+    val oneHrAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)
+    sourceTask.session.execute(insertStmt(oneHrAgo))
+
+    sourceTask.poll().size() should be(0)
+
+    val oneHrLater = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)
+    sourceTask.session.execute(insertStmt(oneHrLater))
+
+    val result = sourceTask.poll()
+
+    result.size() should be(1)
+
+    sourceTask.stop()
+  }
+
+  it should "fetch records from cassandra in given pollInterval" in {
+    val timeBasedQuery =
+      """SELECT * FROM test.event_store WHERE app_id='website' AND event_type='renewal'
+        | AND event_ts >= previousTime() AND event_ts <= currentTime()""".stripMargin
+
+    val topic = "events"
+    val cassandraSourceConfig = sourceConfig(timeBasedQuery, topic)
+
+    val sourceTask = new CassandraSourceTask()
+    val mockContext = mock[SourceTaskContext]
+
+    sourceTask.initialize(mockContext)
+    sourceTask.start(cassandraSourceConfig.asJava)
+
+    val oneHrLater = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)
+    sourceTask.session.execute(insertStmt(oneHrLater))
+
+    val fewSecLater = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)
+    sourceTask.session.execute(insertStmt(fewSecLater))
+
+    val result = sourceTask.poll()
+
+    result.size() should be(1)
+
+    sourceTask.stop()
+  }
 }
diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTask.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTask.scala
index 85f2644..a25c611 100644
--- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTask.scala
+++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSourceTask.scala
@@ -57,8 +57,8 @@ class CassandraSourceTask extends SourceTask with TaskLifecycle {
         //TODO remove https://github.com/tuplejump/kafka-connector/issues/9
         Thread.sleep(source.pollInterval)
 
-        source slide timestamp
-        read(source)
+        val updatedSource = source slide timestamp
+        read(updatedSource)
       case source =>
         read(source)
     }
diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/Configuration.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/Configuration.scala
index 636abd6..d71267f 100644
--- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/Configuration.scala
+++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/Configuration.scala
@@ -31,7 +31,7 @@ import Configuration._
   *
   * TODO CassandraConnectionConfig
   */
-private[kafka] final class Configuration private(val config: immutable.Map[String,String],
+private[kafka] final class Configuration private(val config: immutable.Map[String, String],
                                                  val source: Option[SourceConfig],
                                                  val sink: immutable.List[SinkConfig]) {
 
@@ -70,8 +70,12 @@ object Configuration {
 
   final val PreviousTime = "previousTime()"
 
+  final val PreviousTimeRegex = "previousTime\\(\\)"
+
   final val CurrentTime = "currentTime()"
 
+  final val CurrentTimeRegex = "currentTime\\(\\)"
+
   val Empty = new Configuration(Map.empty, SourceConfig.Empty, Nil)
 
   /** Returns a new [[com.tuplejump.kafka.connect.cassandra.Configuration]]. */
@@ -127,8 +131,9 @@ object Configuration {
     def slide(now: Long): SourceConfig =
       if (timeseries) {
         val timestamp = now - pollInterval
-        copy(query = query.replaceAll(PreviousTime, s"$timestamp")
-          .replaceAll(CurrentTime, s"$now"))
+        val updatedQuery: Query = query.replaceAll(PreviousTimeRegex, s"$timestamp")
+          .replaceAll(CurrentTimeRegex, s"$now")
+        copy(query = updatedQuery)
       } else this
 
     def timeseries: Boolean =
@@ -140,20 +145,20 @@ object Configuration {
 
     val Empty = SourceConfig(Map.empty)
 
-    def apply(config: Map[String,String]): Option[SourceConfig] =
+    def apply(config: Map[String, String]): Option[SourceConfig] =
       for {
         topic <- get(config, TopicKey)
         query <- get(config, QueryKey)
       } yield SourceConfig(topic, query, pollInterval(config), None, None)
 
-    private def pollInterval(config: Map[String,String]): Long =
+    private def pollInterval(config: Map[String, String]): Long =
       get(config, PollInterval).map(_.toLong).getOrElse(DefaultPollInterval)
   }
 
   /** A Kafka [[CassandraSink]] and [[CassandraSinkTask]] configuration.
     * INTERNAL API.
     *
-    * @param topic the kafka `topic` name
+    * @param topic     the kafka `topic` name
     * @param namespace the cassandra `keyspace.table`
     */
   private[kafka] final case class SinkConfig(val topic: TopicName,
@@ -218,7 +223,7 @@ object Configuration {
     val ConsistencyLevelKey = "cassandra.output.consistency.level"
     val DefaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM
 
-    /**  The maximum total size of the batch in bytes. Overridden by BatchSizeRowsParam. */
+    /** The maximum total size of the batch in bytes. Overridden by BatchSizeRowsParam. */
     val BatchSizeBytesKey = "cassandra.output.batch.size.bytes"
     val BatchBufferSize = 1024
 
@@ -227,4 +232,5 @@ object Configuration {
     val DefaultParallelismLevel = "5"
 
   }
+
 }