diff --git a/cloudevents/README.md b/cloudevents/README.md index ac9c8a8c..9a2b8487 100644 --- a/cloudevents/README.md +++ b/cloudevents/README.md @@ -65,7 +65,7 @@ curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" docker run --rm --tty \ --network cloudevents-network \ quay.io/debezium/tooling:1.2 \ - kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8080 \ + kafkacat -b kafka:9092 -C -o beginning -q \ -t dbserver3.inventory.customers | jq . ``` @@ -76,8 +76,8 @@ The same stream processing application writes out that data to the `customers3` docker run --rm --tty \ --network cloudevents-network \ quay.io/debezium/tooling:1.2 \ - kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \ - -t customers2 | jq . + kafkacat -b kafka:9092 -C -o beginning -q \ + -t customers3 ``` ## CloudEvents Binary Mode diff --git a/cloudevents/avro-data-extractor/pom.xml b/cloudevents/avro-data-extractor/pom.xml index a907f2a9..c4f0165b 100755 --- a/cloudevents/avro-data-extractor/pom.xml +++ b/cloudevents/avro-data-extractor/pom.xml @@ -21,7 +21,7 @@ 3.0.0-M6 3.2.0 - 2.11.0.Final + 2.13.1.Final 2.0.0.CR1 7.2.0 @@ -98,7 +98,12 @@ io.confluent kafka-streams-avro-serde - 5.3.2 + 7.2.1 + + + io.apicurio + apicurio-registry-utils-serde + 1.3.2.Final io.quarkus @@ -125,7 +130,6 @@ kafka-connect-avro-converter ${version.kafka.avro} - diff --git a/cloudevents/avro-data-extractor/src/main/docker/Dockerfile.jvm b/cloudevents/avro-data-extractor/src/main/docker/Dockerfile.jvm index 4588e318..abfa5e53 100644 --- a/cloudevents/avro-data-extractor/src/main/docker/Dockerfile.jvm +++ b/cloudevents/avro-data-extractor/src/main/docker/Dockerfile.jvm @@ -86,7 +86,7 @@ COPY --chown=185 target/quarkus-app/*.jar /deployments/ COPY --chown=185 target/quarkus-app/app/ /deployments/app/ COPY --chown=185 target/quarkus-app/quarkus/ /deployments/quarkus/ -EXPOSE 8080 +EXPOSE 8079 USER 185 ENV AB_JOLOKIA_OFF="" ENV JAVA_OPTS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" diff --git a/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/StreamsPipelineManager.java b/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/StreamsPipelineManager.java index 763b120c..7c7d7040 100644 --- a/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/StreamsPipelineManager.java +++ b/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/StreamsPipelineManager.java @@ -7,13 +7,13 @@ import java.nio.ByteBuffer; import java.util.Collections; -import java.util.Map; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Produces; import javax.inject.Inject; import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -24,10 +24,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; +import io.apicurio.registry.client.CompatibleClient; +import io.apicurio.registry.client.RegistryService; +import io.apicurio.registry.utils.serde.AvroKafkaDeserializer; +import io.apicurio.registry.utils.serde.AvroKafkaSerializer; + import io.debezium.examples.cloudevents.dataextractor.model.CloudEvent; import io.debezium.serde.DebeziumSerdes; + + /** * Starts up the KStreams pipeline once the source topics have been created. * @@ -73,9 +78,13 @@ Topology createStreamTopology() { .mapValues(ce -> ce.data) .to(jsonAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray())); - Serde genericAvroSerde = new GenericAvroSerde(); - Map config = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - genericAvroSerde.configure(config, false); + + RegistryService service = CompatibleClient.createCompatible(schemaRegistryUrl); + Deserializer deserializer = new AvroKafkaDeserializer<>(service); + Serde genericAvroSerde = Serdes.serdeFrom( + new AvroKafkaSerializer<>(service), + deserializer + ); builder.stream(avroAvroCustomersTopic, Consumed.with(longKeySerde, genericAvroSerde)) .mapValues(ce -> ((ByteBuffer) ce.get("data")).array()) diff --git a/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/model/CloudEvent.java b/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/model/CloudEvent.java index ec847ef2..f633eb7d 100644 --- a/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/model/CloudEvent.java +++ b/cloudevents/avro-data-extractor/src/main/java/io/debezium/examples/cloudevents/dataextractor/model/CloudEvent.java @@ -14,12 +14,16 @@ public class CloudEvent { public String iodebeziumconnector; public String iodebeziumname; public String iodebeziumtsms; - public boolean iodebeziumsnapshot; + public String iodebeziumsnapshot; public String iodebeziumdb; + public String iodebeziumsequence; public String iodebeziumschema; public String iodebeziumtable; public String iodebeziumtxId; + public String iodebeziumtxid; public String iodebeziumlsn; public String iodebeziumxmin; + public String iodebeziumtxtotalorder; + public String iodebeziumtxdatacollectionorder; public byte[] data; } diff --git a/cloudevents/avro-data-extractor/src/main/resources/application.properties b/cloudevents/avro-data-extractor/src/main/resources/application.properties index e2f4ac2f..bfc8efd3 100644 --- a/cloudevents/avro-data-extractor/src/main/resources/application.properties +++ b/cloudevents/avro-data-extractor/src/main/resources/application.properties @@ -4,9 +4,9 @@ json.avro.extracted.topic=customers2 avro.avro.customers.topic=dbserver3.inventory.customers avro.avro.extracted.topic=customers3 -schema.registry.url=http://schema-registry:8081 +schema.registry.url=http://schema-registry:8080/api/ -quarkus.kafka-streams.bootstrap-servers=localhost:9092 +quarkus.kafka-streams.bootstrap-servers=kafka:9092 quarkus.kafka-streams.application-id=cloudevents-data-extractor quarkus.kafka-streams.topics=${json.avro.customers.topic},${avro.avro.customers.topic} diff --git a/cloudevents/docker-compose.yaml b/cloudevents/docker-compose.yaml index 8cc4d995..c9914ff5 100644 --- a/cloudevents/docker-compose.yaml +++ b/cloudevents/docker-compose.yaml @@ -48,7 +48,7 @@ services: networks: - my-network schema-registry: - image: apicurio/apicurio-registry-mem:2.2.5.Final + image: apicurio/apicurio-registry-mem:2.3.1.Final ports: - 8080:8080 networks: