Skip to content

Commit

Permalink
[Issue 8502] Upgrade Debezium to a newer version (apache#11204)
Browse files Browse the repository at this point in the history
Fixes apache#8502

### Motivation

Upgrade Debezium to a newer version

### Modifications

Upgraded Deebzium to v.1.5.4 (latest built with Java 8, v.1.6.x built with Java 11)
Upgraded kafka-client to 2.7 (version debezium tested with)
Scala-lib to 2.13.6 (for kafka-client)

Dealt with API changes, tests etc.

PR is on top of apache#11154 to have Debezium integration tests on CI.
  • Loading branch information
dlg99 authored Aug 13, 2021
1 parent 17ab040 commit 283ae57
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 59 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ flexible messaging model and an intuitive client API.</description>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
<aerospike-client.version>4.4.8</aerospike-client.version>
<kafka-client.version>2.3.0</kafka-client.version>
<kafka-client.version>2.7.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.774</aws-sdk.version>
<avro.version>1.10.2</avro.version>
Expand All @@ -153,9 +153,9 @@ flexible messaging model and an intuitive client API.</description>
<hdfs-offload-version3>3.3.0</hdfs-offload-version3>
<elasticsearch.version>7.9.1</elasticsearch.version>
<presto.version>332</presto.version>
<scala.binary.version>2.11</scala.binary.version>
<scala-library.version>2.11.12</scala-library.version>
<debezium.version>1.0.0.Final</debezium.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
<debezium.version>1.5.4.Final</debezium.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<hbase.version>2.3.0</hbase.version>
Expand Down
7 changes: 7 additions & 0 deletions pulsar-io/debezium/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ public boolean exists() {
}
}

@Override
public boolean storageExists() {
return true;
}

@Override
public String toString() {
if (topicName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import static org.testng.Assert.assertTrue;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserSql2003;
import io.debezium.relational.ddl.LegacyDdlParser;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.text.ParsingException;
Expand Down Expand Up @@ -86,8 +86,8 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {
// Calling it another time to ensure we can work with the DB history topic already existing
history.initializeStorage();

LegacyDdlParser recoveryParser = new DdlParserSql2003();
LegacyDdlParser ddlParser = new DdlParserSql2003();
DdlParser recoveryParser = new MySqlAntlrDdlParser();
DdlParser ddlParser = new MySqlAntlrDdlParser();
ddlParser.setCurrentSchema("db1"); // recover does this, so we need to as well
Tables tables1 = new Tables();
Tables tables2 = new Tables();
Expand All @@ -102,9 +102,9 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL) {

// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
setLogPosition(10);
ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \n" +
ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
history.record(source, position, "db1", ddl);

// Parse the DDL statement 3x and each time update a different Tables object ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,21 @@ private Long currentOffset(TopicPartition topicPartition) {
List<ByteBuffer> req = Lists.newLinkedList();
ByteBuffer key = topicPartitionAsKey(topicPartition);
req.add(key);
CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> {
if (ex == null) {
if (result != null && result.size() != 0) {
Optional<ByteBuffer> val = result.entrySet().stream()
.filter(entry -> entry.getKey().equals(key))
.findFirst().map(entry -> entry.getValue());
if (val.isPresent()) {
long received = val.get().getLong();
if (log.isDebugEnabled()) {
log.debug("read initial offset for {} == {}", topicPartition, received);
}
offsetFuture.complete(received);
return;
try {
Map<ByteBuffer, ByteBuffer> result = offsetStore.get(req).get();
if (result != null && result.size() != 0) {
Optional<ByteBuffer> val = result.entrySet().stream()
.filter(entry -> entry.getKey().equals(key))
.findFirst().map(entry -> entry.getValue());
if (val.isPresent()) {
long received = val.get().getLong();
if (log.isDebugEnabled()) {
log.debug("read initial offset for {} == {}", topicPartition, received);
}
return received;
}
offsetFuture.complete(-1L);
} else {
offsetFuture.completeExceptionally(ex);
}
});

try {
return offsetFuture.get();
return -1L;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("error getting initial state of {}", topicPartition, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public void stop() {
}

@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
Callback<Map<ByteBuffer, ByteBuffer>> callback) {
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
CompletableFuture<Void> endFuture = new CompletableFuture<>();
readToEnd(endFuture);
return endFuture.thenApply(ignored -> {
Expand All @@ -190,14 +189,7 @@ public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
values.put(key, value);
}
}
if (null != callback) {
callback.onCompletion(null, values);
}
return values;
}).whenComplete((ignored, cause) -> {
if (null != cause && null != callback) {
callback.onCompletion(cause, null);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
Expand Down Expand Up @@ -86,27 +87,10 @@ protected void cleanup() throws Exception {
@Test
public void testGetFromEmpty() throws Exception {
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
null
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
).get().isEmpty());
}

@Test
public void testGetFromEmptyCallback() throws Exception {
CompletableFuture<Map<ByteBuffer, ByteBuffer>> callbackFuture = new CompletableFuture<>();
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
(error, result) -> {
if (null != error) {
callbackFuture.completeExceptionally(error);
} else {
callbackFuture.complete(result);
}
}
).get().isEmpty());
assertTrue(callbackFuture.get().isEmpty());
}

@Test
public void testGetSet() throws Exception {
testGetSet(false);
Expand Down Expand Up @@ -144,7 +128,7 @@ private void testGetSet(boolean testCallback) throws Exception {
}

Map<ByteBuffer, ByteBuffer> result =
offsetBackingStore.get(keys, null).get();
offsetBackingStore.get(keys).get();
assertEquals(numKeys, result.size());
AtomicInteger count = new AtomicInteger();
new TreeMap<>(result).forEach((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
add("source");
add("op");
add("ts_ms");
add("transaction");
}};

protected SourceTester(String sourceType) {
Expand Down Expand Up @@ -144,7 +145,10 @@ public void validateSourceResultAvro(Consumer<KeyValue<GenericRecord, GenericRec
GenericRecord valueRecord = msg.getValue().getValue();
Assert.assertNotNull(valueRecord.getFields());
Assert.assertTrue(valueRecord.getFields().size() > 0);

log.info("Received message: key = {}, value = {}.", keyRecord.getNativeObject(), valueRecord.getNativeObject());
for (Field field : valueRecord.getFields()) {
log.info("validating field {}", field.getName());
Assert.assertTrue(DEBEZIUM_FIELD_SET.contains(field.getName()));
}

Expand Down

0 comments on commit 283ae57

Please sign in to comment.