Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add steps to start a second connector. #959

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions doc/multiple_connectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Start the first connector with the usual configuration.

```
./getLatestRelease.sh
```
Set the sink connector version.
```
export CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE=altinity/clickhouse-sink-connector:2.5.0-lt
```

Start the connector. (This will start the sink connector, MySQL and ClickHouse.)
The name of the first connector is the service name `clickhouse-sink-connector-lt`
```
docker-compose up
```

# Start the second connector

Note: The second connector docker service (clickhouse-sink-connector-lt-2) is setup to use
a different configuration file(config-2.yml) where the following configuration variables are different.
```
name: "company-2"
database.server.id: "22323"
database.server.id: "22323"
```

This will start the second connector (docker service) with the configuration file `config-2.yml`.
Also note the port number for the second connector is different.
```
docker-compose -f clickhouse-sink-connector-lt-service-2.yml create clickhouse-sink-connector-lt-2
docker-compose -f clickhouse-sink-connector-lt-service-2.yml start clickhouse-sink-connector-lt-2
```

Check the docker ps to verify that the second connector is running.
```
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3c55644dddac altinity/clickhouse-sink-connector:2.5.0-lt "sh -c 'java -agentl…" About an hour ago Up About an hour 0.0.0.0:5006->5005/tcp, :::5006->5005/tcp, 0.0.0.0:7009->7000/tcp, :::7009->7000/tcp, 0.0.0.0:8089->8083/tcp, :::8089->8083/tcp docker_clickhouse-sink-connector-lt-2_1
d16b13ee6c27 docker_grafana "/run.sh /run.sh" About an hour ago Up About an hour 0.0.0.0:3000->3000/tcp, :::3000->3000/tcp docker_grafana_1
c11525b3af94 altinity/clickhouse-sink-connector:2.5.0-lt "sh -c 'java -agentl…" About an hour ago Up About an hour 0.0.0.0:5005->5005/tcp, :::5005->5005/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 0.0.0.0:39999->39999/tcp, :::39999->39999/tcp, 0.0.0.0:7001->7000/tcp, :::7001->7000/tcp docker_clickhouse-sink-connector-lt_1

```

Verify the offsets storage values in ClickHouse(To validate that they are not overwritten by the second connector)
```
docker exec -it clickhouse bash
root@ec3bece132cc:/# clickhouse-client
ClickHouse client version 23.8.5.16 (official build).
Connecting to localhost:9000 as user root.
Connected to ClickHouse server version 23.8.5 revision 54465.

ec3bece132cc :) use altinity_sink_connector

USE altinity_sink_connector

SELECT *
FROM replica_source_info
FINAL

Query id: 4a92b294-16d8-41ba-9f42-b95928debaf0

┌─id───────────────────────────────────┬─offset_key──────────────────────────────────┬─offset_val────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────record_insert_ts─┬─record_insert_seq─┐
│ a952b7c7-bfd2-4533-9dfb-d4f0778c7a22 │ ["company-1",{"server":"sink-connector-1"}] │ {"ts_sec":1734449681,"file":"mysql-bin.000003","pos":197,"gtids":"a7123a4c-bcbe-11ef-9f5c-0242ac180004:1-56","snapshot":true} │ 2024-12-17 15:37:04 │ 484 │
│ cf37949f-47d1-4b17-aaa4-1b2236bb384a │ ["company-2",{"server":"sink-connector-2"}] │ {"ts_sec":1734449823,"file":"mysql-bin.000003","pos":197,"gtids":"a7123a4c-bcbe-11ef-9f5c-0242ac180004:1-56","snapshot":true} │ 2024-12-17 15:37:04 │ 483 │
└──────────────────────────────────────┴─────────────────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┴───────────────────┘

2 rows in set. Elapsed: 0.001 sec.

```
2 changes: 1 addition & 1 deletion sink-connector-lightweight/docker/clickhouse-service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- CLICKHOUSE_USER=root
- CLICKHOUSE_PASSWORD=root
- CLICKHOUSE_DB=test
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
ulimits:
nofile:
soft: "262144"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: "3.4"

