Skip to content

Commit

Permalink
fix partitioned table reading for json
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 2, 2024
1 parent 2a490e4 commit 384b277
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 4 deletions.
76 changes: 76 additions & 0 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@ impl FileScanConfig {
})
}

/// Projects only file schema, ignoring partition columns
pub(crate) fn projected_file_schema(&self) -> SchemaRef {
let fields = self.projection.as_ref().map(|proj| {
proj.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
.map(|col_idx| self.file_schema.field(*col_idx))
.cloned()
.collect::<Vec<_>>()
});

fields.map_or_else(
|| Arc::clone(&self.file_schema),
|f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())),
)
}

pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down Expand Up @@ -686,6 +702,66 @@ mod tests {
crate::assert_batches_eq!(expected, &[projected_batch]);
}

#[test]
fn test_projected_file_schema_with_partition_col() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];

// Projected file schema for config with projection including partition column
let projection = config_for_projection(
schema.clone(),
Some(vec![0, 3, 5, schema.fields().len()]),
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
)
.projected_file_schema();

// Assert partition column filtered out in projected file schema
let expected_columns = vec!["c1", "c4", "c6"];
let actual_columns = projection
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>();
assert_eq!(expected_columns, actual_columns);
}

#[test]
fn test_projected_file_schema_without_projection() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];

// Projected file schema for config without projection
let projection = config_for_projection(
schema.clone(),
None,
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols.clone()),
)
.projected_file_schema();

// Assert projected file schema is equal to file schema
assert_eq!(projection.fields(), schema.fields());
}

// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size();
let (projected_schema, ..) = self.base_config.project();

let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
batch_size,
projected_schema,
projected_schema: self.base_config.projected_file_schema(),
file_compression_type: self.file_compression_type.to_owned(),
object_store,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 1, "value": "foo"}
{"id": 2, "value": "bar"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id": 3, "value": "baz"}
{"id": 4, "value": "qux"}
10 changes: 8 additions & 2 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2),
----
6

# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816
query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}
query TT
select * from partitioned_insert_test_json order by a,b
----
1 2
1 2
3 4
3 4
5 6
5 6

statement ok
CREATE EXTERNAL TABLE
Expand Down
71 changes: 71 additions & 0 deletions datafusion/sqllogictest/test_files/json.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,74 @@ DROP TABLE json_test

statement ok
DROP TABLE single_nan

# JSON partitioned table
statement ok
CREATE EXTERNAL TABLE json_partitioned_test (
part Int,
id Int,
value String,
)
STORED AS JSON
LOCATION '../core/tests/data/partitioned_table_json'
PARTITIONED BY (part);

# select wildcard always returns partition columns as the last ones
query ITI
SELECT * FROM json_partitioned_test ORDER BY id
----
1 foo 1
2 bar 1
3 baz 2
4 qux 2


# select all fields
query IIT
SELECT part, id, value FROM json_partitioned_test ORDER BY id
----
1 1 foo
1 2 bar
2 3 baz
2 4 qux

# select without partition column
query I
SELECT id FROM json_partitioned_test ORDER BY id
----
1
2
3
4

# select only partition column
query I
SELECT part FROM json_partitioned_test ORDER BY part
----
1
1
2
2

# select without any table-relates columns in projection
query T
SELECT 'x' FROM json_partitioned_test
----
x
x
x
x

# select with partition filter
query I
SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id
----
1
2

# select with partition filter should scan only one directory
query TT
EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2
----
logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)]
physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id]

0 comments on commit 384b277

Please sign in to comment.