Skip to content

Commit

Permalink
add alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 11, 2024
1 parent 32ed6c5 commit b4c0d16
Show file tree
Hide file tree
Showing 34 changed files with 369 additions and 212 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
[Kafka Sandbox](https://sauljabin.github.io/kafka-sandbox/) it's a markdown book designed to help you to deploy a kafka sandbox locally.
It intends to be a simple way to get started with kafka and
**Kafka Sandbox** helps you to deploy a kafka sandbox locally. It intends to be a simple way to get started with kafka and
help you on your learning path. It provides you with a wide variety of tools from the kafka ecosystem and a simple way
to run them all. It also includes a set of tools and tips to make it easier for you to use kafka. It does not include
security since it is not a production system.
to run them all. It also includes a set of tools and tips to make it easier for you to use kafka.
This is not intended to replace official courses.

You can access it at https://sauljabin.github.io/kafka-sandbox/.

> This repository is for educational purposes. This book is power by [mdBook](https://rust-lang.github.io/mdBook/index.html).
You can access this mdbook at https://sauljabin.github.io/kafka-sandbox and https://github.com/sauljabin/kafka-sandbox.

## Developing Commands

> [!NOTE]
> This book is power by [mdBook](https://rust-lang.github.io/mdBook/index.html).
> You must install [rust](https://www.rust-lang.org/tools/install) first.
Install `mdbook`:

```bash
cargo install mdbook
cargo install mdbook mdbook-alerts
```

Run local server:
Expand Down
4 changes: 3 additions & 1 deletion book.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ src = "md"
title = "Kafka Sandbox"

[output.html]
git-repository-url = "https://github.com/sauljabin/kafka-sandbox"
git-repository-url = "https://github.com/sauljabin/kafka-sandbox"

[preprocessor.alerts]
37 changes: 37 additions & 0 deletions kafka-protobuf-clients/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
plugins {
id 'java'
id 'application'
}

repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}

dependencies {
implementation project(':kafka-protobuf')

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation "io.confluent:kafka-protobuf-serializer:${confluentVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'net.datafaker:datafaker:2.0.2'
implementation 'org.slf4j:slf4j-simple:1.7.30'

compileOnly "org.projectlombok:lombok:${lombokVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}

application {
mainClass = 'kafka.sandbox.App'
}

test {
useJUnitPlatform()
}
29 changes: 29 additions & 0 deletions kafka-protobuf-clients/src/main/java/kafka/sandbox/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka.sandbox;

import kafka.sandbox.cli.Consumer;
import kafka.sandbox.cli.KafkaClients;
import kafka.sandbox.cli.Producer;
import picocli.CommandLine;

import java.io.IOException;
import java.util.Properties;

public class App {

public static void main(String[] args) throws IOException {
Properties producerProps = getProperties("producer.properties");
Properties consumerProps = getProperties("consumer.properties");

CommandLine commandLine = new CommandLine(new KafkaClients())
.addSubcommand(new Producer(producerProps))
.addSubcommand(new Consumer(consumerProps));

System.exit(commandLine.execute(args));
}

private static Properties getProperties(String fileName) throws IOException {
Properties props = new Properties();
props.load(App.class.getClassLoader().getResourceAsStream(fileName));
return props;
}
}
103 changes: 103 additions & 0 deletions kafka-protobuf-clients/src/main/java/kafka/sandbox/cli/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package kafka.sandbox.cli;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Command(name = "consume", description = "Consumes messages from topic")
public class Consumer implements Callable<Integer> {

private final Properties props;

@CommandLine.Parameters(
index = "0",
description = "Topic name"
)
private String topic;

@CommandLine.Option(names = "-s", description = "Use Schema Registry")
boolean useSchemaRegistry;

public Consumer(Properties props) {
this.props = props;
}

@Override
public Integer call() throws Exception {
if (useSchemaRegistry) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
}

KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));

// attach shutdown handler to catch control-c and creating a latch
CountDownLatch latch = new CountDownLatch(1);
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
);

// infinite loop
Thread infiniteLoop = new Thread(
() -> {
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(
Duration.ofMillis(500)
);
for (ConsumerRecord<String, User> record : records) {
log.info(
"Consumed message: topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
consumer.commitSync();
}
} catch (RecordDeserializationException rde) {
log.warn("{}", rde.getMessage());
} catch (WakeupException we) {
log.info("Shutdown gracefully");
} finally {
consumer.close();
}
},
"consumer-thread"
);

infiniteLoop.start();
latch.await();

return CommandLine.ExitCode.OK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.sandbox.cli;

import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;

import static picocli.CommandLine.ParameterException;
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-protobuf-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {

@Spec
private CommandSpec spec;

@Override
public void run() {
throw new ParameterException(spec.commandLine(), "Missing required subcommand");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package kafka.sandbox.cli;

import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import lombok.extern.slf4j.Slf4j;
import net.datafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "produce", description = "Produces messages to topic")
public class Producer implements Callable<Integer> {

private final Properties props;
private final Faker faker = new Faker();

@Parameters(
index = "1",
description = "Total new messages to produce"
)
private int messages;

@Parameters(
index = "0",
description = "Topic name"
)
private String topic;


@Option(names = "-s", description = "Use Schema Registry")
boolean useSchemaRegistry;

public Producer(Properties props) {
this.props = props;
}

@Override
public Integer call() {
if (useSchemaRegistry) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
} else {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
}

KafkaProducer<String, User> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
User user = newMessage();
ProducerRecord<String, User> record = new ProducerRecord<>(
topic,
user.getId(),
user
);
producer.send(
record,
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", user, exception);
return;
}
log.info("Producing message: {}", user);
}
);
}

producer.flush();
producer.close();

return CommandLine.ExitCode.OK;
}

private User newMessage() {
return User.builder()
.id(UUID.randomUUID().toString())
.firstName(faker.name().firstName())
.lastName(faker.name().lastName())
.address(faker.address().streetAddress())
.age(faker.number().numberBetween(20, 40))
.build();
}
}
7 changes: 7 additions & 0 deletions kafka-protobuf-clients/src/main/resources/consumer.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bootstrap.servers=kafka1:9092
group.id=client.consumer
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
specific.avro.reader=true
client.id=client.consumer
4 changes: 4 additions & 0 deletions kafka-protobuf-clients/src/main/resources/producer.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bootstrap.servers=kafka1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
client.id=client.producer
38 changes: 38 additions & 0 deletions kafka-protobuf/src/main/proto/Product.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";

enum InvoiceStatus {
PAID=0;
PENDING=1;
}

message Invoice {
string id = 1;
google.protobuf.Timestamp created_at = 3;
Customer customer=4;
repeated Product products=5;
InvoiceStatus status=6;
}

message Product {
string id = 1;
string name = 2;
string sku = 3;
double price=4;
}

message Customer {
string id = 1;
string firstName = 2;
string lastName = 3;
Address address=4;
}

message Address {
string city =1;
string zipCode = 2;
string street = 3;
}
1 change: 1 addition & 0 deletions md/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
---

- [Quick Start](quick-start.md)
- [Sandbox Environment](sandbox-environment.md)
- [What is Kafka?](what-is-kafka.md)
- [Consuming and Producing](consuming-and-producing.md)
- [Native Consumer and Producer](producing-and-consuming-natives.md)
Expand Down
Loading

0 comments on commit b4c0d16

Please sign in to comment.