diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index a7e2fabfc65..8bea661c33d 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -49,17 +49,17 @@ The internal implementation of Doris sink connector is cached and imported by st | sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | | sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/data-operate/transaction?_highlight=two&_highlight=phase#stream-load-2pc). | | sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual/) | -| sink.check-interval | int | No | 10000 | check exception with the interval while loading | -| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | -| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | -| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | -| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | -| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | -| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | -| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | -| save_mode_create_template | string | no | see below | see below | -| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | -| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | +| sink.check-interval | int | No | 10000 | check exception with the interval while loading | +| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | +| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | +| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | +| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | +| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | +| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | +| save_mode_create_template | string | no | see below | see below | +| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | +| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | ### schema_save_mode[Enum] diff --git a/docs/zh/connector-v2/sink/Doris.md b/docs/zh/connector-v2/sink/Doris.md index f0504977aec..76e1d4f8db1 100644 --- a/docs/zh/connector-v2/sink/Doris.md +++ b/docs/zh/connector-v2/sink/Doris.md @@ -48,17 +48,17 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的 | sink.label-prefix | String | Yes | - | stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。 | | sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考[此处](https://doris.apache.org/docs/data-operate/transaction?_highlight=two&_highlight=phase#stream-load-2pc)。 | | sink.enable-delete | bool | No | - | 是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此[link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual/)获得更多详细信息 | -| sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 | -| sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 | -| sink.buffer-size | int | No | 256 * 1024 | 用于缓存stream load数据的缓冲区大小。 | -| sink.buffer-count | int | No | 3 | 用于缓存stream load数据的缓冲区计数。 | -| doris.batch.size | int | No | 1024 | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。 | -| needs_unsupported_type_casting | boolean | No | false | 是否启用不支持的类型转换,例如 Decimal64 到 Double。 | -| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode` | -| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的`data_save_mode`。 | -| save_mode_create_template | string | no | see below | 见下文。 | -| custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 | -| doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 | +| sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 | +| sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 | +| sink.buffer-size | int | No | 256 * 1024 | 用于缓存stream load数据的缓冲区大小。 | +| sink.buffer-count | int | No | 3 | 用于缓存stream load数据的缓冲区计数。 | +| doris.batch.size | int | No | 1024 | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。 | +| needs_unsupported_type_casting | boolean | No | false | 是否启用不支持的类型转换,例如 Decimal64 到 Double。 | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode` | +| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的`data_save_mode`。 | +| save_mode_create_template | string | no | see below | 见下文。 | +| custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 | +| doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 | ### schema_save_mode[Enum] diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/GtidUtils.java new file mode 100644 index 00000000000..e90ac24adf1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/* + * Utils for handling GTIDs. + */ +public class GtidUtils { + + /** + * This method corrects the GTID set that has been restored from a state or checkpoint using the + * GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the + * restored GTID set is adjusted according to the server's GTID set to ensure it does not exceed + * the latter. For each UUID in the restored GTID set, if it exists in the server's GTID set, + * then it will be adjusted according to the server's GTID set; if it does not exist in the + * server's GTID set, it will be directly added to the new GTID set. + */ + public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) { + Map newSet = new HashMap<>(); + serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); + for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { + GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); + if (serverUuidSet != null) { + long restoredIntervalEnd = getIntervalEnd(uuidSet); + List newIntervals = + new ArrayList<>(); + for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { + if (serverInterval.getEnd() <= restoredIntervalEnd) { + newIntervals.add( + new com.github.shyiko.mysql.binlog.GtidSet.Interval( + serverInterval.getStart(), serverInterval.getEnd())); + } else if (serverInterval.getStart() <= restoredIntervalEnd + && serverInterval.getEnd() > restoredIntervalEnd) { + newIntervals.add( + new com.github.shyiko.mysql.binlog.GtidSet.Interval( + serverInterval.getStart(), restoredIntervalEnd)); + } + } + newSet.put( + uuidSet.getUUID(), + new GtidSet.UUIDSet( + new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet( + uuidSet.getUUID(), newIntervals))); + } else { + newSet.put(uuidSet.getUUID(), uuidSet); + } + } + return new GtidSet(newSet); + } + + /** + * This method merges one GTID set (toMerge) into another (base), without overwriting the + * existing elements in the base GTID set. + */ + public static GtidSet mergeGtidSetInto(GtidSet base, GtidSet toMerge) { + Map newSet = new HashMap<>(); + base.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); + for (GtidSet.UUIDSet uuidSet : toMerge.getUUIDSets()) { + if (!newSet.containsKey(uuidSet.getUUID())) { + newSet.put(uuidSet.getUUID(), uuidSet); + } + } + return new GtidSet(newSet); + } + + private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) { + return uuidSet.getIntervals().stream() + .mapToLong(GtidSet.Interval::getEnd) + .max() + .getAsLong(); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java new file mode 100644 index 00000000000..39e3e1f3402 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -0,0 +1,1589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.debezium.connector.mysql; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.ErrorMessageUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.EventHeader; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.GtidEventData; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.event.RowsQueryEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory; +import com.github.shyiko.mysql.binlog.network.SSLMode; +import com.github.shyiko.mysql.binlog.network.SSLSocketFactory; +import com.github.shyiko.mysql.binlog.network.ServerException; +import io.debezium.DebeziumException; +import io.debezium.annotation.SingleThreadAccess; +import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode; +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; +import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; +import io.debezium.data.Envelope.Operation; +import io.debezium.function.BlockingConsumer; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.relational.TableId; +import io.debezium.schema.SchemaChangeEvent; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import io.debezium.util.Strings; +import io.debezium.util.Threads; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import static io.debezium.util.Strings.isNullOrEmpty; + +/** + * Copied from Debezium project(1.9.8.Final) to fix + * https://github.com/ververica/flink-cdc-connectors/issues/1944. + * + *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously + * specifying starting offset on start. + * + *

