From 1dab545e2cd84f2ce57bee96abb6280ab8b481ad Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 6 Feb 2025 13:19:06 -0800 Subject: [PATCH] Enable ADD ID to work with CPU/GPU both (#479) * Enable ADD ID to work with CPU/GPU both Signed-off-by: Vibhu Jawa * Make Test runable in a CPU only environment Signed-off-by: Vibhu Jawa * Fix pytest skipping behavior in CPU/GPU environment Signed-off-by: Vibhu Jawa * Raise error instead of skipping test Signed-off-by: Vibhu Jawa --------- Signed-off-by: Vibhu Jawa --- nemo_curator/modules/add_id.py | 7 ++++-- nemo_curator/scripts/add_id.py | 3 ++- tests/test_add_id.py | 41 ++++++++++++++++++++++++++-------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/nemo_curator/modules/add_id.py b/nemo_curator/modules/add_id.py index e7e733fc..eca677c4 100644 --- a/nemo_curator/modules/add_id.py +++ b/nemo_curator/modules/add_id.py @@ -39,8 +39,9 @@ def call(self, dataset: DocumentDataset) -> DocumentDataset: return self._add_id_ordered(dataset) def _add_id_fast(self, dataset: DocumentDataset) -> DocumentDataset: - meta = dataset.df.dtypes.to_dict() + meta = dataset.df._meta.copy() meta[self.id_field] = "string" + meta[self.id_field] = meta[self.id_field].astype("string") partition_zero_padding = count_digits(dataset.df.npartitions) id_df = dataset.df.map_partitions( @@ -61,12 +62,14 @@ def _add_id_fast_partition(self, partition, global_padding, partition_info=None) for local_id in range(len(partition)) ] partition[self.id_field] = id_column + partition[self.id_field] = partition[self.id_field].astype("string") return partition def _add_id_ordered(self, dataset: DocumentDataset) -> DocumentDataset: - original_meta = dataset.df.dtypes.to_dict() + original_meta = dataset.df._meta.copy() original_meta[self.id_field] = "string" + original_meta[self.id_field] = original_meta[self.id_field].astype("string") delayed_dataset = dataset.df.to_delayed() parition_lengths = [0] diff --git a/nemo_curator/scripts/add_id.py b/nemo_curator/scripts/add_id.py index c926e36d..2a856af0 100644 --- a/nemo_curator/scripts/add_id.py +++ b/nemo_curator/scripts/add_id.py @@ -28,6 +28,7 @@ def main(args): client = get_client(**ArgumentHelper.parse_client_args(args)) + backend = "cudf" if args.device == "gpu" else "pandas" output_dir = expand_outdir_and_mkdir(args.output_data_dir) files = get_all_files_paths_under(args.input_data_dir) if args.shuffle: @@ -36,7 +37,7 @@ def main(args): dataset = DocumentDataset( read_data( - files, file_type=args.input_file_type, backend="pandas", add_filename=True + files, file_type=args.input_file_type, backend=backend, add_filename=True ) ) add_id = nemo_curator.AddId( diff --git a/tests/test_add_id.py b/tests/test_add_id.py index 42a8575e..c33c5e4a 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -18,26 +18,37 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable +cudf = gpu_only_import("cudf") +is_cudf_available = not is_unavailable(cudf) -def list_to_dataset(documents, col_name="text", npartitions=2): + +def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas"): data = {col_name: documents} pdf = pd.DataFrame(data) - - return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + ddf = dd.from_pandas(pdf, npartitions=npartitions) + if backend == "cudf" and is_unavailable(cudf): + raise ImportError("cuDF is not installed or importable.") + ddf = ddf.to_backend(backend) + return DocumentDataset(ddf) -@pytest.fixture -def single_partition_dataset(): +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) +def single_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=1 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=1, + backend=request.param, ) -@pytest.fixture -def two_partition_dataset(): +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) +def two_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=2 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=2, + backend=request.param, ) @@ -56,6 +67,8 @@ def test_basic_id(self, single_partition_dataset): "doc_id-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -75,6 +88,8 @@ def test_two_partitions(self, two_partition_dataset): "doc_id-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -95,6 +110,8 @@ def test_id_prefix(self, two_partition_dataset): f"{id_prefix}-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -115,6 +132,8 @@ def test_start_index(self, two_partition_dataset): "doc_id-0000000017", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -134,6 +153,8 @@ def test_fast_id_single_partition(self, single_partition_dataset): "doc_id-40", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -153,6 +174,8 @@ def test_fast_id_two_partitions(self, two_partition_dataset): "doc_id-11", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids