Assumes it’s installed locally
Drop & repopulate data:
mysql demo -uroot < ~/git/demo-scene/mysql-debezium-ksql/customers.sql
screen -d -m -S CP_demo -t mysql mysql -uroot demo
screen -S CP_demo -p 0 -X screen -t topic-raw kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic asgard.demo.customers-raw --from-beginning | jq '.'
screen -S CP_demo -p 0 -X screen -t topic-flat kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic asgard.demo.customers --from-beginning | jq '.'
screen -S CP_demo -p 0 -X screen -t ksql ksql
screen -S CP_demo -p 0 -X screen -t datagen ~/git/ksql/bin/ksql-datagen quickstart=ratings format=avro topic=ratings maxInterval=500
mysql> show tables;
+----------------+
| Tables_in_demo |
+----------------+
| customers |
+----------------+
1 row in set (0.00 sec)
mysql> select * from customers;
+----+------------+-----------+--------------------------------+--------+------------------------------------------------------+
| id | first_name | last_name | email | gender | comments |
+----+------------+-----------+--------------------------------+--------+------------------------------------------------------+
| 1 | Bibby | Argabrite | bargabrite0@google.com.hk | Female | Reactive exuding productivity |
| 2 | Auberon | Sulland | asulland1@slideshare.net | Male | Organized context-sensitive Graphical User Interface |
| 3 | Marv | Dalrymple | mdalrymple2@macromedia.com | Male | Versatile didactic pricing structure |
| 4 | Nolana | Yeeles | nyeeles3@drupal.org | Female | Adaptive real-time archive |
| 5 | Modestia | Coltart | mcoltart4@scribd.com | Female | Reverse-engineered non-volatile success |
| 6 | Bram | Acaster | bacaster5@pagesperso-orange.fr | Male | Robust systematic support |
| 7 | Marigold | Veld | mveld6@pinterest.com | Female | Sharable logistical installation |
| 8 | Ruperto | Matteotti | rmatteotti7@diigo.com | Male | Diverse client-server conglomeration |
+----+------------+-----------+--------------------------------+--------+------------------------------------------------------+
8 rows in set (0.00 sec)
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "mysql-source-demo-customers",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"include.schema.changes": "true",
"transforms": "unwrap,InsertTopic,InsertSourceDetails",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"Debezium CDC from MySQL on asgard"
}
}'
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "mysql-source-demo-customers-raw",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "43",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"include.schema.changes": "true",
"transforms": "addTopicSuffix",
"transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicSuffix.regex":"(.*)",
"transforms.addTopicSuffix.replacement":"$1-raw" }
}'
confluent local status connectors|grep -v Writing| jq '.[]'| xargs -I{connector} confluent local status {connector}| grep -v Writing| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
mysql-source-demo-customers | RUNNING | RUNNING
mysql-source-demo-customers-raw | RUNNING | RUNNING
[Optional - show that the topic has been created] -v
to exclude system topics prefixed with _
$ kafka-topics --zookeeper localhost:2181 --list|grep -v _
asgard
asgard.demo.customers
connect-configs
connect-offsets
connect-statuses
dbhistory.demo
Show contents:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic asgard.demo.customers --from-beginning --max-messages=1 | jq '.'
{
"id": 1,
"first_name": {
"string": "Bibby"
},
"last_name": {
"string": "Argabrite"
},
"email": {
"string": "[email protected]"
},
"gender": {
"string": "Female"
},
"comments": {
"string": "Reactive exuding productivity"
},
"messagetopic": {
"string": "asgard.demo.customers"
},
"messagesource": {
"string": "Debezium CDC from MySQL on asgard"
}
}
Processed a total of 1 messages
Run consumer, one for raw, one for flattened :
confluent local consume asgard.demo.customers-raw -- --from-beginning --value-format avro
confluent local consume asgard.demo.customers -- --from-beginning --value-format avro
insert into customers (id,first_name,last_name) values (42,'Rick','Astley');
update customers set first_name='Bob' where id=1;
Point out before/after records in raw
stream
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_SRC WITH (KAFKA_TOPIC='asgard.demo.customers', VALUE_FORMAT='AVRO');
-- check the output / wait a moment?
-- select * from CUSTOMERS_SRC;
CREATE STREAM CUSTOMERS_SRC_REKEY AS SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID');
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, MESSAGESOURCE FROM CUSTOMERS;
ksql> SELECT C.ROWKEY, C.ID FROM CUSTOMERS_SRC C LIMIT 3;
| 1
| 2
| 3
ksql> SELECT C.ROWKEY, C.ID FROM CUSTOMERS C LIMIT 3;
1 | 1
2 | 2
3 | 3
PRINT 'ratings';
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
ksql> SELECT R.RATING_ID, R.CHANNEL, R.MESSAGE, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME FROM RATINGS R LEFT JOIN CUSTOMERS C ON R.USER_ID = C.ID WHERE C.FIRST_NAME IS NOT NULL;
241 | android | (expletive deleted) | Bram Acaster
245 | web | Exceeded all my expectations. Thank you ! | Marigold Veld
247 | android | airport refurb looks great, will fly outta here more! | Modestia Coltart
251 | iOS-test | why is it so difficult to keep the bathrooms clean ? | Bob Argabrite
252 | iOS | more peanuts please | Marv Dalrymple
254 | web | why is it so difficult to keep the bathrooms clean ? | Marigold Veld
255 | iOS-test | is this as good as it gets? really ? | Ruperto Matteotti
257 | web | is this as good as it gets? really ? | Marigold Veld
259 | iOS-test | your team here rocks! | Bob Argabrite
Persist this stream of data
ksql> CREATE STREAM RATINGS_ENRICHED AS SELECT R.RATING_ID, R.CHANNEL, R.MESSAGE, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME FROM RATINGS R LEFT JOIN CUSTOMERS C ON R.USER_ID = C.ID WHERE C.FIRST_NAME IS NOT NULL ;
Customers is a KSQL table, which means that we have the latest value for a given key.
Check out the ratings for customer id 2 only:
ksql> SELECT * FROM RATINGS_ENRICHED WHERE ID=2;
In mysql, make a change to ID 2
mysql> UPDATE CUSTOMERS SET FIRST_NAME = 'Thomas', LAST_NAME ='Smith' WHERE ID=2;
Observe in the continuous KSQL query that the customer name has now changed.
Simple aggregation - count of ratings per person, per minute:
ksql> SELECT FULL_NAME,COUNT(*) FROM RATINGS_ENRICHED WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY FULL_NAME;
Persist this and show the timestamp:
CREATE TABLE RATINGS_PER_CUSTOMER_PER_MINUTE AS SELECT FULL_NAME,COUNT(*) AS RATINGS_COUNT FROM RATINGS_ENRICHED WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY FULL_NAME;
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_MINUTE;
Tested on Elasticsearch 6.2.3.
brew install elasticsearch
brew install kibana
elasticsearch
kibana
curl -XDELETE "http://localhost:9200/*"
PUT /_template/kafkaconnect/
{
"index_patterns": "*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"dates": {
"match": "EXTRACT_TS",
"mapping": {
"type": "date"
}
}
},
{
"non_analysed_string_template": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}
}
curl -X "POST" "http://localhost:8083/connectors/" \
-H "Content-Type: application/json" \
-d '{
"name": "es_sink_RATINGS_ENRICHED",
"config": {
"topics": "'RATINGS_ENRICHED'",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.ignore": "true",
"schema.ignore": "false",
"type.name": "type.name=kafkaconnect",
"topic.index.map": "'RATINGS_ENRICHED':'ratings_enriched'",
"connection.url": "http://localhost:9200",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "EXTRACT_TS"
}
}'
(Import kibana_objects.json
for pre-built dashboard & viz)
Without the index template, fields are sent through as text
(and therefore not available for aggregate analysis). With the index template the connector fails with
{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [type.name=kafkaconnect, kafkaconnect]"},
Or:
org.apache.kafka.connect.errors.ConnectException: Cannot create mapping {"type.name=kafkaconnect":{"properties":{"RATING_ID":{"type":"long"},"CHANNEL":{"type":"text"},"MESSAGE":{"type":"text"},"ID":{"type":"integer"},"FULL_NAME":{"type":"text"},"EXTRACT_TS":{"type":"date"}}}} -- {"root_cause":[{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [foo, type.name=kafkaconnect]"}],"type":"illegal_argument_exception","reason":"Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [foo, type.name=kafkaconnect]"}
`"index": "not_analyzed"` will cause failure in ES6 ( `{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"illegal_argument_exception","reason":"Could not convert [CHANNEL.index] to boolean","caused_by":{"type":"illegal_argument_exception","reason":"Failed to parse value [not_analyzed] as only [true] or [false] are allowed."}}},`) - but simply removing it seems to work
This works for ES6
PUT /_template/kafkaconnect/ { "index_patterns": "*", "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "_default_": { "dynamic_templates": [ { "dates": { "match": "EXTRACT_TS", "mapping": { "type": "date" } } }, { "non_analysed_string_template": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ] } } }
N.b. it grumbles about depreciated options. Switching out `templates` for `"index_patterns": "*",` is fine, but changing the `_default_` value seems to screw things up (and may have been the cause of my problems above - `Rejecting mapping update to [ratings_enriched] as the final mapping would have more than 1 type: [type.name=kafkaconnect, kafkaconnect]"}]`)