Skip to content

Latest commit

 

History

History
56 lines (38 loc) · 2.51 KB

README.md

File metadata and controls

56 lines (38 loc) · 2.51 KB

#Demo Kafka Connect & Kafka Streams

##Prerequisites :

  • Kafka 0.10+. You can download a tech preview confluent platform here
  • A running kafka cluster (one node in localhost is ok !)
  • A running postgres database

##Create the postgres table

CREATE TABLE message(date TIMESTAMP NOT NULL,id SERIAL PRIMARY KEY, username VARCHAR(100), message TEXT);

##Kafka Connect

###Start Kafka connect cluster

  • In kafka-connect/start-distributed-connect.sh, replace by the path you installed Kafka 0.10
  • Review the configration specified in kafka-connect/conf/connect-distributed.properties.1 and kafka-connect/conf/connect-distributed.properties.2
  • Launch kafka-connect/start-distributed-connect.sh
  • Check log trace in kafka-connect/log
  • Check HTTP response to localhost:8085 and localhost:8086

###JDBC connector registration

Insert into postgres and check topic table-message

  • Open a console-consumer
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic table-message
  • Insert a record into postgres message table
insert into message(date,username,message,city) values (now(),'Herve','Hello kafka User Group','Paris');
  • Record must appears in the console-consumer

##Kafka Stream

  • Import the maven project into IntelliJ/ Eclipse...

###Insert into postgres and check topic table-message-kafka

  • Execute the main class DemoKafkaStreams.java
  • Open a console-consumer
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic table-message-kafka
  • Insert records in message postgre table. If the message contains "kafka user group" it must appears in table-message-kafka topic.
  • To check the KTable, insert the same message two times. The username must appears in table-message-kafka topic.

###Check kafka-user-group-example-kafka_user_count-changelog topic

  • If you want to check the persistence topic of the Ktable, execute :
$KAFKA_HOME/bin/kafka-console-consumer --zookeeper localhost:2181 --topic kafka-user-group-example-kafka_user_count-changelog    \
--formatter kafka.tools.DefaultMessageFormatter           --property print.key=true           \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer    \       
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer \
--from-beginning