Skip to content

Commit

Permalink
rename var + left change from merge
Browse files Browse the repository at this point in the history
Signed-off-by: Praateek <[email protected]>
  • Loading branch information
praateekmahajan committed Nov 7, 2024
1 parent 813c1d7 commit 4914974
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,28 +1142,28 @@ def _batched_merge_and_write(
subset_text_df = left_df_use.partitions[
text_part_offset:end_text_offset
]
merged_subset_df = merge_left_to_shuffled_right(
subset_merged_df = merge_left_to_shuffled_right(
subset_text_df,
subset_bucket_df,
merge_on,
)
if os.environ["SHUFFLE_APPROACH"] == "text_bytes_aware":
self._logger.info("Using text_bytes_aware_shuffle")
output_df = text_bytes_aware_shuffle(
df=merged_subset_df,
df=subset_merged_df,
partition_on=partition_on,
text_column=self.text_field,
num_workers=num_workers,
)
elif os.environ["SHUFFLE_APPROACH"] == "dask_vanilla":
self._logger.info("Using dask's vanilla shuffle")
output_df = merged_subset_df.shuffle(on=partition_on)
output_df = subset_merged_df.shuffle(on=partition_on)
elif os.environ["SHUFFLE_APPROACH"] == "rearrange_by_column_direct":
self._logger.info("Using rearrange_by_column_direct")
output_df = rearange_by_column_direct(
df=merged_subset_df,
df=subset_merged_df,
col=partition_on,
npartitions=merged_subset_df.npartitions,
npartitions=subset_merged_df.npartitions,
ignore_index=True,
excomms_default=True,
)
Expand All @@ -1175,34 +1175,34 @@ def _batched_merge_and_write(
# Use the internal dask-expr API
output_df = new_collection(
RearrangeByColumn(
frame=merged_subset_df.expr,
frame=subset_merged_df.expr,
partitioning_index=partition_on,
npartitions_out=merged_subset_df.npartitions,
npartitions_out=subset_merged_df.npartitions,
ignore_index=True,
method="tasks",
# Prevent staged shuffling by setting max_branch
# to the number of input partitions + 1
options={"max_branch": merged_subset_df.npartitions + 1},
options={"max_branch": subset_merged_df.npartitions + 1},
)
)
elif os.environ["SHUFFLE_APPROACH"] == "rearrange_third_branch":
self._logger.info("Using rearrange_third_branch")
from dask.dataframe.shuffle import rearrange_by_column

output_df = rearrange_by_column(
merged_subset_df,
subset_merged_df,
col=partition_on,
shuffle_method="tasks",
# Prevent staged shuffling by setting max_branch
# to the number of input partitions + 1
max_branch=merged_subset_df.npartitions + 1,
npartitions=merged_subset_df.npartitions,
max_branch=subset_merged_df.npartitions + 1,
npartitions=subset_merged_df.npartitions,
ignore_index=True,
)
else:
raise ValueError("Invalid shuffle approach")

if self.int_to_str_id is not None:
if self.int_to_str_id is not None and output_df is not None:
output_df = output_df.map_partitions(
int_ids_to_str, id_column=self.int_to_str_id
)
Expand Down

0 comments on commit 4914974

Please sign in to comment.