From 9e94aec3dadf998a2fe760a18041f9f8353b549f Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Thu, 23 May 2024 11:34:32 +0300 Subject: [PATCH] fix: pass generic errors to sink handler if preset --- kafka/producer/producer_batch.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index 2944a78..67b0606 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -138,6 +138,7 @@ func (b *Batch) FlushMessages() { b.handleMessageTooLargeError(e) return default: + b.handleResponseError(e) logger.Log.Error("batch producer flush error %v", err) return } @@ -180,6 +181,15 @@ func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) { } } +func (b *Batch) handleResponseError(err error) { + for _, msg := range b.messages { + b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(msg), + Err: err, + }) + } +} + func (b *Batch) handleResponseSuccess() { for _, msg := range b.messages { b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{