Line 1485 : Add more error details for some exceptions. + * + * @author Jiri Pechanec + */ +public class MySqlStreamingChangeEventSource + implements StreamingChangeEventSource { + + private static final Logger LOGGER = + LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class); + + private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive"; + + private final EnumMap> eventHandlers = + new EnumMap<>(EventType.class); + private final BinaryLogClient client; + private final MySqlStreamingChangeEventSourceMetrics metrics; + private final Clock clock; + private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode; + private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode; + + private int startingRowNumber = 0; + private long initialEventsToSkip = 0L; + private boolean skipEvent = false; + private boolean ignoreDmlEventByGtidSource = false; + private final Predicate gtidDmlSourceFilter; + private final AtomicLong totalRecordCounter = new AtomicLong(); + private volatile Map lastOffset = null; + private com.github.shyiko.mysql.binlog.GtidSet gtidSet; + private final float heartbeatIntervalFactor = 0.8f; + private final Map binaryLogClientThreads = new ConcurrentHashMap<>(4); + private final MySqlTaskContext taskContext; + private final MySqlConnectorConfig connectorConfig; + private final MySqlConnection connection; + private final EventDispatcher eventDispatcher; + private final ErrorHandler errorHandler; + + @SingleThreadAccess("binlog client thread") + private Instant eventTimestamp; + + /** Describe binlog position. */ + public static class BinlogPosition { + final String filename; + final long position; + + public BinlogPosition(String filename, long position) { + assert filename != null; + + this.filename = filename; + this.position = position; + } + + public String getFilename() { + return filename; + } + + public long getPosition() { + return position; + } + + @Override + public String toString() { + return filename + "/" + position; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + filename.hashCode(); + result = prime * result + (int) (position ^ (position >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BinlogPosition other = (BinlogPosition) obj; + if (!filename.equals(other.filename)) { + return false; + } + if (position != other.position) { + return false; + } + return true; + } + } + + @FunctionalInterface + private interface BinlogChangeEmitter { + void emit(TableId tableId, T data) throws InterruptedException; + } + + public MySqlStreamingChangeEventSource( + MySqlConnectorConfig connectorConfig, + MySqlConnection connection, + EventDispatcher dispatcher, + ErrorHandler errorHandler, + Clock clock, + MySqlTaskContext taskContext, + MySqlStreamingChangeEventSourceMetrics metrics) { + + this.taskContext = taskContext; + this.connectorConfig = connectorConfig; + this.connection = connection; + this.clock = clock; + this.eventDispatcher = dispatcher; + this.errorHandler = errorHandler; + this.metrics = metrics; + + eventDeserializationFailureHandlingMode = + connectorConfig.getEventProcessingFailureHandlingMode(); + inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode(); + + // Set up the log reader ... + client = taskContext.getBinaryLogClient(); + // BinaryLogClient will overwrite thread names later + client.setThreadFactory( + Threads.threadFactory( + MySqlConnector.class, + connectorConfig.getLogicalName(), + "binlog-client", + false, + false, + x -> binaryLogClientThreads.put(x.getName(), x))); + client.setServerId(connectorConfig.serverId()); + client.setSSLMode(sslModeFor(connectorConfig.sslMode())); + if (connectorConfig.sslModeEnabled()) { + SSLSocketFactory sslSocketFactory = + getBinlogSslSocketFactory(connectorConfig, connection); + if (sslSocketFactory != null) { + client.setSslSocketFactory(sslSocketFactory); + } + } + Configuration configuration = connectorConfig.getConfig(); + client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE)); + final long keepAliveInterval = + configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS); + client.setKeepAliveInterval(keepAliveInterval); + // Considering heartbeatInterval should be less than keepAliveInterval, we use the + // heartbeatIntervalFactor + // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default + // value of heartbeatIntervalFactor + // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the + // packet received from the MySQL server. + client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor)); + + boolean filterDmlEventsByGtidSource = + configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS); + gtidDmlSourceFilter = + filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null; + + // Set up the event deserializer with additional type(s) ... + final Map tableMapEventByTableId = + new HashMap(); + EventDeserializer eventDeserializer = + new EventDeserializer() { + @Override + public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { + try { + // Delegate to the superclass ... + Event event = super.nextEvent(inputStream); + + // We have to record the most recent TableMapEventData for each table + // number for our custom deserializers ... + if (event.getHeader().getEventType() == EventType.TABLE_MAP) { + TableMapEventData tableMapEvent = event.getData(); + tableMapEventByTableId.put( + tableMapEvent.getTableId(), tableMapEvent); + } + + // DBZ-5126 Clean cache on rotate event to prevent it from growing + // indefinitely. + if (event.getHeader().getEventType() == EventType.ROTATE + && event.getHeader().getTimestamp() != 0) { + tableMapEventByTableId.clear(); + } + return event; + } + // DBZ-217 In case an event couldn't be read we create a pseudo-event for + // the sake of logging + catch (EventDataDeserializationException edde) { + // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the + // polling loop in + // BinaryLogClient#listenForEventPackets() keeps returning values != -1 + // from peek(); + // this causes the loop to never finish + // Propagating the exception (either EOF or socket closed) causes the + // loop to be aborted + // in this case + if (edde.getCause() instanceof IOException) { + throw edde; + } + + EventHeaderV4 header = new EventHeaderV4(); + header.setEventType(EventType.INCIDENT); + header.setTimestamp(edde.getEventHeader().getTimestamp()); + header.setServerId(edde.getEventHeader().getServerId()); + + if (edde.getEventHeader() instanceof EventHeaderV4) { + header.setEventLength( + ((EventHeaderV4) edde.getEventHeader()).getEventLength()); + header.setNextPosition( + ((EventHeaderV4) edde.getEventHeader()).getNextPosition()); + header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags()); + } + + EventData data = new EventDataDeserializationExceptionData(edde); + return new Event(header, data); + } + } + }; + + // Add our custom deserializers ... + eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer()); + eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer()); + eventDeserializer.setEventDataDeserializer( + EventType.WRITE_ROWS, + new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer( + EventType.UPDATE_ROWS, + new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer( + EventType.DELETE_ROWS, + new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer( + EventType.EXT_WRITE_ROWS, + new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId) + .setMayContainExtraInformation(true)); + eventDeserializer.setEventDataDeserializer( + EventType.EXT_UPDATE_ROWS, + new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId) + .setMayContainExtraInformation(true)); + eventDeserializer.setEventDataDeserializer( + EventType.EXT_DELETE_ROWS, + new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId) + .setMayContainExtraInformation(true)); + client.setEventDeserializer(eventDeserializer); + } + + protected void onEvent(MySqlOffsetContext offsetContext, Event event) { + long ts = 0; + + if (event.getHeader().getEventType() == EventType.HEARTBEAT) { + // HEARTBEAT events have no timestamp but are fired only when + // there is no traffic on the connection which means we are caught-up + // https://dev.mysql.com/doc/internals/en/heartbeat-event.html + metrics.setMilliSecondsBehindSource(ts); + return; + } + + // MySQL has seconds resolution but mysql-binlog-connector-java returns + // a value in milliseconds + long eventTs = event.getHeader().getTimestamp(); + + if (eventTs == 0) { + LOGGER.trace("Received unexpected event with 0 timestamp: {}", event); + return; + } + + ts = clock.currentTimeInMillis() - eventTs; + LOGGER.trace("Current milliseconds behind source: {} ms", ts); + metrics.setMilliSecondsBehindSource(ts); + } + + protected void ignoreEvent(MySqlOffsetContext offsetContext, Event event) { + LOGGER.trace("Ignoring event due to missing handler: {}", event); + } + + protected void handleEvent( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) { + if (event == null) { + return; + } + + final EventHeader eventHeader = event.getHeader(); + // Update the source offset info. Note that the client returns the value in *milliseconds*, + // even though the binlog + // contains only *seconds* precision ... + // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a + // HEARTBEAT + eventTimestamp = + !eventHeader.getEventType().equals(EventType.HEARTBEAT) + ? Instant.ofEpochMilli(eventHeader.getTimestamp()) + : null; + offsetContext.setBinlogServerId(eventHeader.getServerId()); + + final EventType eventType = eventHeader.getEventType(); + if (eventType == EventType.ROTATE) { + EventData eventData = event.getData(); + RotateEventData rotateEventData; + if (eventData instanceof EventDeserializer.EventDataWrapper) { + rotateEventData = + (RotateEventData) + ((EventDeserializer.EventDataWrapper) eventData).getInternal(); + } else { + rotateEventData = (RotateEventData) eventData; + } + offsetContext.setBinlogStartPoint( + rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition()); + } else if (eventHeader instanceof EventHeaderV4) { + EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; + offsetContext.setEventPosition( + trackableEventHeader.getPosition(), trackableEventHeader.getEventLength()); + } + + // If there is a handler for this event, forward the event to it ... + try { + // Forward the event to the handler ... + eventHandlers + .getOrDefault(eventType, (e) -> ignoreEvent(offsetContext, e)) + .accept(event); + + // Generate heartbeat message if the time is right + eventDispatcher.dispatchHeartbeatEvent(partition, offsetContext); + + // Capture that we've completed another event ... + offsetContext.completeEvent(); + + // update last offset used for logging + lastOffset = offsetContext.getOffset(); + + if (skipEvent) { + // We're in the mode of skipping events and we just skipped this one, so decrement + // our skip count ... + --initialEventsToSkip; + skipEvent = initialEventsToSkip > 0; + } + } catch (RuntimeException e) { + // There was an error in the event handler, so propagate the failure to Kafka Connect + // ... + logStreamingSourceState(); + errorHandler.setProducerThrowable( + new DebeziumException("Error processing binlog event", e)); + // Do not stop the client, since Kafka Connect should stop the connector on it's own + // (and doing it here may cause problems the second time it is stopped). + // We can clear the listeners though so that we ignore all future events ... + eventHandlers.clear(); + LOGGER.info( + "Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored."); + } catch (InterruptedException e) { + // Most likely because this reader was stopped and our thread was interrupted ... + Thread.currentThread().interrupt(); + eventHandlers.clear(); + LOGGER.info("Stopped processing binlog events due to thread interruption"); + } + } + + @SuppressWarnings("unchecked") + protected T unwrapData(Event event) { + EventData eventData = event.getData(); + if (eventData instanceof EventDeserializer.EventDataWrapper) { + eventData = ((EventDeserializer.EventDataWrapper) eventData).getInternal(); + } + return (T) eventData; + } + + /** + * Handle the supplied event that signals that mysqld has stopped. + * + * @param event the server stopped event to be processed; may not be null + */ + protected void handleServerStop(MySqlOffsetContext offsetContext, Event event) { + LOGGER.debug("Server stopped: {}", event); + } + + /** + * Handle the supplied event that is sent by a primary to a replica to let the replica know that + * the primary is still alive. Not written to a binary log. + * + * @param event the server stopped event to be processed; may not be null + */ + protected void handleServerHeartbeat( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + LOGGER.trace("Server heartbeat: {}", event); + eventDispatcher.dispatchServerHeartbeatEvent(partition, offsetContext); + } + + /** + * Handle the supplied event that signals that an out of the ordinary event that occurred on the + * master. It notifies the replica that something happened on the primary that might cause data + * to be in an inconsistent state. + * + * @param event the server stopped event to be processed; may not be null + */ + protected void handleServerIncident( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) { + if (event.getData() instanceof EventDataDeserializationExceptionData) { + metrics.onErroneousEvent(partition, "source = " + event); + EventDataDeserializationExceptionData data = event.getData(); + + EventHeaderV4 eventHeader = + (EventHeaderV4) + data.getCause() + .getEventHeader(); // safe cast, instantiated that ourselves + + // logging some additional context but not the exception itself, this will happen in + // handleEvent() + if (eventDeserializationFailureHandlingMode + == EventProcessingFailureHandlingMode.FAIL) { + LOGGER.error( + "Error while deserializing binlog event at offset {}.{}" + + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + offsetContext.getOffset(), + System.lineSeparator(), + eventHeader.getPosition(), + eventHeader.getNextPosition(), + offsetContext.getSource().binlogFilename()); + + throw new RuntimeException(data.getCause()); + } else if (eventDeserializationFailureHandlingMode + == EventProcessingFailureHandlingMode.WARN) { + LOGGER.warn( + "Error while deserializing binlog event at offset {}.{}" + + "This exception will be ignored and the event be skipped.{}" + + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + offsetContext.getOffset(), + System.lineSeparator(), + System.lineSeparator(), + eventHeader.getPosition(), + eventHeader.getNextPosition(), + offsetContext.getSource().binlogFilename(), + data.getCause()); + } + } else { + LOGGER.error("Server incident: {}", event); + } + } + + /** + * Handle the supplied event with a {@link RotateEventData} that signals the logs are being + * rotated. This means that either the server was restarted, or the binlog has transitioned to a + * new file. In either case, subsequent table numbers will be different than those seen to this + * point. + * + * @param event the database change data event to be processed; may not be null + */ + protected void handleRotateLogsEvent(MySqlOffsetContext offsetContext, Event event) { + LOGGER.debug("Rotating logs: {}", event); + RotateEventData command = unwrapData(event); + assert command != null; + taskContext.getSchema().clearTableMappings(); + } + + /** + * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID + * transaction. We don't yet know whether this transaction contains any events we're interested + * in, but we have to record it so that we know the position of this event and know we've + * processed the binlog to this point. + * + *

