Skip to content

Commit

Permalink
create a racecar delivery handle wrapper to provide context in error …
Browse files Browse the repository at this point in the history
…messages
  • Loading branch information
breunigs committed Feb 24, 2021
1 parent 52948ce commit cb2a016
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 33 deletions.
27 changes: 14 additions & 13 deletions lib/racecar/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "racecar/message_delivery_error"
require "racecar/message_delivery_handle"

module Racecar
class Consumer
Expand Down Expand Up @@ -36,7 +37,7 @@ def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition:

def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config)
@producer = producer
@delivery_handles = []
@message_delivery_handles = []

@consumer = consumer

Expand All @@ -52,12 +53,12 @@ def teardown; end
# (e.g. downtime, configuration issue) or specific to the message being sent. The
# caller must handle the latter cases or run into head of line blocking.
def deliver!
@delivery_handles ||= []
if @delivery_handles.any?
instrumentation_payload = { delivered_message_count: @delivery_handles.size }
@message_delivery_handles ||= []
if @message_delivery_handles.any?
instrumentation_payload = { delivered_message_count: @message_delivery_handles.size }

@instrumenter.instrument('deliver_messages', instrumentation_payload) do
@delivery_handles.each do |handle|
@message_delivery_handles.each do |handle|
# rdkafka-ruby checks every wait_timeout seconds if the message was
# successfully delivered, up to max_wait_timeout seconds before raising
# Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to
Expand All @@ -69,25 +70,24 @@ def deliver!
# changing the interface).
handle.wait(max_wait_timeout: 60, wait_timeout: 0.1)
rescue Rdkafka::AbstractHandle::WaitTimeoutError => e
partition = MessageDeliveryError.partition_from_delivery_handle(handle)
# ideally we could use the logger passed to the Runner, but it is not
# available here. The runner sets it for Rdkafka, though, so we can use
# that instead.
@config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)"
@config.logger.debug "Still trying to deliver message to (partition #{handle.partition_text})... (will try up to Racecar.config.message_timeout)"
retry
rescue Rdkafka::RdkafkaError => e
raise MessageDeliveryError.new(e, handle)
end
end
end
@delivery_handles.clear
@message_delivery_handles.clear
end

protected