services:
clickhouse-sink-connector-lt-2:
image: ${CLICKHOUSE_SINK_CONNECTOR_LT_IMAGE}
entrypoint: ["sh", "-c", "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Xms4g -Xmx4g -Dlog4j2.configurationFile=log4j2.xml -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=39999 -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]
restart: "no"
ports:
- "8089:8083"
- "5006:5005"
- "7009:7000"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./log4j2.xml:/log4j2.xml
- ./config-2.yml:/config.yml
logging:
options:
max-size: "100m"
max-file: "5"
185 changes: 185 additions & 0 deletions sink-connector-lightweight/docker/config-2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#### Some of the properties are part of Debezium MYSQL Connector
#### https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties

# Unique name for the connector. Attempting to register again with the same name will fail.
name: "company-2"

# Primary key used for state storage. Refer to State Storage documentation.
# If multiple connectors are writing to the same ClickHouse instance, this
# value needs to be unique for a connector.
topic.prefix: "sink-connector-2"

# IP address or hostname of the MySQL database server.
database.hostname: "mysql-master"

# Integer port number of the MySQL database server listening for client connections.
database.port: "3306"

# Name of the MySQL database user to be used when connecting to the database.
database.user: "root"

# Password of the MySQL database user to be used when connecting to the database.
database.password: "root"

# Unique name for the connector.
database.server.id: "22323"

# The name of the MySQL database from which events are to be captured when not using snapshot mode.
database.server.name: "ER54"

# database.include.list An optional list of regular expressions that match database names to be monitored;
# any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored.
database.include.list: test

# table.include.list An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
table.include.list: ""

# Clickhouse Server URL, Specify only the hostname.
clickhouse.server.url: "clickhouse"

# Clickhouse Server User
clickhouse.server.user: "root"

#Clickhouse Server Password
clickhouse.server.password: "root"

# Clickhouse Server Port
clickhouse.server.port: "8123"

# database.allowPublicKeyRetrieval: "true" https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
database.allowPublicKeyRetrieval: "true"

# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.
# The default value of the property is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property
snapshot.mode: "initial"

# Snapshot.locking.mode Required for Debezium 2.7.0 and later. The snapshot.locking.mode configuration property specifies the mode that the connector uses to lock tables during snapshotting.
#snapshot.locking.mode: "minimal"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 5000

# connector.class: The Java class for the connector. This must be set to io.debezium.connector.mysql.MySqlConnector.
connector.class: "io.debezium.connector.mysql.MySqlConnector"

# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "root"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version) ORDER BY offset_key SETTINGS index_granularity = 8192"

# offset.storage.jdbc.offset.table.delete: The DML statement used to delete the database table where connector offsets are to be stored.(Advanced)
offset.storage.jdbc.offset.table.delete: "select * from %s"

offset.storage.jdbc.offset.table.select: "SELECT id, offset_key, offset_val FROM %s FINAL ORDER BY record_insert_ts, record_insert_seq"

# schema.history.internal: The Java class that implements the schema history strategy. This must be set to io.debezium.storage.jdbc.history.JdbcSchemaHistory.
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"

# schema.history.internal.jdbc.url: The JDBC URL for the database where connector schema history is to be stored.
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"

# schema.history.internal.jdbc.user: The name of the database user to be used when connecting to the database where connector schema history is to be stored.
schema.history.internal.jdbc.user: "root"

# schema.history.internal.jdbc.password: The password of the database user to be used when connecting to the database where connector schema history is to be stored.
schema.history.internal.jdbc.password: "root"

# schema.history.internal.jdbc.schema.history.table.ddl: The DDL statement used to create the database table where connector schema history is to be stored.(Advanced)
schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exists %s
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"

# schema.history.internal.jdbc.schema.history.table.name: The name of the database table where connector schema history is to be stored.
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"

# enable.snapshot.ddl: If set to true, the connector will parse the DDL statements from the initial load
enable.snapshot.ddl: "true"

# persist.raw.bytes: If set to true, the connector will persist raw bytes as received in a String column.
persist.raw.bytes: "false"

# auto.create.tables: If set to true, the connector will create tables in the target based on the schema received in the incoming message.
auto.create.tables: "true"

# auto.create.tables.replicated: If set to true, the connector will create table with Engine set to ReplicatedReplacingMergeTree
#"auto.create.tables.replicated: "true"

# database.connectionTimeZone: The timezone of the MySQL database server used to correctly shift the commit transaction timestamp.
database.connectionTimeZone: "UTC"

# Configuration to override the clickhouse database name for a given MySQL database name. If this configuration is not
# provided, the MySQL database name will be used as the ClickHouse database name.
clickhouse.database.override.map: "test:ch_test"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"

# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.
#skip_replica_start: "false"

# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.
#binary.handling.mode: "base64"

# ignore_delete: If set to true, the connector will ignore delete events. The default is false.
#ignore_delete: "true"

#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
#disable.ddl: "false"

#ignore.ddl.regex: If set, the connector will ignore DDL events that match the regex.
ignore.ddl.regex: "(?i)(ANALYZE PARTITION).*"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"

#restart.event.loop: This will restart the CDC event loop if there are no messages received after
#timeout specified in restart.event.loop.timeout.period.secs.
#Workaround to restart debezium loop(in case of freeze)
#restart.event.loop: "true"

#restart.event.loop.timeout.period.secs: Defines the restart timeout period.
#restart.event.loop.timeout.period.secs: "3000"

# Flush time of the buffer in milliseconds. The buffer that is stored in memory before being flushed to ClickHouse.
#buffer.flush.time.ms: "1000"

# Max number of records for the flush buffer.
#buffer.max.records: "10000"

# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# Maximum number of threads in the thread pool for processing CDC records.
#thread.pool.size: 10

# Sink Connector maximum queue size
#sink.connector.max.queue.size: "100000"

#Metrics (Prometheus target), required for Grafana Dashboard
metrics.enable: "true"

# Skip schema history capturing, use the following configuration
# to reduce slow startup when replicating dbs with large number of tables
#schema.history.internal.store.only.captured.tables.ddl: "true"
#schema.history.internal.store.only.captured.databases.ddl: "true"
Loading