Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table column name was not found for clickhouse sink connector #480

Open
raghav2flobiz opened this issue Feb 29, 2024 · 1 comment
Open

Comments

@raghav2flobiz
Copy link

I am trying to sink data in clickhouse from kafka using kafka clickhouse sink connector.

This is my config for sink connector
config: {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"clickhouse.url": jdbc:clickhouse://:8123,
"tasks.max": "1",
"consumer.override.max.poll.records": "5000",
"consumer.override.max.partition.fetch.bytes": "5242880",
"database": "flobooksprod",
"errors.retry.timeout": "60",
"exactlyOnce": "false",
"hostname": "
",
"port": "8123",
"username": "",
"password": "
",
"topics": "pincodes",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"clickhouseSettings": ""
}

I am getting a error of table column name was not found,

Infact I have created a table with this command

CREATE TABLE pincodes (
id UUID DEFAULT generateUUIDv4() NOT NULL,
pincode character varying,
state character varying,
district character varying,
is_active boolean DEFAULT true,
created_at DateTime NOT NULL,
updated_at DateTime NOT NULL
)ENGINE = MergeTree ORDER BY id;

And this is also my change event captured from debezium from pincodes topics

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"name":"io.debezium.data.Uuid","version":1,"default":"00000000-0000-0000-0000-000000000000","field":"id"},{"type":"string","optional":true,"field":"pincode"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"district"},{"type":"boolean","optional":true,"default":true,"field":"is_active"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debz.public.pincodes.Value","field":"before"},{"type":"struct","fields":[{"type":"string","optional":false,"name":"io.debezium.data.Uuid","version":1,"default":"00000000-0000-0000-0000-000000000000","field":"id"},{"type":"string","optional":true,"field":"pincode"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"district"},{"type":"boolean","optional":true,"default":true,"field":"is_active"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debz.public.pincodes.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debz.public.pincodes.Envelope","version":1},"payload":{"before":null,"after":{"id":"adefd747-bc17-48fd-ac47-3a35d00a0568","pincode":"560068","state":"KA","district":"HSR","is_active":true,"created_at":1709195239954064,"updated_at":1709195239954064},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"debz","ts_ms":1709195239954,"snapshot":"false","db":"flobooksprod","sequence":"["1621741744","1622252848"]","schema":"public","table":"pincodes","txId":13087999,"lsn":1622252848,"xmin":null},"op":"c","ts_ms":1709195240355,"transaction":null}}

@subkanthi
Copy link
Collaborator

Hi @raghav2flobiz , can you please share the logs on where you see the missing column name, and also does it work if the destination table is ReplacingMergeTree, also can confirm the docker tag or JAR you are using.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants