diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index 2944a78..67b0606 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -138,6 +138,7 @@ func (b *Batch) FlushMessages() { b.handleMessageTooLargeError(e) return default: + b.handleResponseError(e) logger.Log.Error("batch producer flush error %v", err) return } @@ -180,6 +181,15 @@ func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) { } } +func (b *Batch) handleResponseError(err error) { + for _, msg := range b.messages { + b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(msg), + Err: err, + }) + } +} + func (b *Batch) handleResponseSuccess() { for _, msg := range b.messages { b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{