Skip to content

Commit

Permalink
fixes + implementation for right join
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 12, 2024
1 parent 2194844 commit 089bcce
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 27 deletions.
5 changes: 2 additions & 3 deletions modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,11 @@ def map_func(

right_to_broadcast = right._modin_frame.combine()
new_columns, new_dtypes = cls._compute_result_metadata(
left,
right,
*((left, right) if not reverted else (right, left)),
on,
left_on,
right_on,
kwargs.get("suffixes", ("_x", "_y") if not reverted else ("_y", "_x")),
kwargs.get("suffixes", ("_x", "_y")),
)

# We rebalance when the ratio of the number of existing partitions to
Expand Down
33 changes: 23 additions & 10 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,33 +526,46 @@ def merge(self, right, **kwargs):
get_logger().info(message)
return MergeImpl.row_axis_merge(self, right, kwargs)

def join(self, right, **kwargs):
def join(self, right: PandasQueryCompiler, **kwargs) -> PandasQueryCompiler:
on = kwargs.get("on", None)
how = kwargs.get("how", "left")
sort = kwargs.get("sort", False)
left = self

if how in ["left", "inner"]:

def map_func(left, right, kwargs=kwargs): # pragma: no cover
return pandas.DataFrame.join(left, right, **kwargs)
if how in ["left", "inner"] or (
how == "right" and right._modin_frame._partitions.size != 0
):
reverted = False
if how == "right":
left, right = right, left
reverted = True

def map_func(
left, right, kwargs=kwargs
) -> pandas.DataFrame: # pragma: no cover
if reverted:
df = pandas.DataFrame.join(right, left, **kwargs)
else:
df = pandas.DataFrame.join(left, right, **kwargs)
return df

right_to_broadcast = right._modin_frame.combine()
new_self = self.__constructor__(
self._modin_frame.broadcast_apply_full_axis(
left = left.__constructor__(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
func=map_func,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
keep_partitioning=False,
num_splits=merge_partitioning(
self._modin_frame, right._modin_frame, axis=1
left._modin_frame, right._modin_frame, axis=1
),
other=right_to_broadcast,
)
)
return new_self.sort_rows_by_column_values(on) if sort else new_self
return left.sort_rows_by_column_values(on) if sort else left
else:
return self.default_to_pandas(pandas.DataFrame.join, right, **kwargs)
return left.default_to_pandas(pandas.DataFrame.join, right, **kwargs)

# END Inter-Data operations

Expand Down
43 changes: 29 additions & 14 deletions modin/tests/pandas/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,20 @@ def test_combine(data):
"test_data, test_data2",
[
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.randint(0, 100, size=(64, 64)),
np.random.uniform(0, 100, size=(128, 64)),
),
(
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(128, 64)),
np.random.randint(0, 100, size=(64, 64)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.randint(0, 100, size=(64, 64)),
np.random.randint(0, 100, size=(64, 128)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(64, 128)),
np.random.randint(0, 100, size=(64, 64)),
),
],
)
Expand Down Expand Up @@ -122,8 +122,9 @@ def test_join(test_data, test_data2):
hows = ["inner", "left", "right", "outer"]
ons = ["col33", "col34"]
sorts = [False, True]
for i in range(4):
for j in range(2):
assert len(ons) == len(sorts), "the loop below is designed for this condition"
for i in range(len(hows)):
for j in range(len(ons)):
modin_result = modin_df.join(
modin_df2,
how=hows[i],
Expand All @@ -140,7 +141,11 @@ def test_join(test_data, test_data2):
lsuffix="_caller",
rsuffix="_other",
)
df_equals(modin_result, pandas_result)
if sorts[j]:
# sorting in `join` is implemented through range partitioning technique
df_equals_and_sort(modin_result, pandas_result)
else:
df_equals(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand Down Expand Up @@ -174,6 +179,15 @@ def test_join(test_data, test_data2):
df_equals(modin_join, pandas_join)


@pytest.mark.parametrize("how", ["inner", "right"])
def test_join_empty(how):
data = np.random.randint(0, 100, size=(64, 64))
eval_general(
*create_test_dfs(data),
lambda df: df.join(df.iloc[:0], how=how, lsuffix="_caller"),
)


def test_join_cross_6786():
data = [[7, 8, 9], [10, 11, 12]]
modin_df, pandas_df = create_test_dfs(data, columns=["x", "y", "z"])
Expand Down Expand Up @@ -272,8 +286,9 @@ def test_merge(test_data, test_data2):
hows = ["left", "inner", "right"]
ons = ["col33", ["col33", "col34"]]
sorts = [False, True]
for i in range(2):
for j in range(2):
assert len(ons) == len(sorts), "the loop below is designed for this condition"
for i in range(len(hows)):
for j in range(len(ons)):
modin_result = modin_df.merge(
modin_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
Expand Down Expand Up @@ -420,7 +435,7 @@ def test_merge(test_data, test_data2):

@pytest.mark.parametrize("how", ["inner", "right"])
def test_merge_empty(how):
data = np.random.uniform(0, 100, size=(2**6, 2**6))
data = np.random.randint(0, 100, size=(64, 64))
eval_general(*create_test_dfs(data), lambda df: df.merge(df.iloc[:0], how=how))


Expand Down

0 comments on commit 089bcce

Please sign in to comment.