Skip to content

Commit

Permalink
Force checkpoint before shutting down terminated shards
Browse files Browse the repository at this point in the history
  • Loading branch information
easel committed Oct 15, 2019
1 parent 6df65b0 commit 308d9bd
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{
IRecordProcessor,
IShutdownNotificationAware
}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.clientlibrary.types._
import com.amazonaws.services.kinesis.model.Record
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -37,7 +37,6 @@ import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
}
import com.weightwatchers.reactive.kinesis.models.{CompoundSequenceNumber, ConsumerEvent}
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.JavaConverters._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -129,9 +128,15 @@ private[consumer] class ConsumerProcessingManager(
}

override def shutdown(shutdownInput: ShutdownInput): Unit = {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
if (shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}. Forcing checkpoint."
)
shutdownInput.getCheckpointer.checkpoint()
} else
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
shutdown(shutdownInput.getCheckpointer)
}

Expand Down

0 comments on commit 308d9bd

Please sign in to comment.