# https://github.com/appsignal/rdkafka-ruby#producing-messages
def produce(payload, topic:, key: nil, partition_key: nil, headers: nil, create_time: nil)
@delivery_handles ||= []
@message_delivery_handles ||= []
message_size = payload.respond_to?(:bytesize) ? payload.bytesize : 0
instrumentation_payload = {
value: payload,
Expand All @@ -97,18 +97,19 @@ def produce(payload, topic:, key: nil, partition_key: nil, headers: nil, create_
topic: topic,
message_size: message_size,
create_time: Time.now,
buffer_size: @delivery_handles.size,
buffer_size: @message_delivery_handles.size,
}

@instrumenter.instrument("produce_message", instrumentation_payload) do
@delivery_handles << @producer.produce(
params = {
topic: topic,
payload: payload,
key: key,
partition_key: partition_key,
timestamp: create_time,
headers: headers,
)
}
handle = @producer.produce(payload: payload, **params)
@message_delivery_handles << MessageDeliveryHandle.new(handle, **params)
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/racecar/ctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "racecar/rails_config_file_loader"
require "racecar/daemon"
require "racecar/message_delivery_error"
require "racecar/message_delivery_handle"

module Racecar
class Ctl
Expand Down Expand Up @@ -106,7 +107,8 @@ def produce(args)
begin
handle.wait(max_wait_timeout: Racecar.config.message_timeout)
rescue Rdkafka::RdkafkaError => e
raise MessageDeliveryError.new(e, handle)
wrapped_handle = MessageDeliveryHandle.new(handle, key: message.key, topic: message.topic, key: nil, partition_key: nil , timestamp: nil, headers: nil)
raise MessageDeliveryError.new(e, wrapped_handle)
end

$stderr.puts "=> Delivered message to Kafka cluster"
Expand Down
21 changes: 7 additions & 14 deletions lib/racecar/message_delivery_error.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
# frozen_string_literal: true

require "racecar/message_delivery_handle"

module Racecar
# MessageDeliveryError wraps an Rdkafka error and tries to give
# specific hints on how to debug or resolve the error within the
# Racecar context.
class MessageDeliveryError < StandardError
# partition_from_delivery_handle takes an rdkafka delivery handle
# and returns a human readable version of the partition. It handles
# the case where the partition is unknown.
def self.partition_from_delivery_handle(delivery_handle)
partition = delivery_handle&.create_result&.partition
# -1 is rdkafka-ruby's default value, which gets eventually set by librdkafka
return "no yet known" if partition.nil? || partition == -1
partition.to_s
end

def initialize(rdkafka_error, delivery_handle)
raise rdkafka_error unless rdkafka_error.is_a?(Rdkafka::RdkafkaError)
if !delivery_handle.is_a?(Racecar::MessageDeliveryHandle)
raise TypeError, "expected a Racecar::MessageDeliveryHandle, got #{delivery_handle.class}"
end

@rdkafka_error = rdkafka_error
@delivery_handle = delivery_handle
Expand Down Expand Up @@ -66,10 +61,8 @@ def explain
EOM

when :unknown_topic_or_part # 3
partition = self.class.partition_from_delivery_handle(@delivery_handle)

<<~EOM
Could not deliver message, since the targeted topic or partition (#{partition}) does not exist.
Could not deliver message, since the targeted topic (#{@delivery_handle.topic}) or partition (#{@delivery_handle.partition_text}) does not exist.
Check that there are no typos, or that the broker's "auto.create.topics.enable" is enabled. For freshly created topics with auto create enabled, this may appear in the beginning (race condition on creation and publishing).
Expand All @@ -92,7 +85,7 @@ def explain

when :topic_authorization_failed # 29
<<~EOM
Failed to deliver message because of insufficient authorization to write into the topic.
Failed to deliver message because of insufficient authorization to write into the topic "#{@delivery_handle.topic}".
Double check that it is not a race condition on topic creation. If it isn't, verify the ACLs are correct.
Expand Down
48 changes: 48 additions & 0 deletions lib/racecar/message_delivery_handle.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

require "forwardable"

module Racecar
# MessageDeliveryHandle is a wrapper around Rdkafka::Producer::DeliveryHandle, mostly
# to keep context around.
class MessageDeliveryHandle
extend Forwardable

attr_reader :rdkafka_handle, :topic, :key, :partition_key, :timestamp, :headers

def initialize(rdkafka_handle, topic:, key:, partition_key:, timestamp:, headers:)
if !rdkafka_handle.is_a?(Rdkafka::Producer::DeliveryHandle)
raise TypeError, "expected a Rdkafka::Producer::DeliveryHandle, got #{rdkafka_handle.class}"
end

@rdkafka_handle = rdkafka_handle
@topic = topic
@key = key
@partition_key = partition_key
@timestamp = timestamp
@headers = headers
end

def_delegators :@rdkafka_handle, :wait, :pending?

# offset returns the offset of the delivered message. If the offset is not yet
# known it will be set to -1.
def offset
@rdkafka_handle.create_result.offset || -1
end

# partition returns the assigned partition of the message. If the partition is not
# yet known it will be set to -1.
def partition
@rdkafka_handle.create_result.partition || -1
end

# partition text returns a string describing the partition of the message. If the
# partition is not yet known, it will return a readable message saying so.
def partition_text
part = partition
return "no yet known" if part == -1
part.to_s
end
end
end
21 changes: 17 additions & 4 deletions spec/message_delivery_error_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "racecar/message_delivery_handle"

RSpec.describe Racecar::MessageDeliveryError do
let(:rdkafka_msg_timed_out) { Rdkafka::RdkafkaError.new(-192) }
let(:rdkafka_unknown_topic_or_part) { Rdkafka::RdkafkaError.new(3) }
Expand All @@ -9,18 +11,29 @@
dh[:offset] = 42
end
end
let(:racecar_delivery_handle) do
Racecar::MessageDeliveryHandle.new(
rdkafka_delivery_handle,
topic: "a_topic",
key: "a_key",
partition_key: "a_partition_key",
timestamp: "a_timestamp",
headers: "some_headers"
)
end

it "passes through error code" do
error = described_class.new(rdkafka_msg_timed_out, rdkafka_delivery_handle)
error = described_class.new(rdkafka_msg_timed_out, racecar_delivery_handle)
expect(error.code).to eq rdkafka_msg_timed_out.code
end

it "includes partition of delivery handle" do
error = described_class.new(rdkafka_unknown_topic_or_part, rdkafka_delivery_handle)
error = described_class.new(rdkafka_unknown_topic_or_part, racecar_delivery_handle)
expect(error.to_s).to include "(37)"
end

it "handles delivery handle being nil" do
described_class.new(rdkafka_unknown_topic_or_part, nil).to_s
it "includes topic of delivery handle" do
error = described_class.new(rdkafka_unknown_topic_or_part, racecar_delivery_handle)
expect(error.to_s).to include "a_topic"
end
end
2 changes: 1 addition & 1 deletion spec/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def close
end
end

class FakeDeliveryHandle
class FakeDeliveryHandle < Rdkafka::Producer::DeliveryHandle
def initialize(kafka, msg, delivery_callback)
@kafka = kafka
@msg = msg
Expand Down

0 comments on commit cb2a016

Please sign in to comment.