From 4c8f7fcffa86fd3a0f9e4fce40239b327f84f3fd Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 30 Dec 2024 15:00:17 -0800 Subject: [PATCH 1/5] Fix image curation on latest RAPIDS Signed-off-by: Vibhu Jawa --- .../datasets/image_text_pair_dataset.py | 6 ++- nemo_curator/image/classifiers/base.py | 2 +- nemo_curator/image/embedders/base.py | 2 +- nemo_curator/utils/cudf_utils.py | 46 ------------------- 4 files changed, 7 insertions(+), 49 deletions(-) delete mode 100644 nemo_curator/utils/cudf_utils.py diff --git a/nemo_curator/datasets/image_text_pair_dataset.py b/nemo_curator/datasets/image_text_pair_dataset.py index b580015c..5097b21b 100644 --- a/nemo_curator/datasets/image_text_pair_dataset.py +++ b/nemo_curator/datasets/image_text_pair_dataset.py @@ -79,7 +79,11 @@ def from_webdataset(cls, path: str, id_col: str): path (str): The path to the WebDataset-like format on disk or cloud storage. id_col (str): The column storing the unique identifier for each record. """ - metadata = dask_cudf.read_parquet(path) + metadata = dask_cudf.read_parquet(path, split_row_groups=False) + # TODO: This is a hack to ensure that the number of partitions is not combined + # and remain the same as the number of shards. + # DEBUG: Why is this happening? + metadata = metadata.repartition(npartitions=metadata.npartitions) metadata = metadata.map_partitions(cls._sort_partition, id_col=id_col) tar_files = cls._get_tar_files(path) diff --git a/nemo_curator/image/classifiers/base.py b/nemo_curator/image/classifiers/base.py index 7ad9de01..40b5215e 100644 --- a/nemo_curator/image/classifiers/base.py +++ b/nemo_curator/image/classifiers/base.py @@ -17,9 +17,9 @@ import cudf import cupy as cp import torch +from crossfit.backend.cudf.series import create_list_series_from_1d_or_2d_ar from nemo_curator.datasets import ImageTextPairDataset -from nemo_curator.utils.cudf_utils import create_list_series_from_1d_or_2d_ar from nemo_curator.utils.distributed_utils import load_object_on_worker diff --git a/nemo_curator/image/embedders/base.py b/nemo_curator/image/embedders/base.py index d910e170..428a61f4 100644 --- a/nemo_curator/image/embedders/base.py +++ b/nemo_curator/image/embedders/base.py @@ -16,11 +16,11 @@ import cupy as cp import torch +from crossfit.backend.cudf.series import create_list_series_from_1d_or_2d_ar from tqdm import tqdm from nemo_curator.datasets import ImageTextPairDataset from nemo_curator.image.classifiers import ImageClassifier -from nemo_curator.utils.cudf_utils import create_list_series_from_1d_or_2d_ar from nemo_curator.utils.distributed_utils import load_object_on_worker diff --git a/nemo_curator/utils/cudf_utils.py b/nemo_curator/utils/cudf_utils.py deleted file mode 100644 index 2ec6c1e2..00000000 --- a/nemo_curator/utils/cudf_utils.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import cudf -import cupy as cp -from cudf.core.column import as_column - - -@staticmethod -def create_list_series_from_1d_or_2d_ar(ar, index): - """ - Create a cudf list series from 2d arrays - """ - if len(ar.shape) == 1: - n_rows, *_ = ar.shape - n_cols = 1 - elif len(ar.shape) == 2: - n_rows, n_cols = ar.shape - else: - return RuntimeError(f"Unexpected input shape: {ar.shape}") - data = as_column(ar.flatten()) - offset_col = as_column( - cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32" - ) - mask_col = cp.full(shape=n_rows, fill_value=cp.bool_(True)) - mask = cudf._lib.transform.bools_to_mask(as_column(mask_col)) - lc = cudf.core.column.ListColumn( - size=n_rows, - dtype=cudf.ListDtype(data.dtype), - mask=mask, - offset=0, - null_count=0, - children=(offset_col, data), - ) - - return cudf.Series(lc, index=index) From 6f40c5cae96f44ef1389c91a27b6a14da9b7a93d Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 1 Jan 2025 14:08:42 -0800 Subject: [PATCH 2/5] Fix alignment of the number of partitions Signed-off-by: Vibhu Jawa --- nemo_curator/datasets/image_text_pair_dataset.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nemo_curator/datasets/image_text_pair_dataset.py b/nemo_curator/datasets/image_text_pair_dataset.py index 5097b21b..9151ee97 100644 --- a/nemo_curator/datasets/image_text_pair_dataset.py +++ b/nemo_curator/datasets/image_text_pair_dataset.py @@ -79,11 +79,7 @@ def from_webdataset(cls, path: str, id_col: str): path (str): The path to the WebDataset-like format on disk or cloud storage. id_col (str): The column storing the unique identifier for each record. """ - metadata = dask_cudf.read_parquet(path, split_row_groups=False) - # TODO: This is a hack to ensure that the number of partitions is not combined - # and remain the same as the number of shards. - # DEBUG: Why is this happening? - metadata = metadata.repartition(npartitions=metadata.npartitions) + metadata = dask_cudf.read_parquet(path, split_row_groups=False, blocksize=None) metadata = metadata.map_partitions(cls._sort_partition, id_col=id_col) tar_files = cls._get_tar_files(path) From e65d3040560df99fb3a83320246f640d9278d5c5 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 9 Jan 2025 11:41:56 -0800 Subject: [PATCH 3/5] Repartition on cuDF is failing Signed-off-by: Vibhu Jawa --- nemo_curator/modules/semantic_dedup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nemo_curator/modules/semantic_dedup.py b/nemo_curator/modules/semantic_dedup.py index 6d01af4f..b93a8140 100644 --- a/nemo_curator/modules/semantic_dedup.py +++ b/nemo_curator/modules/semantic_dedup.py @@ -339,10 +339,13 @@ def __call__(self, embeddings_dataset: DocumentDataset): with performance_report_if_with_ts_suffix(self.profile_dir, "clustering-model"): embeddings_df = embeddings_df[[self.id_col, self.embedding_col]] - embeddings_df = embeddings_df.to_backend("pandas").persist() embeddings_df = embeddings_df.repartition( partition_size=self.partition_size ) + embeddings_df = embeddings_df.to_backend("pandas").persist() + # embeddings_df = embeddings_df.repartition( + # partition_size=self.partition_size + # ) embeddings_df = embeddings_df.to_backend("cudf") cupy_darr = embeddings_df.map_partitions( @@ -362,7 +365,6 @@ def __call__(self, embeddings_dataset: DocumentDataset): t0 = time.time() nearest_cents = kmeans.predict(cupy_darr) self.logger.info(f"Time taken for KMeans Predict: {time.time() - t0}") - t0 = time.time() embeddings_df["nearest_cent"] = nearest_cents.astype(np.int32) del nearest_cents From 9b81751eccd64ecd6aebcbc28aad64dc27c7fbe1 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 9 Jan 2025 11:51:47 -0800 Subject: [PATCH 4/5] Repartition on dask-pandas is failing Signed-off-by: Vibhu Jawa --- nemo_curator/modules/semantic_dedup.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nemo_curator/modules/semantic_dedup.py b/nemo_curator/modules/semantic_dedup.py index b93a8140..c8a96774 100644 --- a/nemo_curator/modules/semantic_dedup.py +++ b/nemo_curator/modules/semantic_dedup.py @@ -338,14 +338,10 @@ def __call__(self, embeddings_dataset: DocumentDataset): with performance_report_if_with_ts_suffix(self.profile_dir, "clustering-model"): embeddings_df = embeddings_df[[self.id_col, self.embedding_col]] - embeddings_df = embeddings_df.repartition( partition_size=self.partition_size ) embeddings_df = embeddings_df.to_backend("pandas").persist() - # embeddings_df = embeddings_df.repartition( - # partition_size=self.partition_size - # ) embeddings_df = embeddings_df.to_backend("cudf") cupy_darr = embeddings_df.map_partitions( From 07f18c5ebde4a02c7f914977ede9f2e007f5a81e Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 9 Jan 2025 13:24:43 -0800 Subject: [PATCH 5/5] Re-Enable Test and see if it still hangs on CI (works locally) Signed-off-by: Vibhu Jawa --- tests/test_semdedup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_semdedup.py b/tests/test_semdedup.py index a2436ca5..4cc66901 100644 --- a/tests/test_semdedup.py +++ b/tests/test_semdedup.py @@ -55,7 +55,6 @@ def gpu_client(self, request): request.cls.cluster = cluster yield - @pytest.mark.skip(reason="TODO: Hangs indefinitely with RAPIDS 24.12") def test_sem_dedup( self, dedup_data,