Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 6, 2024
1 parent aac7097 commit 258c3b9
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 28 deletions.
13 changes: 11 additions & 2 deletions modin/core/dataframe/algebra/fold.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@

"""Module houses builder class for Fold operator."""

from __future__ import annotations

from typing import TYPE_CHECKING, Callable

from .operator import Operator

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler


class Fold(Operator):
"""Builder class for Fold functions."""

@classmethod
def register(cls, fold_function):
def register(cls, fold_function) -> Callable[..., PandasQueryCompiler]:
"""
Build Fold operator that will be performed across rows/columns.
Expand All @@ -35,7 +42,9 @@ def register(cls, fold_function):
Function that takes query compiler and executes Fold function.
"""

def caller(query_compiler, fold_axis=None, *args, **kwargs):
def caller(
query_compiler: PandasQueryCompiler, fold_axis=None, *args, **kwargs
) -> PandasQueryCompiler:
"""
Execute Fold function against passed query compiler.
Expand Down
71 changes: 63 additions & 8 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def synchronize_labels(self, axis=None):
else:
self._deferred_column = True

def _propagate_index_objs(self, axis=None):
def _propagate_index_objs(self, axis=None) -> None:
"""
Synchronize labels by applying the index object for specific `axis` to the `self._partitions` lazily.
Expand Down Expand Up @@ -1320,6 +1320,7 @@ def _take_2d_positional(
new_row_lengths,
new_col_widths,
new_dtypes,
pandas_backend=self._pandas_backend,
)

return self._maybe_reorder_labels(
Expand Down Expand Up @@ -1494,6 +1495,7 @@ def from_labels_executor(df, **kwargs):
row_lengths=self._row_lengths_cache,
column_widths=new_column_widths,
dtypes=new_dtypes,
pandas_backend=self._pandas_backend,
)
# Set flag for propagating deferred row labels across dataframe partitions
result.synchronize_labels(axis=0)
Expand Down Expand Up @@ -1620,7 +1622,13 @@ def _reorder_labels(self, row_positions=None, col_positions=None):
col_idx = self.copy_columns_cache(copy_lengths=True)
new_widths = self._column_widths_cache
return self.__constructor__(
ordered_cols, row_idx, col_idx, new_lengths, new_widths, new_dtypes
ordered_cols,
row_idx,
col_idx,
new_lengths,
new_widths,
new_dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis=None)
Expand All @@ -1640,6 +1648,7 @@ def copy(self):
self._row_lengths_cache,
self._column_widths_cache,
self.copy_dtypes_cache(),
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -1742,6 +1751,7 @@ def astype_builder(df):
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
pandas_backend=self._pandas_backend,
)

def numeric_columns(self, include_bool=True):
Expand Down Expand Up @@ -2102,6 +2112,7 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None):
*new_axes,
*new_axes_lengths,
dtypes,
pandas_backend=self._pandas_backend,
)
return result

Expand Down Expand Up @@ -2287,6 +2298,7 @@ def map(
self._row_lengths_cache,
self._column_widths_cache,
dtypes=dtypes,
pandas_backend=self._pandas_backend,
)

def window(
Expand Down Expand Up @@ -2366,6 +2378,7 @@ def fold(self, axis, func, new_columns=None):
self.copy_columns_cache(copy_lengths=True),
self._row_lengths_cache,
self._column_widths_cache,
pandas_backend=self._pandas_backend,
)

def infer_objects(self) -> PandasDataframe:
Expand Down Expand Up @@ -2412,6 +2425,7 @@ def infer_types(self, col_labels: List[str]) -> PandasDataframe:
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
pandas_backend=self._pandas_backend,
)

def join(
Expand Down Expand Up @@ -2517,6 +2531,7 @@ def combine_and_apply(
self._row_lengths_cache,
[len(self.columns)] if self.has_materialized_columns else None,
self.copy_dtypes_cache(),
pandas_backend=self._pandas_backend,
)
else:
modin_frame = self
Expand Down Expand Up @@ -2820,6 +2835,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> PandasDataframe
*new_axes,
*new_lengths,
self.copy_dtypes_cache() if axis == Axis.COL_WISE else None,
pandas_backend=self._pandas_backend,
)

