diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 44e0977e..03621bf3 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1142,7 +1142,7 @@ def _batched_merge_and_write( subset_text_df = left_df_use.partitions[ text_part_offset:end_text_offset ] - merged_subset_df = merge_left_to_shuffled_right( + subset_merged_df = merge_left_to_shuffled_right( subset_text_df, subset_bucket_df, merge_on, @@ -1150,20 +1150,20 @@ def _batched_merge_and_write( if os.environ["SHUFFLE_APPROACH"] == "text_bytes_aware": self._logger.info("Using text_bytes_aware_shuffle") output_df = text_bytes_aware_shuffle( - df=merged_subset_df, + df=subset_merged_df, partition_on=partition_on, text_column=self.text_field, num_workers=num_workers, ) elif os.environ["SHUFFLE_APPROACH"] == "dask_vanilla": self._logger.info("Using dask's vanilla shuffle") - output_df = merged_subset_df.shuffle(on=partition_on) + output_df = subset_merged_df.shuffle(on=partition_on) elif os.environ["SHUFFLE_APPROACH"] == "rearrange_by_column_direct": self._logger.info("Using rearrange_by_column_direct") output_df = rearange_by_column_direct( - df=merged_subset_df, + df=subset_merged_df, col=partition_on, - npartitions=merged_subset_df.npartitions, + npartitions=subset_merged_df.npartitions, ignore_index=True, excomms_default=True, ) @@ -1175,14 +1175,14 @@ def _batched_merge_and_write( # Use the internal dask-expr API output_df = new_collection( RearrangeByColumn( - frame=merged_subset_df.expr, + frame=subset_merged_df.expr, partitioning_index=partition_on, - npartitions_out=merged_subset_df.npartitions, + npartitions_out=subset_merged_df.npartitions, ignore_index=True, method="tasks", # Prevent staged shuffling by setting max_branch # to the number of input partitions + 1 - options={"max_branch": merged_subset_df.npartitions + 1}, + options={"max_branch": subset_merged_df.npartitions + 1}, ) ) elif os.environ["SHUFFLE_APPROACH"] == "rearrange_third_branch": @@ -1190,19 +1190,19 @@ def _batched_merge_and_write( from dask.dataframe.shuffle import rearrange_by_column output_df = rearrange_by_column( - merged_subset_df, + subset_merged_df, col=partition_on, shuffle_method="tasks", # Prevent staged shuffling by setting max_branch # to the number of input partitions + 1 - max_branch=merged_subset_df.npartitions + 1, - npartitions=merged_subset_df.npartitions, + max_branch=subset_merged_df.npartitions + 1, + npartitions=subset_merged_df.npartitions, ignore_index=True, ) else: raise ValueError("Invalid shuffle approach") - if self.int_to_str_id is not None: + if self.int_to_str_id is not None and output_df is not None: output_df = output_df.map_partitions( int_ids_to_str, id_column=self.int_to_str_id )