Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Force checkpoint before shutting down terminated shards #79

Merged
merged 2 commits into from
Feb 12, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{
IRecordProcessor,
IShutdownNotificationAware
}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.clientlibrary.types._
import com.amazonaws.services.kinesis.model.Record
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -37,7 +37,6 @@ import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
}
import com.weightwatchers.reactive.kinesis.models.{CompoundSequenceNumber, ConsumerEvent}
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.JavaConverters._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -129,9 +128,15 @@ private[consumer] class ConsumerProcessingManager(
}

override def shutdown(shutdownInput: ShutdownInput): Unit = {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
if (shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) {
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}. Forcing checkpoint."
)
shutdownInput.getCheckpointer.checkpoint()
} else
logger.info(
s"Shutdown record processor for shard: $kinesisShardId. Reason: ${shutdownInput.getShutdownReason}"
)
shutdown(shutdownInput.getCheckpointer)
}

Expand Down