Skip to content

Commit

Permalink
add protobuf oneof
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 27, 2024
1 parent 9f2440e commit 9645724
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private Invoice newMessage() {
return Invoice.newBuilder()
.setId(faker.internet().uuid())
.setCreatedAt(Timestamps.now())
.setStatus(InvoiceStatus.forNumber(faker.random().nextInt(InvoiceStatus.values().length - 1)))
.setStatus(faker.options().option(InvoiceStatus.PAID, InvoiceStatus.PENDING))
.setCustomer(
Customer.newBuilder()
.setAddress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class ProtobufDeserializer<P extends Message> implements Deserializer<Mes
private Parser<P> parser;

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> configs, boolean isKey) {
parser = (Parser<P>) configs.get(PROTOBUF_PARSER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ group.id=client.consumer
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
specific.avro.reader=true
specific.protobuf.value.type=kafka.sandbox.proto.Invoice
client.id=client.consumer
37 changes: 37 additions & 0 deletions kafka-protobuf-oneof-clients/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id 'java'
id 'application'
}

repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}

dependencies {
implementation project(':kafka-protobuf')

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation "io.confluent:kafka-protobuf-serializer:${confluentVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'net.datafaker:datafaker:2.0.2'
implementation 'org.slf4j:slf4j-simple:1.7.30'

compileOnly "org.projectlombok:lombok:${lombokVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}

application {
mainClass = 'kafka.sandbox.App'
}

test {
useJUnitPlatform()
}
29 changes: 29 additions & 0 deletions kafka-protobuf-oneof-clients/src/main/java/kafka/sandbox/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka.sandbox;

import kafka.sandbox.cli.Consumer;
import kafka.sandbox.cli.KafkaClients;
import kafka.sandbox.cli.Producer;
import picocli.CommandLine;

import java.io.IOException;
import java.util.Properties;

public class App {

public static void main(String[] args) throws IOException {
Properties producerProps = getProperties("producer.properties");
Properties consumerProps = getProperties("consumer.properties");

CommandLine commandLine = new CommandLine(new KafkaClients())
.addSubcommand(new Producer(producerProps))
.addSubcommand(new Consumer(consumerProps));

System.exit(commandLine.execute(args));
}

private static Properties getProperties(String fileName) throws IOException {
Properties props = new Properties();
props.load(App.class.getClassLoader().getResourceAsStream(fileName));
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package kafka.sandbox.cli;

import kafka.sandbox.proto.Measurement;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Command(name = "consume", description = "Consumes messages from topic")
public class Consumer implements Callable<Integer> {

private final Properties props;

@CommandLine.Parameters(
index = "0",
description = "Topic name"
)
private String topic;

public Consumer(Properties props) {
this.props = props;
}

@Override
public Integer call() throws Exception {
KafkaConsumer<String, Measurement> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));

// attach shutdown handler to catch control-c and creating a latch
CountDownLatch latch = new CountDownLatch(1);
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
);

// infinite loop
Thread infiniteLoop = new Thread(
() -> {
try {
while (true) {
ConsumerRecords<String, Measurement> records = consumer.poll(
Duration.ofMillis(500)
);
for (ConsumerRecord<String, Measurement> record : records) {
log.info(
"Consumed message ({}) : topic = {}, partition = {}, offset = {}, key = {}, value = {}",
// this way we know what value is inside the oneof
record.value().getValueCase(),
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
consumer.commitSync();
}
} catch (RecordDeserializationException rde) {
log.warn("{}", rde.getMessage());
} catch (WakeupException we) {
log.info("Shutdown gracefully");
} finally {
consumer.close();
}
},
"consumer-thread"
);

infiniteLoop.start();
latch.await();

return CommandLine.ExitCode.OK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.sandbox.cli;

import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;

import static picocli.CommandLine.ParameterException;
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-protobuf-oneof-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {

@Spec
private CommandSpec spec;

@Override
public void run() {
throw new ParameterException(spec.commandLine(), "Missing required subcommand");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package kafka.sandbox.cli;

import com.google.protobuf.util.Timestamps;
import kafka.sandbox.proto.*;
import lombok.extern.slf4j.Slf4j;
import net.datafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

import java.util.Properties;
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "produce", description = "Produces messages to topic")
public class Producer implements Callable<Integer> {

private final Properties props;
private final Faker faker = new Faker();
private final Sensor[] sensors = {
Sensor.newBuilder()
.setId("1c107c7d-eb05-44f6-9fdd-4f38aedff16a")
.setStatus(SensorStatus.UP)
.build(),
Sensor.newBuilder()
.setId("c86612bb-9647-43fd-b1fc-abed8c97091c")
.setStatus(SensorStatus.UP)
.build()
};

@Parameters(
index = "1",
description = "Total new messages to produce"
)
private int messages;
@Parameters(
index = "0",
description = "Topic name"
)
private String topic;

public Producer(Properties props) {
this.props = props;
}

@Override
public Integer call() {
KafkaProducer<String, Measurement> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
Measurement measurement = newMessage();
ProducerRecord<String, Measurement> record = new ProducerRecord<>(
topic,
measurement.getSensor().getId(),
measurement
);
producer.send(
record,
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", measurement, exception);
return;
}
log.info("Producing message: {}", measurement);
}
);
}

producer.flush();
producer.close();

return CommandLine.ExitCode.OK;
}

private Measurement newMessage() {
Measurement.Builder builder = Measurement.newBuilder()
.setCreatedAt(Timestamps.now());

if (faker.bool().bool()) {
builder.setSensor(sensors[0])
.setEnvironment(
Environment.newBuilder()
.setHumidity(faker.number().randomDouble(2, 10, 100))
.setTemperature(faker.number().randomDouble(2, 10, 30))
);
} else {
builder.setSensor(sensors[1])
.setSpeed(
Speed.newBuilder()
.setSpeed(faker.number().randomDouble(2, 0, 120))
.setWheelRpm(faker.number().numberBetween(0, 5000))
);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
bootstrap.servers=kafka1:9092
group.id=client.consumer
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
client.id=client.consumer
schema.registry.url=http://schema-registry:8081
value.deserializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
specific.protobuf.value.type=kafka.sandbox.proto.Measurement
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
bootstrap.servers=kafka1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
client.id=client.producer
schema.registry.url=http://schema-registry:8081
value.serializer=io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
34 changes: 34 additions & 0 deletions kafka-protobuf/src/main/proto/Sensor.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";

enum SensorStatus {
UP = 0;
ERROR = 1;
}

message Sensor {
string id = 1;
SensorStatus status = 2;
}

message Environment {
double temperature = 1;
double humidity = 2;
}

message Speed {
int32 wheel_rpm = 1;
double speed = 2;
}

message Measurement {
oneof value {
Environment environment = 1;
Speed speed = 2;
}
google.protobuf.Timestamp created_at = 3;
Sensor sensor = 4;
}
1 change: 1 addition & 0 deletions md/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [Avro Producer and Consumer](avro-producer-and-consumer.md)
- [Avro Union](avro-union.md)
- [Protobuf Producer and Consumer](protobuf-producer-and-consumer.md)
- [Protobuf Oneof](protobuf-oneof.md)
- [Spring Boot](spring-boot.md)
- [Kafka Streams](kafka-streams.md)
- [Interactive Queries](interactive-queries.md)
Expand Down
Loading

0 comments on commit 9645724

Please sign in to comment.