Skip to content

Commit

Permalink
Merge branch 'main' into feature/substitute
Browse files Browse the repository at this point in the history
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
  • Loading branch information
mustafasrepo committed Feb 19, 2024
2 parents 5de7acf + b2a0451 commit 1731eac
Show file tree
Hide file tree
Showing 19 changed files with 249 additions and 24 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.72 as builder
FROM rust:1.72-bullseye as builder

COPY . /usr/src/arrow-datafusion
COPY ./datafusion /usr/src/arrow-datafusion/datafusion
Expand Down
25 changes: 25 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,31 @@ impl StatementOptions {
maybe_option.map(|(_, v)| v)
}

/// Finds partition_by option if exists and parses into a `Vec<String>`.
/// If option doesn't exist, returns empty `vec![]`.
/// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
pub fn take_partition_by(&mut self) -> Vec<String> {
let partition_by = self.take_str_option("partition_by");
match partition_by {
Some(part_cols) => {
let dequoted = part_cols
.chars()
.enumerate()
.filter(|(idx, c)| {
!((*idx == 0 || *idx == part_cols.len() - 1)
&& (*c == '\'' || *c == '"'))
})
.map(|(_idx, c)| c)
.collect::<String>();
dequoted
.split(',')
.map(|s| s.trim().replace("''", "'"))
.collect::<Vec<_>>()
}
None => vec![],
}
}

/// Infers the file_type given a target and arbitrary options.
/// If the options contain an explicit "format" option, that will be used.
/// Otherwise, attempt to infer file_type from the extension of target.
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
/// Sets which columns should be used for hive-style partitioned writes by name.
/// Can be set to empty vec![] for non-partitioned writes.
partition_by: Vec<String>,
}

impl DataFrameWriteOptions {
Expand All @@ -82,6 +85,7 @@ impl DataFrameWriteOptions {
overwrite: false,
single_file_output: false,
compression: CompressionTypeVariant::UNCOMPRESSED,
partition_by: vec![],
}
}
/// Set the overwrite option to true or false
Expand All @@ -101,6 +105,12 @@ impl DataFrameWriteOptions {
self.compression = compression;
self
}

/// Sets the partition_by columns for output partitioning
pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
self.partition_by = partition_by;
self
}
}

impl Default for DataFrameWriteOptions {
Expand Down Expand Up @@ -1176,6 +1186,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::CSV,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down Expand Up @@ -1219,6 +1230,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::JSON,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::PARQUET,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
use datafusion_common::{exec_datafusion_err, DataFusionError};

use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
) -> Result<Vec<Vec<&'a str>>> {
let mut all_partition_values = vec![];

for (col, dtype) in partition_by.iter() {
// For the purposes of writing partitioned data, we can rely on schema inference
// to determine the type of the partition cols in order to provide a more ergonomic
// UI which does not require specifying DataTypes manually. So, we ignore the
// DataType within the partition_by array and infer the correct type from the
// batch schema instead.
let schema = rb.schema();
for (col, _) in partition_by.iter() {
let mut partition_values = vec![];
let col_array =
rb.column_by_name(col)
.ok_or(DataFusionError::Execution(format!(
"PartitionBy Column {} does not exist in source data!",
col
)))?;

let dtype = schema.field_with_name(col)?.data_type();
let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
"PartitionBy Column {} does not exist in source data! Got schema {schema}.",
col
))?;

match dtype {
DataType::Utf8 => {
Expand All @@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>(
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
dtype)))?;
.ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
dtype))?;

for val in array.values() {
partition_values.push(
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
);
}
},
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner {
output_url,
file_format,
copy_options,
partition_by,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let parsed_url = ListingTableUrl::parse(output_url)?;
Expand All @@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
// from the schema of the RecordBatch being written. This allows COPY statements to specify only
// the column name rather than column name + explicit data type.
let table_partition_cols = partition_by.iter()
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

// Set file sink related options
let config = FileSinkConfig {
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
table_partition_cols,
overwrite: false,
file_type_writer_options
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayNdims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayDistinct => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayPosition => {
Signature::variadic_any(self.volatility())
Signature::array_and_element_and_optional_index(self.volatility())
}
BuiltinScalarFunction::ArrayPositions => {
Signature::array_and_element(self.volatility())
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
output_url: String,
file_format: FileType,
partition_by: Vec<String>,
copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
partition_by,
copy_options,
})))
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
/// Detmines which, if any, columns should be used for hive-style partitioned writes
pub partition_by: Vec<String>,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,13 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by,
copy_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),

