Skip to content

Commit

Permalink
PERF-#0000: Don't materialize index in '_groupby_shuffle' internal fu…
Browse files Browse the repository at this point in the history
…nction

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Nov 4, 2023
1 parent 7a9415e commit 2fef468
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
13 changes: 9 additions & 4 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2421,9 +2421,9 @@ def _apply_func_to_range_partitioning(

# don't want to inherit over-partitioning so doing this 'min' check
ideal_num_new_partitions = min(len(self._partitions), NPartitions.get())
m = len(self.index) / ideal_num_new_partitions
m = sum(self.row_lengths) / ideal_num_new_partitions
sampling_probability = (1 / m) * np.log(
ideal_num_new_partitions * len(self.index)
ideal_num_new_partitions * sum(self.row_lengths)
)
# If this df is overpartitioned, we try to sample each partition with probability
# greater than 1, which leads to an error. In this case, we can do one of the following
Expand All @@ -2435,8 +2435,13 @@ def _apply_func_to_range_partitioning(
if sampling_probability >= 1:
from modin.config import MinPartitionSize

ideal_num_new_partitions = round(len(self.index) / MinPartitionSize.get())
if len(self.index) < MinPartitionSize.get() or ideal_num_new_partitions < 2:
ideal_num_new_partitions = round(
sum(self.row_lengths) / MinPartitionSize.get()
)
if (
sum(self.row_lengths) < MinPartitionSize.get()
or ideal_num_new_partitions < 2
):
# If the data is too small, we shouldn't try reshuffling/repartitioning but rather
# simply combine all partitions and apply the sorting to the whole dataframe
return self.combine_and_apply(func=func)
Expand Down
2 changes: 1 addition & 1 deletion modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(
ideal_num_new_partitions: int,
**kwargs: dict,
):
self.frame_len = len(modin_frame.index)
self.frame_len = sum(modin_frame.row_lengths)
self.ideal_num_new_partitions = ideal_num_new_partitions
self.columns = columns if is_list_like(columns) else [columns]
self.ascending = ascending
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3616,7 +3616,7 @@ def _groupby_shuffle(
# Higher API level won't pass empty data here unless the frame has delayed
# computations. FIXME: We apparently lose some laziness here (due to index access)
# because of the inability to process empty groupby natively.
if len(self.columns) == 0 or len(self.index) == 0:
if len(self.columns) == 0 or sum(self._modin_frame.row_lengths) == 0:
return super().groupby_agg(
by, agg_func, axis, groupby_kwargs, agg_args, agg_kwargs, how, drop
)
Expand Down Expand Up @@ -3811,7 +3811,7 @@ def groupby_agg(
# Higher API level won't pass empty data here unless the frame has delayed
# computations. So we apparently lose some laziness here (due to index access)
# because of the inability to process empty groupby natively.
if len(self.columns) == 0 or len(self.index) == 0:
if len(self.columns) == 0 or sum(self._modin_frame.row_lengths) == 0:
return super().groupby_agg(
by, agg_func, axis, groupby_kwargs, agg_args, agg_kwargs, how, drop
)
Expand Down

0 comments on commit 2fef468

Please sign in to comment.