diff --git a/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/Producer.java b/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/Producer.java index 34f8a6a..0d1cae9 100644 --- a/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/Producer.java +++ b/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/Producer.java @@ -81,7 +81,7 @@ private Invoice newMessage() { return Invoice.newBuilder() .setId(faker.internet().uuid()) .setCreatedAt(Timestamps.now()) - .setStatus(InvoiceStatus.forNumber(faker.random().nextInt(InvoiceStatus.values().length - 1))) + .setStatus(faker.options().option(InvoiceStatus.PAID, InvoiceStatus.PENDING)) .setCustomer( Customer.newBuilder() .setAddress( diff --git a/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/ProtobufDeserializer.java b/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/ProtobufDeserializer.java index 9ad81d8..72f08e2 100644 --- a/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/ProtobufDeserializer.java +++ b/kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/ProtobufDeserializer.java @@ -14,6 +14,7 @@ public class ProtobufDeserializer

implements Deserializer parser; @Override + @SuppressWarnings("unchecked") public void configure(Map configs, boolean isKey) { parser = (Parser

) configs.get(PROTOBUF_PARSER); diff --git a/kafka-protobuf-clients/src/main/resources/consumer.properties b/kafka-protobuf-clients/src/main/resources/consumer.properties index b9d28f0..5b7215c 100644 --- a/kafka-protobuf-clients/src/main/resources/consumer.properties +++ b/kafka-protobuf-clients/src/main/resources/consumer.properties @@ -3,5 +3,5 @@ group.id=client.consumer enable.auto.commit=false auto.offset.reset=earliest key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -specific.avro.reader=true +specific.protobuf.value.type=kafka.sandbox.proto.Invoice client.id=client.consumer \ No newline at end of file diff --git a/kafka-protobuf-oneof-clients/build.gradle b/kafka-protobuf-oneof-clients/build.gradle new file mode 100644 index 0000000..65418c6 --- /dev/null +++ b/kafka-protobuf-oneof-clients/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'java' + id 'application' +} + +repositories { + mavenCentral() + maven { + url = 'https://packages.confluent.io/maven/' + } +} + +dependencies { + implementation project(':kafka-protobuf') + + implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" + implementation "io.confluent:kafka-protobuf-serializer:${confluentVersion}" + + implementation 'info.picocli:picocli:4.6.1' + implementation 'net.datafaker:datafaker:2.0.2' + implementation 'org.slf4j:slf4j-simple:1.7.30' + + compileOnly "org.projectlombok:lombok:${lombokVersion}" + annotationProcessor "org.projectlombok:lombok:${lombokVersion}" + + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" + testCompileOnly "org.projectlombok:lombok:${lombokVersion}" + testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}" +} + +application { + mainClass = 'kafka.sandbox.App' +} + +test { + useJUnitPlatform() +} diff --git a/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/App.java b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/App.java new file mode 100644 index 0000000..75bcff7 --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/App.java @@ -0,0 +1,29 @@ +package kafka.sandbox; + +import kafka.sandbox.cli.Consumer; +import kafka.sandbox.cli.KafkaClients; +import kafka.sandbox.cli.Producer; +import picocli.CommandLine; + +import java.io.IOException; +import java.util.Properties; + +public class App { + + public static void main(String[] args) throws IOException { + Properties producerProps = getProperties("producer.properties"); + Properties consumerProps = getProperties("consumer.properties"); + + CommandLine commandLine = new CommandLine(new KafkaClients()) + .addSubcommand(new Producer(producerProps)) + .addSubcommand(new Consumer(consumerProps)); + + System.exit(commandLine.execute(args)); + } + + private static Properties getProperties(String fileName) throws IOException { + Properties props = new Properties(); + props.load(App.class.getClassLoader().getResourceAsStream(fileName)); + return props; + } +} diff --git a/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Consumer.java b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Consumer.java new file mode 100644 index 0000000..5bf4f2b --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Consumer.java @@ -0,0 +1,92 @@ +package kafka.sandbox.cli; + +import kafka.sandbox.proto.Measurement; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.WakeupException; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +@Slf4j +@Command(name = "consume", description = "Consumes messages from topic") +public class Consumer implements Callable { + + private final Properties props; + + @CommandLine.Parameters( + index = "0", + description = "Topic name" + ) + private String topic; + + public Consumer(Properties props) { + this.props = props; + } + + @Override + public Integer call() throws Exception { + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singleton(topic)); + + // attach shutdown handler to catch control-c and creating a latch + CountDownLatch latch = new CountDownLatch(1); + Runtime + .getRuntime() + .addShutdownHook( + new Thread("consumer-shutdown-hook") { + @Override + public void run() { + consumer.wakeup(); + latch.countDown(); + } + } + ); + + // infinite loop + Thread infiniteLoop = new Thread( + () -> { + try { + while (true) { + ConsumerRecords records = consumer.poll( + Duration.ofMillis(500) + ); + for (ConsumerRecord record : records) { + log.info( + "Consumed message ({}) : topic = {}, partition = {}, offset = {}, key = {}, value = {}", + // this way we know what value is inside the oneof + record.value().getValueCase(), + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value() + ); + } + consumer.commitSync(); + } + } catch (RecordDeserializationException rde) { + log.warn("{}", rde.getMessage()); + } catch (WakeupException we) { + log.info("Shutdown gracefully"); + } finally { + consumer.close(); + } + }, + "consumer-thread" + ); + + infiniteLoop.start(); + latch.await(); + + return CommandLine.ExitCode.OK; + } +} diff --git a/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java new file mode 100644 index 0000000..20eec6a --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java @@ -0,0 +1,23 @@ +package kafka.sandbox.cli; + +import picocli.CommandLine.Command; +import picocli.CommandLine.Model.CommandSpec; + +import static picocli.CommandLine.ParameterException; +import static picocli.CommandLine.Spec; + +@Command( + name = "kafka-protobuf-oneof-clients", + description = "Allows you either to produce or consume topic", + synopsisSubcommandLabel = "COMMAND" +) +public class KafkaClients implements Runnable { + + @Spec + private CommandSpec spec; + + @Override + public void run() { + throw new ParameterException(spec.commandLine(), "Missing required subcommand"); + } +} diff --git a/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Producer.java b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Producer.java new file mode 100644 index 0000000..55d2af5 --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/cli/Producer.java @@ -0,0 +1,99 @@ +package kafka.sandbox.cli; + +import com.google.protobuf.util.Timestamps; +import kafka.sandbox.proto.*; +import lombok.extern.slf4j.Slf4j; +import net.datafaker.Faker; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Parameters; + +import java.util.Properties; +import java.util.concurrent.Callable; + +@Slf4j +@Command(name = "produce", description = "Produces messages to topic") +public class Producer implements Callable { + + private final Properties props; + private final Faker faker = new Faker(); + private final Sensor[] sensors = { + Sensor.newBuilder() + .setId("1c107c7d-eb05-44f6-9fdd-4f38aedff16a") + .setStatus(SensorStatus.UP) + .build(), + Sensor.newBuilder() + .setId("c86612bb-9647-43fd-b1fc-abed8c97091c") + .setStatus(SensorStatus.UP) + .build() + }; + + @Parameters( + index = "1", + description = "Total new messages to produce" + ) + private int messages; + @Parameters( + index = "0", + description = "Topic name" + ) + private String topic; + + public Producer(Properties props) { + this.props = props; + } + + @Override + public Integer call() { + KafkaProducer producer = new KafkaProducer<>(props); + + for (int i = 0; i < messages; i++) { + Measurement measurement = newMessage(); + ProducerRecord record = new ProducerRecord<>( + topic, + measurement.getSensor().getId(), + measurement + ); + producer.send( + record, + (metadata, exception) -> { + if (exception != null) { + log.error("Error producing {}", measurement, exception); + return; + } + log.info("Producing message: {}", measurement); + } + ); + } + + producer.flush(); + producer.close(); + + return CommandLine.ExitCode.OK; + } + + private Measurement newMessage() { + Measurement.Builder builder = Measurement.newBuilder() + .setCreatedAt(Timestamps.now()); + + if (faker.bool().bool()) { + builder.setSensor(sensors[0]) + .setEnvironment( + Environment.newBuilder() + .setHumidity(faker.number().randomDouble(2, 10, 100)) + .setTemperature(faker.number().randomDouble(2, 10, 30)) + ); + } else { + builder.setSensor(sensors[1]) + .setSpeed( + Speed.newBuilder() + .setSpeed(faker.number().randomDouble(2, 0, 120)) + .setWheelRpm(faker.number().numberBetween(0, 5000)) + ); + } + + return builder.build(); + } +} diff --git a/kafka-protobuf-oneof-clients/src/main/resources/consumer.properties b/kafka-protobuf-oneof-clients/src/main/resources/consumer.properties new file mode 100644 index 0000000..8326ba9 --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/resources/consumer.properties @@ -0,0 +1,9 @@ +bootstrap.servers=kafka1:9092 +group.id=client.consumer +enable.auto.commit=false +auto.offset.reset=earliest +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +client.id=client.consumer +schema.registry.url=http://schema-registry:8081 +value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer +specific.protobuf.value.type=kafka.sandbox.proto.Measurement \ No newline at end of file diff --git a/kafka-protobuf-oneof-clients/src/main/resources/producer.properties b/kafka-protobuf-oneof-clients/src/main/resources/producer.properties new file mode 100644 index 0000000..78bfd39 --- /dev/null +++ b/kafka-protobuf-oneof-clients/src/main/resources/producer.properties @@ -0,0 +1,6 @@ +bootstrap.servers=kafka1:9092 +key.serializer=org.apache.kafka.common.serialization.StringSerializer +acks=all +client.id=client.producer +schema.registry.url=http://schema-registry:8081 +value.serializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer \ No newline at end of file diff --git a/kafka-protobuf/src/main/proto/Sensor.proto b/kafka-protobuf/src/main/proto/Sensor.proto new file mode 100644 index 0000000..5bde276 --- /dev/null +++ b/kafka-protobuf/src/main/proto/Sensor.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + +option java_multiple_files = true; +option java_package = "kafka.sandbox.proto"; + +enum SensorStatus { + UP = 0; + ERROR = 1; +} + +message Sensor { + string id = 1; + SensorStatus status = 2; +} + +message Environment { + double temperature = 1; + double humidity = 2; +} + +message Speed { + int32 wheel_rpm = 1; + double speed = 2; +} + +message Measurement { + oneof value { + Environment environment = 1; + Speed speed = 2; + } + google.protobuf.Timestamp created_at = 3; + Sensor sensor = 4; +} diff --git a/md/SUMMARY.md b/md/SUMMARY.md index f77fbdd..e7c9fc1 100644 --- a/md/SUMMARY.md +++ b/md/SUMMARY.md @@ -19,6 +19,7 @@ - [Avro Producer and Consumer](avro-producer-and-consumer.md) - [Avro Union](avro-union.md) - [Protobuf Producer and Consumer](protobuf-producer-and-consumer.md) + - [Protobuf Oneof](protobuf-oneof.md) - [Spring Boot](spring-boot.md) - [Kafka Streams](kafka-streams.md) - [Interactive Queries](interactive-queries.md) diff --git a/md/protobuf-oneof.md b/md/protobuf-oneof.md new file mode 100644 index 0000000..0f59de9 --- /dev/null +++ b/md/protobuf-oneof.md @@ -0,0 +1,36 @@ +# Protobuf Oneof + +More at [https://protobuf.dev/reference/java/java-generated/#oneof-fields](https://protobuf.dev/reference/java/java-generated/#oneof-fields). + +### Protobuf Schema + +Here you can see that the `Measurement` has a `oneof` field: + +```protobuf +{{#include ../kafka-protobuf/src/main/proto/Sensor.proto}} +``` + +### Setup + +Create a topic: + +```bash +kafka-topics --create --bootstrap-server kafka1:9092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.measurements +``` + +### Produce + +```bash +gradle kafka-protobuf-oneof-clients:run --args="produce client.measurements 100" +``` + +### Consume + +You can verify which value is inside (speed or environment) with `record.value().getValueCase()`. + +```bash +gradle kafka-protobuf-oneof-clients:run --args="consume client.measurements" +``` \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 8da40b3..81f7ff2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,6 +2,7 @@ rootProject.name = "kafka-sandbox" include("kafka-avro") include("kafka-protobuf") include("kafka-protobuf-clients") +include("kafka-protobuf-oneof-clients") include("kafka-avro-clients") include("kafka-avro-union-clients") include("kafka-json-clients")