Skip to content

Commit

Permalink
add invoice
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 11, 2024
1 parent 0d9ed20 commit 4f76222
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package kafka.sandbox.cli;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import kafka.sandbox.proto.Invoice;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.BytesDeserializer;
import picocli.CommandLine;
import picocli.CommandLine.Command;

Expand Down Expand Up @@ -41,13 +42,13 @@ public Consumer(Properties props) {
@Override
public Integer call() throws Exception {
if (useSchemaRegistry) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
}

KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
KafkaConsumer<String, Invoice> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));

// attach shutdown handler to catch control-c and creating a latch
Expand All @@ -69,10 +70,10 @@ public void run() {
() -> {
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(
ConsumerRecords<String, Invoice> records = consumer.poll(
Duration.ofMillis(500)
);
for (ConsumerRecord<String, User> record : records) {
for (ConsumerRecord<String, Invoice> record : records) {
log.info(
"Consumed message: topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package kafka.sandbox.cli;

import com.google.protobuf.util.Timestamps;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import kafka.sandbox.proto.Invoice;
import lombok.extern.slf4j.Slf4j;
import net.datafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;

@Slf4j
Expand Down Expand Up @@ -52,26 +51,26 @@ public Integer call() {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
} else {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
}

KafkaProducer<String, User> producer = new KafkaProducer<>(props);
KafkaProducer<String, Invoice> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
User user = newMessage();
ProducerRecord<String, User> record = new ProducerRecord<>(
Invoice invoice = newMessage();
ProducerRecord<String, Invoice> record = new ProducerRecord<>(
topic,
user.getId(),
user
invoice.getId(),
invoice
);
producer.send(
record,
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", user, exception);
log.error("Error producing {}", invoice, exception);
return;
}
log.info("Producing message: {}", user);
log.info("Producing message: {}", invoice);
}
);
}
Expand All @@ -82,13 +81,10 @@ public Integer call() {
return CommandLine.ExitCode.OK;
}

private User newMessage() {
return User.builder()
.id(UUID.randomUUID().toString())
.firstName(faker.name().firstName())
.lastName(faker.name().lastName())
.address(faker.address().streetAddress())
.age(faker.number().numberBetween(20, 40))
private Invoice newMessage() {
return Invoice.newBuilder()
.setId(faker.internet().uuid())
.setCreatedAt(Timestamps.now())
.build();
}
}
3 changes: 2 additions & 1 deletion kafka-protobuf/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ repositories {
dependencies {
api "io.grpc:grpc-protobuf:${grpcVersion}"
api "io.grpc:grpc-stub:${grpcVersion}"
implementation "com.google.protobuf:protobuf-java:${protoVersion}"
api "com.google.protobuf:protobuf-java:${protoVersion}"
api "com.google.protobuf:protobuf-java-util:${protoVersion}"
compileOnly 'org.apache.tomcat:annotations-api:6.0.53'

testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
Expand Down
File renamed without changes.

0 comments on commit 4f76222

Please sign in to comment.