partition_by: partition_by.clone(),
copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
Expand Down Expand Up @@ -1551,6 +1552,7 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by: _,
copy_options,
}) => {
let op_str = match copy_options {
Expand Down
17 changes: 16 additions & 1 deletion datafusion/expr/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub enum TypeSignature {
/// DataFusion attempts to coerce all argument types to match the first argument's type
///
/// # Examples
/// Given types in signature should be coericible to the same final type.
/// Given types in signature should be coercible to the same final type.
/// A function such as `make_array` is `VariadicEqual`.
///
/// `make_array(i32, i64) -> make_array(i64, i64)`
Expand Down Expand Up @@ -132,7 +132,10 @@ pub enum ArrayFunctionSignature {
/// The first argument should be non-list or list, and the second argument should be List/LargeList.
/// The first argument's list dimension should be one dimension less than the second argument's list dimension.
ElementAndArray,
/// Specialized Signature for Array functions of the form (List/LargeList, Index)
ArrayAndIndex,
/// Specialized Signature for Array functions of the form (List/LargeList, Element, Optional Index)
ArrayAndElementAndOptionalIndex,
}

impl std::fmt::Display for ArrayFunctionSignature {
Expand All @@ -141,6 +144,9 @@ impl std::fmt::Display for ArrayFunctionSignature {
ArrayFunctionSignature::ArrayAndElement => {
write!(f, "array, element")
}
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex => {
write!(f, "array, element, [index]")
}
ArrayFunctionSignature::ElementAndArray => {
write!(f, "element, array")
}
Expand Down Expand Up @@ -292,6 +298,15 @@ impl Signature {
volatility,
}
}
/// Specialized Signature for Array functions with an optional index
pub fn array_and_element_and_optional_index(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex,
),
volatility,
}
}
/// Specialized Signature for ArrayPrepend and similar functions
pub fn element_and_array(volatility: Volatility) -> Self {
Signature {
Expand Down
32 changes: 32 additions & 0 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,35 @@ fn get_valid_types(
_ => Ok(vec![vec![]]),
}
}
fn array_element_and_optional_index(
current_types: &[DataType],
) -> Result<Vec<Vec<DataType>>> {
// make sure there's 2 or 3 arguments
if !(current_types.len() == 2 || current_types.len() == 3) {
return Ok(vec![vec![]]);
}

let first_two_types = &current_types[0..2];
let mut valid_types = array_append_or_prepend_valid_types(first_two_types, true)?;

// Early return if there are only 2 arguments
if current_types.len() == 2 {
return Ok(valid_types);
}

let valid_types_with_index = valid_types
.iter()
.map(|t| {
let mut t = t.clone();
t.push(DataType::Int64);
t
})
.collect::<Vec<_>>();

valid_types.extend(valid_types_with_index);

Ok(valid_types)
}
fn array_and_index(current_types: &[DataType]) -> Result<Vec<Vec<DataType>>> {
if current_types.len() != 2 {
return Ok(vec![vec![]]);
Expand Down Expand Up @@ -184,6 +213,9 @@ fn get_valid_types(
ArrayFunctionSignature::ArrayAndElement => {
return array_append_or_prepend_valid_types(current_types, true)
}
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex => {
return array_element_and_optional_index(current_types)
}
ArrayFunctionSignature::ArrayAndIndex => {
return array_and_index(current_types)
}
Expand Down
11 changes: 7 additions & 4 deletions datafusion/physical-plan/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
///
/// ## Aggregations
/// - A `GROUP BY b` clause can use `Linear` mode.
/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// - A `GROUP BY b` clause can use `Linear` mode, as the only one permutation `[b]`
/// cannot satisfy the existing ordering.
/// - A `GROUP BY a, c` or a `GROUP BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively, as
/// the permutation `[a]` satisfies the existing ordering.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode.
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode, as the
/// full permutation `[a, b]` satisfies the existing ordering.
///
/// Note these are the same examples as above, but with `GROUP BY` instead of
/// `PARTITION BY` to make the examples easier to read.
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,11 @@ pub(crate) fn calc_requirements<
(!sort_reqs.is_empty()).then_some(sort_reqs)
}

/// This function calculates the indices such that when partition by expressions reordered with this indices
/// This function calculates the indices such that when partition by expressions reordered with the indices
/// resulting expressions define a preset for existing ordering.
// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used
// This vector will be [1, 0]. It means that when we iterate b,a columns with the order [1, 0]
// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
/// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used,
/// this vector will be [1, 0]. It means that when we iterate b, a columns with the order [1, 0]
/// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
pub(crate) fn get_ordered_partition_by_indices(
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
input: &Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(&copy.file_type)?,
partition_by: vec![],
copy_options,
},
))
Expand Down Expand Up @@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
Expand Down
Loading

0 comments on commit 1731eac

Please sign in to comment.