Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Kinesis stream how to call shutdown on source? #60

Closed
MeiSign opened this issue Apr 17, 2018 · 1 comment
Closed

Kinesis stream how to call shutdown on source? #60

MeiSign opened this issue Apr 17, 2018 · 1 comment

Comments

@MeiSign
Copy link

MeiSign commented Apr 17, 2018

We are using the Kinesis object to create a kinesis stream source:

Kinesis
  .source(StreamConfigName)
  .via(flow)
  .runWith(Sink.foreach[CommittableEvent[ConsumerEvent]](_.commit()))

When we shutdown the application in which this stream is processed we produce this exception:

2018-04-17 09:59:28.934 ERROR c.w.r.k.c.ConsumerProcessingManager - Unexpected exception on shutdown, final checkpoint attempt may have failed
akka.pattern.AskTimeoutException: Recipient[Actor[akka://user/consumer-worker-c28d4f4c-4214-11e8-a2d7-f70fbc38ec42#-1898192529]] had already been terminated. Sender[null] sent the message of type "com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker$GracefulShutdown".
at akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:290)
at akka.pattern.AskableActorRef$.$qmark$extension1(AskSupport.scala:282)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager$$anonfun$shutdown$2.apply(ConsumerProcessingManager.scala:143)
at scala.util.Try$.apply(Try.scala:192)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.shutdown(ConsumerProcessingManager.scala:142)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.shutdownRequested(ConsumerProcessingManager.scala:128)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotificationTask.call(ShutdownNotificationTask.java:44)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

How can I call the stop method on the KinesisConsumer to shutdown gracefully? If I understood the documentation correctly, the KinesisConsumer is used inside the Source. But The source does not return any handle so I could trigger a graceful shutdown, does it?

@markglh
Copy link
Contributor

markglh commented Apr 23, 2018

Fixed in #61

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

No branches or pull requests

2 participants