From ff7c18776242b3a826c257e9166e4929f30eb94b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Pi=C3=B1a?= Date: Thu, 11 Jul 2024 12:49:12 -0500 Subject: [PATCH] update protobuf --- gradle.properties | 5 ++- kafka-avro-clients/build.gradle | 2 +- kafka-avro-union-clients/build.gradle | 2 +- kafka-avro/build.gradle | 4 +- kafka-json-clients/build.gradle | 2 +- kafka-ksqldb-extensions/build.gradle | 2 +- kafka-native-clients/build.gradle | 2 +- kafka-protobuf/build.gradle | 42 +++++++++++++++++++ .../src/main/proto/CountService.proto | 5 +-- kafka-streams/build.gradle | 32 ++------------ .../main/java/kafka/sandbox/cli/Count.java | 8 ++-- .../main/java/kafka/sandbox/cli/Streams.java | 6 +-- ...{CounterService.java => CountService.java} | 8 ++-- settings.gradle | 1 + 14 files changed, 69 insertions(+), 52 deletions(-) create mode 100644 kafka-protobuf/build.gradle rename kafka-streams/src/main/proto/suppliers.proto => kafka-protobuf/src/main/proto/CountService.proto (73%) rename kafka-streams/src/main/java/kafka/sandbox/grpc/{CounterService.java => CountService.java} (83%) diff --git a/gradle.properties b/gradle.properties index 56489e3..4c48f9c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,6 @@ confluentVersion=7.6.1 kafkaVersion=3.6.1 -lombokVersion=1.18.26 \ No newline at end of file +lombokVersion=1.18.26 +junitVersion=5.10.2 +grpcVersion=1.65.0 +protoVersion=4.27.2 \ No newline at end of file diff --git a/kafka-avro-clients/build.gradle b/kafka-avro-clients/build.gradle index 9bd020c..1abea1f 100644 --- a/kafka-avro-clients/build.gradle +++ b/kafka-avro-clients/build.gradle @@ -11,7 +11,7 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" implementation project(':kafka-avro') implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" diff --git a/kafka-avro-union-clients/build.gradle b/kafka-avro-union-clients/build.gradle index 9bd020c..1abea1f 100644 --- a/kafka-avro-union-clients/build.gradle +++ b/kafka-avro-union-clients/build.gradle @@ -11,7 +11,7 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" implementation project(':kafka-avro') implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" diff --git a/kafka-avro/build.gradle b/kafka-avro/build.gradle index cac3698..5ad7b9c 100644 --- a/kafka-avro/build.gradle +++ b/kafka-avro/build.gradle @@ -9,8 +9,8 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' - implementation 'org.apache.avro:avro:1.11.+' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" + implementation 'org.apache.avro:avro:1.11.3' } test { diff --git a/kafka-json-clients/build.gradle b/kafka-json-clients/build.gradle index c222ce8..3e464f2 100644 --- a/kafka-json-clients/build.gradle +++ b/kafka-json-clients/build.gradle @@ -11,7 +11,7 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" implementation "io.confluent:kafka-json-serializer:${confluentVersion}" diff --git a/kafka-ksqldb-extensions/build.gradle b/kafka-ksqldb-extensions/build.gradle index 273c20c..97da8d5 100644 --- a/kafka-ksqldb-extensions/build.gradle +++ b/kafka-ksqldb-extensions/build.gradle @@ -16,7 +16,7 @@ java { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" implementation("io.confluent.ksql:ksqldb-udf:${confluentVersion}") { exclude group: 'io.confluent.observability', module: 'telemetry-client' diff --git a/kafka-native-clients/build.gradle b/kafka-native-clients/build.gradle index af8fc90..7b7ca7d 100644 --- a/kafka-native-clients/build.gradle +++ b/kafka-native-clients/build.gradle @@ -11,7 +11,7 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" diff --git a/kafka-protobuf/build.gradle b/kafka-protobuf/build.gradle new file mode 100644 index 0000000..ee8d815 --- /dev/null +++ b/kafka-protobuf/build.gradle @@ -0,0 +1,42 @@ +plugins { + id 'java' + id 'java-library' + id 'com.google.protobuf' version '0.9.4' +} + +repositories { + mavenCentral() +} + +dependencies { + api "io.grpc:grpc-protobuf:${grpcVersion}" + api "io.grpc:grpc-stub:${grpcVersion}" + implementation "com.google.protobuf:protobuf-java:${protoVersion}" + compileOnly 'org.apache.tomcat:annotations-api:6.0.53' + + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" + testImplementation "io.grpc:grpc-testing:${grpcVersion}" +} + +test { + useJUnitPlatform() +} + +protobuf { + protoc { artifact = "com.google.protobuf:protoc:${protoVersion}" } + plugins { + grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } + } + generateProtoTasks { + all()*.plugins { grpc {} } + } +} + +sourceSets { + main { + java { + srcDirs 'build/generated/source/proto/main/grpc' + srcDirs 'build/generated/source/proto/main/java' + } + } +} \ No newline at end of file diff --git a/kafka-streams/src/main/proto/suppliers.proto b/kafka-protobuf/src/main/proto/CountService.proto similarity index 73% rename from kafka-streams/src/main/proto/suppliers.proto rename to kafka-protobuf/src/main/proto/CountService.proto index 9b7246e..3987551 100644 --- a/kafka-streams/src/main/proto/suppliers.proto +++ b/kafka-protobuf/src/main/proto/CountService.proto @@ -2,11 +2,8 @@ syntax = "proto3"; option java_multiple_files = true; option java_package = "kafka.sandbox.proto"; -option java_outer_classname = "SuppliersProto"; -package suppliers; - -service CounterService { +service CountService { rpc GetCountByCountry (CountRequest) returns (CountReply) {} } diff --git a/kafka-streams/build.gradle b/kafka-streams/build.gradle index 8c6b389..8aacf35 100644 --- a/kafka-streams/build.gradle +++ b/kafka-streams/build.gradle @@ -1,7 +1,6 @@ plugins { id 'java' id 'application' - id 'com.google.protobuf' version '0.9.2' } repositories { @@ -11,18 +10,13 @@ repositories { } } -String grpcVersion = '1.54.0' -String protoVersion = '3.22.2' - dependencies { - implementation "io.grpc:grpc-protobuf:${grpcVersion}" - implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation project(':kafka-protobuf') + runtimeOnly "io.grpc:grpc-netty:${grpcVersion}" implementation project(':kafka-avro') implementation "org.apache.kafka:kafka-streams:${kafkaVersion}" implementation "io.confluent:kafka-streams-avro-serde:${confluentVersion}" - compileOnly 'org.apache.tomcat:annotations-api:6.0.53' - runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" implementation 'info.picocli:picocli:4.6.1' implementation 'ch.qos.logback:logback-classic:1.5.6' @@ -32,18 +26,7 @@ dependencies { testCompileOnly "org.projectlombok:lombok:${lombokVersion}" testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}" - testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' - testImplementation "io.grpc:grpc-testing:${grpcVersion}" -} - -protobuf { - protoc { artifact = "com.google.protobuf:protoc:${protoVersion}" } - plugins { - grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } - } - generateProtoTasks { - all()*.plugins { grpc {} } - } + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" } application { @@ -53,12 +36,3 @@ application { test { useJUnitPlatform() } - -sourceSets { - main { - java { - srcDirs 'build/generated/source/proto/main/grpc' - srcDirs 'build/generated/source/proto/main/java' - } - } -} diff --git a/kafka-streams/src/main/java/kafka/sandbox/cli/Count.java b/kafka-streams/src/main/java/kafka/sandbox/cli/Count.java index 6296708..5a01720 100644 --- a/kafka-streams/src/main/java/kafka/sandbox/cli/Count.java +++ b/kafka-streams/src/main/java/kafka/sandbox/cli/Count.java @@ -5,8 +5,8 @@ import io.grpc.ManagedChannel; import kafka.sandbox.proto.CountReply; import kafka.sandbox.proto.CountRequest; -import kafka.sandbox.proto.CounterServiceGrpc; -import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceBlockingStub; +import kafka.sandbox.proto.CountServiceGrpc; +import kafka.sandbox.proto.CountServiceGrpc.CountServiceBlockingStub; import lombok.extern.slf4j.Slf4j; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -22,11 +22,11 @@ public class Count implements Callable { private String country; @Override - public Integer call() throws Exception { + public Integer call() { ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create()) .build(); - CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel); + CountServiceBlockingStub blockingStub = CountServiceGrpc.newBlockingStub(channel); CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build()); System.out.println(countByCountry.getMessage()); diff --git a/kafka-streams/src/main/java/kafka/sandbox/cli/Streams.java b/kafka-streams/src/main/java/kafka/sandbox/cli/Streams.java index e542417..9dc5d1e 100644 --- a/kafka-streams/src/main/java/kafka/sandbox/cli/Streams.java +++ b/kafka-streams/src/main/java/kafka/sandbox/cli/Streams.java @@ -4,7 +4,7 @@ import io.grpc.InsecureServerCredentials; import io.grpc.Server; import kafka.sandbox.avro.Supplier; -import kafka.sandbox.grpc.CounterService; +import kafka.sandbox.grpc.CountService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; @@ -52,7 +52,7 @@ public Integer call() throws Exception { // aggregate the new supplier counts by country KTable aggregated = suppliers // map the country as key - .map((key, value) -> new KeyValue<>(value.getCountry().toString(), value)) + .map((key, value) -> new KeyValue<>(value.getCountry(), value)) .groupByKey() // aggregate and materialize the store .count(Materialized.as("SupplierCountByCountry")); @@ -73,7 +73,7 @@ public Integer call() throws Exception { // GRPC Server Server server = Grpc.newServerBuilderForPort(5050, InsecureServerCredentials.create()) - .addService(new CounterService(streams)) + .addService(new CountService(streams)) .build(); // attach shutdown handler to catch control-c and creating a latch diff --git a/kafka-streams/src/main/java/kafka/sandbox/grpc/CounterService.java b/kafka-streams/src/main/java/kafka/sandbox/grpc/CountService.java similarity index 83% rename from kafka-streams/src/main/java/kafka/sandbox/grpc/CounterService.java rename to kafka-streams/src/main/java/kafka/sandbox/grpc/CountService.java index df7343f..acea860 100644 --- a/kafka-streams/src/main/java/kafka/sandbox/grpc/CounterService.java +++ b/kafka-streams/src/main/java/kafka/sandbox/grpc/CountService.java @@ -3,17 +3,17 @@ import io.grpc.stub.StreamObserver; import kafka.sandbox.proto.CountReply; import kafka.sandbox.proto.CountRequest; -import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceImplBase; +import kafka.sandbox.proto.CountServiceGrpc.CountServiceImplBase; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -public class CounterService extends CounterServiceImplBase { +public class CountService extends CountServiceImplBase { - private KafkaStreams streams; + private final KafkaStreams streams; - public CounterService(KafkaStreams streams) { + public CountService(KafkaStreams streams) { this.streams = streams; } diff --git a/settings.gradle b/settings.gradle index d2fcc17..f98f085 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,6 @@ rootProject.name = "kafka-sandbox" include("kafka-avro") +include("kafka-protobuf") include("kafka-avro-clients") include("kafka-avro-union-clients") include("kafka-json-clients")