Skip to content

Commit

Permalink
[Improve][Paimon] Add check for the base type between source and sink…
Browse files Browse the repository at this point in the history
… before write. (#6953)
  • Loading branch information
dailai authored Jun 11, 2024
1 parent 658643a commit d56d64f
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH;
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA;

/**
* The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is
Expand Down Expand Up @@ -245,4 +247,28 @@ public static SeaTunnelRuntimeException unsupportedRowKind(
params.put("rowKind", rowKind);
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
}

public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema(
String connector,
String sourceFieldSqlSchema,
String exceptFieldSqlSchema,
String sinkFieldSqlSchema) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFieldSqlSchema", sourceFieldSqlSchema);
params.put("exceptFieldSqlSchema", exceptFieldSqlSchema);
params.put("sinkFieldSqlSchema", sinkFieldSqlSchema);
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params);
}

public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch(
String connector, int sourceFieldsNum, int sinkFieldsNum) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"COMMON-28",
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
UNSUPPORTED_ROW_KIND(
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"),

WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA(
"COMMON-30",
"<connector>: The source filed with schema '<sourceFieldSqlSchema>', except filed schema of sink is '<exceptFieldSqlSchema>'; but the filed in sink table which actual schema is '<sinkFieldSqlSchema>'. Please check schema of sink table."),

WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
"COMMON-31",
"<connector>: The source has '<sourceFieldsNum>' fields, but the table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table.");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
import org.apache.paimon.data.BinaryMap;
Expand Down Expand Up @@ -350,8 +351,14 @@ public static SeaTunnelRow convert(
*/
public static InternalRow reconvert(
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
List<DataField> fields = tableSchema.fields();
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
List<DataField> sinkTotalFields = tableSchema.fields();
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
throw new CommonError()
.writeRowErrorWithFiledsCountNotMatch(
"Paimon", sourceTotalFields, sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
Expand All @@ -370,6 +377,7 @@ public static InternalRow reconvert(
binaryWriter.setNullAt(i);
continue;
}
checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
Expand Down Expand Up @@ -416,7 +424,7 @@ public static InternalRow reconvert(
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
binaryWriter.writeTimestamp(
Expand Down Expand Up @@ -470,4 +478,23 @@ public static InternalRow reconvert(
}
return binaryRow;
}

private static void checkCanWriteWithType(
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
String sourceFieldName = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i);
DataField sinkDataField = fields.get(i);
DataType exceptDataType =
RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType);
DataType sinkDataType = sinkDataField.type();
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
throw new CommonError()
.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
});
}

@TestTemplate
public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Container.ExecResult errResult =
container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf");
Assertions.assertEquals(1, errResult.getExitCode());
Assertions.assertTrue(
errResult
.getStderr()
.contains(
"[Paimon: The source filed with schema 'name INT', except filed schema of sink is '`name` INT'; but the filed in sink table which actual schema is '`name` STRING'. Please check schema of sink table.]"));
}

@TestTemplate
public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = int
score = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, 100, "A"]
},
{
kind = INSERT
fields = [2, 100, "B"]
},
{
kind = INSERT
fields = [3, 100, "C"]
}
]
}
}

sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_namespace1"
table = "st_test"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
}, {
"id" : 12,
"name" : "c_date",
"type" : "TIMESTAMP(3)"
"type" : "DATE"
}, {
"id" : 13,
"name" : "c_timestamp",
Expand All @@ -68,4 +68,4 @@
"partitionKeys" : [ ],
"primaryKeys" : [ ],
"options" : { }
}
}

0 comments on commit d56d64f

Please sign in to comment.