diff --git a/doc/multiple_connectors.md b/doc/multiple_connectors.md new file mode 100644 index 000000000..bea9a6fed --- /dev/null +++ b/doc/multiple_connectors.md @@ -0,0 +1,69 @@ +# Start the first connector with the usual configuration. + +``` +./getLatestRelease.sh +``` +Set the sink connector version. +``` +export CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE=altinity/clickhouse-sink-connector:2.5.0-lt +``` + +Start the connector. (This will start the sink connector, MySQL and ClickHouse.) +The name of the first connector is the service name `clickhouse-sink-connector-lt` +``` +docker-compose up +``` + +# Start the second connector + +Note: The second connector docker service (clickhouse-sink-connector-lt-2) is setup to use +a different configuration file(config-2.yml) where the following configuration variables are different. +``` +name: "company-2" +database.server.id: "22323" +database.server.id: "22323" +``` + +This will start the second connector (docker service) with the configuration file `config-2.yml`. +Also note the port number for the second connector is different. +``` +docker-compose -f clickhouse-sink-connector-lt-service-2.yml create clickhouse-sink-connector-lt-2 +docker-compose -f clickhouse-sink-connector-lt-service-2.yml start clickhouse-sink-connector-lt-2 +``` + +Check the docker ps to verify that the second connector is running. +``` +docker ps +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +3c55644dddac altinity/clickhouse-sink-connector:2.5.0-lt "sh -c 'java -agentl…" About an hour ago Up About an hour 0.0.0.0:5006->5005/tcp, :::5006->5005/tcp, 0.0.0.0:7009->7000/tcp, :::7009->7000/tcp, 0.0.0.0:8089->8083/tcp, :::8089->8083/tcp docker_clickhouse-sink-connector-lt-2_1 +d16b13ee6c27 docker_grafana "/run.sh /run.sh" About an hour ago Up About an hour 0.0.0.0:3000->3000/tcp, :::3000->3000/tcp docker_grafana_1 +c11525b3af94 altinity/clickhouse-sink-connector:2.5.0-lt "sh -c 'java -agentl…" About an hour ago Up About an hour 0.0.0.0:5005->5005/tcp, :::5005->5005/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 0.0.0.0:39999->39999/tcp, :::39999->39999/tcp, 0.0.0.0:7001->7000/tcp, :::7001->7000/tcp docker_clickhouse-sink-connector-lt_1 + +``` + +Verify the offsets storage values in ClickHouse(To validate that they are not overwritten by the second connector) +``` +docker exec -it clickhouse bash +root@ec3bece132cc:/# clickhouse-client +ClickHouse client version 23.8.5.16 (official build). +Connecting to localhost:9000 as user root. +Connected to ClickHouse server version 23.8.5 revision 54465. + +ec3bece132cc :) use altinity_sink_connector + +USE altinity_sink_connector + +SELECT * +FROM replica_source_info +FINAL + +Query id: 4a92b294-16d8-41ba-9f42-b95928debaf0 + +┌─id───────────────────────────────────┬─offset_key──────────────────────────────────┬─offset_val────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────record_insert_ts─┬─record_insert_seq─┐ +│ a952b7c7-bfd2-4533-9dfb-d4f0778c7a22 │ ["company-1",{"server":"sink-connector-1"}] │ {"ts_sec":1734449681,"file":"mysql-bin.000003","pos":197,"gtids":"a7123a4c-bcbe-11ef-9f5c-0242ac180004:1-56","snapshot":true} │ 2024-12-17 15:37:04 │ 484 │ +│ cf37949f-47d1-4b17-aaa4-1b2236bb384a │ ["company-2",{"server":"sink-connector-2"}] │ {"ts_sec":1734449823,"file":"mysql-bin.000003","pos":197,"gtids":"a7123a4c-bcbe-11ef-9f5c-0242ac180004:1-56","snapshot":true} │ 2024-12-17 15:37:04 │ 483 │ +└──────────────────────────────────────┴─────────────────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴───────────────────┘ + +2 rows in set. Elapsed: 0.001 sec. + +``` \ No newline at end of file diff --git a/sink-connector-lightweight/docker/clickhouse-service.yml b/sink-connector-lightweight/docker/clickhouse-service.yml index 610b7871b..977691831 100644 --- a/sink-connector-lightweight/docker/clickhouse-service.yml +++ b/sink-connector-lightweight/docker/clickhouse-service.yml @@ -12,7 +12,7 @@ services: - CLICKHOUSE_USER=root - CLICKHOUSE_PASSWORD=root - CLICKHOUSE_DB=test - - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0 + - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 ulimits: nofile: soft: "262144" diff --git a/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service-2.yml b/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service-2.yml new file mode 100644 index 000000000..d15874dbf --- /dev/null +++ b/sink-connector-lightweight/docker/clickhouse-sink-connector-lt-service-2.yml @@ -0,0 +1,20 @@ +version: "3.4" + +services: + clickhouse-sink-connector-lt-2: + image: ${CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE} + entrypoint: ["sh", "-c", "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Xms4g -Xmx4g -Dlog4j2.configurationFile=log4j2.xml -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=39999 -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"] + restart: "no" + ports: + - "8089:8083" + - "5006:5005" + - "7009:7000" + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./log4j2.xml:/log4j2.xml + - ./config-2.yml:/config.yml + logging: + options: + max-size: "100m" + max-file: "5" diff --git a/sink-connector-lightweight/docker/config-2.yml b/sink-connector-lightweight/docker/config-2.yml new file mode 100644 index 000000000..fc71c7074 --- /dev/null +++ b/sink-connector-lightweight/docker/config-2.yml @@ -0,0 +1,185 @@ +#### Some of the properties are part of Debezium MYSQL Connector +#### https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties + +# Unique name for the connector. Attempting to register again with the same name will fail. +name: "company-2" + +# Primary key used for state storage. Refer to State Storage documentation. +# If multiple connectors are writing to the same ClickHouse instance, this +# value needs to be unique for a connector. +topic.prefix: "sink-connector-2" + +# IP address or hostname of the MySQL database server. +database.hostname: "mysql-master" + +# Integer port number of the MySQL database server listening for client connections. +database.port: "3306" + +# Name of the MySQL database user to be used when connecting to the database. +database.user: "root" + +# Password of the MySQL database user to be used when connecting to the database. +database.password: "root" + +# Unique name for the connector. +database.server.id: "22323" + +# The name of the MySQL database from which events are to be captured when not using snapshot mode. +database.server.name: "ER54" + +# database.include.list An optional list of regular expressions that match database names to be monitored; +# any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. +database.include.list: test + +# table.include.list An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored; +table.include.list: "" + +# Clickhouse Server URL, Specify only the hostname. +clickhouse.server.url: "clickhouse" + +# Clickhouse Server User +clickhouse.server.user: "root" + +#Clickhouse Server Password +clickhouse.server.password: "root" + +# Clickhouse Server Port +clickhouse.server.port: "8123" + +# database.allowPublicKeyRetrieval: "true" https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/ +database.allowPublicKeyRetrieval: "true" + +# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property. +# The default value of the property is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property +snapshot.mode: "initial" + +# Snapshot.locking.mode Required for Debezium 2.7.0 and later. The snapshot.locking.mode configuration property specifies the mode that the connector uses to lock tables during snapshotting. +#snapshot.locking.mode: "minimal" + +# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval. +offset.flush.interval.ms: 5000 + +# connector.class: The Java class for the connector. This must be set to io.debezium.connector.mysql.MySqlConnector. +connector.class: "io.debezium.connector.mysql.MySqlConnector" + +# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore. +offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore" + +# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored. +offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info" + +# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored. +offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" + +# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored. +offset.storage.jdbc.user: "root" + +# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored. +offset.storage.jdbc.password: "root" + +# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced) +offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s +( + `id` String, + `offset_key` String, + `offset_val` String, + `record_insert_ts` DateTime, + `record_insert_seq` UInt64, + `_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9)) +) +ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192" + +# offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced) +offset.storage.jdbc.offset.table.delete: "select * from %s" + +offset.storage.jdbc.offset.table.select: "SELECT id, offset_key, offset_val FROM %s FINAL ORDER BY record_insert_ts, record_insert_seq" + +# schema.history.internal: The Java class that implements the schema history strategy. This must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory. +schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory" + +# schema.history.internal.jdbc.url: The JDBC URL for the database where connector schema history is to be stored. +schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector" + +# schema.history.internal.jdbc.user: The name of the database user to be used when connecting to the database where connector schema history is to be stored. +schema.history.internal.jdbc.user: "root" + +# schema.history.internal.jdbc.password: The password of the database user to be used when connecting to the database where connector schema history is to be stored. +schema.history.internal.jdbc.password: "root" + +# schema.history.internal.jdbc.schema.history.table.ddl: The DDL statement used to create the database table where connector schema history is to be stored.(Advanced) +schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s +(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id" + +# schema.history.internal.jdbc.schema.history.table.name: The name of the database table where connector schema history is to be stored. +schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history" + +# enable.snapshot.ddl: If set to true, the connector will parse the DDL statements from the initial load +enable.snapshot.ddl: "true" + +# persist.raw.bytes: If set to true, the connector will persist raw bytes as received in a String column. +persist.raw.bytes: "false" + +# auto.create.tables: If set to true, the connector will create tables in the target based on the schema received in the incoming message. +auto.create.tables: "true" + +# auto.create.tables.replicated: If set to true, the connector will create table with Engine set to ReplicatedReplacingMergeTree +#"auto.create.tables.replicated: "true" + +# database.connectionTimeZone: The timezone of the MySQL database server used to correctly shift the commit transaction timestamp. +database.connectionTimeZone: "UTC" + +# Configuration to override the clickhouse database name for a given MySQL database name. If this configuration is not +# provided, the MySQL database name will be used as the ClickHouse database name. +clickhouse.database.override.map: "test:ch_test" + +# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone. +#clickhouse.datetime.timezone: "UTC" + +# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication. +#skip_replica_start: "false" + +# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes. +#binary.handling.mode: "base64" + +# ignore_delete: If set to true, the connector will ignore delete events. The default is false. +#ignore_delete: "true" + +#disable.ddl: If set to true, the connector will ignore DDL events. The default is false. +#disable.ddl: "false" + +#ignore.ddl.regex: If set, the connector will ignore DDL events that match the regex. +ignore.ddl.regex: "(?i)(ANALYZE PARTITION).*" + +#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false. +#disable.drop.truncate: "false" + +#restart.event.loop: This will restart the CDC event loop if there are no messages received after +#timeout specified in restart.event.loop.timeout.period.secs. +#Workaround to restart debezium loop(in case of freeze) +#restart.event.loop: "true" + +#restart.event.loop.timeout.period.secs: Defines the restart timeout period. +#restart.event.loop.timeout.period.secs: "3000" + +# Flush time of the buffer in milliseconds. The buffer that is stored in memory before being flushed to ClickHouse. +#buffer.flush.time.ms: "1000" + +# Max number of records for the flush buffer. +#buffer.max.records: "10000" + +# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas. +#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000" + +# Maximum number of threads in the thread pool for processing CDC records. +#thread.pool.size: 10 + +# Sink Connector maximum queue size +#sink.connector.max.queue.size: "100000" + +#Metrics (Prometheus target), required for Grafana Dashboard +metrics.enable: "true" + +# Skip schema history capturing, use the following configuration +# to reduce slow startup when replicating dbs with large number of tables +#schema.history.internal.store.only.captured.tables.ddl: "true" +#schema.history.internal.store.only.captured.databases.ddl: "true"