Skip to content

Commit

Permalink
add avro union example
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jun 29, 2024
1 parent d8afe15 commit 5706110
Show file tree
Hide file tree
Showing 22 changed files with 403 additions and 573 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.concurrent.CountDownLatch;

@Slf4j
@Command(name = "consume", description = "Consumes supplier messages from the topic")
@Command(name = "consume", description = "Consumes messages from topic")
public class Consumer implements Callable<Integer> {

private final Properties props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-clients",
description = "Allows you either to produce or consume form the supplier topic",
name = "kafka-avro-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "produce", description = "Produces supplier messages to the topic")
@Command(name = "produce", description = "Produces messages to topic")
public class Producer implements Callable<Integer> {

private final Properties props;
private final Faker faker = new Faker();

@Parameters(
index = "1",
description = "Total new supplier messages to produce"
description = "Total new messages to produce"
)
private int messages;

Expand All @@ -41,15 +41,21 @@ public Integer call() {
KafkaProducer<String, Supplier> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
Supplier supplier = createNewCustomer();
Supplier supplier = newMessage();
ProducerRecord<String, Supplier> record = new ProducerRecord<>(
topic,
supplier.getId().toString(),
supplier
);
producer.send(
record,
(metadata, exception) -> log.info("Producing message: {}", supplier)
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", supplier, exception);
return;
}
log.info("Producing message: {}", supplier);
}
);
}

Expand All @@ -59,7 +65,7 @@ public Integer call() {
return CommandLine.ExitCode.OK;
}

private Supplier createNewCustomer() {
private Supplier newMessage() {
return Supplier
.newBuilder()
.setId(UUID.randomUUID().toString())
Expand Down
37 changes: 37 additions & 0 deletions kafka-avro-union-clients/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id 'java'
id 'application'
}

repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'

implementation project(':kafka-avro')
implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'net.datafaker:datafaker:2.0.2'
implementation 'org.slf4j:slf4j-simple:1.7.30'

compileOnly 'org.projectlombok:lombok:1.18.26'
annotationProcessor 'org.projectlombok:lombok:1.18.26'

testCompileOnly 'org.projectlombok:lombok:1.18.26'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.26'
}

application {
mainClass = 'kafka.sandbox.App'
}

test {
useJUnitPlatform()
}
29 changes: 29 additions & 0 deletions kafka-avro-union-clients/src/main/java/kafka/sandbox/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka.sandbox;

import kafka.sandbox.cli.Consumer;
import kafka.sandbox.cli.KafkaClients;
import kafka.sandbox.cli.Producer;
import picocli.CommandLine;

import java.io.IOException;
import java.util.Properties;

public class App {

public static void main(String[] args) throws IOException {
Properties producerProps = getProperties("producer.properties");
Properties consumerProps = getProperties("consumer.properties");

CommandLine commandLine = new CommandLine(new KafkaClients())
.addSubcommand(new Producer(producerProps))
.addSubcommand(new Consumer(consumerProps));

System.exit(commandLine.execute(args));
}

private static Properties getProperties(String fileName) throws IOException {
Properties props = new Properties();
props.load(App.class.getClassLoader().getResourceAsStream(fileName));
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package kafka.sandbox.cli;

import kafka.sandbox.avro.Metric;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Command(name = "consume", description = "Consumes metrics from topic")
public class Consumer implements Callable<Integer> {

private final Properties props;

@CommandLine.Parameters(
index = "0",
description = "Topic name"
)
private String topic;

public Consumer(Properties props) {
this.props = props;
}

@Override
public Integer call() throws Exception {
KafkaConsumer<String, Metric> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));

// attach shutdown handler to catch control-c and creating a latch
CountDownLatch latch = new CountDownLatch(1);
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
);

// infinite loop
Thread infiniteLoop = new Thread(
() -> {
try {
while (true) {
ConsumerRecords<String, Metric> records = consumer.poll(
Duration.ofMillis(500)
);
for (ConsumerRecord<String, Metric> record : records) {
log.info(
"Consumed message: topic = {}, partition = {}, offset = {}, key = {}, value = {}, type = {}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.value().getMetric().getClass().getName()
);
}
consumer.commitSync();
}
} catch (WakeupException e) {
log.info("Shutdown gracefully");
} finally {
consumer.close();
}
},
"consumer-thread"
);

infiniteLoop.start();
latch.await();

return CommandLine.ExitCode.OK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.sandbox.cli;

import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;

import static picocli.CommandLine.ParameterException;
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-avro-union-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {

@Spec
private CommandSpec spec;

@Override
public void run() {
throw new ParameterException(spec.commandLine(), "Missing required subcommand");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka.sandbox.cli;

import kafka.sandbox.avro.CounterMetric;
import kafka.sandbox.avro.Metric;
import kafka.sandbox.avro.MetricType;
import kafka.sandbox.avro.TimerMetric;
import lombok.extern.slf4j.Slf4j;
import net.datafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "produce", description = "Produces metrics to topic")
public class Producer implements Callable<Integer> {

private final Properties props;
private final Faker faker = new Faker();

@Parameters(
index = "1",
description = "Total messages to produce"
)
private int messages;

@Parameters(
index = "0",
description = "Topic name"
)
private String topic;

public Producer(Properties props) {
this.props = props;
}

@Override
public Integer call() {
KafkaProducer<String, Metric> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
Metric metric = newMetric();
ProducerRecord<String, Metric> record = new ProducerRecord<>(
topic,
metric.getMetricId().toString(),
metric
);
producer.send(
record,
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", metric, exception);
return;
}
log.info("Producing message: {}", metric);
}
);
}

producer.flush();
producer.close();

return CommandLine.ExitCode.OK;
}

private Metric newMetric() {
MetricType metricType = faker.bool().bool() ? MetricType.COUNTER : MetricType.TIMER;
return Metric
.newBuilder()
.setMetricId(UUID.randomUUID().toString())
.setMetricType(metricType)
.setMetric(metricType.equals(MetricType.COUNTER) ? newCounterMetric() : newTimerMetric())
.build();
}

private CounterMetric newCounterMetric() {
return CounterMetric
.newBuilder()
.setCount(faker.number().numberBetween(10, 20))
.build();
}

private TimerMetric newTimerMetric() {
return TimerMetric
.newBuilder()
.setAvg(faker.number().randomDouble(2, 1, 2))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
bootstrap.servers=localhost:19092,localhost:29092,localhost:39092
schema.registry.url=http://localhost:8081
group.id=client.consumer
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader=true
client.id=client.consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
bootstrap.servers=localhost:19092,localhost:29092,localhost:39092
schema.registry.url=http://localhost:8081
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
acks=1
client.id=client.producer
Loading

0 comments on commit 5706110

Please sign in to comment.