From d9875d45654c728d8bc103db41960a183158514c Mon Sep 17 00:00:00 2001 From: mvashishtha Date: Wed, 7 Jun 2023 18:20:21 -0500 Subject: [PATCH] Make test_rolling test center=False more and fix bugs Signed-off-by: mvashishtha --- .../dataframe/pandas/dataframe/dataframe.py | 109 ++++-- .../storage_formats/pandas/query_compiler.py | 312 ++++++++++++------ modin/pandas/test/test_rolling.py | 34 +- 3 files changed, 313 insertions(+), 142 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index d652f6595a2..3d750a91765 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -1995,7 +1995,7 @@ def window( axis : int or modin.core.dataframe.base.utils.Axis The axis to slide over. reduce_fn : callable(rowgroup|colgroup) -> row|col - The reduce function to apply over the data. + The reduce function to apply over the data. This must not change metadata. window_size : int The number of rows/columns to pass to the reduce function. (The size of the sliding window). @@ -2007,13 +2007,8 @@ def window( PandasDataframe A new PandasDataframe with the reduce function applied over windows of the specified axis. - - Notes - ----- - The user-defined reduce function must reduce each window’s column - (row if axis=1) down to a single value. """ - + axis = Axis(axis) # applies reduction function over entire virtual partition @@ -2026,34 +2021,72 @@ def window_function_complete(virtual_partition): def window_function_partition(virtual_partition): virtual_partition_copy = virtual_partition.copy() window_result = reduce_fn(virtual_partition_copy) - return window_result.iloc[:, window_size - 1 : ] if axis == Axis.COL_WISE else window_result.iloc[window_size - 1: , :] + return ( + window_result.iloc[:, window_size - 1 :] + if axis == Axis.COL_WISE + else window_result.iloc[window_size - 1 :, :] + ) - num_parts = len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions) + num_parts = ( + len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions) + ) results = [] for i in range(num_parts): - # get the ith partition - starting_part = self._partitions[:, [i]] if axis == Axis.COL_WISE else self._partitions[i] + # get the ith partition + starting_part = ( + self._partitions[:, [i]] + if axis == Axis.COL_WISE + else self._partitions[i] + ) # partitions to join in virtual partition - parts_to_join = [starting_part] if (axis == Axis.ROW_WISE) else [[partition[0]] for partition in starting_part] + parts_to_join = ( + [starting_part] + if (axis == Axis.ROW_WISE) + else [[partition[0]] for partition in starting_part] + ) # used to determine if window continues into next partition or if we can create virtual partition last_window_span = window_size - 1 k = i + 1 - while (last_window_span > 0 and k < num_parts): + while last_window_span > 0 and k < num_parts: # new partition - new_parts = self._partitions[:, [k]] if axis == Axis.COL_WISE else self._partitions[k] - part_len = new_parts[0][0].width() if axis == Axis.COL_WISE else new_parts[0].length() + new_parts = ( + self._partitions[:, [k]] + if axis == Axis.COL_WISE + else self._partitions[k] + ) + part_len = ( + new_parts[0][0].width() + if axis == Axis.COL_WISE + else new_parts[0].length() + ) - if (last_window_span <= part_len): + if last_window_span <= part_len: if axis == Axis.COL_WISE: - masked_new_parts = [[part[0].mask(row_labels = slice(None), col_labels = slice(0, last_window_span))] for part in new_parts] + masked_new_parts = [ + [ + part[0].mask( + row_labels=slice(None), + col_labels=slice(0, last_window_span), + ) + ] + for part in new_parts + ] for x, r in enumerate(parts_to_join): r.append(masked_new_parts[x][0]) else: - masked_new_parts = np.array([part.mask(row_labels = slice(0, last_window_span), col_labels=slice(None)) for part in new_parts]) + masked_new_parts = np.array( + [ + part.mask( + row_labels=slice(0, last_window_span), + col_labels=slice(None), + ) + for part in new_parts + ] + ) parts_to_join.append(masked_new_parts) break else: @@ -2064,37 +2097,45 @@ def window_function_partition(virtual_partition): else: parts_to_join.append(new_parts) last_window_span -= part_len - k += 1 + k += 1 # create virtual partition and perform window operation - virtual_partitions = self._partition_mgr_cls.row_partitions(np.array(parts_to_join), full_axis = False) if axis == Axis.COL_WISE else self._partition_mgr_cls.column_partitions(np.array(parts_to_join), full_axis=False) + virtual_partitions = ( + self._partition_mgr_cls.row_partitions( + np.array(parts_to_join), full_axis=False + ) + if axis == Axis.COL_WISE + else self._partition_mgr_cls.column_partitions( + np.array(parts_to_join), full_axis=False + ) + ) if i == 0: - reduce_result = [virtual_partition.apply(window_function_complete) for virtual_partition in virtual_partitions] + reduce_result = [ + virtual_partition.apply(window_function_complete) + for virtual_partition in virtual_partitions + ] else: - reduce_result = [virtual_partition.apply(window_function_partition) for virtual_partition in virtual_partitions] - + reduce_result = [ + virtual_partition.apply(window_function_partition) + for virtual_partition in virtual_partitions + ] + # append reduction result to results array if axis == Axis.ROW_WISE: results.append(reduce_result) else: if results == []: results = [[x] for x in reduce_result] - else: + else: for x, r in enumerate(results): - r.append(reduce_result[x]) + r.append(reduce_result[x]) results = np.array(results) - + return self.__constructor__( - results, - self.index, - self.columns, - None, - None, - result_schema - ) - + results, self.index, self.columns, None, None, result_schema + ) @lazy_metadata_decorator(apply_axis="both") def fold(self, axis, func, new_columns=None): diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 43621d17da7..ca5d50d9b21 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -207,6 +207,32 @@ def dt_op_builder(df, *args, **kwargs): return dt_op_builder +def _current_engine_supports_virtual_partitions(): + return Engine.get() in ("Ray", "Dask", "Unidist") + + +def _can_use_cell_wise_window(center, window, win_type): + return ( + # need to make virtual partitions to use cell-wise window + _current_engine_supports_virtual_partitions() + # modin frame window assumes center is True + and center is False + # window can be time intervals like "1d" or even more exotic objects like + # FixedForwardWindowIndexer. modin frame window only handles integers + and isinstance(window, int) + # window types like triangular window can normalize the input values by the + # maximum value, but finding the maximum value requires access to the entire + # axis that we are windowing. For example: + # the 4th value of + # pd.Series([1, 10, 1000, 10_000]).rolling(3, win_type="triang").std() + # on pandas 1.5.3 is 5461.392260, but if we do the rolling from the second value + # onwards with + # pd.Series([10, 1000, 10_000]).rolling(3, win_type="triang").std(), the 3rd + # value is 5426.261196. So we can't decompose the windowing cell-wise. + and win_type is None + ) + + def copy_df_for_func(func, display_name: str = None): """ Build function that execute specified `func` against passed frame inplace. @@ -1386,14 +1412,17 @@ def expanding_corr( def window_mean(self, axis, window_args, *args, **kwargs): window = window_args[0] center = window_args[2] - if not center and isinstance(window, int): + win_type = window_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*window_args).mean(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*window_args).mean(*args, **kwargs), ) ) else: - return self.old_window_mean(axis, window_args, *args, **kwargs) + return self.old_window_mean(axis, window_args, *args, **kwargs) old_window_sum = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( @@ -1404,14 +1433,17 @@ def window_mean(self, axis, window_args, *args, **kwargs): def window_sum(self, axis, window_args, *args, **kwargs): window = window_args[0] center = window_args[2] - if not center and isinstance(window, int): + win_type = window_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*window_args).sum(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*window_args).sum(*args, **kwargs), ) ) else: - return self.old_window_sum(axis, window_args, *args, **kwargs) + return self.old_window_sum(axis, window_args, *args, **kwargs) old_window_var = Fold.register( lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame( @@ -1422,14 +1454,17 @@ def window_sum(self, axis, window_args, *args, **kwargs): def window_var(self, axis, window_args, ddof, *args, **kwargs): window = window_args[0] center = window_args[2] - if not center and isinstance(window, int): + win_type = window_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*window_args).var(ddof=ddof, *args, **kwargs) + axis, + window, + lambda df: df.rolling(*window_args).var(ddof, *args, **kwargs), ) ) else: - return self.old_window_var(axis, window_args, ddof, *args, **kwargs) + return self.old_window_var(axis, window_args, ddof, *args, **kwargs) old_window_std = Fold.register( lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame( @@ -1440,14 +1475,17 @@ def window_var(self, axis, window_args, ddof, *args, **kwargs): def window_std(self, axis, window_args, ddof, *args, **kwargs): window = window_args[0] center = window_args[2] - if not center and isinstance(window, int): + win_type = window_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*window_args).std(ddof=ddof, *args, **kwargs) + axis, + window, + lambda df: df.rolling(*window_args).std(ddof=ddof, *args, **kwargs), ) ) else: - return self.old_window_std(axis, window_args, ddof, *args, **kwargs) + return self.old_window_std(axis, window_args, ddof, *args, **kwargs) old_rolling_count = Fold.register( lambda df, rolling_args: pandas.DataFrame(df.rolling(*rolling_args).count()) @@ -1456,28 +1494,32 @@ def window_std(self, axis, window_args, ddof, *args, **kwargs): def rolling_count(self, axis, rolling_args): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( axis, window, lambda df: df.rolling(*rolling_args).count() ) ) else: - return self.old_rolling_count(axis, rolling_args) + return self.old_rolling_count(axis, rolling_args) old_rolling_sum = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( - df.rolling(*rolling_args).sum(*args, **kwargs) + df.rolling(*rolling_args).sum(*args, **kwargs) ) ) def rolling_sum(self, axis, rolling_args, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).sum(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).sum(*args, **kwargs), ) ) else: @@ -1492,10 +1534,13 @@ def rolling_sum(self, axis, rolling_args, *args, **kwargs): def rolling_sem(self, axis, rolling_args, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).sum(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).sem(*args, **kwargs), ) ) else: @@ -1510,14 +1555,17 @@ def rolling_sem(self, axis, rolling_args, *args, **kwargs): def rolling_mean(self, axis, rolling_args, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).mean(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).mean(*args, **kwargs), ) ) else: - return self.old_rolling_mean(axis, rolling_args, *args, **kwargs) + return self.old_rolling_mean(axis, rolling_args, *args, **kwargs) old_rolling_median = Fold.register( lambda df, rolling_args, **kwargs: pandas.DataFrame( @@ -1528,14 +1576,15 @@ def rolling_mean(self, axis, rolling_args, *args, **kwargs): def rolling_median(self, axis, rolling_args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( axis, window, lambda df: df.rolling(*rolling_args).median(**kwargs) ) ) else: - return self.old_rolling_median(axis, rolling_args, **kwargs) + return self.old_rolling_median(axis, rolling_args, **kwargs) old_rolling_var = Fold.register( lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame( @@ -1546,32 +1595,42 @@ def rolling_median(self, axis, rolling_args, **kwargs): def rolling_var(self, axis, rolling_args, ddof, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).var(ddof=ddof, *args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).var( + ddof=ddof, *args, **kwargs + ), ) ) else: - return self.old_rolling_var(axis, rolling_args, ddof, *args, **kwargs) + return self.old_rolling_var(axis, rolling_args, ddof, *args, **kwargs) old_rolling_std = Fold.register( lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame( df.rolling(*rolling_args).std(ddof=ddof, *args, **kwargs) ) - ) + ) def rolling_std(self, axis, rolling_args, ddof, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).var(ddof=ddof, *args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).std( + ddof=ddof, *args, **kwargs + ), ) ) else: - return self.old_rolling_std(axis, rolling_args, ddof, *args, **kwargs) + return self.old_rolling_std(axis, rolling_args, ddof, *args, **kwargs) old_rolling_min = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( @@ -1582,14 +1641,17 @@ def rolling_std(self, axis, rolling_args, ddof, *args, **kwargs): def rolling_min(self, axis, rolling_args, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).min(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).min(*args, **kwargs), ) ) else: - return self.old_rolling_min(axis, rolling_args, *args, **kwargs) + return self.old_rolling_min(axis, rolling_args, *args, **kwargs) old_rolling_max = Fold.register( lambda df, rolling_args, *args, **kwargs: pandas.DataFrame( @@ -1600,14 +1662,17 @@ def rolling_min(self, axis, rolling_args, *args, **kwargs): def rolling_max(self, axis, rolling_args, *args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).max(*args, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).max(*args, **kwargs), ) ) else: - return self.old_rolling_max(axis, rolling_args, *args, **kwargs) + return self.old_rolling_max(axis, rolling_args, *args, **kwargs) old_rolling_skew = Fold.register( lambda df, rolling_args, **kwargs: pandas.DataFrame( @@ -1618,7 +1683,8 @@ def rolling_max(self, axis, rolling_args, *args, **kwargs): def rolling_skew(self, axis, rolling_args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( axis, window, lambda df: df.rolling(*rolling_args).skew(**kwargs) @@ -1636,7 +1702,8 @@ def rolling_skew(self, axis, rolling_args, **kwargs): def rolling_kurt(self, axis, rolling_args, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( axis, window, lambda df: df.rolling(*rolling_args).kurt(**kwargs) @@ -1658,27 +1725,30 @@ def rolling_kurt(self, axis, rolling_args, **kwargs): ) ) - def rolling_apply(self, axis, rolling_args, func, raw, engine, engine_kwargs, args, kwargs): + def rolling_apply( + self, axis, rolling_args, func, raw, engine, engine_kwargs, args, kwargs + ): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, - lambda df: - df.rolling(*rolling_args).apply( - func=func, - raw=raw, - engine=engine, - engine_kwargs=engine_kwargs, - args=args, - kwargs=kwargs, - ) + axis, + window, + lambda df: df.rolling(*rolling_args).apply( + func=func, + raw=raw, + engine=engine, + engine_kwargs=engine_kwargs, + args=args, + kwargs=kwargs, + ), ) - ) + ) else: return self.old_rolling_apply( - axis, + axis, rolling_args, func, raw, @@ -1688,7 +1758,6 @@ def rolling_apply(self, axis, rolling_args, func, raw, engine, engine_kwargs, ar kwargs, ) - old_rolling_rank = Fold.register( lambda df, rolling_args, method, ascending, pct, numeric_only, **kwargs: pandas.DataFrame( df.rolling(*rolling_args).rank( @@ -1701,24 +1770,30 @@ def rolling_apply(self, axis, rolling_args, func, raw, engine, engine_kwargs, ar ) ) - def rolling_rank(self, axis, rolling_args, method, ascending, pct, numeric_only, **kwargs): + def rolling_rank( + self, axis, rolling_args, method, ascending, pct, numeric_only, **kwargs + ): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, + axis, + window, lambda df: df.rolling(*rolling_args).rank( method=method, ascending=ascending, pct=pct, numeric_only=numeric_only, - **kwargs - ) + **kwargs, + ), ) ) else: - return self.old_rolling_rank(axis, rolling_args, method, ascending, pct, numeric_only, **kwargs) + return self.old_rolling_rank( + axis, rolling_args, method, ascending, pct, numeric_only, **kwargs + ) old_rolling_quantile = Fold.register( lambda df, rolling_args, quantile, interpolation, **kwargs: pandas.DataFrame( @@ -1731,22 +1806,29 @@ def rolling_rank(self, axis, rolling_args, method, ascending, pct, numeric_only, def rolling_quantile(self, axis, rolling_args, quantile, interpolation, **kwargs): window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).quantile(quantile=quantile, interpolation=interpolation, **kwargs) + axis, + window, + lambda df: df.rolling(*rolling_args).quantile( + quantile=quantile, interpolation=interpolation, **kwargs + ), ) ) else: - return self.old_rolling_quantile(axis, rolling_args, quantile, interpolation, **kwargs) + return self.old_rolling_quantile( + axis, rolling_args, quantile, interpolation, **kwargs + ) old_rolling_corr = Fold.register( lambda df, rolling_args, other, pairwise, *args, **kwargs: pandas.DataFrame( df.rolling(*rolling_args).corr( other=other, pairwise=pairwise, *args, **kwargs - ) ) ) + ) def rolling_corr(self, axis, rolling_args, other, pairwise, *args, **kwargs): if len(self.columns) > 1: @@ -1758,17 +1840,21 @@ def rolling_corr(self, axis, rolling_args, other, pairwise, *args, **kwargs): else: window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window(center, window, win_type): return self.__constructor__( self._modin_frame.window( - axis, window, + axis, + window, lambda df: df.rolling(*rolling_args).corr( other=other, pairwise=pairwise, *args, **kwargs - ) + ), ) ) else: - return self.old_rolling_corr(axis, rolling_args, other, pairwise, *args, **kwargs) + return self.old_rolling_corr( + axis, rolling_args, other, pairwise, *args, **kwargs + ) old_rolling_cov = Fold.register( lambda df, rolling_args, other, pairwise, ddof, **kwargs: pandas.DataFrame( @@ -1788,37 +1874,37 @@ def rolling_cov(self, axis, rolling_args, other, pairwise, ddof, **kwargs): else: window = rolling_args[0] center = rolling_args[2] - if not center and isinstance(window, int): + win_type = rolling_args[3] + if _can_use_cell_wise_window( + center, + window, + win_type, + ): return self.__constructor__( self._modin_frame.window( - axis, window, + axis, + window, lambda df: df.rolling(*rolling_args).cov( other=other, pairwise=pairwise, **kwargs - ) + ), ) ) else: - return self.old_rolling_cov(axis, rolling_args, other, pairwise, ddof, **kwargs) - - def rolling_aggregate(self, axis, rolling_args, func, *args, **kwargs): - window = rolling_args[0] - center = rolling_args[2] - if not center and isinstance(window, int): - return self.__constructor__( - self._modin_frame.window( - axis, window, lambda df: df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs) + return self.old_rolling_cov( + axis, rolling_args, other, pairwise, ddof, **kwargs ) - ) - else: - new_modin_frame = self._modin_frame.apply_full_axis( - axis, - lambda df: pandas.DataFrame( - df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs) - ), - new_index=self.index, - ) - return self.__constructor__(new_modin_frame) + def rolling_aggregate(self, axis, rolling_args, func, *args, **kwargs): + # note that we can't use the modin frame's window(), which assumes that the + # metadata doesn't change. + new_modin_frame = self._modin_frame.apply_full_axis( + axis, + lambda df: pandas.DataFrame( + df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs) + ), + new_index=self.index, + ) + return self.__constructor__(new_modin_frame) def unstack(self, level, fill_value): if not isinstance(self.index, pandas.MultiIndex) or ( @@ -4352,10 +4438,48 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item): return self.__constructor__(new_modin_frame) def sort_rows_by_column_values(self, columns, ascending=True, **kwargs): - new_modin_frame = self._modin_frame.sort_by( - 0, columns, ascending=ascending, **kwargs - ) - return self.__constructor__(new_modin_frame) + # Our algebra sort is only implemented for Engines that support virtual partitioning. + if _current_engine_supports_virtual_partitions(): + new_modin_frame = self._modin_frame.sort_by( + 0, columns, ascending=ascending, **kwargs + ) + return self.__constructor__(new_modin_frame) + ignore_index = kwargs.get("ignore_index", False) + kwargs["ignore_index"] = False + if not is_list_like(columns): + columns = [columns] + # Currently, sort_values will just reindex based on the sorted values. + # TODO create a more efficient way to sort + ErrorMessage.default_to_pandas("sort_values") + broadcast_value_dict = { + col: self.getitem_column_array([col]).to_pandas().squeeze(axis=1) + for col in columns + } + # Clear index level names because they also appear in broadcast_value_dict + orig_index_level_names = self.index.names + tmp_index = self.index.copy() + tmp_index.names = [None] * tmp_index.nlevels + # Index may contain duplicates + broadcast_values1 = pandas.DataFrame(broadcast_value_dict, index=tmp_index) + # Index without duplicates + broadcast_values2 = pandas.DataFrame(broadcast_value_dict) + broadcast_values2 = broadcast_values2.reset_index(drop=True) + # Index may contain duplicates + new_index1 = broadcast_values1.sort_values( + by=columns, axis=0, ascending=ascending, **kwargs + ).index + # Index without duplicates + new_index2 = broadcast_values2.sort_values( + by=columns, axis=0, ascending=ascending, **kwargs + ).index + + result = self.reset_index(drop=True).reindex(axis=0, labels=new_index2) + if ignore_index: + result = result.reset_index(drop=True) + else: + result.index = new_index1 + result.index.names = orig_index_level_names + return result def sort_columns_by_row_values(self, rows, ascending=True, **kwargs): if not is_list_like(rows): diff --git a/modin/pandas/test/test_rolling.py b/modin/pandas/test/test_rolling.py index 22939e1af03..4314168b60a 100644 --- a/modin/pandas/test/test_rolling.py +++ b/modin/pandas/test/test_rolling.py @@ -50,6 +50,7 @@ def create_test_series(vals): @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) @pytest.mark.parametrize("axis", [0, 1]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -69,7 +70,7 @@ def create_test_series(vals): ("median", {}), ], ) -def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): +def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs, center): # Testing of Rolling class modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): @@ -82,7 +83,7 @@ def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): window=window, min_periods=min_periods, win_type=None, - center=True, + center=center, axis=axis, ), method, @@ -94,15 +95,16 @@ def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) @pytest.mark.parametrize("axis", [0, 1]) -def test_dataframe_agg(data, window, min_periods, axis): +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") +def test_dataframe_agg(data, window, min_periods, axis, center): modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): window = len(pandas_df) modin_rolled = modin_df.rolling( - window=window, min_periods=min_periods, win_type=None, center=True, axis=axis + window=window, min_periods=min_periods, win_type=None, center=center, axis=axis ) pandas_rolled = pandas_df.rolling( - window=window, min_periods=min_periods, win_type=None, center=True, axis=axis + window=window, min_periods=min_periods, win_type=None, center=center, axis=axis ) df_equals(pandas_rolled.aggregate(np.sum), modin_rolled.aggregate(np.sum)) # TODO(https://github.com/modin-project/modin/issues/4260): Once pandas @@ -119,6 +121,7 @@ def test_dataframe_agg(data, window, min_periods, axis): @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) @pytest.mark.parametrize("axis", [0, 1]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -128,7 +131,7 @@ def test_dataframe_agg(data, window, min_periods, axis): ("std", {"ddof": 0}), ], ) -def test_dataframe_window(data, window, min_periods, axis, method, kwargs): +def test_dataframe_window(data, window, min_periods, axis, method, kwargs, center): # Testing of Window class modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): @@ -141,7 +144,7 @@ def test_dataframe_window(data, window, min_periods, axis, method, kwargs): window=window, min_periods=min_periods, win_type="triang", - center=True, + center=center, axis=axis, ), method, @@ -203,6 +206,7 @@ def test_dataframe_dt_index(axis, on, closed, window): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -224,7 +228,7 @@ def test_dataframe_dt_index(axis, on, closed, window): ("median", {}), ], ) -def test_series_rolling(data, window, min_periods, method, kwargs): +def test_series_rolling(data, window, min_periods, method, kwargs, center): # Test of Rolling class modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): @@ -237,7 +241,7 @@ def test_series_rolling(data, window, min_periods, method, kwargs): window=window, min_periods=min_periods, win_type=None, - center=True, + center=center, ), method, )(**kwargs), @@ -247,15 +251,16 @@ def test_series_rolling(data, window, min_periods, method, kwargs): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) -def test_series_corr_cov(data, window, min_periods): +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") +def test_series_corr_cov(data, window, min_periods, center): modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): window = len(pandas_series) modin_rolled = modin_series.rolling( - window=window, min_periods=min_periods, win_type=None, center=True + window=window, min_periods=min_periods, win_type=None, center=center ) pandas_rolled = pandas_series.rolling( - window=window, min_periods=min_periods, win_type=None, center=True + window=window, min_periods=min_periods, win_type=None, center=center ) df_equals(modin_rolled.corr(modin_series), pandas_rolled.corr(pandas_series)) df_equals( @@ -269,6 +274,7 @@ def test_series_corr_cov(data, window, min_periods): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -278,7 +284,7 @@ def test_series_corr_cov(data, window, min_periods): ("std", {"ddof": 0}), ], ) -def test_series_window(data, window, min_periods, method, kwargs): +def test_series_window(data, window, min_periods, method, kwargs, center): # Test of Window class modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): @@ -291,7 +297,7 @@ def test_series_window(data, window, min_periods, method, kwargs): window=window, min_periods=min_periods, win_type="triang", - center=True, + center=center, ), method, )(**kwargs),