From 384b27778706b8e8b0e45344646d4eed397ec714 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 2 Mar 2024 15:27:18 +0200 Subject: [PATCH] fix partitioned table reading for json --- .../physical_plan/file_scan_config.rs | 76 +++++++++++++++++++ .../core/src/datasource/physical_plan/json.rs | 3 +- .../partitioned_table_json/part=1/data.json | 2 + .../partitioned_table_json/part=2/data.json | 2 + .../test_files/insert_to_external.slt | 10 ++- datafusion/sqllogictest/test_files/json.slt | 71 +++++++++++++++++ 6 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 datafusion/core/tests/data/partitioned_table_json/part=1/data.json create mode 100644 datafusion/core/tests/data/partitioned_table_json/part=2/data.json diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 4a814c5b9b2c..2f235a93fbc4 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -157,6 +157,22 @@ impl FileScanConfig { }) } + /// Projects only file schema, ignoring partition columns + pub(crate) fn projected_file_schema(&self) -> SchemaRef { + let fields = self.projection.as_ref().map(|proj| { + proj.iter() + .filter(|col_idx| **col_idx < self.file_schema.fields().len()) + .map(|col_idx| self.file_schema.field(*col_idx)) + .cloned() + .collect::>() + }); + + fields.map_or_else( + || Arc::clone(&self.file_schema), + |f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())), + ) + } + pub(crate) fn file_column_projection_indices(&self) -> Option> { self.projection.as_ref().map(|p| { p.iter() @@ -686,6 +702,66 @@ mod tests { crate::assert_batches_eq!(expected, &[projected_batch]); } + #[test] + fn test_projected_file_schema_with_partition_col() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config with projection including partition column + let projection = config_for_projection( + schema.clone(), + Some(vec![0, 3, 5, schema.fields().len()]), + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert partition column filtered out in projected file schema + let expected_columns = vec!["c1", "c4", "c6"]; + let actual_columns = projection + .fields() + .iter() + .map(|f| f.name().clone()) + .collect::>(); + assert_eq!(expected_columns, actual_columns); + } + + #[test] + fn test_projected_file_schema_without_projection() { + let schema = aggr_test_schema(); + let partition_cols = vec![ + ( + "part1".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ( + "part2".to_owned(), + wrap_partition_type_in_dict(DataType::Utf8), + ), + ]; + + // Projected file schema for config without projection + let projection = config_for_projection( + schema.clone(), + None, + Statistics::new_unknown(&schema), + to_partition_cols(partition_cols.clone()), + ) + .projected_file_schema(); + + // Assert projected file schema is equal to file schema + assert_eq!(projection.fields(), schema.fields()); + } + // sets default for configs that play no role in projections fn config_for_projection( file_schema: SchemaRef, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 62b96ea3aefb..ca466b5c6a92 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec { context: Arc, ) -> Result { let batch_size = context.session_config().batch_size(); - let (projected_schema, ..) = self.base_config.project(); let object_store = context .runtime_env() .object_store(&self.base_config.object_store_url)?; let opener = JsonOpener { batch_size, - projected_schema, + projected_schema: self.base_config.projected_file_schema(), file_compression_type: self.file_compression_type.to_owned(), object_store, }; diff --git a/datafusion/core/tests/data/partitioned_table_json/part=1/data.json b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json new file mode 100644 index 000000000000..466c5b3dc4ab --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=1/data.json @@ -0,0 +1,2 @@ +{"id": 1, "value": "foo"} +{"id": 2, "value": "bar"} diff --git a/datafusion/core/tests/data/partitioned_table_json/part=2/data.json b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json new file mode 100644 index 000000000000..857d70e1f397 --- /dev/null +++ b/datafusion/core/tests/data/partitioned_table_json/part=2/data.json @@ -0,0 +1,2 @@ +{"id": 3, "value": "baz"} +{"id": 4, "value": "qux"} diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index c0f86ac76320..2e10862bac66 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -195,9 +195,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), ---- 6 -# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816 -query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \} +query TT select * from partitioned_insert_test_json order by a,b +---- +1 2 +1 2 +3 4 +3 4 +5 6 +5 6 statement ok CREATE EXTERNAL TABLE diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index c0d5e895f0f2..01b330f24c53 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -68,3 +68,74 @@ DROP TABLE json_test statement ok DROP TABLE single_nan + +# JSON partitioned table +statement ok +CREATE EXTERNAL TABLE json_partitioned_test ( + part Int, + id Int, + value String, +) +STORED AS JSON +LOCATION '../core/tests/data/partitioned_table_json' +PARTITIONED BY (part); + +# select wildcard always returns partition columns as the last ones +query ITI +SELECT * FROM json_partitioned_test ORDER BY id +---- +1 foo 1 +2 bar 1 +3 baz 2 +4 qux 2 + + +# select all fields +query IIT +SELECT part, id, value FROM json_partitioned_test ORDER BY id +---- +1 1 foo +1 2 bar +2 3 baz +2 4 qux + +# select without partition column +query I +SELECT id FROM json_partitioned_test ORDER BY id +---- +1 +2 +3 +4 + +# select only partition column +query I +SELECT part FROM json_partitioned_test ORDER BY part +---- +1 +1 +2 +2 + +# select without any table-relates columns in projection +query T +SELECT 'x' FROM json_partitioned_test +---- +x +x +x +x + +# select with partition filter +query I +SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 +---- +logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] +physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id]