Note that this captures the current GTID and complete GTID set, regardless of whether the + * connector is the GTID set upon connection. We do this because we actually want to capture all + * GTID set values found in the binlog, whether or not we process them. However, only when we + * connect do we actually want to pass to MySQL only those GTID ranges that are applicable per + * the configuration. + * + * @param event the GTID event to be processed; may not be null + */ + protected void handleGtidEvent(MySqlOffsetContext offsetContext, Event event) { + LOGGER.debug("GTID transaction: {}", event); + GtidEventData gtidEvent = unwrapData(event); + String gtid = gtidEvent.getGtid(); + gtidSet.add(gtid); + offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set + ignoreDmlEventByGtidSource = false; + if (gtidDmlSourceFilter != null && gtid != null) { + String uuid = gtid.trim().substring(0, gtid.indexOf(":")); + if (!gtidDmlSourceFilter.test(uuid)) { + ignoreDmlEventByGtidSource = true; + } + } + metrics.onGtidChange(gtid); + } + + /** + * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL + * query that generated the event. + * + * @param event the database change data event to be processed; may not be null + */ + protected void handleRowsQuery(MySqlOffsetContext offsetContext, Event event) { + // Unwrap the RowsQueryEvent + final RowsQueryEventData lastRowsQueryEventData = unwrapData(event); + + // Set the query on the source + offsetContext.setQuery(lastRowsQueryEventData.getQuery()); + } + + /** + * Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL + * statements as changes in the MySQL schemas. + * + * @param partition the partition in which the even occurred + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while recording the DDL statements + */ + protected void handleQueryEvent( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + QueryEventData command = unwrapData(event); + LOGGER.debug("Received query command: {}", event); + String sql = command.getSql().trim(); + if (sql.equalsIgnoreCase("BEGIN")) { + // We are starting a new transaction ... + offsetContext.startNextTransaction(); + eventDispatcher.dispatchTransactionStartedEvent( + partition, offsetContext.getTransactionId(), offsetContext); + offsetContext.setBinlogThread(command.getThreadId()); + if (initialEventsToSkip != 0) { + LOGGER.debug( + "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event", + initialEventsToSkip, + startingRowNumber); + // We are restarting, so we need to skip the events in this transaction that we + // processed previously... + skipEvent = true; + } + return; + } + if (sql.equalsIgnoreCase("COMMIT")) { + handleTransactionCompletion(partition, offsetContext, event); + return; + } + + String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase(); + + if (upperCasedStatementBegin.startsWith("XA ")) { + // This is an XA transaction, and we currently ignore these and do nothing ... + return; + } + if (connectorConfig.getDdlFilter().test(sql)) { + LOGGER.debug("DDL '{}' was filtered out of processing", sql); + return; + } + if (upperCasedStatementBegin.equals("INSERT ") + || upperCasedStatementBegin.equals("UPDATE ") + || upperCasedStatementBegin.equals("DELETE ")) { + LOGGER.warn( + "Received DML '" + + sql + + "' for processing, binlog probably contains events generated with statement or mixed based replication format"); + return; + } + if (sql.equalsIgnoreCase("ROLLBACK")) { + // We have hit a ROLLBACK which is not supported + LOGGER.warn( + "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering", + MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name()); + } + + final List schemaChangeEvents = + taskContext + .getSchema() + .parseStreamingDdl( + partition, + sql, + command.getDatabase(), + offsetContext, + clock.currentTimeAsInstant()); + try { + for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { + if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) { + continue; + } + + final TableId tableId = + schemaChangeEvent.getTables().isEmpty() + ? null + : schemaChangeEvent.getTables().iterator().next().id(); + eventDispatcher.dispatchSchemaChangeEvent( + partition, + tableId, + (receiver) -> { + try { + receiver.schemaChangeEvent(schemaChangeEvent); + } catch (Exception e) { + throw new DebeziumException(e); + } + }); + } + } catch (InterruptedException e) { + LOGGER.info("Processing interrupted"); + } + } + + private void handleTransactionCompletion( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + // We are completing the transaction ... + eventDispatcher.dispatchTransactionCommittedEvent(partition, offsetContext); + offsetContext.commitTransaction(); + offsetContext.setBinlogThread(-1L); + skipEvent = false; + ignoreDmlEventByGtidSource = false; + } + + /** + * Handle a change in the table metadata. + * + *

This method should be called whenever we consume a TABLE_MAP event, and every transaction + * in the log should include one of these for each table affected by the transaction. Each table + * map event includes a monotonically-increasing numeric identifier, and this identifier is used + * within subsequent events within the same transaction. This table identifier can change when: + * + *

    + *
  1. the table structure is modified (e.g., via an {@code ALTER TABLE ...} command); or + *
  2. MySQL rotates to a new binary log file, even if the table structure does not change. + *
+ * + * @param event the update event; never null + */ + protected void handleUpdateTableMetadata( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + TableMapEventData metadata = unwrapData(event); + long tableNumber = metadata.getTableId(); + String databaseName = metadata.getDatabase(); + String tableName = metadata.getTable(); + TableId tableId = new TableId(databaseName, null, tableName); + if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) { + LOGGER.debug("Received update table metadata event: {}", event); + } else { + informAboutUnknownTableIfRequired(partition, offsetContext, event, tableId); + } + } + + /** + * If we receive an event for a table that is monitored but whose metadata we don't know, either + * ignore that event or raise a warning or error as per the {@link + * MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration. + */ + private void informAboutUnknownTableIfRequired( + MySqlPartition partition, + MySqlOffsetContext offsetContext, + Event event, + TableId tableId, + Operation operation) + throws InterruptedException { + if (tableId != null + && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { + metrics.onErroneousEvent( + partition, "source = " + tableId + ", event " + event, operation); + EventHeaderV4 eventHeader = event.getHeader(); + + if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) { + LOGGER.error( + "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" + + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + event, + offsetContext.getOffset(), + tableId, + System.lineSeparator(), + eventHeader.getPosition(), + eventHeader.getNextPosition(), + offsetContext.getSource().binlogFilename()); + throw new DebeziumException( + "Encountered change event for table " + + tableId + + " whose schema isn't known to this connector"); + } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) { + LOGGER.warn( + "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" + + "The event will be ignored.{}" + + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + event, + offsetContext.getOffset(), + tableId, + System.lineSeparator(), + System.lineSeparator(), + eventHeader.getPosition(), + eventHeader.getNextPosition(), + offsetContext.getSource().binlogFilename()); + } else { + LOGGER.debug( + "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" + + "The event will be ignored.{}" + + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + event, + offsetContext.getOffset(), + tableId, + System.lineSeparator(), + System.lineSeparator(), + eventHeader.getPosition(), + eventHeader.getNextPosition(), + offsetContext.getSource().binlogFilename()); + } + } else { + if (tableId == null) { + EventData eventData = unwrapData(event); + if (eventData instanceof WriteRowsEventData) { + tableId = + taskContext + .getSchema() + .getExcludeTableId( + ((WriteRowsEventData) eventData).getTableId()); + } else if (eventData instanceof UpdateRowsEventData) { + tableId = + taskContext + .getSchema() + .getExcludeTableId( + ((UpdateRowsEventData) eventData).getTableId()); + } else if (eventData instanceof DeleteRowsEventData) { + tableId = + taskContext + .getSchema() + .getExcludeTableId( + ((DeleteRowsEventData) eventData).getTableId()); + } + } + LOGGER.trace("Filtered {} event for {}", event.getHeader().getEventType(), tableId); + metrics.onFilteredEvent(partition, "source = " + tableId, operation); + eventDispatcher.dispatchFilteredEvent(partition, offsetContext); + } + } + + private void informAboutUnknownTableIfRequired( + MySqlPartition partition, + MySqlOffsetContext offsetContext, + Event event, + TableId tableId) + throws InterruptedException { + informAboutUnknownTableIfRequired(partition, offsetContext, event, tableId, null); + } + + /** + * Generate source records for the supplied event with an {@link WriteRowsEventData}. + * + * @param partition the partition in which the even occurred + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while blocking + */ + protected void handleInsert( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + handleChange( + partition, + offsetContext, + event, + Operation.CREATE, + WriteRowsEventData.class, + x -> taskContext.getSchema().getTableId(x.getTableId()), + WriteRowsEventData::getRows, + (tableId, row) -> + eventDispatcher.dispatchDataChangeEvent( + partition, + tableId, + new MySqlChangeRecordEmitter( + partition, + offsetContext, + clock, + Operation.CREATE, + null, + row))); + } + + /** + * Generate source records for the supplied event with an {@link UpdateRowsEventData}. + * + * @param partition the partition in which the even occurred + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while blocking + */ + protected void handleUpdate( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + handleChange( + partition, + offsetContext, + event, + Operation.UPDATE, + UpdateRowsEventData.class, + x -> taskContext.getSchema().getTableId(x.getTableId()), + UpdateRowsEventData::getRows, + (tableId, row) -> + eventDispatcher.dispatchDataChangeEvent( + partition, + tableId, + new MySqlChangeRecordEmitter( + partition, + offsetContext, + clock, + Operation.UPDATE, + row.getKey(), + row.getValue()))); + } + + /** + * Generate source records for the supplied event with an {@link DeleteRowsEventData}. + * + * @param partition the partition in which the even occurred + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while blocking + */ + protected void handleDelete( + MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + handleChange( + partition, + offsetContext, + event, + Operation.DELETE, + DeleteRowsEventData.class, + x -> taskContext.getSchema().getTableId(x.getTableId()), + DeleteRowsEventData::getRows, + (tableId, row) -> + eventDispatcher.dispatchDataChangeEvent( + partition, + tableId, + new MySqlChangeRecordEmitter( + partition, + offsetContext, + clock, + Operation.DELETE, + row, + null))); + } + + private void handleChange( + MySqlPartition partition, + MySqlOffsetContext offsetContext, + Event event, + Operation operation, + Class eventDataClass, + TableIdProvider tableIdProvider, + RowsProvider rowsProvider, + BinlogChangeEmitter changeEmitter) + throws InterruptedException { + if (skipEvent) { + // We can skip this because we should already be at least this far ... + LOGGER.info("Skipping previously processed row event: {}", event); + return; + } + if (ignoreDmlEventByGtidSource) { + LOGGER.debug("Skipping DML event because this GTID source is filtered: {}", event); + return; + } + final T data = unwrapData(event); + final TableId tableId = tableIdProvider.getTableId(data); + final List rows = rowsProvider.getRows(data); + String changeType = operation.name(); + + if (tableId != null && taskContext.getSchema().schemaFor(tableId) != null) { + int count = 0; + int numRows = rows.size(); + if (startingRowNumber < numRows) { + for (int row = startingRowNumber; row != numRows; ++row) { + offsetContext.setRowNumber(row, numRows); + offsetContext.event(tableId, eventTimestamp); + changeEmitter.emit(tableId, rows.get(row)); + count++; + } + if (LOGGER.isDebugEnabled()) { + if (startingRowNumber != 0) { + LOGGER.debug( + "Emitted {} {} record(s) for last {} row(s) in event: {}", + count, + changeType, + numRows - startingRowNumber, + event); + } else { + LOGGER.debug( + "Emitted {} {} record(s) for event: {}", count, changeType, event); + } + } + offsetContext.changeEventCompleted(); + } else { + // All rows were previously processed ... + LOGGER.debug("Skipping previously processed {} event: {}", changeType, event); + } + } else { + informAboutUnknownTableIfRequired(partition, offsetContext, event, tableId, operation); + } + startingRowNumber = 0; + } + + /** + * Handle a {@link EventType#VIEW_CHANGE} event. + * + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while blocking + */ + protected void viewChange(MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + LOGGER.debug("View Change event: {}", event); + // do nothing + } + + /** + * Handle a {@link EventType#XA_PREPARE} event. + * + * @param event the database change data event to be processed; may not be null + * @throws InterruptedException if this thread is interrupted while blocking + */ + protected void prepareTransaction(MySqlOffsetContext offsetContext, Event event) + throws InterruptedException { + LOGGER.debug("XA Prepare event: {}", event); + // do nothing + } + + private SSLMode sslModeFor(SecureConnectionMode mode) { + switch (mode) { + case DISABLED: + return SSLMode.DISABLED; + case PREFERRED: + return SSLMode.PREFERRED; + case REQUIRED: + return SSLMode.REQUIRED; + case VERIFY_CA: + return SSLMode.VERIFY_CA; + case VERIFY_IDENTITY: + return SSLMode.VERIFY_IDENTITY; + } + return null; + } + + @Override + public void execute( + ChangeEventSourceContext context, + MySqlPartition partition, + MySqlOffsetContext offsetContext) + throws InterruptedException { + if (!connectorConfig.getSnapshotMode().shouldStream()) { + LOGGER.info( + "Streaming is disabled for snapshot mode {}", + connectorConfig.getSnapshotMode()); + return; + } + if (connectorConfig.getSnapshotMode() != MySqlConnectorConfig.SnapshotMode.NEVER) { + taskContext.getSchema().assureNonEmptySchema(); + } + final Set skippedOperations = connectorConfig.getSkippedOperations(); + + final MySqlOffsetContext effectiveOffsetContext = + offsetContext != null ? offsetContext : MySqlOffsetContext.initial(connectorConfig); + + // Register our event handlers ... + eventHandlers.put( + EventType.STOP, (event) -> handleServerStop(effectiveOffsetContext, event)); + eventHandlers.put( + EventType.HEARTBEAT, + (event) -> handleServerHeartbeat(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.INCIDENT, + (event) -> handleServerIncident(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.ROTATE, (event) -> handleRotateLogsEvent(effectiveOffsetContext, event)); + eventHandlers.put( + EventType.TABLE_MAP, + (event) -> handleUpdateTableMetadata(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.QUERY, + (event) -> handleQueryEvent(partition, effectiveOffsetContext, event)); + + if (!skippedOperations.contains(Operation.CREATE)) { + eventHandlers.put( + EventType.WRITE_ROWS, + (event) -> handleInsert(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.EXT_WRITE_ROWS, + (event) -> handleInsert(partition, effectiveOffsetContext, event)); + } + + if (!skippedOperations.contains(Operation.UPDATE)) { + eventHandlers.put( + EventType.UPDATE_ROWS, + (event) -> handleUpdate(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.EXT_UPDATE_ROWS, + (event) -> handleUpdate(partition, effectiveOffsetContext, event)); + } + + if (!skippedOperations.contains(Operation.DELETE)) { + eventHandlers.put( + EventType.DELETE_ROWS, + (event) -> handleDelete(partition, effectiveOffsetContext, event)); + eventHandlers.put( + EventType.EXT_DELETE_ROWS, + (event) -> handleDelete(partition, effectiveOffsetContext, event)); + } + + eventHandlers.put( + EventType.VIEW_CHANGE, (event) -> viewChange(effectiveOffsetContext, event)); + eventHandlers.put( + EventType.XA_PREPARE, (event) -> prepareTransaction(effectiveOffsetContext, event)); + eventHandlers.put( + EventType.XID, + (event) -> handleTransactionCompletion(partition, effectiveOffsetContext, event)); + + // Conditionally register ROWS_QUERY handler to parse SQL statements. + if (connectorConfig.includeSqlQuery()) { + eventHandlers.put( + EventType.ROWS_QUERY, + (event) -> handleRowsQuery(effectiveOffsetContext, event)); + } + + BinaryLogClient.EventListener listener; + if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) { + listener = (event) -> handleEvent(partition, effectiveOffsetContext, event); + } else { + EventBuffer buffer = + new EventBuffer( + connectorConfig.bufferSizeForStreamingChangeEventSource(), + this, + context); + listener = (event) -> buffer.add(partition, effectiveOffsetContext, event); + } + client.registerEventListener(listener); + + client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext)); + client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event)); + if (LOGGER.isDebugEnabled()) { + client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event)); + } + + final boolean isGtidModeEnabled = connection.isGtidModeEnabled(); + metrics.setIsGtidModeEnabled(isGtidModeEnabled); + + // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of + // the last Debezium checkpoint. + String availableServerGtidStr = connection.knownGtidSet(); + if (isGtidModeEnabled) { + // The server is using GTIDs, so enable the handler ... + eventHandlers.put( + EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event)); + + // Now look at the GTID set from the server and what we've previously seen ... + GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); + + // also take into account purged GTID logs + GtidSet purgedServerGtidSet = connection.purgedGtidSet(); + LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet); + + GtidSet filteredGtidSet = + filterGtidSet( + effectiveOffsetContext, availableServerGtidSet, purgedServerGtidSet); + if (filteredGtidSet != null) { + // We've seen at least some GTIDs, so start reading from the filtered GTID set ... + LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet); + String filteredGtidSetStr = filteredGtidSet.toString(); + client.setGtidSet(filteredGtidSetStr); + effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr); + gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr); + } else { + // We've not yet seen any GTIDs, so that means we have to start reading the binlog + // from the beginning ... + client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename()); + client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition()); + gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(""); + } + } else { + // The server is not using GTIDs, so start reading the binlog based upon where we last + // left off ... + client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename()); + client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition()); + } + + // We may be restarting in the middle of a transaction, so see how far into the transaction + // we have already processed... + initialEventsToSkip = effectiveOffsetContext.eventsToSkipUponRestart(); + LOGGER.info("Skip {} events on streaming start", initialEventsToSkip); + + // Set the starting row number, which is the next row number to be read ... + startingRowNumber = effectiveOffsetContext.rowsToSkipUponRestart(); + LOGGER.info("Skip {} rows on streaming start", startingRowNumber); + + // Only when we reach the first BEGIN event will we start to skip events ... + skipEvent = false; + + try { + // Start the log reader, which starts background threads ... + if (context.isRunning()) { + long timeout = connectorConfig.getConnectionTimeout().toMillis(); + long started = clock.currentTimeInMillis(); + try { + LOGGER.debug( + "Attempting to establish binlog reader connection with timeout of {} ms", + timeout); + client.connect(timeout); + // Need to wait for keepalive thread to be running, otherwise it can be left + // orphaned + // The problem is with timing. When the close is called too early after connect + // then + // the keepalive thread is not terminated + if (client.isKeepAlive()) { + LOGGER.info("Waiting for keepalive thread to start"); + final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock); + int waitAttempts = 50; + boolean keepAliveThreadRunning = false; + while (!keepAliveThreadRunning && waitAttempts-- > 0) { + for (Thread t : binaryLogClientThreads.values()) { + if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) { + LOGGER.info("Keepalive thread is running"); + keepAliveThreadRunning = true; + } + } + metronome.pause(); + } + } + } catch (TimeoutException e) { + // If the client thread is interrupted *before* the client could connect, the + // client throws a timeout exception + // The only way we can distinguish this is if we get the timeout exception + // before the specified timeout has + // elapsed, so we simply check this (within 10%) ... + long duration = clock.currentTimeInMillis() - started; + if (duration > (0.9 * timeout)) { + double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration); + throw new DebeziumException( + "Timed out after " + + actualSeconds + + " seconds while waiting to connect to MySQL at " + + connectorConfig.hostname() + + ":" + + connectorConfig.port() + + " with user '" + + connectorConfig.username() + + "'", + e); + } + // Otherwise, we were told to shutdown, so we don't care about the timeout + // exception + } catch (AuthenticationException e) { + throw new DebeziumException( + "Failed to authenticate to the MySQL database at " + + connectorConfig.hostname() + + ":" + + connectorConfig.port() + + " with user '" + + connectorConfig.username() + + "'", + e); + } catch (Throwable e) { + throw new DebeziumException( + "Unable to connect to the MySQL database at " + + connectorConfig.hostname() + + ":" + + connectorConfig.port() + + " with user '" + + connectorConfig.username() + + "': " + + e.getMessage(), + e); + } + } + while (context.isRunning()) { + Thread.sleep(100); + } + } finally { + try { + client.disconnect(); + } catch (Exception e) { + LOGGER.info("Exception while stopping binary log client", e); + } + } + } + + private SSLSocketFactory getBinlogSslSocketFactory( + MySqlConnectorConfig connectorConfig, MySqlConnection connection) { + String acceptedTlsVersion = connection.getSessionVariableForSslVersion(); + if (!isNullOrEmpty(acceptedTlsVersion)) { + SSLMode sslMode = sslModeFor(connectorConfig.sslMode()); + LOGGER.info( + "Enable ssl " + + sslMode + + " mode for connector " + + connectorConfig.getLogicalName()); + + final char[] keyPasswordArray = connection.connectionConfig().sslKeyStorePassword(); + final String keyFilename = connection.connectionConfig().sslKeyStore(); + final char[] trustPasswordArray = connection.connectionConfig().sslTrustStorePassword(); + final String trustFilename = connection.connectionConfig().sslTrustStore(); + KeyManager[] keyManagers = null; + if (keyFilename != null) { + try { + KeyStore ks = connection.loadKeyStore(keyFilename, keyPasswordArray); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509"); + kmf.init(ks, keyPasswordArray); + + keyManagers = kmf.getKeyManagers(); + } catch (KeyStoreException + | NoSuchAlgorithmException + | UnrecoverableKeyException e) { + throw new DebeziumException("Could not load keystore", e); + } + } + TrustManager[] trustManagers; + try { + KeyStore ks = null; + if (trustFilename != null) { + ks = connection.loadKeyStore(trustFilename, trustPasswordArray); + } + + if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) { + trustManagers = + new TrustManager[] { + new X509TrustManager() { + + @Override + public void checkClientTrusted( + X509Certificate[] x509Certificates, String s) + throws CertificateException {} + + @Override + public void checkServerTrusted( + X509Certificate[] x509Certificates, String s) + throws CertificateException {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + }; + } else { + TrustManagerFactory tmf = + TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ks); + trustManagers = tmf.getTrustManagers(); + } + } catch (KeyStoreException | NoSuchAlgorithmException e) { + throw new DebeziumException("Could not load truststore", e); + } + // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that + // the accepted TLS version is passed to the constructed factory + final KeyManager[] finalKMS = keyManagers; + return new DefaultSSLSocketFactory(acceptedTlsVersion) { + + @Override + protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { + sc.init(finalKMS, trustManagers, null); + } + }; + } + + return null; + } + + private void logStreamingSourceState() { + logStreamingSourceState(Level.ERROR); + } + + protected void logEvent(MySqlOffsetContext offsetContext, Event event) { + LOGGER.trace("Received event: {}", event); + } + + private void logStreamingSourceState(Level severity) { + final Object position = + client == null + ? "N/A" + : client.getBinlogFilename() + "/" + client.getBinlogPosition(); + final String message = + "Error during binlog processing. Last offset stored = {}, binlog reader near position = {}"; + switch (severity) { + case WARN: + LOGGER.warn(message, lastOffset, position); + break; + case DEBUG: + LOGGER.debug(message, lastOffset, position); + break; + default: + LOGGER.error(message, lastOffset, position); + } + } + + /** + * Apply the include/exclude GTID source filters to the current {@link #source() GTID set} and + * merge them onto the currently available GTID set from a MySQL server. + * + *

The merging behavior of this method might seem a bit strange at first. It's required in + * order for Debezium to consume a MySQL binlog that has multi-source replication enabled, if a + * failover has to occur. In such a case, the server that Debezium is failed over to might have + * a different set of sources, but still include the sources required for Debezium to continue + * to function. MySQL does not allow downstream replicas to connect if the GTID set does not + * contain GTIDs for all channels that the server is replicating from, even if the server does + * have the data needed by the client. To get around this, we can have Debezium merge its GTID + * set with whatever is on the server, so that MySQL will allow it to connect. See DBZ-143 for details. + * + *

This method does not mutate any state in the context. + * + * @param availableServerGtidSet the GTID set currently available in the MySQL server + * @param purgedServerGtid the GTID set already purged by the MySQL server + * @return A GTID set meant for consuming from a MySQL binlog; may return null if the SourceInfo + * has no GTIDs and therefore none were filtered + */ + public GtidSet filterGtidSet( + MySqlOffsetContext offsetContext, + GtidSet availableServerGtidSet, + GtidSet purgedServerGtid) { + String gtidStr = offsetContext.gtidSet(); + if (gtidStr == null) { + return null; + } + LOGGER.info("Attempting to generate a filtered GTID set"); + LOGGER.info("GTID set from previous recorded offset: {}", gtidStr); + GtidSet filteredGtidSet = new GtidSet(gtidStr); + Predicate gtidSourceFilter = connectorConfig.gtidSourceFilter(); + if (gtidSourceFilter != null) { + filteredGtidSet = filteredGtidSet.retainAll(gtidSourceFilter); + LOGGER.info( + "GTID set after applying GTID source includes/excludes to previous recorded offset: {}", + filteredGtidSet); + } + LOGGER.info("GTID set available on server: {}", availableServerGtidSet); + + GtidSet mergedGtidSet; + + if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) { + final GtidSet knownGtidSet = filteredGtidSet; + LOGGER.info("Using first available positions for new GTID channels"); + final GtidSet relevantAvailableServerGtidSet = + (gtidSourceFilter != null) + ? availableServerGtidSet.retainAll(gtidSourceFilter) + : availableServerGtidSet; + LOGGER.info( + "Relevant GTID set available on server: {}", relevantAvailableServerGtidSet); + + // Since the GTID recorded in the checkpoint represents the CDC-executed records, in + // certain scenarios + // (such as when the startup mode is earliest/timestamp/binlogfile), the recorded GTID + // may not start from + // the beginning. For example, A:300-500. However, during job recovery, we usually only + // need to focus on + // the last consumed point instead of consuming A:1-299. Therefore, some adjustments + // need to be made to the + // recorded offset in the checkpoint, and the available GTID for other MySQL instances + // should be completed. + mergedGtidSet = + GtidUtils.fixRestoredGtidSet( + GtidUtils.mergeGtidSetInto( + relevantAvailableServerGtidSet.retainAll( + uuid -> knownGtidSet.forServerWithId(uuid) != null), + purgedServerGtid), + filteredGtidSet); + } else { + mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + } + + LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet); + return mergedGtidSet; + } + + MySqlStreamingChangeEventSourceMetrics getMetrics() { + return metrics; + } + + void rewindBinaryLogClient(ChangeEventSourceContext context, BinlogPosition position) { + try { + if (context.isRunning()) { + LOGGER.debug("Rewinding binlog to position {}", position); + client.disconnect(); + client.setBinlogFilename(position.getFilename()); + client.setBinlogPosition(position.getPosition()); + client.connect(); + } + } catch (IOException e) { + LOGGER.error("Unexpected error when re-connecting to the MySQL binary log reader", e); + } + } + + BinlogPosition getCurrentBinlogPosition() { + return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition()); + } + + /** + * Wraps the specified exception in a {@link DebeziumException}, ensuring that all useful state + * is captured inside the new exception's message. + * + * @param error the exception; may not be null + * @return the wrapped Kafka Connect exception + */ + protected DebeziumException wrap(Throwable error) { + assert error != null; + String msg = error.getMessage(); + if (error instanceof ServerException) { + ServerException e = (ServerException) error; + msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + "."; + } else if (error instanceof SQLException) { + SQLException e = (SQLException) error; + msg = + e.getMessage() + + " Error code: " + + e.getErrorCode() + + "; SQLSTATE: " + + e.getSQLState() + + "."; + } + msg = ErrorMessageUtils.optimizeErrorMessage(msg); + return new DebeziumException(msg, error); + } + + /** LifecycleListener for Reader Thread. */ + protected final class ReaderThreadLifecycleListener implements LifecycleListener { + private final MySqlOffsetContext offsetContext; + + ReaderThreadLifecycleListener(MySqlOffsetContext offsetContext) { + this.offsetContext = offsetContext; + } + + @Override + public void onDisconnect(BinaryLogClient client) { + if (LOGGER.isInfoEnabled()) { + taskContext.temporaryLoggingContext( + connectorConfig, + "binlog", + () -> { + Map offset = lastOffset; + if (offset != null) { + LOGGER.info( + "Stopped reading binlog after {} events, last recorded offset: {}", + totalRecordCounter, + offset); + } else { + LOGGER.info( + "Stopped reading binlog after {} events, no new offset was recorded", + totalRecordCounter); + } + }); + } + } + + @Override + public void onConnect(BinaryLogClient client) { + // Set up the MDC logging context for this thread ... + taskContext.configureLoggingContext("binlog"); + + // The event row number will be used when processing the first event ... + LOGGER.info( + "Connected to MySQL binlog at {}:{}, starting at {}", + connectorConfig.hostname(), + connectorConfig.port(), + offsetContext); + } + + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + LOGGER.debug("A communication failure event arrived", ex); + logStreamingSourceState(); + try { + // Stop BinaryLogClient background threads + client.disconnect(); + } catch (final Exception e) { + LOGGER.debug("Exception while closing client", e); + } + errorHandler.setProducerThrowable(wrap(ex)); + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { + if (eventDeserializationFailureHandlingMode + == EventProcessingFailureHandlingMode.FAIL) { + LOGGER.debug("A deserialization failure event arrived", ex); + logStreamingSourceState(); + errorHandler.setProducerThrowable(wrap(ex)); + } else if (eventDeserializationFailureHandlingMode + == EventProcessingFailureHandlingMode.WARN) { + LOGGER.warn("A deserialization failure event arrived", ex); + logStreamingSourceState(Level.WARN); + } else { + LOGGER.debug("A deserialization failure event arrived", ex); + logStreamingSourceState(Level.DEBUG); + } + } + } + + @FunctionalInterface + private interface TableIdProvider { + TableId getTableId(E data); + } + + @FunctionalInterface + private interface RowsProvider { + List getRows(E data); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index e2417872385..93e7cd1f2ea 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -43,6 +43,8 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.mysql.GtidSet; +import io.debezium.connector.mysql.GtidUtils; import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; @@ -289,6 +291,15 @@ private MySqlOffsetContext loadStartingOffsetState( } private boolean isBinlogAvailable(MySqlOffsetContext offset) { + String gtidStr = offset.gtidSet(); + if (gtidStr != null) { + return checkGtidSet(offset); + } + + return checkBinlogFilename(offset); + } + + private boolean checkBinlogFilename(MySqlOffsetContext offset) { String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY); if (binlogFilename == null) { return true; // start at current position @@ -313,6 +324,61 @@ private boolean isBinlogAvailable(MySqlOffsetContext offset) { return found; } + private boolean checkGtidSet(MySqlOffsetContext offset) { + String gtidStr = offset.gtidSet(); + + if (gtidStr.trim().isEmpty()) { + return true; // start at beginning ... + } + + String availableGtidStr = connection.knownGtidSet(); + if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) { + // Last offsets had GTIDs but the server does not use them ... + LOG.warn( + "Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled"); + return false; + } + + // Get the GTID set that is available in the server ... + GtidSet availableGtidSet = new GtidSet(availableGtidStr); + + // GTIDs are enabled + LOG.info("Merging server GTID set {} with restored GTID set {}", availableGtidSet, gtidStr); + + // Based on the current server's GTID, the GTID in MySqlOffsetContext is adjusted to ensure + // the completeness of + // the GTID. This is done to address the issue of being unable to recover from a checkpoint + // in certain startup + // modes. + GtidSet gtidSet = GtidUtils.fixRestoredGtidSet(availableGtidSet, new GtidSet(gtidStr)); + LOG.info("Merged GTID set is {}", gtidSet); + + if (gtidSet.isContainedWithin(availableGtidSet)) { + LOG.info( + "MySQL current GTID set {} does contain the GTID set {} required by the connector.", + availableGtidSet, + gtidSet); + // The replication is concept of mysql master-slave replication protocol ... + final GtidSet gtidSetToReplicate = + connection.subtractGtidSet(availableGtidSet, gtidSet); + final GtidSet purgedGtidSet = connection.purgedGtidSet(); + LOG.info("Server has already purged {} GTIDs", purgedGtidSet); + final GtidSet nonPurgedGtidSetToReplicate = + connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet); + LOG.info( + "GTID set {} known by the server but not processed yet, for replication are available only GTID set {}", + gtidSetToReplicate, + nonPurgedGtidSetToReplicate); + if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) { + LOG.warn("Some of the GTIDs needed to replicate have been already purged"); + return false; + } + return true; + } + LOG.info("Connector last known GTIDs are {}, but MySQL has {}", gtidSet, availableGtidSet); + return false; + } + private void validateAndLoadDatabaseHistory( MySqlOffsetContext offset, MySqlDatabaseSchema schema) { schema.initializeStorage(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/ErrorMessageUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/ErrorMessageUtils.java new file mode 100644 index 00000000000..5d7afd844aa --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/ErrorMessageUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils; + +import java.util.regex.Pattern; + +/** This util tries to optimize error message for some exceptions. */ +public class ErrorMessageUtils { + private static final Pattern SERVER_ID_CONFLICT = + Pattern.compile( + ".*A slave with the same server_uuid/server_id as this slave has connected to the master.*"); + private static final Pattern MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE = + Pattern.compile( + ".*The connector is trying to read binlog.*but this is no longer available on the server.*"); + private static final Pattern MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE = + Pattern.compile( + ".*Cannot replicate because the (master|source) purged required binary logs.*"); + + /** Add more error details for some exceptions. */ + public static String optimizeErrorMessage(String msg) { + if (msg == null) { + return null; + } + if (SERVER_ID_CONFLICT.matcher(msg).matches()) { + // Optimize the error msg when server id conflict + msg += + "\nThe 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n" + + "The server id conflict may happen in the following situations: \n" + + "1. The server id has been used by other mysql cdc table in the current job.\n" + + "2. The server id has been used by the mysql cdc table in other jobs.\n" + + "3. The server id has been used by other sync tools like canal, debezium and so on.\n"; + } else if (MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE.matcher(msg).matches() + || MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE.matcher(msg).matches()) { + // Optimize the error msg when binlog is unavailable + msg += + "\nThe required binary logs are no longer available on the server. This may happen in following situations:\n" + + "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n" + + "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side."; + } + return msg; + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java new file mode 100644 index 00000000000..286a3e41e96 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql; + +import org.junit.jupiter.api.Test; + +import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; +import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit test for {@link GtidUtils}. */ +class GtidUtilsTest { + @Test + void testFixingRestoredGtidSet() { + GtidSet serverGtidSet = new GtidSet("A:1-100"); + GtidSet restoredGtidSet = new GtidSet("A:30-100"); + assertEquals("A:1-100", fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()); + + serverGtidSet = new GtidSet("A:1-100"); + restoredGtidSet = new GtidSet("A:30-50"); + assertEquals("A:1-50", fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()); + + serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); + restoredGtidSet = new GtidSet("A:106-150"); + assertEquals( + "A:1-100:102-150,B:20-200", + fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()); + + serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); + restoredGtidSet = new GtidSet("A:106-150,C:1-100"); + assertEquals( + "A:1-100:102-150,B:20-200,C:1-100", + fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()); + + serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); + restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100"); + assertEquals( + "A:1-100:102-200,B:20-200,C:1-100", + fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()); + } + + @Test + void testMergingGtidSets() { + GtidSet base = new GtidSet("A:1-100"); + GtidSet toMerge = new GtidSet("A:1-10"); + assertEquals("A:1-100", mergeGtidSetInto(base, toMerge).toString()); + + base = new GtidSet("A:1-100"); + toMerge = new GtidSet("B:1-10"); + assertEquals("A:1-100,B:1-10", mergeGtidSetInto(base, toMerge).toString()); + + base = new GtidSet("A:1-100,C:1-100"); + toMerge = new GtidSet("A:1-10,B:1-10"); + assertEquals("A:1-100,B:1-10,C:1-100", mergeGtidSetInto(base, toMerge).toString()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCSpecificStartingOffsetIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCSpecificStartingOffsetIT.java new file mode 100644 index 00000000000..5321e499b26 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCSpecificStartingOffsetIT.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql; + +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.MySqlDialect; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import io.debezium.jdbc.JdbcConnection; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK and FLINK do not support restore") +public class MysqlCDCSpecificStartingOffsetIT extends TestSuiteBase implements TestResource { + + // mysql + private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_USER_NAME = "mysqluser"; + private static final String MYSQL_USER_PASSWORD = "mysqlpw"; + private static final String MYSQL_DATABASE = "mysql_cdc"; + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE); + + // mysql source table query sql + private static final String SOURCE_SQL_TEMPLATE = + "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary," + + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary," + + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned," + + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext," + + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char," + + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned," + + " f_json, f_year from %s.%s"; + // mysql sink table query sql + private static final String SINK_SQL_TEMPLATE = + "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary," + + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary," + + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned," + + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext," + + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char," + + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned," + + " f_json, cast(f_year as year) from %s.%s"; + + private static final String SOURCE_TABLE_1 = "mysql_cdc_e2e_source_table"; + private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table"; + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(MYSQL_DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_USER_PASSWORD) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image"))); + } + + private String driverUrl() { + return "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + } + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib && cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @BeforeAll + @Override + public void startUp() { + log.info("The second stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + log.info("Mysql Containers are started"); + inventoryDatabase.createAndInitialize(); + log.info("Mysql ddl execution is complete"); + flushLogs(); + } + + private Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + } + + private List> query(String sql) { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql)) { + List> result = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + ArrayList objects = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + objects.add(resultSet.getObject(i)); + } + log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects)); + result.add(objects); + } + return result; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + // Execute SQL + private void executeSql(String sql) { + try (Connection connection = getJdbcConnection()) { + connection.createStatement().execute(sql); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @TestTemplate + public void testMysqlCdcEarliestOffset(TestContainer container) + throws IOException, InterruptedException { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysqlcdc_earliest_offset.conf"; + purgeBinaryLogs(); + // Insert data + executeSql( + String.format( + "INSERT INTO %s.%s ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 11, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', '测试字段4', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value4\" }', 2022 )", + MYSQL_DATABASE, SOURCE_TABLE_1)); + executeSql( + String.format( + "INSERT INTO %s.%s ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 12, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,\n" + + " 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,\n" + + " 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',\n" + + " 'This is a tiny text field', '测试字段5', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n" + + " 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 112.345, '14:30:00', -128, 22, '{ \"key\": \"value5\" }', 2013 )", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // verify data + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getSourceQuerySQL(MYSQL_DATABASE, SOURCE_TABLE_1)), + query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE))); + }); + + // Take a savepoint + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + // Make some changes after the savepoint + executeSql( + String.format( + "UPDATE %s.%s SET f_year = '2025' WHERE id = 12", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + // Restart the job from savepoint + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // Make some changes after the restore + executeSql( + String.format( + "UPDATE %s.%s SET f_tinyint_unsigned = '88' WHERE id = 12", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + // verify data + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getSourceQuerySQL(MYSQL_DATABASE, SOURCE_TABLE_1)), + query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE))); + }); + } + + @TestTemplate + public void testMysqlCdcSpecificOffset(TestContainer container) throws Exception { + String jobId = String.valueOf(JobIdGenerator.newJobId()); + String jobConfigFile = "/mysqlcdc_specific_offset.conf"; + purgeBinaryLogs(); + String source_sql_where_id_template = + "select id, cast(f_binary as char) as f_binary, cast(f_blob as char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary," + + " cast(f_longblob as char) as f_longblob, cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as f_varbinary," + + " f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned," + + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext," + + " f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char," + + " f_enum, cast(f_mediumblob as char) as f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned," + + " f_json, f_year from %s.%s where id in (%s)"; + // Clear related content to ensure that multiple operations are not affected + clearTable(MYSQL_DATABASE, SOURCE_TABLE_1); + clearTable(MYSQL_DATABASE, SINK_TABLE); + // Purge binary log at first + purgeBinaryLogs(); + // Record current binlog offset + BinlogOffset currentBinlogOffset = getCurrentBinlogOffset(); + + String[] variables = { + "specific_offset_file=" + currentBinlogOffset.getFilename(), + "specific_offset_pos=" + currentBinlogOffset.getPosition() + }; + + // Insert data + executeSql( + String.format( + "INSERT INTO %s.%s ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 14, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', '测试字段4', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value4\" }', 2022 )", + MYSQL_DATABASE, SOURCE_TABLE_1)); + executeSql( + String.format( + "INSERT INTO %s.%s ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( 15, 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 0x74696E79626C6F62,\n" + + " 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,\n" + + " 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field', 'This is a text field',\n" + + " 'This is a tiny text field', '测试字段5', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n" + + " 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 112.345, '14:30:00', -128, 22, '{ \"key\": \"value5\" }', 2013 )", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob(jobConfigFile, jobId, variables); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // validate results + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format( + source_sql_where_id_template, + MYSQL_DATABASE, + SOURCE_TABLE_1, + "14,15")), + query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE))); + }); + + // Take a savepoint + Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode()); + // Make some changes after the savepoint + executeSql( + String.format( + "UPDATE %s.%s SET f_year = '2025' WHERE id = 15", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob(jobConfigFile, jobId, variables); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // Make some changes after the restore + executeSql( + String.format( + "UPDATE %s.%s SET f_tinyint_unsigned = '77' WHERE id = 15", + MYSQL_DATABASE, SOURCE_TABLE_1)); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query( + String.format( + source_sql_where_id_template, + MYSQL_DATABASE, + SOURCE_TABLE_1, + "14,15")), + query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE))); + }); + } + + @Override + @AfterAll + public void tearDown() { + // close Container + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.close(); + } + } + + private void clearTable(String database, String tableName) { + executeSql("truncate table " + database + "." + tableName); + } + + private void flushLogs() { + executeSql("FLUSH LOGS;"); + } + + private String getSourceQuerySQL(String database, String tableName) { + return String.format(SOURCE_SQL_TEMPLATE, database, tableName); + } + + private String getSinkQuerySQL(String database, String tableName) { + return String.format(SINK_SQL_TEMPLATE, database, tableName); + } + + private BinlogOffset getCurrentBinlogOffset() { + JdbcSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(MYSQL_CONTAINER.getUsername()) + .password(MYSQL_CONTAINER.getPassword()) + .databaseList(MYSQL_CONTAINER.getDatabaseName()); + MySqlDialect mySqlDialect = + new MySqlDialect((MySqlSourceConfigFactory) configFactory, Collections.emptyList()); + JdbcConnection jdbcConnection = mySqlDialect.openJdbcConnection(configFactory.create(0)); + return MySqlConnectionUtils.currentBinlogOffset(jdbcConnection); + } + + private void purgeBinaryLogs() { + executeSql( + String.format("PURGE BINARY LOGS TO '%s'", getCurrentBinlogOffset().getFilename())); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_earliest_offset.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_earliest_offset.conf new file mode 100644 index 00000000000..c92642ac05d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_earliest_offset.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + plugin_output = "customers_mysql_cdc" + server-id = 5653 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + startup.mode = "earliest" + } +} + +sink { + jdbc { + plugin_input = "customers_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + database = mysql_cdc + table = mysql_cdc_e2e_sink_table + primary_keys = ["id"] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_specific_offset.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_specific_offset.conf new file mode 100644 index 00000000000..dcbc0cd716f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_specific_offset.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MySQL-CDC { + plugin_output = "customers_mysql_cdc" + server-id = 5654 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + startup.mode = "specific" + startup.specific-offset.file = ${specific_offset_file} + startup.specific-offset.pos = ${specific_offset_pos} + } +} + +sink { + jdbc { + plugin_input = "customers_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user_sink" + password = "mysqlpw" + + generate_sink_sql = true + database = mysql_cdc + table = mysql_cdc_e2e_sink_table + primary_keys = ["id"] + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf index 8088ff009e8..fda43e8f790 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/mysqlcdc_to_doris_with_schema_change.conf @@ -21,7 +21,7 @@ env { # You can set engine configuration here parallelism = 1 job.mode = "STREAMING" - checkpoint.interval = 5000 + checkpoint.interval = 2000 } source { diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java index 10d5685c6d2..bcf2043d704 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java @@ -163,7 +163,7 @@ protected Container.ExecResult cancelJob(GenericContainer container, String j } protected Container.ExecResult restoreJob( - GenericContainer container, String confFile, String jobId) + GenericContainer container, String confFile, String jobId, List variables) throws IOException, InterruptedException { final String confInContainerPath = copyConfigFileToContainer(container, confFile); // copy connectors @@ -182,7 +182,15 @@ protected Container.ExecResult restoreJob( command.add(adaptPathForWin(confInContainerPath)); command.add(getRestoreCommand()); command.add(jobId); - command.addAll(getExtraStartShellCommands()); + List extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands()); + if (variables != null && !variables.isEmpty()) { + variables.forEach( + v -> { + extraStartShellCommands.add("-i"); + extraStartShellCommands.add(v); + }); + } + command.addAll(extraStartShellCommands); return executeCommand(container, command); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java index f815ecb6b23..ce74948e411 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java @@ -44,7 +44,7 @@ void executeExtraCommands(ContainerExtendedFactory extendedFactory) Container.ExecResult executeJob(String confFile, List variables) throws IOException, InterruptedException; - default Container.ExecResult executeJob(String confFile, String jobId) + default Container.ExecResult executeJob(String confFile, String jobId, String... variables) throws IOException, InterruptedException { throw new UnsupportedOperationException("Not implemented"); } @@ -64,7 +64,7 @@ default Container.ExecResult savepointJob(String jobId) throw new UnsupportedOperationException("Not implemented"); } - default Container.ExecResult restoreJob(String confFile, String jobId) + default Container.ExecResult restoreJob(String confFile, String jobId, String... variables) throws IOException, InterruptedException { throw new UnsupportedOperationException("Not implemented"); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index dde94542b37..254519a8138 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -313,16 +313,16 @@ public Container.ExecResult executeJob(String confFile) @Override public Container.ExecResult executeJob(String confFile, List variables) throws IOException, InterruptedException { - return executeJob(confFile, null, variables); + return doExecuteJob(confFile, null, variables); } @Override - public Container.ExecResult executeJob(String confFile, String jobId) + public Container.ExecResult executeJob(String confFile, String jobId, String... variables) throws IOException, InterruptedException { - return executeJob(confFile, jobId, null); + return doExecuteJob(confFile, jobId, variables != null ? Arrays.asList(variables) : null); } - private Container.ExecResult executeJob(String confFile, String jobId, List variables) + private Container.ExecResult doExecuteJob(String confFile, String jobId, List variables) throws IOException, InterruptedException { log.info("test in container: {}", identifier()); List beforeThreads = ContainerUtil.getJVMThreadNames(server); @@ -498,10 +498,15 @@ public Container.ExecResult savepointJob(String jobId) } @Override - public Container.ExecResult restoreJob(String confFile, String jobId) + public Container.ExecResult restoreJob(String confFile, String jobId, String... variables) throws IOException, InterruptedException { runningCount.incrementAndGet(); - Container.ExecResult result = restoreJob(server, confFile, jobId); + Container.ExecResult result = + restoreJob( + server, + confFile, + jobId, + variables != null ? Arrays.asList(variables) : null); runningCount.decrementAndGet(); return result; } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index 82402ff2d43..b7f2ea53f81 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -104,7 +104,7 @@ public void testSaveModeOnMasterOrClient() throws IOException, InterruptedExcept "org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data savemode with table path: test.table2")); // restore will not execute savemode - execResult = restoreJob(server, "/savemode/fake_to_inmemory_savemode.conf", "1"); + execResult = restoreJob(server, "/savemode/fake_to_inmemory_savemode.conf", "1", null); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); // clear old logs serverLogLength += serverLogs.length();