diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 419a0b56903..9f6673a3e0a 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -13,14 +13,21 @@ """Module houses builder class for Fold operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + from .operator import Operator +if TYPE_CHECKING: + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class Fold(Operator): """Builder class for Fold functions.""" @classmethod - def register(cls, fold_function): + def register(cls, fold_function) -> Callable[..., PandasQueryCompiler]: """ Build Fold operator that will be performed across rows/columns. @@ -35,7 +42,9 @@ def register(cls, fold_function): Function that takes query compiler and executes Fold function. """ - def caller(query_compiler, fold_axis=None, *args, **kwargs): + def caller( + query_compiler: PandasQueryCompiler, fold_axis=None, *args, **kwargs + ) -> PandasQueryCompiler: """ Execute Fold function against passed query compiler. diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 6a32741b3cb..8f642592856 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -881,7 +881,7 @@ def synchronize_labels(self, axis=None): else: self._deferred_column = True - def _propagate_index_objs(self, axis=None): + def _propagate_index_objs(self, axis=None) -> None: """ Synchronize labels by applying the index object for specific `axis` to the `self._partitions` lazily. @@ -1320,6 +1320,7 @@ def _take_2d_positional( new_row_lengths, new_col_widths, new_dtypes, + pandas_backend=self._pandas_backend, ) return self._maybe_reorder_labels( @@ -1494,6 +1495,7 @@ def from_labels_executor(df, **kwargs): row_lengths=self._row_lengths_cache, column_widths=new_column_widths, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) # Set flag for propagating deferred row labels across dataframe partitions result.synchronize_labels(axis=0) @@ -1620,7 +1622,13 @@ def _reorder_labels(self, row_positions=None, col_positions=None): col_idx = self.copy_columns_cache(copy_lengths=True) new_widths = self._column_widths_cache return self.__constructor__( - ordered_cols, row_idx, col_idx, new_lengths, new_widths, new_dtypes + ordered_cols, + row_idx, + col_idx, + new_lengths, + new_widths, + new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis=None) @@ -1640,6 +1648,7 @@ def copy(self): self._row_lengths_cache, self._column_widths_cache, self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -1742,6 +1751,7 @@ def astype_builder(df): self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=self._pandas_backend, ) def numeric_columns(self, include_bool=True): @@ -2102,6 +2112,7 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None): *new_axes, *new_axes_lengths, dtypes, + pandas_backend=self._pandas_backend, ) return result @@ -2287,6 +2298,7 @@ def map( self._row_lengths_cache, self._column_widths_cache, dtypes=dtypes, + pandas_backend=self._pandas_backend, ) def window( @@ -2366,6 +2378,7 @@ def fold(self, axis, func, new_columns=None): self.copy_columns_cache(copy_lengths=True), self._row_lengths_cache, self._column_widths_cache, + pandas_backend=self._pandas_backend, ) def infer_objects(self) -> PandasDataframe: @@ -2412,6 +2425,7 @@ def infer_types(self, col_labels: List[str]) -> PandasDataframe: self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=self._pandas_backend, ) def join( @@ -2517,6 +2531,7 @@ def combine_and_apply( self._row_lengths_cache, [len(self.columns)] if self.has_materialized_columns else None, self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) else: modin_frame = self @@ -2820,6 +2835,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> PandasDataframe *new_axes, *new_lengths, self.copy_dtypes_cache() if axis == Axis.COL_WISE else None, + pandas_backend=self._pandas_backend, ) def filter_by_types(self, types: List[Hashable]) -> PandasDataframe: @@ -2873,7 +2889,12 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> PandasDataframe: 1, partitions ) return self.__constructor__( - partitions, new_index, new_columns, row_lengths, column_widths + partitions, + new_index, + new_columns, + row_lengths, + column_widths, + pandas_backend=self._pandas_backend, ) def combine(self) -> PandasDataframe: @@ -2901,6 +2922,7 @@ def combine(self) -> PandasDataframe: else None ), dtypes=self.copy_dtypes_cache(), + pandas_backend=self._pandas_backend, ) result.synchronize_labels() return result @@ -3050,7 +3072,13 @@ def apply_full_axis_select_indices( if new_columns is None: new_columns = self.columns if axis == 0 else None return self.__constructor__( - new_partitions, new_index, new_columns, None, None, dtypes=new_dtypes + new_partitions, + new_index, + new_columns, + None, + None, + dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3145,6 +3173,7 @@ def apply_select_indices( lengths_objs[0], lengths_objs[1], new_dtypes, + pandas_backend=self._pandas_backend, ) else: # We are applying over both axes here, so make sure we have all the right @@ -3172,6 +3201,7 @@ def apply_select_indices( self._row_lengths_cache, self._column_widths_cache, new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -3277,6 +3307,7 @@ def _pick_axis(get_axis, sizes_cache): new_row_lengths, new_column_widths, dtypes=dtypes, + pandas_backend=self._pandas_backend, ) def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all): @@ -3415,7 +3446,10 @@ def broadcast_apply_select_indices( keep_remaining, ) return self.__constructor__( - new_partitions, index=new_index, columns=new_columns + new_partitions, + index=new_index, + columns=new_columns, + pandas_backend=self._pandas_backend, ) def construct_dtype(self, dtype: str, backend: Optional[str]): @@ -3611,7 +3645,11 @@ def broadcast_apply_full_axis( kw["column_widths"] = self._column_widths_cache result = self.__constructor__( - new_partitions, index=new_index, columns=new_columns, **kw + new_partitions, + index=new_index, + columns=new_columns, + **kw, + pandas_backend=self._pandas_backend, ) if sync_labels and new_index is not None: result.synchronize_labels(axis=0) @@ -3833,6 +3871,7 @@ def n_ary_op( self.copy_columns_cache(copy_lengths=True), row_lengths, self._column_widths_cache, + pandas_backend=self._pandas_backend, ) new_right_frames = [ self.__constructor__( @@ -3841,6 +3880,7 @@ def n_ary_op( right_frame.copy_columns_cache(copy_lengths=True), row_lengths, right_frame._column_widths_cache, + pandas_backend=self._pandas_backend, ) for right_parts, right_frame in zip(list_of_right_parts, right_frames) ] @@ -3878,6 +3918,7 @@ def n_ary_op( row_lengths, column_widths, dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -4005,7 +4046,13 @@ def _compute_new_widths(): new_widths = None return self.__constructor__( - new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes + new_partitions, + new_index, + new_columns, + new_lengths, + new_widths, + new_dtypes, + pandas_backend=self._pandas_backend, ) def _apply_func_to_range_partitioning_broadcast( @@ -4080,6 +4127,7 @@ def _apply_func_to_range_partitioning_broadcast( index=new_index, columns=new_columns, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -4428,6 +4476,7 @@ def join_cols(df, *cols): new_partitions, index=result.copy_index_cache(), row_lengths=result._row_lengths_cache, + pandas_backend=self._pandas_backend, ) if ( @@ -4504,7 +4553,10 @@ def groupby_reduce( axis, self._partitions, by_parts, map_func, reduce_func, apply_indices ) return self.__constructor__( - new_partitions, index=new_index, columns=new_columns + new_partitions, + index=new_index, + columns=new_columns, + pandas_backend=self._pandas_backend, ) @classmethod @@ -4689,6 +4741,7 @@ def transpose(self): self._column_widths_cache, self._row_lengths_cache, dtypes=new_dtypes, + pandas_backend=self._pandas_backend, ) @lazy_metadata_decorator(apply_axis="both") @@ -4876,6 +4929,7 @@ def remote_fn(df, name, caselist): # pragma: no cover columns, row_lengths, column_widths, + pandas_backend=self._pandas_backend, ) for part in list_of_right_parts ) @@ -4947,4 +5001,5 @@ def map_data( index=self.index, row_lengths=lengths, column_widths=[1], + pandas_backend=self._pandas_backend, ) diff --git a/modin/core/storage_formats/pandas/aggregations.py b/modin/core/storage_formats/pandas/aggregations.py index 7d5293b1017..6c2e795a523 100644 --- a/modin/core/storage_formats/pandas/aggregations.py +++ b/modin/core/storage_formats/pandas/aggregations.py @@ -13,6 +13,8 @@ """Contains implementations for aggregation functions.""" +from __future__ import annotations + from enum import Enum from typing import TYPE_CHECKING, Callable, Tuple @@ -38,7 +40,7 @@ class Method(Enum): @classmethod def build_corr_method( cls, - ) -> Callable[["PandasQueryCompiler", str, int, bool], "PandasQueryCompiler"]: + ) -> Callable[[PandasQueryCompiler, str, int, bool], PandasQueryCompiler]: """ Build a query compiler method computing the correlation matrix. @@ -49,12 +51,12 @@ def build_corr_method( """ def corr_method( - qc: "PandasQueryCompiler", + qc: PandasQueryCompiler, method: str, min_periods: int = 1, numeric_only: bool = True, - ) -> "PandasQueryCompiler": - if method != "pearson": + ) -> PandasQueryCompiler: + if method != "pearson" or qc._modin_frame._pandas_backend == "pyarrow": return super(type(qc), qc).corr( method=method, min_periods=min_periods, numeric_only=numeric_only ) @@ -103,7 +105,7 @@ def corr_method( @classmethod def build_cov_method( cls, - ) -> Callable[["PandasQueryCompiler", int, int], "PandasQueryCompiler"]: + ) -> Callable[[PandasQueryCompiler, int, int], PandasQueryCompiler]: """ Build a query compiler method computing the covariance matrix. diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 866a2456fae..83d42965a9e 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -1858,7 +1858,29 @@ def isin_func(df, values): map = Map.register(pandas.DataFrame.map) # Will it work with pyarrow backend? conj = Map.register(lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df))) - convert_dtypes = Fold.register(pandas.DataFrame.convert_dtypes) + + def convert_dtypes( + self, + infer_objects: bool = True, + convert_string: bool = True, + convert_integer: bool = True, + convert_boolean: bool = True, + convert_floating: bool = True, + dtype_backend: str = "numpy_nullable", + ): + result = Fold.register(pandas.DataFrame.convert_dtypes)( + self, + infer_objects=infer_objects, + convert_string=convert_string, + convert_integer=convert_integer, + convert_boolean=convert_boolean, + convert_floating=convert_floating, + dtype_backend=dtype_backend, + ) + if dtype_backend == "pyarrow": + result._modin_frame._pandas_backend = "pyarrow" + return result + invert = Map.register(pandas.DataFrame.__invert__, dtypes="copy") isna = Map.register(pandas.DataFrame.isna, dtypes="bool") # better way to distinguish methods for NumPy API? diff --git a/modin/distributed/dataframe/pandas/partitions.py b/modin/distributed/dataframe/pandas/partitions.py index cac3bec93b6..62a05ff81d3 100644 --- a/modin/distributed/dataframe/pandas/partitions.py +++ b/modin/distributed/dataframe/pandas/partitions.py @@ -90,7 +90,7 @@ def unwrap_partitions( f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead." ) - modin_frame = api_layer_object._query_compiler._modin_frame # type: ignore[attr-defined] + modin_frame = api_layer_object._query_compiler._modin_frame modin_frame._propagate_index_objs(None) if axis is None: @@ -122,7 +122,7 @@ def get_block(partition: PartitionUnionType) -> np.ndarray: ] actual_engine = type( - api_layer_object._query_compiler._modin_frame._partitions[0][0] # type: ignore[attr-defined] + api_layer_object._query_compiler._modin_frame._partitions[0][0] ).__name__ if actual_engine in ( "PandasOnRayDataframePartition", diff --git a/modin/logging/logger_decorator.py b/modin/logging/logger_decorator.py index 301fb02562b..662f7d1de73 100644 --- a/modin/logging/logger_decorator.py +++ b/modin/logging/logger_decorator.py @@ -19,7 +19,7 @@ from functools import wraps from types import FunctionType, MethodType -from typing import Any, Callable, Dict, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar, Union, overload from modin.config import LogMode @@ -28,6 +28,9 @@ _MODIN_LOGGER_NOWRAP = "__modin_logging_nowrap__" +Fn = TypeVar("Fn", bound=Callable) + + def disable_logging(func: Callable) -> Callable: """ Disable logging of one particular function. Useful for decorated classes. @@ -46,11 +49,17 @@ def disable_logging(func: Callable) -> Callable: return func +@overload +def enable_logging(modin_layer: Fn) -> Fn: + # This helps preserve typings when the decorator is used without parentheses + ... + + def enable_logging( - modin_layer: Union[str, Callable, Type] = "PANDAS-API", + modin_layer: Union[str, Fn, Type] = "PANDAS-API", name: Optional[str] = None, log_level: LogLevel = LogLevel.INFO, -) -> Callable: +) -> Callable[[Fn], Fn]: """ Log Decorator used on specific Modin functions or classes. @@ -76,7 +85,7 @@ def enable_logging( # def func() return enable_logging()(modin_layer) - def decorator(obj: Any) -> Any: + def decorator(obj: Fn) -> Fn: """Decorate function or class to add logs to Modin API function(s).""" if isinstance(obj, type): seen: Dict[Any, Any] = {} diff --git a/modin/pandas/general.py b/modin/pandas/general.py index aeff9986f35..6c79752847d 100644 --- a/modin/pandas/general.py +++ b/modin/pandas/general.py @@ -415,7 +415,7 @@ def value_counts( ) -@_inherit_docstrings(pandas.concat, apilink="pandas.concat") +# @_inherit_docstrings(pandas.concat, apilink="pandas.concat") @enable_logging def concat( objs: "Iterable[DataFrame | Series] | Mapping[Hashable, DataFrame | Series]", diff --git a/modin/tests/pandas/dataframe/test_binary.py b/modin/tests/pandas/dataframe/test_binary.py index a8858cf799a..a1070d892b7 100644 --- a/modin/tests/pandas/dataframe/test_binary.py +++ b/modin/tests/pandas/dataframe/test_binary.py @@ -85,9 +85,16 @@ def test_math_functions(other, axis, op): # lambda == "series_or_list" pytest.xfail(reason="different behavior") - eval_general( - *create_test_dfs(data), lambda df: getattr(df, op)(other(df, axis), axis=axis) - ) + md_df, pd_df = create_test_dfs(data) + if op in ("mod", "rmod") and any("pyarrow" in str(dtype) for dtype in pd_df.dtypes): + with pytest.raises(NotImplementedError): + eval_general( + md_df, pd_df, lambda df: getattr(df, op)(other(df, axis), axis=axis) + ) + else: + eval_general( + md_df, pd_df, lambda df: getattr(df, op)(other(df, axis), axis=axis) + ) @pytest.mark.parametrize("other", [lambda df: 2, lambda df: df]) @@ -465,8 +472,8 @@ def test_non_commutative_multiply(): eval_general(modin_df, pandas_df, lambda s: s * integer) -# TODO: just for developing purpose; remove `xfail` mark -@pytest.mark.xfail +# TODO: just for developing purpose; remove `skip` mark +@pytest.mark.skip @pytest.mark.parametrize( "op", [