def filter_by_types(self, types: List[Hashable]) -> PandasDataframe:
Expand Down Expand Up @@ -2873,7 +2889,12 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> PandasDataframe:
1, partitions
)
return self.__constructor__(
partitions, new_index, new_columns, row_lengths, column_widths
partitions,
new_index,
new_columns,
row_lengths,
column_widths,
pandas_backend=self._pandas_backend,
)

def combine(self) -> PandasDataframe:
Expand Down Expand Up @@ -2901,6 +2922,7 @@ def combine(self) -> PandasDataframe:
else None
),
dtypes=self.copy_dtypes_cache(),
pandas_backend=self._pandas_backend,
)
result.synchronize_labels()
return result
Expand Down Expand Up @@ -3050,7 +3072,13 @@ def apply_full_axis_select_indices(
if new_columns is None:
new_columns = self.columns if axis == 0 else None
return self.__constructor__(
new_partitions, new_index, new_columns, None, None, dtypes=new_dtypes
new_partitions,
new_index,
new_columns,
None,
None,
dtypes=new_dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -3145,6 +3173,7 @@ def apply_select_indices(
lengths_objs[0],
lengths_objs[1],
new_dtypes,
pandas_backend=self._pandas_backend,
)
else:
# We are applying over both axes here, so make sure we have all the right
Expand Down Expand Up @@ -3172,6 +3201,7 @@ def apply_select_indices(
self._row_lengths_cache,
self._column_widths_cache,
new_dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -3277,6 +3307,7 @@ def _pick_axis(get_axis, sizes_cache):
new_row_lengths,
new_column_widths,
dtypes=dtypes,
pandas_backend=self._pandas_backend,
)

def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all):
Expand Down Expand Up @@ -3415,7 +3446,10 @@ def broadcast_apply_select_indices(
keep_remaining,
)
return self.__constructor__(
new_partitions, index=new_index, columns=new_columns
new_partitions,
index=new_index,
columns=new_columns,
pandas_backend=self._pandas_backend,
)

def construct_dtype(self, dtype: str, backend: Optional[str]):
Expand Down Expand Up @@ -3611,7 +3645,11 @@ def broadcast_apply_full_axis(
kw["column_widths"] = self._column_widths_cache

result = self.__constructor__(
new_partitions, index=new_index, columns=new_columns, **kw
new_partitions,
index=new_index,
columns=new_columns,
**kw,
pandas_backend=self._pandas_backend,
)
if sync_labels and new_index is not None:
result.synchronize_labels(axis=0)
Expand Down Expand Up @@ -3833,6 +3871,7 @@ def n_ary_op(
self.copy_columns_cache(copy_lengths=True),
row_lengths,
self._column_widths_cache,
pandas_backend=self._pandas_backend,
)
new_right_frames = [
self.__constructor__(
Expand All @@ -3841,6 +3880,7 @@ def n_ary_op(
right_frame.copy_columns_cache(copy_lengths=True),
row_lengths,
right_frame._column_widths_cache,
pandas_backend=self._pandas_backend,
)
for right_parts, right_frame in zip(list_of_right_parts, right_frames)
]
Expand Down Expand Up @@ -3878,6 +3918,7 @@ def n_ary_op(
row_lengths,
column_widths,
dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -4005,7 +4046,13 @@ def _compute_new_widths():
new_widths = None

return self.__constructor__(
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
new_partitions,
new_index,
new_columns,
new_lengths,
new_widths,
new_dtypes,
pandas_backend=self._pandas_backend,
)

def _apply_func_to_range_partitioning_broadcast(
Expand Down Expand Up @@ -4080,6 +4127,7 @@ def _apply_func_to_range_partitioning_broadcast(
index=new_index,
columns=new_columns,
dtypes=new_dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -4428,6 +4476,7 @@ def join_cols(df, *cols):
new_partitions,
index=result.copy_index_cache(),
row_lengths=result._row_lengths_cache,
pandas_backend=self._pandas_backend,
)

if (
Expand Down Expand Up @@ -4504,7 +4553,10 @@ def groupby_reduce(
axis, self._partitions, by_parts, map_func, reduce_func, apply_indices
)
return self.__constructor__(
new_partitions, index=new_index, columns=new_columns
new_partitions,
index=new_index,
columns=new_columns,
pandas_backend=self._pandas_backend,
)

@classmethod
Expand Down Expand Up @@ -4689,6 +4741,7 @@ def transpose(self):
self._column_widths_cache,
self._row_lengths_cache,
dtypes=new_dtypes,
pandas_backend=self._pandas_backend,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -4876,6 +4929,7 @@ def remote_fn(df, name, caselist): # pragma: no cover
columns,
row_lengths,
column_widths,
pandas_backend=self._pandas_backend,
)
for part in list_of_right_parts
)
Expand Down Expand Up @@ -4947,4 +5001,5 @@ def map_data(
index=self.index,
row_lengths=lengths,
column_widths=[1],
pandas_backend=self._pandas_backend,
)
12 changes: 7 additions & 5 deletions modin/core/storage_formats/pandas/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Contains implementations for aggregation functions."""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Callable, Tuple

Expand All @@ -38,7 +40,7 @@ class Method(Enum):
@classmethod
def build_corr_method(
cls,
) -> Callable[["PandasQueryCompiler", str, int, bool], "PandasQueryCompiler"]:
) -> Callable[[PandasQueryCompiler, str, int, bool], PandasQueryCompiler]:
"""
Build a query compiler method computing the correlation matrix.
Expand All @@ -49,12 +51,12 @@ def build_corr_method(
"""

def corr_method(
qc: "PandasQueryCompiler",
qc: PandasQueryCompiler,
method: str,
min_periods: int = 1,
numeric_only: bool = True,
) -> "PandasQueryCompiler":
if method != "pearson":
) -> PandasQueryCompiler:
if method != "pearson" or qc._modin_frame._pandas_backend == "pyarrow":
return super(type(qc), qc).corr(
method=method, min_periods=min_periods, numeric_only=numeric_only
)
Expand Down Expand Up @@ -103,7 +105,7 @@ def corr_method(
@classmethod
def build_cov_method(
cls,
) -> Callable[["PandasQueryCompiler", int, int], "PandasQueryCompiler"]:
) -> Callable[[PandasQueryCompiler, int, int], PandasQueryCompiler]:
"""
Build a query compiler method computing the covariance matrix.
Expand Down
24 changes: 23 additions & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1858,7 +1858,29 @@ def isin_func(df, values):
map = Map.register(pandas.DataFrame.map)
# Will it work with pyarrow backend?
conj = Map.register(lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df)))
convert_dtypes = Fold.register(pandas.DataFrame.convert_dtypes)

def convert_dtypes(
self,
infer_objects: bool = True,
convert_string: bool = True,
convert_integer: bool = True,
convert_boolean: bool = True,
convert_floating: bool = True,
dtype_backend: str = "numpy_nullable",
):
result = Fold.register(pandas.DataFrame.convert_dtypes)(
self,
infer_objects=infer_objects,
convert_string=convert_string,
convert_integer=convert_integer,
convert_boolean=convert_boolean,
convert_floating=convert_floating,
dtype_backend=dtype_backend,
)
if dtype_backend == "pyarrow":
result._modin_frame._pandas_backend = "pyarrow"
return result

invert = Map.register(pandas.DataFrame.__invert__, dtypes="copy")
isna = Map.register(pandas.DataFrame.isna, dtypes="bool")
# better way to distinguish methods for NumPy API?
Expand Down
Loading

0 comments on commit 258c3b9

Please sign in to comment.