From 447e43aa2bc0e21254426f56cc2a1e1946ca51b6 Mon Sep 17 00:00:00 2001 From: wyb Date: Wed, 25 Dec 2024 14:32:23 +0800 Subject: [PATCH 1/5] [Enhancement] Support column mismatch fill property in files() Signed-off-by: wyb --- be/src/exec/csv_scanner.cpp | 70 ++++++--- be/src/exec/file_scanner.cpp | 5 + be/src/exec/file_scanner.h | 5 + .../starrocks/catalog/TableFunctionTable.java | 95 ++++++++++--- .../com/starrocks/planner/FileScanNode.java | 23 ++- .../sql/analyzer/InsertAnalyzer.java | 24 ++-- .../sql/plan/PlanFragmentBuilder.java | 4 +- gensrc/thrift/PlanNodes.thrift | 1 + test/sql/test_files/R/test_csv_files_merge | 134 ++++++++++++++++++ test/sql/test_files/R/test_orc_files_merge | 3 + .../sql/test_files/R/test_parquet_files_merge | 3 + test/sql/test_files/T/test_csv_files_merge | 75 ++++++++++ test/sql/test_files/T/test_orc_files_merge | 3 + .../sql/test_files/T/test_parquet_files_merge | 3 + .../csv_format/basic0_column_mismatch.csv | 2 + 15 files changed, 392 insertions(+), 58 deletions(-) create mode 100644 test/sql/test_files/R/test_csv_files_merge create mode 100644 test/sql/test_files/T/test_csv_files_merge create mode 100644 test/sql/test_files/csv_format/basic0_column_mismatch.csv diff --git a/be/src/exec/csv_scanner.cpp b/be/src/exec/csv_scanner.cpp index 4765fb6ab5b191..f04365f6518240 100644 --- a/be/src/exec/csv_scanner.cpp +++ b/be/src/exec/csv_scanner.cpp @@ -45,8 +45,8 @@ static std::string string_2_asc(const std::string& input) { return oss.str(); } -static std::string make_column_count_not_matched_error_message(int expected_count, int actual_count, - CSVParseOptions& parse_options) { +static std::string make_column_count_not_matched_error_message_for_load(int expected_count, int actual_count, + CSVParseOptions& parse_options) { std::stringstream error_msg; error_msg << "Target column count: " << expected_count << " doesn't match source value column count: " << actual_count << ". " @@ -55,6 +55,20 @@ static std::string make_column_count_not_matched_error_message(int expected_coun return error_msg.str(); } +static std::string make_column_count_not_matched_error_message_for_query(int expected_count, int actual_count, + CSVParseOptions& parse_options, + const std::string& row, + const std::string& filename) { + std::stringstream error_msg; + error_msg << "Schema column count: " << expected_count + << " doesn't match source value column count: " << actual_count << ". " + << "Column separator: " << string_2_asc(parse_options.column_delimiter) << ", " + << "Row delimiter: " << string_2_asc(parse_options.row_delimiter) << ", " + << "Row: '" << row << "', File: " << filename << ". " + << "Consider setting 'fill_mismatch_column_with' = 'null'"; + return error_msg.str(); +} + static std::string make_value_type_not_matched_error_message(int field_pos, const Slice& field, const SlotDescriptor* slot) { std::stringstream error_msg; @@ -357,17 +371,23 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { if (status.is_end_of_file()) { break; } - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv, - row.columns.size(), _parse_options); - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv, - row.columns.size(), _parse_options); - _report_rejected_record(record, error_msg); + if (_is_load) { + std::string error_msg = make_column_count_not_matched_error_message_for_load( + _num_fields_in_csv, row.columns.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); + } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else { + // files() query return error + std::string error_msg = make_column_count_not_matched_error_message_for_query( + _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), + _curr_reader->filename()); + return Status::DataQualityError(error_msg); } - continue; } if (!validate_utf8(record.data, record.size)) { if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { @@ -459,17 +479,23 @@ Status CSVScanner::_parse_csv(Chunk* chunk) { _curr_reader->split_record(record, &fields); if (fields.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) { - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - std::string error_msg = - make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options); - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - std::string error_msg = - make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options); - _report_rejected_record(record, error_msg); + if (_is_load) { + std::string error_msg = make_column_count_not_matched_error_message_for_load( + _num_fields_in_csv, fields.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); + } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else { + // files() query return error + std::string error_msg = make_column_count_not_matched_error_message_for_query( + _num_fields_in_csv, fields.size(), _parse_options, record.to_string(), + _curr_reader->filename()); + return Status::DataQualityError(error_msg); } - continue; } if (!validate_utf8(record.data, record.size)) { if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { diff --git a/be/src/exec/file_scanner.cpp b/be/src/exec/file_scanner.cpp index 04c56ce9e7d5e0..d134bee08c6a4c 100644 --- a/be/src/exec/file_scanner.cpp +++ b/be/src/exec/file_scanner.cpp @@ -46,6 +46,7 @@ FileScanner::FileScanner(starrocks::RuntimeState* state, starrocks::RuntimeProfi _row_desc(nullptr), _strict_mode(false), _error_counter(0), + _is_load(true), _schema_only(schema_only) {} FileScanner::~FileScanner() = default; @@ -135,6 +136,10 @@ Status FileScanner::open() { _strict_mode = _params.strict_mode; } + if (_params.__isset.is_load) { + _is_load = _params.is_load; + } + if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) { return Status::InternalError("Slot map of dest to src must be set in strict mode"); } diff --git a/be/src/exec/file_scanner.h b/be/src/exec/file_scanner.h index 6cb79a6e1062ca..781fb442397d16 100644 --- a/be/src/exec/file_scanner.h +++ b/be/src/exec/file_scanner.h @@ -100,6 +100,11 @@ class FileScanner { bool _strict_mode; int64_t _error_counter; + // When column mismatch, query and load have different behaviors. + // Query returns error, while load counts the filtered rows, and return error or not is based on max filter ratio, + // so need to check query or load in scanner. + // Currently only used in csv scanner. + bool _is_load; // sources std::vector _src_slot_descriptors; diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java index b4c6796e986400..4f6099a514f665 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java @@ -119,6 +119,8 @@ public class TableFunctionTable extends Table { public static final String PROPERTY_AUTO_DETECT_SAMPLE_FILES = "auto_detect_sample_files"; public static final String PROPERTY_AUTO_DETECT_SAMPLE_ROWS = "auto_detect_sample_rows"; + private static final String PROPERTY_FILL_MISMATCH_COLUMN_WITH = "fill_mismatch_column_with"; + public static final String PROPERTY_CSV_COLUMN_SEPARATOR = "csv.column_separator"; public static final String PROPERTY_CSV_ROW_DELIMITER = "csv.row_delimiter"; public static final String PROPERTY_CSV_SKIP_HEADER = "csv.skip_header"; @@ -130,12 +132,39 @@ public class TableFunctionTable extends Table { private static final String PROPERTY_LIST_FILES_ONLY = "list_files_only"; private static final String PROPERTY_LIST_RECURSIVELY = "list_recursively"; + public enum MisMatchFillValue { + NONE, // error + NULL; + + public static MisMatchFillValue fromString(String value) { + for (MisMatchFillValue fillValue : values()) { + if (fillValue.name().equalsIgnoreCase(value)) { + return fillValue; + } + } + return null; + } + + public static List getCandidates() { + return Arrays.stream(values()).map(p -> p.name().toLowerCase()).collect(Collectors.toList()); + } + } + + public enum FilesTableType { + LOAD, + UNLOAD, + QUERY, + LIST + } + private String path; private String format; - // for load data - private int autoDetectSampleFiles; - private int autoDetectSampleRows; + private FilesTableType filesTableType = FilesTableType.QUERY; + + // for load/query data + private int autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES; + private int autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS; private List columnsFromPath = new ArrayList<>(); private boolean strictMode = false; @@ -143,6 +172,8 @@ public class TableFunctionTable extends Table { private List fileStatuses = Lists.newArrayList(); + private MisMatchFillValue misMatchFillValue = MisMatchFillValue.NONE; + // for unload data private String compressionType; private Optional> partitionColumnIDs = Optional.empty(); @@ -164,12 +195,12 @@ public class TableFunctionTable extends Table { private boolean listFilesOnly = false; private boolean listRecursively = false; - // Ctor for load data / list files via table function + // Ctor for load data / query data / list files via table function public TableFunctionTable(Map properties) throws DdlException { this(properties, null); } - // Ctor for load data / list files via table function + // Ctor for load data / query data / list files via table function public TableFunctionTable(Map properties, Consumer pushDownSchemaFunc) throws DdlException { super(TableType.TABLE_FUNCTION); @@ -180,9 +211,11 @@ public TableFunctionTable(Map properties, Consumer columns, Map properties, checkNotNull(properties, "properties is null"); checkNotNull(sessionVariable, "sessionVariable is null"); this.properties = properties; + this.filesTableType = FilesTableType.UNLOAD; parsePropertiesForUnload(columns, sessionVariable); setNewFullSchema(columns); } - private void setSchemaForLoad() throws DdlException { - parseFilesForLoad(); + private void setSchemaForLoadAndQuery() throws DdlException { + parseFilesForLoadAndQuery(); // infer schema from files List columns = new ArrayList<>(); @@ -338,6 +372,14 @@ public String getPath() { return path; } + public void setFilesTableType(FilesTableType filesTableType) { + this.filesTableType = filesTableType; + } + + public boolean isLoadType() { + return filesTableType == FilesTableType.LOAD; + } + public boolean isListFilesOnly() { return listFilesOnly; } @@ -393,23 +435,34 @@ private void parsePropertiesForLoad(Map properties) throws DdlEx strictMode = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_MODE)); } - if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) { - autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES; - } else { + if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) { + String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES); try { - autoDetectSampleFiles = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES)); + autoDetectSampleFiles = Integer.parseInt(property); } catch (NumberFormatException e) { - throw new DdlException("failed to parse auto_detect_sample_files: ", e); + ErrorReport.reportDdlException( + ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_FILES, property, "int number"); } } - if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) { - autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS; - } else { + if (properties.containsKey(PROPERTY_FILL_MISMATCH_COLUMN_WITH)) { + String property = properties.get(PROPERTY_FILL_MISMATCH_COLUMN_WITH); + misMatchFillValue = MisMatchFillValue.fromString(property); + if (misMatchFillValue == null) { + String msg = String.format("%s (case insensitive)", String.join(", ", MisMatchFillValue.getCandidates())); + ErrorReport.reportSemanticException( + ErrorCode.ERR_INVALID_VALUE, PROPERTY_FILL_MISMATCH_COLUMN_WITH, property, msg); + } + } + + // csv properties + if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) { + String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS); try { - autoDetectSampleRows = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)); + autoDetectSampleRows = Integer.parseInt(property); } catch (NumberFormatException e) { - throw new DdlException("failed to parse auto_detect_sample_files: ", e); + ErrorReport.reportDdlException( + ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_ROWS, property, "int number"); } } @@ -467,7 +520,7 @@ private void parsePropertiesForLoad(Map properties) throws DdlEx } } - private void parseFilesForLoad() throws DdlException { + private void parseFilesForLoadAndQuery() throws DdlException { try { // fake:// is a faked path, for testing purpose if (path.startsWith("fake://")) { @@ -635,6 +688,10 @@ public boolean isStrictMode() { return strictMode; } + public boolean isFlexibleColumnMapping() { + return misMatchFillValue != MisMatchFillValue.NONE; + } + @Override public String toString() { return String.format("TABLE('path'='%s', 'format'='%s')", path, format); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java index b4a0de3849e5d8..711fdd65a594ed 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java @@ -161,7 +161,13 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private boolean useVectorizedLoad; private LoadJob.JSONOptions jsonOptions = new LoadJob.JSONOptions(); + private boolean flexibleColumnMapping = false; + // When column mismatch, query and load have different behaviors. + // Query returns error, while load counts the filtered rows, and return error or not is based on max filter ratio, + // so need to check query or load in scanner. + // Currently only used in csv scanner. + private boolean isLoad = true; private boolean nullExprInAutoIncrement; @@ -224,8 +230,10 @@ public void init(Analyzer analyzer) throws StarRocksException { } } - private boolean isLoad() { - return desc.getTable() == null; + // broker table is deprecated + // TODO: remove + private boolean isBrokerTable() { + return desc.getTable() != null; } @Deprecated @@ -257,6 +265,10 @@ public void setFlexibleColumnMapping(boolean enable) { this.flexibleColumnMapping = enable; } + public void setIsLoad(boolean isLoad) { + this.isLoad = isLoad; + } + public void setUseVectorizedLoad(boolean useVectorizedLoad) { this.useVectorizedLoad = useVectorizedLoad; } @@ -314,6 +326,7 @@ private void initParams(ParamCreateContext context) params.setEscape(fileGroup.getEscape()); params.setJson_file_size_limit(Config.json_file_size_limit); params.setFlexible_column_mapping(flexibleColumnMapping); + params.setIs_load(isLoad); initColumns(context); initWhereExpr(fileGroup.getWhereExpr(), analyzer); } @@ -338,7 +351,7 @@ private void initColumns(ParamCreateContext context) throws StarRocksException { // for query, there is no column exprs, they will be got from table's schema in "Load.initColumns" List columnExprs = Lists.newArrayList(); List columnsFromPath = Lists.newArrayList(); - if (isLoad()) { + if (!isBrokerTable()) { columnExprs = context.fileGroup.getColumnExprList(); columnsFromPath = context.fileGroup.getColumnsFromPath(); } @@ -494,7 +507,7 @@ private void getFileStatusAndCalcInstance() throws StarRocksException { } Preconditions.checkState(fileStatusesList.size() == fileGroups.size()); - if (isLoad() && filesAdded == 0) { + if (!isBrokerTable() && filesAdded == 0) { // return at most 3 paths to users int limit = 3; List allFilePaths = @@ -740,7 +753,7 @@ public void updateScanRangeLocations() { @Override protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); - if (!isLoad()) { + if (isBrokerTable()) { BrokerTable brokerTable = (BrokerTable) targetTable; output.append(prefix).append("TABLE: ").append(brokerTable.getName()).append("\n"); output.append(prefix).append("PATH: ") diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java index 3d7df347136dbc..f845cb04769c95 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java @@ -49,7 +49,6 @@ import com.starrocks.sql.ast.LoadStmt; import com.starrocks.sql.ast.PartitionNames; import com.starrocks.sql.ast.QueryRelation; -import com.starrocks.sql.ast.QueryStatement; import com.starrocks.sql.ast.Relation; import com.starrocks.sql.ast.SelectListItem; import com.starrocks.sql.ast.SelectRelation; @@ -336,6 +335,13 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext if (session.getDumpInfo() != null) { session.getDumpInfo().addTable(insertStmt.getTableName().getDb(), table); } + + // Set table function table used for load + List relations = + AnalyzerUtils.collectFileTableFunctionRelation(insertStmt.getQueryStatement()); + for (FileTableFunctionRelation relation : relations) { + ((TableFunctionTable) relation.getTable()).setFilesTableType(TableFunctionTable.FilesTableType.LOAD); + } } private static void analyzeProperties(InsertStmt insertStmt, ConnectContext session) { @@ -361,15 +367,13 @@ private static void analyzeProperties(InsertStmt insertStmt, ConnectContext sess } // push down some properties to file table function - QueryStatement queryStatement = insertStmt.getQueryStatement(); - if (queryStatement != null) { - List relations = AnalyzerUtils.collectFileTableFunctionRelation(queryStatement); - for (FileTableFunctionRelation relation : relations) { - Map tableFunctionProperties = relation.getProperties(); - for (String property : PUSH_DOWN_PROPERTIES_SET) { - if (properties.containsKey(property)) { - tableFunctionProperties.put(property, properties.get(property)); - } + List relations = + AnalyzerUtils.collectFileTableFunctionRelation(insertStmt.getQueryStatement()); + for (FileTableFunctionRelation relation : relations) { + Map tableFunctionProperties = relation.getProperties(); + for (String property : PUSH_DOWN_PROPERTIES_SET) { + if (properties.containsKey(property)) { + tableFunctionProperties.put(property, properties.get(property)); } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java index 1d8e1939f6e32d..39bdbf402e09ba 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java @@ -3865,8 +3865,8 @@ public PlanFragment visitPhysicalTableFunctionTableScan(OptExpression optExpress int dop = ConnectContext.get().getSessionVariable().getSinkDegreeOfParallelism(); scanNode.setLoadInfo(-1, -1, table, new BrokerDesc(table.getProperties()), fileGroups, table.isStrictMode(), dop); scanNode.setUseVectorizedLoad(true); - // table function enable flexible column mapping by default. - scanNode.setFlexibleColumnMapping(true); + scanNode.setFlexibleColumnMapping(table.isFlexibleColumnMapping()); + scanNode.setIsLoad(table.isLoadType()); Analyzer analyzer = new Analyzer(GlobalStateMgr.getCurrentState(), context.getConnectContext()); analyzer.setDescTbl(context.getDescTbl()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f4e71113d9472e..a80ed0e6762f60 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -271,6 +271,7 @@ struct TBrokerScanRangeParams { 30: optional i64 schema_sample_file_count 31: optional i64 schema_sample_file_row_count 32: optional bool flexible_column_mapping + 33: optional bool is_load } // Broker scan range diff --git a/test/sql/test_files/R/test_csv_files_merge b/test/sql/test_files/R/test_csv_files_merge new file mode 100644 index 00000000000000..ba66d50aec2605 --- /dev/null +++ b/test/sql/test_files/R/test_csv_files_merge @@ -0,0 +1,134 @@ +-- name: test_csv_files_merge + +create database db_${uuid0}; +use db_${uuid0}; + +shell: ossutil64 mkdir oss://${oss_bucket}/test_files/csv_format/${uuid0} >/dev/null || echo "exit 0" >/dev/null + +shell: ossutil64 cp --force ./sql/test_files/csv_format/basic0_column_mismatch.csv oss://${oss_bucket}/test_files/csv_format/${uuid0}/ | grep -Pv "(average|elapsed)" +-- result: +0 + +Succeed: Total num: 1, size: 34. OK num: 1(upload 1 files). +-- !result + +shell: ossutil64 cp --force ./sql/test_files/csv_format/basic1.csv oss://${oss_bucket}/test_files/csv_format/${uuid0}/ | grep -Pv "(average|elapsed)" +-- result: +0 + +Succeed: Total num: 1, size: 52. OK num: 1(upload 1 files). +-- !result + + +desc files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1"); +-- result: +$1 bigint YES +$2 varchar(1048576) YES +$3 double YES +$4 boolean YES +-- !result + +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "null"); +-- result: +4 Tom 30.4 None +5 Jerry 40.8 0 +1 Julia 20.2 1 +2 Andy 21.3 0 +3 Joke 22.4 1 +-- !result + +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); +-- result: +[REGEX].*Schema column count: 4 doesn't match source value column count: 3. Column separator: ',', Row delimiter: .*, Row: '4,Tom,30.4', File: .*basic0_column_mismatch.csv. Consider setting 'fill_mismatch_column_with' = 'null'.* +-- !result + + +create table t1 (k1 bigint, k2 varchar(256), k3 double, k4 boolean); +-- result: +-- !result + +insert into t1 +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); +-- result: +[REGEX].*Insert has filtered data.* +-- !result + +insert into t1 properties ("max_filter_ratio" = "0.5") +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); +-- result: +-- !result + +select * from t1; +-- result: +1 Julia 20.2 1 +2 Andy 21.3 0 +3 Joke 22.4 1 +-- !result + +truncate table t1; +-- result: +-- !result + +insert into t1 +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "null"); +-- result: +-- !result + +select * from t1; +-- result: +1 Julia 20.2 1 +2 Andy 21.3 0 +3 Joke 22.4 1 +4 Tom 30.4 None +5 Jerry 40.8 0 +-- !result + + +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "xxx"); +-- result: +[REGEX].*Invalid fill_mismatch_column_with: 'xxx'. Expected values should be none, null \(case insensitive\). +-- !result + + +shell: ossutil64 rm -rf oss://${oss_bucket}/test_files/csv_format/${uuid0}/ > /dev/null diff --git a/test/sql/test_files/R/test_orc_files_merge b/test/sql/test_files/R/test_orc_files_merge index 94ee12f2ecaee1..16ef31cfa80235 100644 --- a/test/sql/test_files/R/test_orc_files_merge +++ b/test/sql/test_files/R/test_orc_files_merge @@ -75,6 +75,7 @@ select * from files( select * from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -88,6 +89,7 @@ None 21 None None 2024-10-03 None c None select k2, k5, k7 from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -101,6 +103,7 @@ select k2, k5, k7 from files( select k1, k3, k8 from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", diff --git a/test/sql/test_files/R/test_parquet_files_merge b/test/sql/test_files/R/test_parquet_files_merge index bdcda52a19c0ec..9f11f6812716b4 100644 --- a/test/sql/test_files/R/test_parquet_files_merge +++ b/test/sql/test_files/R/test_parquet_files_merge @@ -75,6 +75,7 @@ select * from files( select * from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -88,6 +89,7 @@ None 21 None None 2024-10-03 None c None select k2, k5, k7 from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -101,6 +103,7 @@ select k2, k5, k7 from files( select k1, k3, k8 from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", diff --git a/test/sql/test_files/T/test_csv_files_merge b/test/sql/test_files/T/test_csv_files_merge new file mode 100644 index 00000000000000..94104aef647a91 --- /dev/null +++ b/test/sql/test_files/T/test_csv_files_merge @@ -0,0 +1,75 @@ +-- name: test_csv_files_merge + +create database db_${uuid0}; +use db_${uuid0}; + +shell: ossutil64 mkdir oss://${oss_bucket}/test_files/csv_format/${uuid0} >/dev/null || echo "exit 0" >/dev/null +shell: ossutil64 cp --force ./sql/test_files/csv_format/basic0_column_mismatch.csv oss://${oss_bucket}/test_files/csv_format/${uuid0}/ | grep -Pv "(average|elapsed)" +shell: ossutil64 cp --force ./sql/test_files/csv_format/basic1.csv oss://${oss_bucket}/test_files/csv_format/${uuid0}/ | grep -Pv "(average|elapsed)" + +-- query +desc files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1"); +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "null"); +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); + + +-- load +create table t1 (k1 bigint, k2 varchar(256), k3 double, k4 boolean); + +insert into t1 +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); + +insert into t1 properties ("max_filter_ratio" = "0.5") +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "none"); +select * from t1; +truncate table t1; + +insert into t1 +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "null"); +select * from t1; + +-- error property +select * from files( + "path" = "oss://${oss_bucket}/test_files/csv_format/${uuid0}/*", + "format" = "csv", + "csv.column_separator" = ",", + "csv.row_delimiter" = "\n", + "auto_detect_sample_files" = "1", + "fill_mismatch_column_with" = "xxx"); + +shell: ossutil64 rm -rf oss://${oss_bucket}/test_files/csv_format/${uuid0}/ > /dev/null diff --git a/test/sql/test_files/T/test_orc_files_merge b/test/sql/test_files/T/test_orc_files_merge index 7994c344c54397..9460e8cc9ef194 100644 --- a/test/sql/test_files/T/test_orc_files_merge +++ b/test/sql/test_files/T/test_orc_files_merge @@ -37,6 +37,7 @@ select * from files( select * from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -44,6 +45,7 @@ select * from files( select k2, k5, k7 from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -51,6 +53,7 @@ select k2, k5, k7 from files( select k1, k3, k8 from files( "path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*", "format" = "orc", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", diff --git a/test/sql/test_files/T/test_parquet_files_merge b/test/sql/test_files/T/test_parquet_files_merge index c3e9d78ef4a4cd..7727b6eee0aaa3 100644 --- a/test/sql/test_files/T/test_parquet_files_merge +++ b/test/sql/test_files/T/test_parquet_files_merge @@ -37,6 +37,7 @@ select * from files( select * from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -44,6 +45,7 @@ select * from files( select k2, k5, k7 from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", @@ -51,6 +53,7 @@ select k2, k5, k7 from files( select k1, k3, k8 from files( "path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*", "format" = "parquet", + "fill_mismatch_column_with" = "null", "auto_detect_sample_files" = "2", "aws.s3.access_key" = "${oss_ak}", "aws.s3.secret_key" = "${oss_sk}", diff --git a/test/sql/test_files/csv_format/basic0_column_mismatch.csv b/test/sql/test_files/csv_format/basic0_column_mismatch.csv new file mode 100644 index 00000000000000..c5e98eba421ce6 --- /dev/null +++ b/test/sql/test_files/csv_format/basic0_column_mismatch.csv @@ -0,0 +1,2 @@ +4,Tom,30.4 +5,Jerry,40.8,false,xxx From a4de4002ae3511856c4a9c77c3d24bcb9968a698 Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 27 Dec 2024 16:08:46 +0000 Subject: [PATCH 2/5] Fix File column count > files schema Signed-off-by: wyb --- be/src/exec/csv_scanner.cpp | 95 ++++++++++++------- be/src/exec/file_scanner.cpp | 6 +- be/src/exec/file_scanner.h | 7 +- .../com/starrocks/planner/FileScanNode.java | 14 +-- .../sql/plan/PlanFragmentBuilder.java | 3 +- gensrc/thrift/PlanNodes.thrift | 8 +- test/sql/test_files/R/test_csv_files_merge | 1 + 7 files changed, 86 insertions(+), 48 deletions(-) diff --git a/be/src/exec/csv_scanner.cpp b/be/src/exec/csv_scanner.cpp index f04365f6518240..6ce1b01f48eba1 100644 --- a/be/src/exec/csv_scanner.cpp +++ b/be/src/exec/csv_scanner.cpp @@ -367,27 +367,44 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { const char* data = _curr_reader->buffBasePtr() + row.parsed_start; CSVReader::Record record(data, row.parsed_end - row.parsed_start); - if (row.columns.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) { + if (_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { if (status.is_end_of_file()) { break; } - if (_is_load) { - std::string error_msg = make_column_count_not_matched_error_message_for_load( - _num_fields_in_csv, row.columns.size(), _parse_options); - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - _report_rejected_record(record, error_msg); - } - continue; - } else { - // files() query return error - std::string error_msg = make_column_count_not_matched_error_message_for_query( - _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), - _curr_reader->filename()); - return Status::DataQualityError(error_msg); + std::string error_msg = make_column_count_not_matched_error_message_for_load( + _num_fields_in_csv, row.columns.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); + } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else if (_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { + if (status.is_end_of_file()) { + break; + } + std::string error_msg = make_column_count_not_matched_error_message_for_load( + _num_fields_in_csv, row.columns.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); + } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else if (_file_scan_type == TFileScanType::FILES_QUERY && row.columns.size() < _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { + if (status.is_end_of_file()) { + break; } + // files() query return error + std::string error_msg = make_column_count_not_matched_error_message_for_query( + _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), + _curr_reader->filename()); + return Status::DataQualityError(error_msg); } if (!validate_utf8(record.data, record.size)) { if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { @@ -478,24 +495,34 @@ Status CSVScanner::_parse_csv(Chunk* chunk) { fields.clear(); _curr_reader->split_record(record, &fields); - if (fields.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) { - if (_is_load) { - std::string error_msg = make_column_count_not_matched_error_message_for_load( - _num_fields_in_csv, fields.size(), _parse_options); - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - _report_rejected_record(record, error_msg); - } - continue; - } else { - // files() query return error - std::string error_msg = make_column_count_not_matched_error_message_for_query( - _num_fields_in_csv, fields.size(), _parse_options, record.to_string(), - _curr_reader->filename()); - return Status::DataQualityError(error_msg); + if (_file_scan_type == TFileScanType::LOAD && fields.size() != _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { + std::string error_msg = make_column_count_not_matched_error_message_for_load(_num_fields_in_csv, + fields.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); + } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else if (_file_scan_type == TFileScanType::FILES_INSERT && fields.size() < _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { + std::string error_msg = make_column_count_not_matched_error_message_for_load(_num_fields_in_csv, + fields.size(), _parse_options); + if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { + _report_error(record, error_msg); } + if (_state->enable_log_rejected_record()) { + _report_rejected_record(record, error_msg); + } + continue; + } else if (_file_scan_type == TFileScanType::FILES_QUERY && fields.size() < _num_fields_in_csv && + !_scan_range.params.flexible_column_mapping) { + // files() query return error + std::string error_msg = make_column_count_not_matched_error_message_for_query( + _num_fields_in_csv, fields.size(), _parse_options, record.to_string(), _curr_reader->filename()); + return Status::DataQualityError(error_msg); } if (!validate_utf8(record.data, record.size)) { if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { diff --git a/be/src/exec/file_scanner.cpp b/be/src/exec/file_scanner.cpp index d134bee08c6a4c..368b023553ca87 100644 --- a/be/src/exec/file_scanner.cpp +++ b/be/src/exec/file_scanner.cpp @@ -46,7 +46,7 @@ FileScanner::FileScanner(starrocks::RuntimeState* state, starrocks::RuntimeProfi _row_desc(nullptr), _strict_mode(false), _error_counter(0), - _is_load(true), + _file_scan_type(TFileScanType::LOAD), _schema_only(schema_only) {} FileScanner::~FileScanner() = default; @@ -136,8 +136,8 @@ Status FileScanner::open() { _strict_mode = _params.strict_mode; } - if (_params.__isset.is_load) { - _is_load = _params.is_load; + if (_params.__isset.file_scan_type) { + _file_scan_type = _params.file_scan_type; } if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) { diff --git a/be/src/exec/file_scanner.h b/be/src/exec/file_scanner.h index 781fb442397d16..6d1c816f643e5d 100644 --- a/be/src/exec/file_scanner.h +++ b/be/src/exec/file_scanner.h @@ -100,11 +100,12 @@ class FileScanner { bool _strict_mode; int64_t _error_counter; - // When column mismatch, query and load have different behaviors. + // When column mismatch, files query/load and other type load have different behaviors. // Query returns error, while load counts the filtered rows, and return error or not is based on max filter ratio, - // so need to check query or load in scanner. + // files load will not filter rows if file column count is larger that the schema, + // so need to check files query/load or other type load in scanner. // Currently only used in csv scanner. - bool _is_load; + TFileScanType::type _file_scan_type; // sources std::vector _src_slot_descriptors; diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java index 711fdd65a594ed..de049489ffd8fd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java @@ -81,6 +81,7 @@ import com.starrocks.thrift.TExplainLevel; import com.starrocks.thrift.TFileFormatType; import com.starrocks.thrift.TFileScanNode; +import com.starrocks.thrift.TFileScanType; import com.starrocks.thrift.TFileType; import com.starrocks.thrift.THdfsProperties; import com.starrocks.thrift.TNetworkAddress; @@ -163,11 +164,12 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private LoadJob.JSONOptions jsonOptions = new LoadJob.JSONOptions(); private boolean flexibleColumnMapping = false; - // When column mismatch, query and load have different behaviors. + // When column mismatch, files query/load and other type load have different behaviors. // Query returns error, while load counts the filtered rows, and return error or not is based on max filter ratio, - // so need to check query or load in scanner. + // files load will not filter rows if file column count is larger that the schema, + // so need to check files query/load or other type load in scanner. // Currently only used in csv scanner. - private boolean isLoad = true; + private TFileScanType fileScanType = TFileScanType.LOAD; private boolean nullExprInAutoIncrement; @@ -265,8 +267,8 @@ public void setFlexibleColumnMapping(boolean enable) { this.flexibleColumnMapping = enable; } - public void setIsLoad(boolean isLoad) { - this.isLoad = isLoad; + public void setFileScanType(TFileScanType fileScanType) { + this.fileScanType = fileScanType; } public void setUseVectorizedLoad(boolean useVectorizedLoad) { @@ -326,7 +328,7 @@ private void initParams(ParamCreateContext context) params.setEscape(fileGroup.getEscape()); params.setJson_file_size_limit(Config.json_file_size_limit); params.setFlexible_column_mapping(flexibleColumnMapping); - params.setIs_load(isLoad); + params.setFile_scan_type(fileScanType); initColumns(context); initWhereExpr(fileGroup.getWhereExpr(), analyzer); } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java index 39bdbf402e09ba..3c96dacd6bf27d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/plan/PlanFragmentBuilder.java @@ -201,6 +201,7 @@ import com.starrocks.sql.optimizer.statistics.Statistics; import com.starrocks.sql.optimizer.transformer.LogicalPlan; import com.starrocks.thrift.TBrokerFileStatus; +import com.starrocks.thrift.TFileScanType; import com.starrocks.thrift.TPartitionType; import com.starrocks.thrift.TResultSinkType; import org.apache.commons.collections4.CollectionUtils; @@ -3866,7 +3867,7 @@ public PlanFragment visitPhysicalTableFunctionTableScan(OptExpression optExpress scanNode.setLoadInfo(-1, -1, table, new BrokerDesc(table.getProperties()), fileGroups, table.isStrictMode(), dop); scanNode.setUseVectorizedLoad(true); scanNode.setFlexibleColumnMapping(table.isFlexibleColumnMapping()); - scanNode.setIsLoad(table.isLoadType()); + scanNode.setFileScanType(table.isLoadType() ? TFileScanType.FILES_INSERT : TFileScanType.FILES_QUERY); Analyzer analyzer = new Analyzer(GlobalStateMgr.getCurrentState(), context.getConnectContext()); analyzer.setDescTbl(context.getDescTbl()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index a80ed0e6762f60..6691ffa5baea24 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -209,6 +209,12 @@ struct THdfsProperties { 12: optional CloudConfiguration.TCloudConfiguration cloud_configuration } +enum TFileScanType { + LOAD, + FILES_INSERT, + FILES_QUERY +} + struct TBrokerScanRangeParams { 1: required i8 column_separator; 2: required i8 row_delimiter; @@ -271,7 +277,7 @@ struct TBrokerScanRangeParams { 30: optional i64 schema_sample_file_count 31: optional i64 schema_sample_file_row_count 32: optional bool flexible_column_mapping - 33: optional bool is_load + 33: optional TFileScanType file_scan_type } // Broker scan range diff --git a/test/sql/test_files/R/test_csv_files_merge b/test/sql/test_files/R/test_csv_files_merge index ba66d50aec2605..9b4d50e01a4ea6 100644 --- a/test/sql/test_files/R/test_csv_files_merge +++ b/test/sql/test_files/R/test_csv_files_merge @@ -92,6 +92,7 @@ select * from t1; 1 Julia 20.2 1 2 Andy 21.3 0 3 Joke 22.4 1 +5 Jerry 40.8 0 -- !result truncate table t1; From 1dd3ba981ce33657fe39a4b23df3fcb0933e7a12 Mon Sep 17 00:00:00 2001 From: wyb Date: Mon, 30 Dec 2024 10:25:17 +0800 Subject: [PATCH 3/5] Refactor code and add comment Signed-off-by: wyb --- be/src/exec/csv_scanner.cpp | 24 ++++++++---------------- gensrc/thrift/PlanNodes.thrift | 1 + 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/be/src/exec/csv_scanner.cpp b/be/src/exec/csv_scanner.cpp index 6ce1b01f48eba1..a8861073f70315 100644 --- a/be/src/exec/csv_scanner.cpp +++ b/be/src/exec/csv_scanner.cpp @@ -367,22 +367,13 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { const char* data = _curr_reader->buffBasePtr() + row.parsed_start; CSVReader::Record record(data, row.parsed_end - row.parsed_start); - if (_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv && + if (((_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv) || + (_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv)) && !_scan_range.params.flexible_column_mapping) { - if (status.is_end_of_file()) { - break; - } - std::string error_msg = make_column_count_not_matched_error_message_for_load( - _num_fields_in_csv, row.columns.size(), _parse_options); - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - _report_rejected_record(record, error_msg); - } - continue; - } else if (_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv && - !_scan_range.params.flexible_column_mapping) { + // broker load / stream load will filter rows when file column count is consistent with schema. + // + // insert from files() will filter rows when file column count is less than schema. + // file column count more than schema is normal, extra columns will be ignored. if (status.is_end_of_file()) { break; } @@ -400,7 +391,8 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { if (status.is_end_of_file()) { break; } - // files() query return error + // query files() will return error when file column count is less than schema. + // file column count more than schema is normal, extra columns will be ignored. std::string error_msg = make_column_count_not_matched_error_message_for_query( _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), _curr_reader->filename()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6691ffa5baea24..2c8fb62b88cf5d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -210,6 +210,7 @@ struct THdfsProperties { } enum TFileScanType { + // broker load, stream load, except insert from files LOAD, FILES_INSERT, FILES_QUERY From cf7c8ecebd3008cf28f7b599df8be4b51c775b4d Mon Sep 17 00:00:00 2001 From: wyb Date: Mon, 30 Dec 2024 10:54:29 +0800 Subject: [PATCH 4/5] Refactor code Signed-off-by: wyb --- be/src/exec/csv_scanner.cpp | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/be/src/exec/csv_scanner.cpp b/be/src/exec/csv_scanner.cpp index a8861073f70315..b4ccb28d69bbde 100644 --- a/be/src/exec/csv_scanner.cpp +++ b/be/src/exec/csv_scanner.cpp @@ -370,9 +370,9 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { if (((_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv) || (_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv)) && !_scan_range.params.flexible_column_mapping) { - // broker load / stream load will filter rows when file column count is consistent with schema. + // broker load / stream load will filter rows when file column count is consistent with column list. // - // insert from files() will filter rows when file column count is less than schema. + // insert from files() will filter rows when file column count is less than files() schema. // file column count more than schema is normal, extra columns will be ignored. if (status.is_end_of_file()) { break; @@ -391,7 +391,7 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { if (status.is_end_of_file()) { break; } - // query files() will return error when file column count is less than schema. + // query files() will return error when file column count is less than files() schema. // file column count more than schema is normal, extra columns will be ignored. std::string error_msg = make_column_count_not_matched_error_message_for_query( _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), @@ -487,19 +487,13 @@ Status CSVScanner::_parse_csv(Chunk* chunk) { fields.clear(); _curr_reader->split_record(record, &fields); - if (_file_scan_type == TFileScanType::LOAD && fields.size() != _num_fields_in_csv && + if (((_file_scan_type == TFileScanType::LOAD && fields.size() != _num_fields_in_csv) || + (_file_scan_type == TFileScanType::FILES_INSERT && fields.size() < _num_fields_in_csv)) && !_scan_range.params.flexible_column_mapping) { - std::string error_msg = make_column_count_not_matched_error_message_for_load(_num_fields_in_csv, - fields.size(), _parse_options); - if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { - _report_error(record, error_msg); - } - if (_state->enable_log_rejected_record()) { - _report_rejected_record(record, error_msg); - } - continue; - } else if (_file_scan_type == TFileScanType::FILES_INSERT && fields.size() < _num_fields_in_csv && - !_scan_range.params.flexible_column_mapping) { + // broker load / stream load will filter rows when file column count is consistent with column list. + // + // insert from files() will filter rows when file column count is less than files() schema. + // file column count more than schema is normal, extra columns will be ignored. std::string error_msg = make_column_count_not_matched_error_message_for_load(_num_fields_in_csv, fields.size(), _parse_options); if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) { @@ -511,7 +505,8 @@ Status CSVScanner::_parse_csv(Chunk* chunk) { continue; } else if (_file_scan_type == TFileScanType::FILES_QUERY && fields.size() < _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) { - // files() query return error + // query files() will return error when file column count is less than files() schema. + // file column count more than schema is normal, extra columns will be ignored. std::string error_msg = make_column_count_not_matched_error_message_for_query( _num_fields_in_csv, fields.size(), _parse_options, record.to_string(), _curr_reader->filename()); return Status::DataQualityError(error_msg); From 588b66f16349a8f84379c5ec1b5fffdd800cdb85 Mon Sep 17 00:00:00 2001 From: wyb Date: Mon, 30 Dec 2024 11:12:34 +0800 Subject: [PATCH 5/5] Fix comment Signed-off-by: wyb --- be/src/exec/csv_scanner.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/exec/csv_scanner.cpp b/be/src/exec/csv_scanner.cpp index b4ccb28d69bbde..43edf2e767a8db 100644 --- a/be/src/exec/csv_scanner.cpp +++ b/be/src/exec/csv_scanner.cpp @@ -370,7 +370,7 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { if (((_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv) || (_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv)) && !_scan_range.params.flexible_column_mapping) { - // broker load / stream load will filter rows when file column count is consistent with column list. + // broker load / stream load will filter rows when file column count is inconsistent with column list. // // insert from files() will filter rows when file column count is less than files() schema. // file column count more than schema is normal, extra columns will be ignored. @@ -388,11 +388,11 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) { continue; } else if (_file_scan_type == TFileScanType::FILES_QUERY && row.columns.size() < _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) { + // query files() will return error when file column count is less than files() schema. + // file column count more than schema is normal, extra columns will be ignored. if (status.is_end_of_file()) { break; } - // query files() will return error when file column count is less than files() schema. - // file column count more than schema is normal, extra columns will be ignored. std::string error_msg = make_column_count_not_matched_error_message_for_query( _num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(), _curr_reader->filename()); @@ -490,7 +490,7 @@ Status CSVScanner::_parse_csv(Chunk* chunk) { if (((_file_scan_type == TFileScanType::LOAD && fields.size() != _num_fields_in_csv) || (_file_scan_type == TFileScanType::FILES_INSERT && fields.size() < _num_fields_in_csv)) && !_scan_range.params.flexible_column_mapping) { - // broker load / stream load will filter rows when file column count is consistent with column list. + // broker load / stream load will filter rows when file column count is inconsistent with column list. // // insert from files() will filter rows when file column count is less than files() schema. // file column count more than schema is normal, extra columns will be ignored.