diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index d4dfdde0..63106129 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -469,7 +469,6 @@ def __init__( cache_dir=self.config.cache_dir, jaccard_pairs_path=os.path.join(self.config.cache_dir, jaccard_pairs_fname), id_column=self.config.id_field, - convert_str_ids=False, jaccard_threshold=self.config.jaccard_threshold, logger=self._logger, profile_dir=self.config.profile_dir, @@ -1142,6 +1141,11 @@ def _batched_merge_and_write( text_part_offset:end_text_offset ] + print("==== HERE ====") + print(subset_text_df.compute().to_pandas().to_dict("records")) + print(subset_bucket_df.compute().to_pandas().to_dict("records")) + print("==== HERE OVER ====") + try: # NOTE: If we have more text-df partitions than bucket-map # partitions, we are more likely to see an OverflowError @@ -1412,7 +1416,6 @@ def __init__( cache_dir: str, jaccard_pairs_path: str, id_column="id", - convert_str_ids=False, jaccard_threshold: float = 0.8, logger: Union[logging.LoggerAdapter, str] = "./", profile_dir: Optional[str] = None, @@ -1422,7 +1425,6 @@ def __init__( self.id_column = id_column self.left_id = f"{id_column}_x" self.right_id = f"{id_column}_y" - self.convert_str_ids = convert_str_ids self.jaccard_threshold = jaccard_threshold self.profile_dir = profile_dir if isinstance(logger, str): @@ -1458,7 +1460,7 @@ def _run_connected_components( self.profile_dir, "connected-components-run" ): - Comms.initialize(p2p=True) + Comms.initialize(p2p=False) df = dask_cudf.read_parquet( deduped_encoded_jaccard_path, blocksize="1GB", aggregate_files=True ) @@ -1486,9 +1488,7 @@ def _run_connected_components( labels_df = labels_df.merge( result, left_on=["uid"], right_on=["vertex"], how="inner" ) - id_columns = ( - ["dataset_id", "doc_id"] if self.convert_str_ids else [self.id_column] - ) + id_columns = [self.id_column] labels_df = labels_df[id_columns + ["labels"]] labels_df = labels_df.rename(columns={"labels": "group"}) labels_df = labels_df.persist() @@ -1505,7 +1505,7 @@ def _run_connected_components( assert num_nodes == len(labels_df) # Ensure all docs in the same group are in the same partition labels_df = labels_df.shuffle(on=["group"], ignore_index=True) - labels_df.to_parquet(output_path, write_index=False) + labels_df.to_parquet(output_path, write_index=False, overwrite=True) Comms.destroy() self._logger.info( f"Time taken for Connected Components Run = {time.time() - t0}s and output written at {output_path}" @@ -1573,20 +1573,12 @@ def _write_dedup_encoded_jaccard_pair(self, encoded_jaccard_pair_path): transform_divisions=False, align_dataframes=False, ) - ddf.to_parquet(output_path, write_index=False) + ddf.to_parquet(output_path, write_index=False, overwrite=True) self._logger.info( f"Time taken for Dedup Encoding Jaccard Pairs = {time.time() - t0}s and output written at {output_path}" ) return output_path - def _convert_str_id_pair_to_int(self, df): - for id, tag in zip([self.left_id, self.right_id], ["x", "y"]): - dx = df[id].str.rsplit("-", n=1, expand=True) - df[f"dataset_id_{tag}"] = dx[0].astype("uint32").values - df[f"doc_id_{tag}"] = dx[1].astype("int64").values - df = df.drop(columns=[id]) - return df - def _write_dedup_parsed_id(self): dedup_parsed_id_path = f"{self.cache_dir}/dedup_parsed_id.parquet" t0 = time.time() @@ -1596,22 +1588,10 @@ def _write_dedup_parsed_id(self): ddf = dask_cudf.read_parquet( self.jaccard_pairs_path, columns=[self.left_id, self.right_id], - blocksize="1GB", + blocksize="512MB", aggregate_files=True, ) id_columns = [self.id_column] - if self.convert_str_ids: - ddf = ddf.map_partitions( - self._convert_str_id_pair_to_int, - meta={ - "dataset_id_x": "uint32", - "doc_id_x": "int64", - "dataset_id_y": "uint32", - "doc_id_y": "int64", - }, - ) - id_columns = ["dataset_id", "doc_id"] - unique_docs = ddf.map_partitions( ConnectedComponents._get_unique_ids_per_partition, id_columns=id_columns ) @@ -1639,78 +1619,51 @@ def _write_encoded_jaccard_pair(self, dedup_parsed_id_path): ddf_id = dask_cudf.read_parquet( dedup_parsed_id_path, blocksize="2GB", aggregate_files=True ) - ddf_id = ddf_id.persist() - len(ddf_id) ddf = dask_cudf.read_parquet( self.jaccard_pairs_path, - blocksize="256MB", + blocksize="1GB", aggregate_files=True, ) - id_columns = [self.id_column] - if self.convert_str_ids: - ddf = ddf.map_partitions( - self._convert_str_id_pair_to_int, - meta={ - "jaccard": "float32", - "dataset_id_x": "uint32", - "doc_id_x": "int64", - "dataset_id_y": "uint32", - "doc_id_y": "int64", - }, - ) - id_columns = ["dataset_id", "doc_id"] - - num_workers = get_num_workers(get_current_client()) - self._batched_merge_and_write( + self._merge_and_write( ddf=ddf, ddf_id=ddf_id, output_path=output_path, - id_columns=id_columns, - batch_size=num_workers, + id_column=self.id_column, ) self._logger.info( f"Time taken for Encoding Jaccard Pairs = {time.time() - t0}s and output written at {output_path}" ) return output_path - def _batched_merge_and_write( - self, ddf, ddf_id, output_path, id_columns, batch_size=32 - ): - total_batches = (ddf.npartitions + batch_size - 1) // batch_size - for batch_id, offset in enumerate(range(0, ddf.npartitions, batch_size)): - st = time.time() - subset_ddf = ddf.partitions[offset : offset + batch_size] - for tag in ["x", "y"]: - pair_ids = [] - for id_col in id_columns: - pair_ids.append(f"{id_col}_{tag}") - subset_ddf = subset_ddf.merge( - ddf_id, - left_on=pair_ids, - right_on=id_columns, - how="inner", - broadcast=True, - ) - subset_ddf = subset_ddf.drop( - columns=pair_ids, - ) - subset_ddf = subset_ddf.rename( - columns={"uid": f"{self.id_column}_{tag}"} - ) - - subset_ddf = subset_ddf[[self.left_id, self.right_id, "jaccard"]] - output_batch_path = os.path.join(output_path, f"{batch_id}.parquet") - if batch_id == 0: - subset_ddf.to_parquet( - output_batch_path, write_index=False, overwrite=True - ) - else: - subset_ddf.to_parquet(output_batch_path, write_index=False, append=True) - - et = time.time() - print( - f"batch_id = {batch_id}/{total_batches}, time = {et - st}", flush=True + def _merge_and_write( + self, + ddf: dask_cudf.DataFrame, + ddf_id: dask_cudf.DataFrame, + output_path: str, + id_column: str, + ) -> None: + st = time.time() + # Ensure 'id_columns' is a list + ddf_id = ddf_id.set_index(id_column) + for tag in ["x", "y"]: + pair_id = f"{id_column}_{tag}" + # Merge 'ddf' with 'ddf_id' to map ids to uids + ddf = ddf.merge( + ddf_id, + left_on=pair_id, + right_index=True, + how="inner", + broadcast=True, ) + ddf = ddf.drop(columns=pair_id) + ddf = ddf.rename(columns={"uid": f"{self.id_column}_{tag}"}) + ddf = ddf[[self.left_id, self.right_id, "jaccard"]] + ddf.to_parquet(output_path, write_index=False, overwrite=True) + + et = time.time() + self._logger.info( + f"Time taken for merge and write = {et - st}s and output written at {output_path}" + ) @staticmethod def _get_unique_ids_per_partition(df, id_columns): @@ -1720,11 +1673,11 @@ def _get_unique_ids_per_partition(df, id_columns): for id_col in id_columns: cols_to_drop.append(f"{id_col}_{tag}") - subset_df = df[cols_to_drop].drop_duplicates() + subset_df = df[cols_to_drop].drop_duplicates(ignore_index=True) subset_df = subset_df.rename( columns={f"{id_col}_{tag}": f"{id_col}" for id_col in id_columns} ) unique_df_ls.append(subset_df) unique_df = cudf.concat(unique_df_ls, ignore_index=True) - unique_df = unique_df.drop_duplicates() + unique_df = unique_df.drop_duplicates(ignore_index=True) return unique_df diff --git a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py index e6353b78..33e37f10 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py +++ b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py @@ -32,15 +32,15 @@ def main(args): st = time.time() output_path = os.path.join(args.output_dir, "connected_components.parquet") args.enable_spilling = True - client = get_client(**ArgumentHelper.parse_client_args(args)) components_stage = ConnectedComponents( cache_dir=args.cache_dir, jaccard_pairs_path=args.jaccard_pairs_path, id_column=args.input_json_id_field, - convert_str_ids=True, jaccard_threshold=args.jaccard_threshold, + logger=args.log_dir, + profile_dir=args.profile_path, ) components_stage.cc_workflow(output_path=output_path) print(f"All done in {time.time()-st:.1f} seconds") diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index dbf6c790..38f63dbe 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -173,8 +173,8 @@ def text_bytes_aware_shuffle( output_col = "_partitions" df = df.persist() - if len(df) == 0: - return None + # if len(df) == 0: + # return None shuffle_part_ids = get_shuffle_partition_info( df=df, partition_on=partition_on,