Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support column mismatch fill property in files() #54362

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 52 additions & 12 deletions be/src/exec/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ static std::string string_2_asc(const std::string& input) {
return oss.str();
}

static std::string make_column_count_not_matched_error_message(int expected_count, int actual_count,
CSVParseOptions& parse_options) {
static std::string make_column_count_not_matched_error_message_for_load(int expected_count, int actual_count,
CSVParseOptions& parse_options) {
std::stringstream error_msg;
error_msg << "Target column count: " << expected_count
<< " doesn't match source value column count: " << actual_count << ". "
Expand All @@ -55,6 +55,20 @@ static std::string make_column_count_not_matched_error_message(int expected_coun
return error_msg.str();
}

static std::string make_column_count_not_matched_error_message_for_query(int expected_count, int actual_count,
CSVParseOptions& parse_options,
const std::string& row,
const std::string& filename) {
std::stringstream error_msg;
error_msg << "Schema column count: " << expected_count
<< " doesn't match source value column count: " << actual_count << ". "
<< "Column separator: " << string_2_asc(parse_options.column_delimiter) << ", "
<< "Row delimiter: " << string_2_asc(parse_options.row_delimiter) << ", "
<< "Row: '" << row << "', File: " << filename << ". "
<< "Consider setting 'fill_mismatch_column_with' = 'null'";
return error_msg.str();
}

static std::string make_value_type_not_matched_error_message(int field_pos, const Slice& field,
const SlotDescriptor* slot) {
std::stringstream error_msg;
Expand Down Expand Up @@ -353,21 +367,36 @@ Status CSVScanner::_parse_csv_v2(Chunk* chunk) {

const char* data = _curr_reader->buffBasePtr() + row.parsed_start;
CSVReader::Record record(data, row.parsed_end - row.parsed_start);
if (row.columns.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) {
if (((_file_scan_type == TFileScanType::LOAD && row.columns.size() != _num_fields_in_csv) ||
(_file_scan_type == TFileScanType::FILES_INSERT && row.columns.size() < _num_fields_in_csv)) &&
!_scan_range.params.flexible_column_mapping) {
// broker load / stream load will filter rows when file column count is inconsistent with column list.
//
// insert from files() will filter rows when file column count is less than files() schema.
// file column count more than schema is normal, extra columns will be ignored.
if (status.is_end_of_file()) {
break;
}
std::string error_msg = make_column_count_not_matched_error_message_for_load(
_num_fields_in_csv, row.columns.size(), _parse_options);
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv,
row.columns.size(), _parse_options);
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
std::string error_msg = make_column_count_not_matched_error_message(_num_fields_in_csv,
row.columns.size(), _parse_options);
_report_rejected_record(record, error_msg);
}
continue;
} else if (_file_scan_type == TFileScanType::FILES_QUERY && row.columns.size() < _num_fields_in_csv &&
!_scan_range.params.flexible_column_mapping) {
// query files() will return error when file column count is less than files() schema.
// file column count more than schema is normal, extra columns will be ignored.
if (status.is_end_of_file()) {
break;
}
std::string error_msg = make_column_count_not_matched_error_message_for_query(
_num_fields_in_csv, row.columns.size(), _parse_options, record.to_string(),
_curr_reader->filename());
return Status::DataQualityError(error_msg);
}
if (!validate_utf8(record.data, record.size)) {
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
Expand Down Expand Up @@ -458,18 +487,29 @@ Status CSVScanner::_parse_csv(Chunk* chunk) {
fields.clear();
_curr_reader->split_record(record, &fields);

if (fields.size() != _num_fields_in_csv && !_scan_range.params.flexible_column_mapping) {
if (((_file_scan_type == TFileScanType::LOAD && fields.size() != _num_fields_in_csv) ||
(_file_scan_type == TFileScanType::FILES_INSERT && fields.size() < _num_fields_in_csv)) &&
!_scan_range.params.flexible_column_mapping) {
// broker load / stream load will filter rows when file column count is inconsistent with column list.
//
// insert from files() will filter rows when file column count is less than files() schema.
// file column count more than schema is normal, extra columns will be ignored.
std::string error_msg = make_column_count_not_matched_error_message_for_load(_num_fields_in_csv,
fields.size(), _parse_options);
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
std::string error_msg =
make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options);
_report_error(record, error_msg);
}
if (_state->enable_log_rejected_record()) {
std::string error_msg =
make_column_count_not_matched_error_message(_num_fields_in_csv, fields.size(), _parse_options);
_report_rejected_record(record, error_msg);
}
continue;
} else if (_file_scan_type == TFileScanType::FILES_QUERY && fields.size() < _num_fields_in_csv &&
!_scan_range.params.flexible_column_mapping) {
// query files() will return error when file column count is less than files() schema.
// file column count more than schema is normal, extra columns will be ignored.
std::string error_msg = make_column_count_not_matched_error_message_for_query(
_num_fields_in_csv, fields.size(), _parse_options, record.to_string(), _curr_reader->filename());
return Status::DataQualityError(error_msg);
}
if (!validate_utf8(record.data, record.size)) {
if (_counter->num_rows_filtered++ < REPORT_ERROR_MAX_NUMBER) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ FileScanner::FileScanner(starrocks::RuntimeState* state, starrocks::RuntimeProfi
_row_desc(nullptr),
_strict_mode(false),
_error_counter(0),
_file_scan_type(TFileScanType::LOAD),
_schema_only(schema_only) {}

FileScanner::~FileScanner() = default;
Expand Down Expand Up @@ -135,6 +136,10 @@ Status FileScanner::open() {
_strict_mode = _params.strict_mode;
}

if (_params.__isset.file_scan_type) {
_file_scan_type = _params.file_scan_type;
}

if (_strict_mode && !_params.__isset.dest_sid_to_src_sid_without_trans) {
return Status::InternalError("Slot map of dest to src must be set in strict mode");
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class FileScanner {

bool _strict_mode;
int64_t _error_counter;
// When column mismatch, files query/load and other type load have different behaviors.
// Query returns error, while load counts the filtered rows, and return error or not is based on max filter ratio,
// files load will not filter rows if file column count is larger that the schema,
// so need to check files query/load or other type load in scanner.
// Currently only used in csv scanner.
TFileScanType::type _file_scan_type;

// sources
std::vector<SlotDescriptor*> _src_slot_descriptors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class TableFunctionTable extends Table {
public static final String PROPERTY_AUTO_DETECT_SAMPLE_FILES = "auto_detect_sample_files";
public static final String PROPERTY_AUTO_DETECT_SAMPLE_ROWS = "auto_detect_sample_rows";

private static final String PROPERTY_FILL_MISMATCH_COLUMN_WITH = "fill_mismatch_column_with";

public static final String PROPERTY_CSV_COLUMN_SEPARATOR = "csv.column_separator";
public static final String PROPERTY_CSV_ROW_DELIMITER = "csv.row_delimiter";
public static final String PROPERTY_CSV_SKIP_HEADER = "csv.skip_header";
Expand All @@ -130,19 +132,48 @@ public class TableFunctionTable extends Table {
private static final String PROPERTY_LIST_FILES_ONLY = "list_files_only";
private static final String PROPERTY_LIST_RECURSIVELY = "list_recursively";

public enum MisMatchFillValue {
NONE, // error
NULL;

public static MisMatchFillValue fromString(String value) {
for (MisMatchFillValue fillValue : values()) {
if (fillValue.name().equalsIgnoreCase(value)) {
return fillValue;
}
}
return null;
}

public static List<String> getCandidates() {
return Arrays.stream(values()).map(p -> p.name().toLowerCase()).collect(Collectors.toList());
}
}

public enum FilesTableType {
LOAD,
UNLOAD,
QUERY,
LIST
}

private String path;
private String format;

// for load data
private int autoDetectSampleFiles;
private int autoDetectSampleRows;
private FilesTableType filesTableType = FilesTableType.QUERY;

// for load/query data
private int autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES;
private int autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;

private List<String> columnsFromPath = new ArrayList<>();
private boolean strictMode = false;
private final Map<String, String> properties;

private List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();

private MisMatchFillValue misMatchFillValue = MisMatchFillValue.NONE;

// for unload data
private String compressionType;
private Optional<List<Integer>> partitionColumnIDs = Optional.empty();
Expand All @@ -164,12 +195,12 @@ public class TableFunctionTable extends Table {
private boolean listFilesOnly = false;
private boolean listRecursively = false;

// Ctor for load data / list files via table function
// Ctor for load data / query data / list files via table function
public TableFunctionTable(Map<String, String> properties) throws DdlException {
this(properties, null);
}

// Ctor for load data / list files via table function
// Ctor for load data / query data / list files via table function
public TableFunctionTable(Map<String, String> properties, Consumer<TableFunctionTable> pushDownSchemaFunc)
throws DdlException {
super(TableType.TABLE_FUNCTION);
Expand All @@ -180,9 +211,11 @@ public TableFunctionTable(Map<String, String> properties, Consumer<TableFunction
parseProperties();

if (listFilesOnly) {
this.filesTableType = FilesTableType.LIST;
setSchemaForListFiles();
} else {
setSchemaForLoad();
// set filesTableType as LOAD in insert analyzer, and default is QUERY
setSchemaForLoadAndQuery();
}

if (pushDownSchemaFunc != null) {
Expand All @@ -196,12 +229,13 @@ public TableFunctionTable(List<Column> columns, Map<String, String> properties,
checkNotNull(properties, "properties is null");
checkNotNull(sessionVariable, "sessionVariable is null");
this.properties = properties;
this.filesTableType = FilesTableType.UNLOAD;
parsePropertiesForUnload(columns, sessionVariable);
setNewFullSchema(columns);
}

private void setSchemaForLoad() throws DdlException {
parseFilesForLoad();
private void setSchemaForLoadAndQuery() throws DdlException {
parseFilesForLoadAndQuery();

// infer schema from files
List<Column> columns = new ArrayList<>();
Expand Down Expand Up @@ -338,6 +372,14 @@ public String getPath() {
return path;
}

public void setFilesTableType(FilesTableType filesTableType) {
this.filesTableType = filesTableType;
}

public boolean isLoadType() {
return filesTableType == FilesTableType.LOAD;
}

public boolean isListFilesOnly() {
return listFilesOnly;
}
Expand Down Expand Up @@ -393,23 +435,34 @@ private void parsePropertiesForLoad(Map<String, String> properties) throws DdlEx
strictMode = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_MODE));
}

if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) {
autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES;
} else {
if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_FILES)) {
String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES);
try {
autoDetectSampleFiles = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_FILES));
autoDetectSampleFiles = Integer.parseInt(property);
} catch (NumberFormatException e) {
throw new DdlException("failed to parse auto_detect_sample_files: ", e);
ErrorReport.reportDdlException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_FILES, property, "int number");
}
}

if (!properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) {
autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;
} else {
if (properties.containsKey(PROPERTY_FILL_MISMATCH_COLUMN_WITH)) {
String property = properties.get(PROPERTY_FILL_MISMATCH_COLUMN_WITH);
misMatchFillValue = MisMatchFillValue.fromString(property);
if (misMatchFillValue == null) {
String msg = String.format("%s (case insensitive)", String.join(", ", MisMatchFillValue.getCandidates()));
ErrorReport.reportSemanticException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_FILL_MISMATCH_COLUMN_WITH, property, msg);
}
}

// csv properties
if (properties.containsKey(PROPERTY_AUTO_DETECT_SAMPLE_ROWS)) {
String property = properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS);
try {
autoDetectSampleRows = Integer.parseInt(properties.get(PROPERTY_AUTO_DETECT_SAMPLE_ROWS));
autoDetectSampleRows = Integer.parseInt(property);
} catch (NumberFormatException e) {
throw new DdlException("failed to parse auto_detect_sample_files: ", e);
ErrorReport.reportDdlException(
ErrorCode.ERR_INVALID_VALUE, PROPERTY_AUTO_DETECT_SAMPLE_ROWS, property, "int number");
}
}

Expand Down Expand Up @@ -467,7 +520,7 @@ private void parsePropertiesForLoad(Map<String, String> properties) throws DdlEx
}
}

private void parseFilesForLoad() throws DdlException {
private void parseFilesForLoadAndQuery() throws DdlException {
try {
// fake:// is a faked path, for testing purpose
if (path.startsWith("fake://")) {
Expand Down Expand Up @@ -635,6 +688,10 @@ public boolean isStrictMode() {
return strictMode;
}

public boolean isFlexibleColumnMapping() {
return misMatchFillValue != MisMatchFillValue.NONE;
}

@Override
public String toString() {
return String.format("TABLE('path'='%s', 'format'='%s')", path, format);
Expand Down
Loading
Loading