Skip to content

Commit

Permalink
Retry sending messages only for retriable exceptions (#29)
Browse files Browse the repository at this point in the history
* Fix futures array loop in retrying_send
  Nil values were removed from the futures array before looping, causing
  wrong indexes relative to the batch array.

* Update producer exception logging

* Also retry Kafka `InterruptException`s to preserve existing behaviour.

* Document new retry logic and bump version to v10.5.0
  • Loading branch information
praseodym authored Jul 20, 2020
1 parent 7f95509 commit ba70405
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 10.5.0
- Changed: retry sending messages only for retriable exceptions [#27](https://github.com/logstash-plugins/logstash-integration-kafka/pull/29)

## 10.4.1
- [DOC] Fixed formatting issues and made minor content edits [#43](https://github.com/logstash-plugins/logstash-integration-kafka/pull/43)
Expand Down
9 changes: 9 additions & 0 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ Kafka down, etc).

A value less than zero is a configuration error.

Starting with version 10.5.0, this plugin will only retry exceptions that are a subclass of
https://kafka.apache.org/25/javadoc/org/apache/kafka/common/errors/RetriableException.html[RetriableException]
and
https://kafka.apache.org/25/javadoc/org/apache/kafka/common/errors/InterruptException.html[InterruptException].
If producing a message throws any other exception, an error is logged and the message is dropped without retrying.
This prevents the Logstash pipeline from hanging indefinitely.

In versions prior to 10.5.0, any exception is retried indefinitely unless the `retries` option is configured.

[id="plugins-{type}s-{plugin}-retry_backoff_ms"]
===== `retry_backoff_ms`

Expand Down
44 changes: 28 additions & 16 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def retrying_send(batch)
remaining = @retries

while batch.any?
if !remaining.nil?
unless remaining.nil?
if remaining < 0
# TODO(sissel): Offer to DLQ? Then again, if it's a transient fault,
# DLQing would make things worse (you dlq data that would be successful
Expand All @@ -255,27 +255,39 @@ def retrying_send(batch)
begin
# send() can throw an exception even before the future is created.
@producer.send(record)
rescue org.apache.kafka.common.errors.TimeoutException => e
rescue org.apache.kafka.common.errors.InterruptException,
org.apache.kafka.common.errors.RetriableException => e
logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
failures << record
nil
rescue org.apache.kafka.common.errors.InterruptException => e
failures << record
nil
rescue org.apache.kafka.common.errors.SerializationException => e
# TODO(sissel): Retrying will fail because the data itself has a problem serializing.
# TODO(sissel): Let's add DLQ here.
failures << record
rescue org.apache.kafka.common.KafkaException => e
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message,
:record_value => record.value)
nil
end
end.compact
end

futures.each_with_index do |future, i|
begin
result = future.get()
rescue => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
logger.warn("producer send failed", :exception => e.class, :message => e.message)
failures << batch[i]
# We cannot skip nils using `futures.compact` because then our index `i` will not align with `batch`
unless future.nil?
begin
future.get
rescue java.util.concurrent.ExecutionException => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or
e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException
logger.info("producer send failed, will retry sending", :exception => e.cause.class,
:message => e.cause.message)
failures << batch[i]
elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record", :exception => e.cause.class,
:message => e.cause.message, :record_value => batch[i].value)
end
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.4.1'
s.version = '10.5.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
48 changes: 40 additions & 8 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@
end
end

context "when KafkaProducer#send() raises an exception" do
context "when KafkaProducer#send() raises a retriable exception" do
let(:failcount) { (rand * 10).to_i }
let(:sendcount) { failcount + 1 }

let(:exception_classes) { [
org.apache.kafka.common.errors.TimeoutException,
org.apache.kafka.common.errors.DisconnectException,
org.apache.kafka.common.errors.CoordinatorNotAvailableException,
org.apache.kafka.common.errors.InterruptException,
org.apache.kafka.common.errors.SerializationException
] }

before do
Expand All @@ -88,6 +89,37 @@
end
end

context "when KafkaProducer#send() raises a non-retriable exception" do
let(:failcount) { (rand * 10).to_i }

let(:exception_classes) { [
org.apache.kafka.common.errors.SerializationException,
org.apache.kafka.common.errors.RecordTooLargeException,
org.apache.kafka.common.errors.InvalidTopicException
] }

before do
count = 0
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.exactly(1).times
.and_wrap_original do |m, *args|
if count < failcount # fail 'failcount' times in a row.
count += 1
# Pick an exception at random
raise exception_classes.shuffle.first.new("injected exception for testing")
else
m.call(*args) # call original
end
end
end

it "should not retry" do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
kafka.register
kafka.multi_receive([event])
end
end

context "when a send fails" do
context "and the default retries behavior is used" do
# Fail this many times and then finally succeed.
Expand All @@ -107,7 +139,7 @@
# inject some failures.

# Return a custom Future that will raise an exception to simulate a Kafka send() problem.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future = java.util.concurrent.FutureTask.new { raise org.apache.kafka.common.errors.TimeoutException.new("Failed") }
future.run
future
else
Expand All @@ -129,7 +161,7 @@
.once
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future = java.util.concurrent.FutureTask.new { raise org.apache.kafka.common.errors.TimeoutException.new("Failed") }
future.run
future
end
Expand All @@ -143,7 +175,7 @@
.once
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future = java.util.concurrent.FutureTask.new { raise org.apache.kafka.common.errors.TimeoutException.new("Failed") }
future.run
future
end
Expand All @@ -164,7 +196,7 @@
.at_most(max_sends).times
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future = java.util.concurrent.FutureTask.new { raise org.apache.kafka.common.errors.TimeoutException.new("Failed") }
future.run
future
end
Expand All @@ -175,10 +207,10 @@

it 'should only sleep retries number of times' do
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.at_most(max_sends)
.at_most(max_sends).times
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future = java.util.concurrent.FutureTask.new { raise org.apache.kafka.common.errors.TimeoutException.new("Failed") }
future.run
future
end
Expand Down

0 comments on commit ba70405

Please sign in to comment.