Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

USD-2054 Upgrade Iceberg to 1.6.1 #8

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ subprojects {
}
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
sourceCompatibility = "11"
targetCompatibility = "11"

test {
useJUnitPlatform()
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ bson-ver = "4.11.0"
hadoop-ver = "3.4.1"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
iceberg-ver = "1.5.2"
iceberg-ver = "1.6.1"
jackson-ver = "2.14.2"
junit-ver = "5.10.0"
kafka-ver = "3.5.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,19 @@ private R applyWithSchema(R record) {

// create the new value
Schema newValueSchema = makeUpdatedSchema(payloadSchema, cdcSchema);
if (value.schema().field("ts_us") != null) {
newValueSchema = makeUpdatedSchema(newValueSchema, CustomFieldConstants.SOURCE_TIMESTAMP_US, Timestamp.SCHEMA);
}

Struct newValue = new Struct(newValueSchema);

for (Field field : payloadSchema.fields()) {
newValue.put(field.name(), payload.get(field));
}
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

if (value.getStruct("ts_us") != null) {
newValue.put(CustomFieldConstants.SOURCE_TIMESTAMP_US, new java.util.Date(value.getInt64("ts_us")));
if (value.schema().field("ts_us") != null) {
newValue.put(CustomFieldConstants.SOURCE_TIMESTAMP_US, new java.util.Date(value.getInt64("ts_us") / 1000L));
}

return record.newRecord(
Expand Down Expand Up @@ -187,7 +191,6 @@ private String mapOperation(String originalOp) {

private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) {
String db;
Long txid = null;

if (source.schema().field("schema") != null) {
// prefer schema if present, e.g. for Postgres
Expand All @@ -198,20 +201,49 @@ private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata

String table = source.getString("table");

if (source.schema().field("txId") != null) {
txid = source.getInt64("txId");
}
// Extract transaction ID based on connector type
Long txid = extractTransactionIdFromSourceStruct(source);

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_TXID, txid);
}

private Long extractTransactionIdFromSourceStruct(Struct source) {
String connector = source.getString("connector");

if ("postgresql".equals(connector)) {
// Check for txId field for postgresql
if (source.schema().field("txId") != null) {
return source.getInt64("txId");
}
} else if ("mysql".equals(connector)) {
// Check if in snapshot mode first
if (source.schema().field("snapshot") != null && source.getBoolean("snapshot")) {
// Return 0 as gtid is null when snapshotting
return 0L;
}

// Not in snapshot mode, check for gtid field for mysql
if (source.schema().field("gtid") != null) {
String gtid = source.getString("gtid");
// Split gtid into uuid and txid
String[] gtidSections = gtid.split(":");
// Return txid section
if (gtidSections.length == 2) {
return Long.valueOf(gtidSections[1]);
}
}
} else {
LOG.warn("Transactional consistency is not currently supported for connector type: {}", connector);
}
return null;
}

private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> cdcMetadata) {
Map<String, Object> map = Requirements.requireMap(source, "Debezium transform");

String db;
Long txid = null;
if (map.containsKey("schema")) {
// prefer schema if present, e.g. for Postgres
db = map.get("schema").toString();
Expand All @@ -220,15 +252,46 @@ private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> c
}
String table = map.get("table").toString();

if (map.containsKey("txId")) {
txid = Long.valueOf(map.get("txId").toString());
}
// Extract transaction ID based on connector type
Long txid = extractTransactionIdFromSourceMap(map);

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_TXID, txid);
}

private Long extractTransactionIdFromSourceMap(Map<String, Object> source) {
String connector = source.get("connector").toString();

if ("postgresql".equals(connector)) {
// Check for txId field for postgresql
if (source.containsKey("txId")) {
// Return txid
return Long.valueOf(source.get("txId").toString());
}
} else if ("mysql".equals(connector)) {
// Check if in snapshot mode first
if (source.containsKey("snapshot") && Boolean.TRUE.equals(source.get("snapshot"))) {
// Return 0 as gtid is null when snapshotting
return 0L;
}

// Not in snapshot mode, check for gtid field for mysql
if (source.containsKey("gtid")) {
String gtid = source.get("gtid").toString();
// Split gtid into uuid and txid
String[] gtidSections = gtid.split(":");
// Return txid section
if (gtidSections.length == 2) {
return Long.valueOf(gtidSections[1]);
}
}
} else {
LOG.warn("Transactional consistency is not currently supported for connector type: {}", connector);
}
return null;
}

private String target(String db, String table) {
return cdcTargetPattern == null || cdcTargetPattern.isEmpty()
? db + "." + table
Expand All @@ -252,18 +315,22 @@ private Schema makeCdcSchema(Schema keySchema) {
return builder.build();
}

private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) {
private Schema makeUpdatedSchema(Schema schema, String fieldName, Schema fieldSchema) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
}

builder.field(CdcConstants.COL_CDC, cdcSchema);
builder.field(fieldName, fieldSchema);

return builder.build();
}

private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) {
return makeUpdatedSchema(schema, CdcConstants.COL_CDC, cdcSchema);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
Expand Down
Loading