diff --git a/build.gradle b/build.gradle index 57cd2e8d..1dc2ddc9 100644 --- a/build.gradle +++ b/build.gradle @@ -27,8 +27,8 @@ subprojects { } } - sourceCompatibility = "1.8" - targetCompatibility = "1.8" + sourceCompatibility = "11" + targetCompatibility = "11" test { useJUnitPlatform() diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 24c4a4ae..ab6cf0f0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,7 +6,7 @@ bson-ver = "4.11.0" hadoop-ver = "3.4.1" hive-ver = "2.3.9" http-client-ver = "5.2.1" -iceberg-ver = "1.5.2" +iceberg-ver = "1.6.1" jackson-ver = "2.14.2" junit-ver = "5.10.0" kafka-ver = "3.5.1" diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java index d7481d49..7399f7cc 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java @@ -103,6 +103,10 @@ private R applyWithSchema(R record) { // create the new value Schema newValueSchema = makeUpdatedSchema(payloadSchema, cdcSchema); + if (value.schema().field("ts_us") != null) { + newValueSchema = makeUpdatedSchema(newValueSchema, CustomFieldConstants.SOURCE_TIMESTAMP_US, Timestamp.SCHEMA); + } + Struct newValue = new Struct(newValueSchema); for (Field field : payloadSchema.fields()) { @@ -110,8 +114,8 @@ private R applyWithSchema(R record) { } newValue.put(CdcConstants.COL_CDC, cdcMetadata); - if (value.getStruct("ts_us") != null) { - newValue.put(CustomFieldConstants.SOURCE_TIMESTAMP_US, new java.util.Date(value.getInt64("ts_us"))); + if (value.schema().field("ts_us") != null) { + newValue.put(CustomFieldConstants.SOURCE_TIMESTAMP_US, new java.util.Date(value.getInt64("ts_us") / 1000L)); } return record.newRecord( @@ -187,7 +191,6 @@ private String mapOperation(String originalOp) { private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) { String db; - Long txid = null; if (source.schema().field("schema") != null) { // prefer schema if present, e.g. for Postgres @@ -198,20 +201,49 @@ private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata String table = source.getString("table"); - if (source.schema().field("txId") != null) { - txid = source.getInt64("txId"); - } + // Extract transaction ID based on connector type + Long txid = extractTransactionIdFromSourceStruct(source); cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table); cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table)); cdcMetadata.put(CdcConstants.COL_TXID, txid); } + private Long extractTransactionIdFromSourceStruct(Struct source) { + String connector = source.getString("connector"); + + if ("postgresql".equals(connector)) { + // Check for txId field for postgresql + if (source.schema().field("txId") != null) { + return source.getInt64("txId"); + } + } else if ("mysql".equals(connector)) { + // Check if in snapshot mode first + if (source.schema().field("snapshot") != null && source.getBoolean("snapshot")) { + // Return 0 as gtid is null when snapshotting + return 0L; + } + + // Not in snapshot mode, check for gtid field for mysql + if (source.schema().field("gtid") != null) { + String gtid = source.getString("gtid"); + // Split gtid into uuid and txid + String[] gtidSections = gtid.split(":"); + // Return txid section + if (gtidSections.length == 2) { + return Long.valueOf(gtidSections[1]); + } + } + } else { + LOG.warn("Transactional consistency is not currently supported for connector type: {}", connector); + } + return null; + } + private void setTableAndTargetFromSourceMap(Object source, Map cdcMetadata) { Map map = Requirements.requireMap(source, "Debezium transform"); String db; - Long txid = null; if (map.containsKey("schema")) { // prefer schema if present, e.g. for Postgres db = map.get("schema").toString(); @@ -220,15 +252,46 @@ private void setTableAndTargetFromSourceMap(Object source, Map c } String table = map.get("table").toString(); - if (map.containsKey("txId")) { - txid = Long.valueOf(map.get("txId").toString()); - } + // Extract transaction ID based on connector type + Long txid = extractTransactionIdFromSourceMap(map); cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table); cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table)); cdcMetadata.put(CdcConstants.COL_TXID, txid); } + private Long extractTransactionIdFromSourceMap(Map source) { + String connector = source.get("connector").toString(); + + if ("postgresql".equals(connector)) { + // Check for txId field for postgresql + if (source.containsKey("txId")) { + // Return txid + return Long.valueOf(source.get("txId").toString()); + } + } else if ("mysql".equals(connector)) { + // Check if in snapshot mode first + if (source.containsKey("snapshot") && Boolean.TRUE.equals(source.get("snapshot"))) { + // Return 0 as gtid is null when snapshotting + return 0L; + } + + // Not in snapshot mode, check for gtid field for mysql + if (source.containsKey("gtid")) { + String gtid = source.get("gtid").toString(); + // Split gtid into uuid and txid + String[] gtidSections = gtid.split(":"); + // Return txid section + if (gtidSections.length == 2) { + return Long.valueOf(gtidSections[1]); + } + } + } else { + LOG.warn("Transactional consistency is not currently supported for connector type: {}", connector); + } + return null; + } + private String target(String db, String table) { return cdcTargetPattern == null || cdcTargetPattern.isEmpty() ? db + "." + table @@ -252,18 +315,22 @@ private Schema makeCdcSchema(Schema keySchema) { return builder.build(); } - private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) { + private Schema makeUpdatedSchema(Schema schema, String fieldName, Schema fieldSchema) { SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field : schema.fields()) { builder.field(field.name(), field.schema()); } - builder.field(CdcConstants.COL_CDC, cdcSchema); + builder.field(fieldName, fieldSchema); return builder.build(); } + private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) { + return makeUpdatedSchema(schema, CdcConstants.COL_CDC, cdcSchema); + } + @Override public ConfigDef config() { return CONFIG_DEF; diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java index d90af5d7..d0e733cc 100644 --- a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java @@ -23,42 +23,54 @@ import java.math.BigDecimal; import java.time.Instant; import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.data.Timestamp; import org.junit.jupiter.api.Test; public class DebeziumTransformTest { - private static final Schema KEY_SCHEMA = - SchemaBuilder.struct().field("account_id", Schema.INT64_SCHEMA).build(); + private static final Schema KEY_SCHEMA = SchemaBuilder.struct().field("account_id", Schema.INT64_SCHEMA).build(); - private static final Schema ROW_SCHEMA = - SchemaBuilder.struct() - .field("account_id", Schema.INT64_SCHEMA) - .field("balance", Decimal.schema(2)) - .field("last_updated", Schema.STRING_SCHEMA) - .build(); + private static final Schema ROW_SCHEMA = SchemaBuilder.struct() + .field("account_id", Schema.INT64_SCHEMA) + .field("balance", Decimal.schema(2)) + .field("last_updated", Schema.STRING_SCHEMA) + .build(); - private static final Schema SOURCE_SCHEMA = - SchemaBuilder.struct() - .field("db", Schema.STRING_SCHEMA) - .field("schema", Schema.STRING_SCHEMA) - .field("table", Schema.STRING_SCHEMA) - .build(); + private static final Schema SOURCE_SCHEMA = SchemaBuilder.struct() + .field("db", Schema.STRING_SCHEMA) + .field("schema", Schema.STRING_SCHEMA) + .field("table", Schema.STRING_SCHEMA) + .field("connector", Schema.STRING_SCHEMA) + .field("snapshot", Schema.BOOLEAN_SCHEMA) + .field("txId", Schema.OPTIONAL_INT64_SCHEMA) + .field("gtid", Schema.OPTIONAL_STRING_SCHEMA) + .build(); - private static final Schema VALUE_SCHEMA = - SchemaBuilder.struct() - .field("op", Schema.STRING_SCHEMA) - .field("ts_ms", Schema.INT64_SCHEMA) - .field("source", SOURCE_SCHEMA) - .field("before", ROW_SCHEMA) - .field("after", ROW_SCHEMA) - .field("txid", Schema.INT64_SCHEMA) - .build(); + // Original VALUE_SCHEMA + private static final Schema VALUE_SCHEMA = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .field("source", SOURCE_SCHEMA) + .field("before", ROW_SCHEMA) + .field("after", ROW_SCHEMA) + .build(); + + // New VALUE_SCHEMA with ts_us to test forked changes + private static final Schema VALUE_SCHEMA_WITH_TS_US = SchemaBuilder.struct() + .field("op", Schema.STRING_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) + .field("ts_us", Schema.INT64_SCHEMA) + .field("source", SOURCE_SCHEMA) + .field("before", ROW_SCHEMA) + .field("after", ROW_SCHEMA) + .build(); @Test public void testDmsTransformNull() { @@ -75,7 +87,104 @@ public void testDebeziumTransformSchemaless() { try (DebeziumTransform smt = new DebeziumTransform<>()) { smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); - Map event = createDebeziumEventMap("u"); + Map event = createDebeziumEventMap("u", "postgresql", false, 1L, null); + Map key = ImmutableMap.of("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Map.class); + Map value = (Map) result.value(); + + assertThat(value.get("account_id")).isEqualTo(1); + + Map cdcMetadata = (Map) value.get("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class); + + // Verify txid has been added + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us has not been added + assertThat(value.containsKey("source_ts_us")).isFalse(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumTransformSchemalessWithTsUs() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + Map event = createDebeziumEventMap("u", "postgresql", false, 1L, null); + // Add ts_us to the event + long tsUs = System.currentTimeMillis() * 1000; + event = new ImmutableMap.Builder() + .putAll(event) + .put("ts_us", tsUs) + .build(); + + Map key = ImmutableMap.of("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Map.class); + Map value = (Map) result.value(); + + assertThat(value.get("account_id")).isEqualTo(1); + + Map cdcMetadata = (Map) value.get("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class); + + // Verify txid has been added + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us has been added and is correct + assertThat(value.get("source_ts_us")).isEqualTo(tsUs); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumTransformSchemalessMySQL() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + Map event = createDebeziumEventMap("u", "mysql", false, null, "0000-0000-0000:1"); + Map key = ImmutableMap.of("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Map.class); + Map value = (Map) result.value(); + + assertThat(value.get("account_id")).isEqualTo(1); + + Map cdcMetadata = (Map) value.get("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class); + + // Verify txid has been extracted from gtid + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us has not been added + assertThat(value.containsKey("source_ts_us")).isFalse(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumTransformSchemalessMySQLSnapshotting() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + Map event = createDebeziumEventMap("u", "mysql", true, null, null); Map key = ImmutableMap.of("account_id", 1L); SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0); @@ -90,6 +199,12 @@ public void testDebeziumTransformSchemaless() { assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class); + + // Verify txid has been set to 0 as gtid is null when snapshotting + assertThat(cdcMetadata.get("txid")).isEqualTo(0L); + + // Verify source_ts_us has not been added + assertThat(value.containsKey("source_ts_us")).isFalse(); } } @@ -98,7 +213,7 @@ public void testDebeziumTransformWithSchema() { try (DebeziumTransform smt = new DebeziumTransform<>()) { smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); - Struct event = createDebeziumEventStruct("u"); + Struct event = createDebeziumEventStruct("u", "postgresql", false, 1L, null); Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L); SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0); @@ -113,21 +228,120 @@ public void testDebeziumTransformWithSchema() { assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class); + + // Verify txid has been added + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us has not been added + assertThat(value.schema().field("source_ts_us")).isNull(); + } + } + + @Test + public void testDebeziumTransformWithSchemAndTsUs() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + long tsUs = System.currentTimeMillis() * 1000L; + Struct event = createDebeziumEventStructWithTsUs("u", "postgresql", false, 1L, null, tsUs); + Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA_WITH_TS_US, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Struct.class); + Struct value = (Struct) result.value(); + + assertThat(value.get("account_id")).isEqualTo(1L); + + Struct cdcMetadata = value.getStruct("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class); + + // Verify txid has been added + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us is added and correct + Schema tsUsSchema = value.schema().field("source_ts_us").schema(); + assertThat(tsUsSchema).isEqualTo(Timestamp.SCHEMA); + assertThat(value.get("source_ts_us")).isEqualTo(new java.util.Date(tsUs / 1000L)); } } - private Map createDebeziumEventMap(String operation) { - Map source = - ImmutableMap.of( - "db", "db", - "schema", "schema", - "table", "tbl"); + @Test + public void testDebeziumTransformWithSchemaMySQL() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + Struct event = createDebeziumEventStruct("u", "mysql", false, null, "0000-0000-0000:1"); + Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Struct.class); + Struct value = (Struct) result.value(); - Map data = - ImmutableMap.of( - "account_id", 1, - "balance", 100, - "last_updated", Instant.now().toString()); + assertThat(value.get("account_id")).isEqualTo(1L); + + Struct cdcMetadata = value.getStruct("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class); + + // Verify txid has been extracted from gtid + assertThat(cdcMetadata.get("txid")).isEqualTo(1L); + + // Verify source_ts_us has not been added + assertThat(value.schema().field("source_ts_us")).isNull(); + } + } + + @Test + public void testDebeziumTransformWithSchemaMySQLSnapshotting() { + try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + + Struct event = createDebeziumEventStruct("u", "mysql", true, null, null); + Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L); + SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0); + + SinkRecord result = smt.apply(record); + assertThat(result.value()).isInstanceOf(Struct.class); + Struct value = (Struct) result.value(); + + assertThat(value.get("account_id")).isEqualTo(1L); + + Struct cdcMetadata = value.getStruct("_cdc"); + assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); + assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); + assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class); + + // Verify txid has been set to 0 as gtid is null when snapshotting + assertThat(cdcMetadata.get("txid")).isEqualTo(0L); + + // Verify source_ts_us has not been added + assertThat(value.schema().field("source_ts_us")).isNull(); + } + } + + private Map createDebeziumEventMap(String operation, String connector, Boolean snapshot, Long txid, + String gtid) { + Map source = Maps.newHashMap(); + source.put("db", "db"); + source.put("schema", "schema"); + source.put("table", "tbl"); + source.put("connector", connector); + source.put("snapshot", snapshot); + source.put("txId", txid); + source.put("gtid", gtid); + + Map data = ImmutableMap.of( + "account_id", 1, + "balance", 100, + "last_updated", Instant.now().toString()); return ImmutableMap.of( "op", operation, @@ -137,15 +351,21 @@ private Map createDebeziumEventMap(String operation) { "after", data); } - private Struct createDebeziumEventStruct(String operation) { - Struct source = - new Struct(SOURCE_SCHEMA).put("db", "db").put("schema", "schema").put("table", "tbl"); + private Struct createDebeziumEventStruct(String operation, String connector, Boolean snapshot, Long txid, + String gtid) { + Struct source = new Struct(SOURCE_SCHEMA) + .put("db", "db") + .put("schema", "schema") + .put("table", "tbl") + .put("connector", connector) + .put("snapshot", snapshot) + .put("txId", txid) + .put("gtid", gtid); - Struct data = - new Struct(ROW_SCHEMA) - .put("account_id", 1L) - .put("balance", BigDecimal.valueOf(100)) - .put("last_updated", Instant.now().toString()); + Struct data = new Struct(ROW_SCHEMA) + .put("account_id", 1L) + .put("balance", BigDecimal.valueOf(100)) + .put("last_updated", Instant.now().toString()); return new Struct(VALUE_SCHEMA) .put("op", operation) @@ -154,4 +374,29 @@ private Struct createDebeziumEventStruct(String operation) { .put("before", data) .put("after", data); } + + private Struct createDebeziumEventStructWithTsUs(String operation, String connector, Boolean snapshot, Long txid, + String gtid, Long tsUs) { + Struct source = new Struct(SOURCE_SCHEMA) + .put("db", "db") + .put("schema", "schema") + .put("table", "tbl") + .put("connector", connector) + .put("snapshot", snapshot) + .put("txId", txid) + .put("gtid", gtid); + + Struct data = new Struct(ROW_SCHEMA) + .put("account_id", 1L) + .put("balance", BigDecimal.valueOf(100)) + .put("last_updated", Instant.now().toString()); + + return new Struct(VALUE_SCHEMA_WITH_TS_US) + .put("op", operation) + .put("ts_ms", System.currentTimeMillis()) + .put("ts_us", tsUs) + .put("source", source) + .put("before", data) + .put("after", data); + } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index f50184ef..f904db49 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -153,6 +153,7 @@ private boolean receive(Envelope envelope) { *

* PostgreSQL uses a 32-bit unsigned integer for transaction IDs, which means the wraparound occurs at 2^32 (4,294,967,296). * We are using 2^31 (2,147,483,648) to detect the wraparound correctly. + * TODO (2471-02-04): MySQL transaction ID limit needs addressing, threshold it 2^63 -1 and there is no wraparound * * @param currentTxId current transaction ID * @param newTxId new transaction ID diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index 8ecc3d20..885a9f08 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -265,7 +265,7 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { Assertions.assertEquals(1, snapshots.size()); Snapshot snapshot = snapshots.get(0); - Assertions.assertEquals(DataOperations.OVERWRITE, snapshot.operation()); + Assertions.assertEquals(DataOperations.DELETE, snapshot.operation()); Assertions.assertEquals(0, ImmutableList.copyOf(snapshot.addedDataFiles(table.io())).size()); Assertions.assertEquals(1, ImmutableList.copyOf(snapshot.addedDeleteFiles(table.io())).size()); } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java index b94df2f3..e7beb027 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java @@ -179,7 +179,7 @@ public void testCommitResponseBecomesDataWrittenPartitioned() { Types.NestedField.optional( 10_304, "delete_files", - Types.ListType.ofRequired(10_304, DataFile.getType(spec.partitionType()))))); + Types.ListType.ofRequired(10_305, DataFile.getType(spec.partitionType()))))); } @Test diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java index 6585f6d9..d5e1534d 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/BaseWriterTest.java @@ -67,7 +67,7 @@ public void before() { when(table.io()).thenReturn(fileIO); when(table.locationProvider()) .thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of())); - when(table.encryption()).thenReturn(new PlaintextEncryptionManager()); + when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance()); when(table.properties()).thenReturn(ImmutableMap.of()); }