diff --git a/examples/dlq_consumer.rb b/examples/dlq_consumer.rb new file mode 100644 index 00000000..6d27c73b --- /dev/null +++ b/examples/dlq_consumer.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class DlqConsumer < Racecar::Consumer + subscribes_to "messages", start_from_beginning: false + dead_letter_queue topic: "dlq" + + def process(message) + # simulate failure + if message.key == 'key-10' + raise "oops" + end + + puts message.value + end +end diff --git a/lib/racecar/consumer.rb b/lib/racecar/consumer.rb index 84c8db0f..0905db98 100644 --- a/lib/racecar/consumer.rb +++ b/lib/racecar/consumer.rb @@ -10,6 +10,7 @@ class << self attr_accessor :max_wait_time attr_accessor :group_id attr_accessor :producer, :consumer, :parallel_workers, :fetch_messages + attr_reader :dlq_topic, :dlq_retries def subscriptions @subscriptions ||= [] @@ -38,6 +39,11 @@ def subscribes_to( end end + def dead_letter_queue(topic:, retries: 3) + @dlq_topic = topic + @dlq_retries = retries + end + # Rebalance hooks for subclasses to override def on_partitions_assigned(rebalance_event); end def on_partitions_revoked(rebalance_event); end diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index 4d75687f..4766c349 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -35,6 +35,16 @@ def setup_pauses raise ArgumentError, "Invalid value for pause_timeout: must be integer greater or equal -1" end + # why is it nested hash? + # its + # { + # "topic" => { + # "partition" => pause instance + # } + # "topic1" => { + # "partition" => pause instance + # } + # } @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| h2[k2] = ::Racecar::Pause.new( @@ -180,17 +190,33 @@ def process(message) @instrumenter.instrument("start_process_message", instrumentation_payload) with_pause(message.topic, message.partition, message.offset..message.offset) do |pause| - begin - @instrumenter.instrument("process_message", instrumentation_payload) do - processor.process(Racecar::Message.new(message, retries_count: pause.pauses_count)) - processor.deliver! - consumer.store_offset(message) + if processor.class.dlq_topic && pause.pauses_count > processor.class.dlq_retries + begin + @instrumenter.instrument("dlq", instrumentation_payload) do + processor.send(:produce, message.payload, topic: processor.class.dlq_topic, headers: instrumentation_payload) + processor.deliver! + consumer.store_offset(message) + end + rescue => e + logger.error e.to_s + logger.error "Error moving msg to dlq" + instrumentation_payload[:dlq_produce_error] = true + config.error_handler.call(e, instrumentation_payload) + raise e + end + else + begin + @instrumenter.instrument("process_message", instrumentation_payload) do + processor.process(Racecar::Message.new(message, retries_count: pause.pauses_count)) + processor.deliver! + consumer.store_offset(message) + end + rescue => e + instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) + instrumentation_payload[:retries_count] = pause.pauses_count + config.error_handler.call(e, instrumentation_payload) + raise e end - rescue => e - instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) - instrumentation_payload[:retries_count] = pause.pauses_count - config.error_handler.call(e, instrumentation_payload) - raise e end end end diff --git a/spec/runner_spec.rb b/spec/runner_spec.rb index 3e823fb9..f76db54a 100644 --- a/spec/runner_spec.rb +++ b/spec/runner_spec.rb @@ -94,6 +94,11 @@ class TestNilConsumer < Racecar::Consumer subscribes_to "greetings" end +class TestDlqConsumer < TestConsumer + dead_letter_queue topic: "dlq", retries: 2 + subscribes_to "greetings" +end + class FakeConsumer def initialize(kafka, runner) @kafka = kafka @@ -469,6 +474,7 @@ def event_raised_errors?(event_name) runner.run end + context 'with instrumentation enabled' do let(:message_instrumentation) do { @@ -762,4 +768,37 @@ def process_batch(batch, hello); end end end end + + context "with dlq enabled" do + let(:processor) { TestDlqConsumer.new } + + it "delivers the bad messages to dlq topic", focus: true do + error = StandardError.new("surprise") + kafka.deliver_message(error, topic: "greetings") + + runner.run + + messages = kafka.messages_in("dlq") + + expect(messages.map(&:payload)).to eq [error] + end + + it "instruments dlq messages" do + allow(instrumenter).to receive(:instrument).and_call_original + error = StandardError.new("surprise") + kafka.deliver_message(error, topic: "greetings") + + runner.run + + expect(instrumenter).to have_received(:instrument).with("dlq", a_hash_including(:create_time, consumer_class: "TestDlqConsumer", topic: "greetings", value: error)) + end + + it "retries before producing to dlq" do + error = StandardError.new("surprise") + kafka.deliver_message(error, topic: "greetings") + + expect(config.error_handler).to receive(:call).exactly(3).times + runner.run + end + end end