Skip to content

Commit

Permalink
add headers to native producer
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 12, 2024
1 parent 1b2b677 commit e048445
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Integer call() {
Supplier supplier = newMessage();
ProducerRecord<String, Supplier> record = new ProducerRecord<>(
topic,
supplier.getId().toString(),
supplier.getId(),
supplier
);
producer.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Serializer;

import java.util.List;
import java.util.Properties;

@Slf4j
Expand All @@ -25,8 +27,13 @@ public void produce(String topic, int message) {
V value = newMessage();
ProducerRecord<String, V> record = new ProducerRecord<>(
topic,
null,
String.valueOf(value),
value
value,
List.of(
new RecordHeader("native", value.getClass().getName().getBytes()),
new RecordHeader("value", String.valueOf(value).getBytes())
)
);
producer.send(
record,
Expand Down

0 comments on commit e048445

Please sign in to comment.