Skip to content

Commit

Permalink
docs: ๐Ÿ“ [Chapter 07] KafkaProducer ์ˆ˜์ •์ •
Browse files Browse the repository at this point in the history
  • Loading branch information
Shinminjin committed Jan 5, 2025
1 parent 2343f0f commit 189a15c
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions _posts/kafka/2025-01-04-chapter07.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ public class KafkaConsumer {
**`KafkaProducer`**

```java
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
Expand All @@ -198,23 +201,51 @@ public class KafkaProducer {
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;

// ProducerListener ์„ค์ •
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {

@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
// ์ „์†ก ์„ฑ๊ณต ์‹œ, RecordMetadata๋ฅผ ํ†ตํ•ด ์ „์†ก๋œ ๋ฐ์ดํ„ฐ์˜ ๋ฉ”ํƒ€ ์ •๋ณด๋ฅผ ํ™•์ธ
String topic = recordMetadata.topic();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();

// bootstrap.servers ์ถœ๋ ฅ
String bootstrapServers = kafkaTemplate.getProducerFactory().getConfigurationProperties().get("bootstrap.servers").toString();
System.out.println("Message successfully sent to topic: " + topic + ", partition: " + partition + ", offset: " + offset);
}

@Override
public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
// ์ „์†ก ์‹คํŒจ ์‹œ, ์˜ˆ์™ธ ๋ฉ”์‹œ์ง€ ์ถœ๋ ฅ
System.out.println("Failed to send message to topic: " + producerRecord.topic() + ". Exception: " + exception.getMessage());
System.out.println("Failed message: " + producerRecord.value()); // ์‹คํŒจํ•œ ๋ฉ”์‹œ์ง€ ์ถœ๋ ฅ
}
});
}

public void sendMessage(String message) {
long startTime = System.currentTimeMillis();
CompletableFuture<SendResult<String, String>> future = CompletableFuture.supplyAsync(() -> {
try {
return kafkaTemplate.send("my_topic", "key", message).get(); // blocking
// ๋ฉ”์‹œ์ง€ ์ „์†ก
return kafkaTemplate.send("my_topic", "key", message).get(); // blocking
} catch (InterruptedException | ExecutionException e) {
// ์‹คํŒจ ์‹œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
throw new RuntimeException("Failed to send message", e);
}
});

// ๋น„๋™๊ธฐ ๊ฒฐ๊ณผ ์ฒ˜๋ฆฌ
future.whenComplete((result, ex) -> {
long endTime = System.currentTimeMillis();
if (ex == null) {
System.out.println("Message sent successfully: " + result.getProducerRecord().value());
System.out.println("Message sent successfully in " + (endTime - startTime) + " ms");
} else {
System.out.println("Failed to send message: " + ex.getMessage());
// ์ง€์—ฐ์ด๋‚˜ ์—ฐ๊ฒฐ ์‹คํŒจ ์‹œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
System.out.println("Failed to send message in " + (endTime - startTime) + " ms: " + ex.getMessage());
// ์žฌ์‹œ๋„ ๋กœ์ง ์ถ”๊ฐ€
}
});
}
Expand Down

0 comments on commit 189a15c

Please sign in to comment.