Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Add Monotonic Definition #59

Closed
wants to merge 68 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
a2919b6
add monotonic function definitions for aggregate expressions
mertak-synnada Jan 16, 2025
14109e6
fix benchmark results
mertak-synnada Jan 16, 2025
b3d75ba
set prefer_existing_sort to true in sqllogictests
mertak-synnada Jan 17, 2025
549502e
set prefer_existing_sort to true in sqllogictests
mertak-synnada Jan 17, 2025
623e0c5
fix typo
mertak-synnada Jan 17, 2025
6a9d24e
Merge branch 'refs/heads/apache_main' into feature/monotonic-sets
mertak-synnada Jan 20, 2025
53ee3de
re-add test_utils.rs changes to the new file
mertak-synnada Jan 20, 2025
97d8951
clone input with Arc
mertak-synnada Jan 20, 2025
cc33031
Merge branch 'refs/heads/apache_main' into feature/monotonic-sets
mertak-synnada Jan 22, 2025
41d9430
Merge branch 'refs/heads/apache_main' into feature/monotonic-sets
mertak-synnada Jan 23, 2025
e988dcf
inject aggr expr indices
mertak-synnada Jan 23, 2025
906245e
remove redundant file
mertak-synnada Jan 23, 2025
475fe2d
add Sum monotonicity
mertak-synnada Jan 24, 2025
57e000e
fix sql logic tests
mertak-synnada Jan 24, 2025
ca57f46
fix sql logic tests
mertak-synnada Jan 24, 2025
6cf9644
Merge branch 'refs/heads/apache_main' into feature/monotonic-sets
mertak-synnada Jan 24, 2025
072e6ef
update docs
mertak-synnada Jan 24, 2025
7d62cb0
Merge branch 'apache_main' into feature/monotonic-sets
berkaysynnada Jan 28, 2025
491aabe
review part 1
berkaysynnada Jan 28, 2025
71996fb
make AnalysisContext aware of empty sets to represent certainly false…
buraksenn Jan 28, 2025
1920771
Add relation to alias expr in schema display (#14311)
phisn Jan 28, 2025
8d542ec
Improve deprecation message for MemoryExec (#14322)
alamb Jan 28, 2025
dc445a1
fix: LogicalPlan::get_parameter_types fails to return all placeholder…
dhegberg Jan 28, 2025
ecc5694
fix: FULL OUTER JOIN and LIMIT produces wrong results (#14338)
zhuqi-lucas Jan 28, 2025
7f0c71b
Customize window frame support for dialect (#14288)
Sevenannn Jan 28, 2025
d051731
refactor: switch BooleanBufferBuilder to NullBufferBuilder in a unit …
Chen-Yuan-Lai Jan 28, 2025
f8063e8
Add `ColumnStatistics::Sum` (#14074)
gatesn Jan 28, 2025
3a991e6
refactor: switch BooleanBufferBuilder to NullBufferBuilder in unit te…
Chen-Yuan-Lai Jan 28, 2025
a4917d4
Increase MSRV to 1.81.0 (#14330)
alamb Jan 28, 2025
66b4da2
Fix build due to logical error (#14345)
alamb Jan 29, 2025
972c56f
fix the tests
berkaysynnada Jan 29, 2025
4b946b3
revert slt's
berkaysynnada Jan 29, 2025
481b5b4
simplify terms
berkaysynnada Jan 29, 2025
29af731
Update mod.rs
berkaysynnada Jan 29, 2025
1f02953
remove unnecessary computations
berkaysynnada Jan 29, 2025
79dd942
remove index calc
berkaysynnada Jan 29, 2025
247d5fe
Update mod.rs
berkaysynnada Jan 29, 2025
16bdac4
Apply suggestions from code review
ozankabak Jan 29, 2025
1875336
add slt
berkaysynnada Jan 29, 2025
8464f0a
chore(deps): bump serde_json from 1.0.137 to 1.0.138 in /datafusion-c…
dependabot[bot] Jan 29, 2025
50ac43d
chore(deps): bump tempfile from 3.15.0 to 3.16.0 in /datafusion-cli (…
dependabot[bot] Jan 29, 2025
2a8b885
Update version in `datafusion-cli/Dockerfile` to 1.81 (#14344)
alamb Jan 29, 2025
62000b4
perf(array-agg): add fast path for array agg for `merge_batch` (#14299)
rluvaton Jan 29, 2025
1da5252
moving memory.rs out of datafusion/core (#14332)
logan-keede Jan 29, 2025
d18a1d3
refactor: switch `BooleanBufferBuilder` to `NullBufferBuilder` in bin…
Chen-Yuan-Lai Jan 29, 2025
5897438
Restore ability to run single SLT file (#14355)
findepi Jan 29, 2025
3f48520
chore(deps): bump home from 0.5.9 to 0.5.11 in /datafusion-cli (#14257)
dependabot[bot] Jan 29, 2025
a93b4de
chore(deps): bump aws-sdk-ssooidc in /datafusion-cli (#14314)
dependabot[bot] Jan 29, 2025
2510e34
fix: LimitPushdown rule uncorrect remove some GlobalLimitExec (#14245)
zhuqi-lucas Jan 29, 2025
60c0fb8
refactor: switch `BooleanBufferBuilder` to `NullBufferBuilder` in sin…
Chen-Yuan-Lai Jan 29, 2025
707f673
Issue-14216 - Support arrays_overlap function (#14217)
erenavsarogullari Jan 29, 2025
99f907b
chore(deps): bump rustyline from 14.0.0 to 15.0.0 in /datafusion-cli …
dependabot[bot] Jan 29, 2025
ba7b94f
remove aggregate changes, tests already give expected results
berkaysynnada Jan 30, 2025
2152b7f
fix clippy
berkaysynnada Jan 30, 2025
0d367dd
chore(deps): bump aws-sdk-sts from 1.51.0 to 1.57.0 in /datafusion-cl…
dependabot[bot] Jan 30, 2025
51a23cf
chore(deps): bump aws-sdk-sso from 1.50.0 to 1.56.0 in /datafusion-cl…
dependabot[bot] Jan 30, 2025
b79efc0
chore(deps): bump korandoru/hawkeye from 5 to 6 (#14354)
dependabot[bot] Jan 30, 2025
7822613
remove one row sorts
berkaysynnada Jan 30, 2025
5e9b2db
Improve comments
ozankabak Jan 30, 2025
7efa2f3
chore(deps): bump aws-sdk-ssooidc in /datafusion-cli (#14369)
dependabot[bot] Jan 30, 2025
0edc3d9
Minor: include the number of files run in sqllogictest display (#14359)
alamb Jan 30, 2025
54d62d6
Use a short name for set monotonicity
ozankabak Jan 30, 2025
29e9a1c
removed (#14370)
buraksenn Jan 30, 2025
c077ef5
move information_schema to datafusion-catalog (#14364)
logan-keede Jan 30, 2025
07ee09a
Unpin aws sdk dependencies (#14361)
alamb Jan 30, 2025
11435de
Core: Fix incorrect searched CASE optimization (#14349)
findepi Jan 30, 2025
53728b3
Improve speed of `median` by implementing special `GroupsAccumulator…
Rachelint Jan 31, 2025
1146811
Merge branch 'main' into feature/monotonic-sets
ozankabak Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 272 additions & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,9 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};
use datafusion_physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
aggregate_exec, aggregate_exec_monotonic, aggregate_exec_non_monotonic,
bounded_window_exec, bounded_window_exec_non_monotonic,
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec,
limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec,
sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
Expand Down Expand Up @@ -707,6 +709,17 @@ mod tests {
Ok(schema)
}

// Generate a schema which consists of 5 columns (a, b, c, d, e) of Uint64
fn create_test_schema4() -> Result<SchemaRef> {
let a = Field::new("a", DataType::UInt64, true);
let b = Field::new("b", DataType::UInt64, false);
let c = Field::new("c", DataType::UInt64, true);
let d = Field::new("d", DataType::UInt64, false);
let e = Field::new("e", DataType::UInt64, false);
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}

/// Runs the sort enforcement optimizer and asserts the plan
/// against the original and expected plans
///
Expand Down Expand Up @@ -1010,6 +1023,63 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_aggregate_monotonic() -> Result<()> {
let schema = create_test_schema4()?;
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr("a", &schema)];
let sort = sort_exec(sort_exprs.clone(), source);

let aggregate = aggregate_exec_monotonic(sort);
let sort_exprs = LexOrdering::new(vec![sort_expr("count", &aggregate.schema())]);
let physical_plan: Arc<dyn ExecutionPlan> =
Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;

let expected_input = [
"SortExec: expr=[count@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=Final, gby=[], aggr=[count]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];

let expected_optimized = [
"AggregateExec: mode=Final, gby=[], aggr=[count]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_aggregate_non_monotonic() -> Result<()> {
let schema = create_test_schema4()?;
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr("a", &schema)];
let sort = sort_exec(sort_exprs.clone(), source);

let aggregate = aggregate_exec_non_monotonic(sort);
let sort_exprs = LexOrdering::new(vec![sort_expr("avg", &aggregate.schema())]);
let physical_plan: Arc<dyn ExecutionPlan> =
Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;

let expected_input = [
"SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=Final, gby=[], aggr=[avg]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess avg is monotonic for integer but not for float?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's not monotonic for unsigned integers, for example:

1st data 100 => Avg 100
2nd data 2 => (102/2) Avg 51 (decreased)
3rd data 3 => (105/3) Avg 35 (decreased)
4th data 1003 => (1108/4) Avg 277 (increased)

To be monotonic, it needs to be always increasing/decreasing

" MemoryExec: partitions=1, partition_sizes=[0]",
];

let expected_optimized = [
"SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=Final, gby=[], aggr=[avg]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_sort4() -> Result<()> {
let schema = create_test_schema()?;
Expand Down Expand Up @@ -1758,6 +1828,207 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_bounded_window_monotonic_sort() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_options(
"nullable_col",
&schema,
SortOptions {
descending: true,
nulls_first: false,
},
)];
let source = parquet_exec_sorted(&schema, sort_exprs.clone());
let sort = sort_exec(sort_exprs.clone(), source);

let bounded_window =
bounded_window_exec("nullable_col", sort_exprs.clone(), sort);
let output_schema = bounded_window.schema();
let sort_exprs2 = vec![sort_expr_options(
"count",
&output_schema,
SortOptions {
descending: false,
nulls_first: false,
},
)];
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);

let expected_input = [
"SortExec: expr=[count@2 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
let expected_optimized = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_bounded_plain_window_monotonic_sort_with_partitions() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_options(
"nullable_col",
&schema,
SortOptions {
descending: true,
nulls_first: false,
},
)];
let source = parquet_exec_sorted(&schema, sort_exprs.clone());
let sort = sort_exec(sort_exprs.clone(), source);
let partition_bys = &[col("nullable_col", &schema)?];

let bounded_window = bounded_window_exec_with_partition(
"nullable_col",
sort_exprs.clone(),
partition_bys,
sort,
false,
);
let output_schema = bounded_window.schema();
let sort_exprs2 = vec![
sort_expr_options(
"nullable_col",
&output_schema,
SortOptions {
descending: true,
nulls_first: false,
},
),
sort_expr_options(
"count",
&output_schema,
SortOptions {
descending: false,
nulls_first: false,
},
),
];
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);

let expected_input = [
"SortExec: expr=[nullable_col@0 DESC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
let expected_optimized = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_bounded_sliding_window_monotonic_sort_with_partitions() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_options(
"nullable_col",
&schema,
SortOptions {
descending: true,
nulls_first: false,
},
)];
let source = parquet_exec_sorted(&schema, sort_exprs.clone());
let sort = sort_exec(sort_exprs.clone(), source);
let partition_bys = &[col("nullable_col", &schema)?];

let bounded_window = bounded_window_exec_with_partition(
"nullable_col",
sort_exprs.clone(),
partition_bys,
sort,
true,
);
let output_schema = bounded_window.schema();
let sort_exprs2 = vec![
sort_expr_options(
"nullable_col",
&output_schema,
SortOptions {
descending: true,
nulls_first: false,
},
),
sort_expr_options(
"count",
&output_schema,
SortOptions {
descending: false,
nulls_first: false,
},
),
];
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);

let expected_input = [
"SortExec: expr=[nullable_col@0 DESC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
let expected_optimized = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }], mode=[Sorted]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 DESC NULLS LAST]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_bounded_window_non_monotonic_sort() -> Result<()> {
let schema = create_test_schema4()?;
let sort_exprs = vec![sort_expr_options(
"a",
&schema,
SortOptions {
descending: true,
nulls_first: false,
},
)];
let source = parquet_exec_sorted(&schema, sort_exprs.clone());
let sort = sort_exec(sort_exprs.clone(), source);

let bounded_window =
bounded_window_exec_non_monotonic("a", sort_exprs.clone(), sort);
let output_schema = bounded_window.schema();
let sort_exprs2 = vec![sort_expr_options(
"avg",
&output_schema,
SortOptions {
descending: false,
nulls_first: false,
},
)];
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);

let expected_input = [
"SortExec: expr=[avg@5 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST]",
];
let expected_optimized = [
"SortExec: expr=[avg@5 ASC NULLS LAST], preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_window_multi_path_sort2() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
10 changes: 10 additions & 0 deletions datafusion/expr/src/test/function_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ impl AggregateUDFImpl for Count {
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}

fn is_monotonic(&self) -> Option<bool> {
Some(true)
}
}

create_func!(Min, min_udaf);
Expand Down Expand Up @@ -363,6 +367,9 @@ impl AggregateUDFImpl for Min {
fn is_descending(&self) -> Option<bool> {
Some(false)
}
fn is_monotonic(&self) -> Option<bool> {
Some(false)
}
}

create_func!(Max, max_udaf);
Expand Down Expand Up @@ -448,6 +455,9 @@ impl AggregateUDFImpl for Max {
fn is_descending(&self) -> Option<bool> {
Some(true)
}
fn is_monotonic(&self) -> Option<bool> {
Some(true)
}
}

/// Testing stub implementation of avg aggregate
Expand Down
14 changes: 13 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Whether the aggregate function is nullable.
///
/// Nullable means that that the function could return `null` for any inputs.
/// Nullable means that the function could return `null` for any inputs.
/// For example, aggregate functions like `COUNT` always return a non null value
/// but others like `MIN` will return `NULL` if there is nullable input.
/// Note that if the function is declared as *not* nullable, make sure the [`AggregateUDFImpl::default_value`] is `non-null`
Expand Down Expand Up @@ -635,6 +635,18 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}

/// Indicates whether the aggregation function is monotonic as a set function. A set
/// function is monotonically increasing if its value increases as its argument grows
/// (as a set). Formally, `f` is a monotonically increasing set function if `f(S) >= f(T)`
/// whenever `S` is a superset of `T`.
///
/// Returns None if the function is not monotonic.
/// If the function is monotonically decreasing returns Some(false) e.g. Min
/// If the function is monotonically increasing returns Some(true) e.g. Max
fn is_monotonic(&self) -> Option<bool> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to accept a data_type argument here since Sums monotonicity depends on it.

Copy link

@jayzhan-synnada jayzhan-synnada Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add it first so we don't have to break API again in the future

None
}
}

impl PartialEq for dyn AggregateUDFImpl {
Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ impl WindowFrame {
&& (self.end_bound.is_unbounded()
|| self.end_bound == WindowFrameBound::CurrentRow)
}

/// Is the window frame ever-expanding (it always grows in the superset sense).
/// Useful when understanding if set-monotonicity properties of functions can
/// be exploited.
pub fn is_ever_expanding(&self) -> bool {
self.start_bound.is_unbounded()
}
}

/// There are five ways to describe starting and ending frame boundaries:
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ impl AggregateUDFImpl for Count {
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}

fn is_monotonic(&self) -> Option<bool> {
Some(true)
}
}

#[derive(Debug)]
Expand Down
Loading
Loading