From 2272d3d9c0aebcb726b8351550a590ec790c97b3 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 4 Feb 2025 15:41:55 -0800 Subject: [PATCH] add partition_on logic Signed-off-by: Vibhu Jawa --- nemo_curator/datasets/doc_dataset.py | 4 + nemo_curator/utils/distributed_utils.py | 46 +++++++-- tests/test_io.py | 129 ++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 7 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 6d49a998..878f1394 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -160,6 +160,7 @@ def to_json( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. @@ -170,6 +171,7 @@ def to_json( output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="jsonl", ) @@ -178,6 +180,7 @@ def to_parquet( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. @@ -188,6 +191,7 @@ def to_parquet( output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="parquet", ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index addabfd9..2a70963d 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -843,6 +843,7 @@ def write_to_disk( write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, output_type: str = "jsonl", + partition_on: Optional[str] = None, ): """ This function writes a Dask DataFrame to the specified file path. @@ -879,6 +880,11 @@ def write_to_disk( f"write_using_filename is True but no {filename_col} column found in DataFrame" ) + if partition_on is not None and write_to_filename: + raise ValueError( + "Cannot use both partition_on and write_to_filename parameters simultaneously. " + ) + if is_cudf_type(df): import cudf @@ -904,7 +910,12 @@ def write_to_disk( # output_path is a directory else: if output_type == "jsonl" or output_type == "parquet": - _write_to_jsonl_or_parquet(df, output_path, output_type) + _write_to_jsonl_or_parquet( + df, + output_path=output_path, + output_type=output_type, + partition_on=partition_on, + ) elif output_type == "bitext": if write_to_filename: os.makedirs(output_path, exist_ok=True) @@ -938,16 +949,37 @@ def _write_to_jsonl_or_parquet( df, output_path: str, output_type: Literal["jsonl", "parquet"] = "jsonl", + partition_on: Optional[str] = None, ): if output_type == "jsonl": - if is_cudf_type(df): - # See open issue here: https://github.com/rapidsai/cudf/issues/15211 - # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if partition_on is not None: + unique_values = ( + df[partition_on] + .unique() + .to_backend(backend="pandas") + .compute() + .to_list() + ) + for value in unique_values: + os.makedirs(output_path, exist_ok=True) + partition_output_path = os.path.join( + output_path, f"{partition_on}={value}" + ) + df[df[partition_on] == value].to_json( + partition_output_path, + orient="records", + lines=True, + force_ascii=False, + ) else: - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if is_cudf_type(df): + # See open issue here: https://github.com/rapidsai/cudf/issues/15211 + # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) + df.to_json(output_path, orient="records", lines=True, force_ascii=False) + else: + df.to_json(output_path, orient="records", lines=True, force_ascii=False) elif output_type == "parquet": - df.to_parquet(output_path, write_index=False) + df.to_parquet(output_path, write_index=False, partition_on=partition_on) else: raise ValueError(f"Unknown output type: {output_type}") diff --git a/tests/test_io.py b/tests/test_io.py index ca0c645b..0b294bb0 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -31,6 +31,9 @@ write_to_disk, ) from nemo_curator.utils.file_utils import get_all_files_paths_under +from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable + +cudf = gpu_only_import("cudf") def _generate_dummy_dataset(num_rows: int = 50) -> str: @@ -293,3 +296,129 @@ def test_write_single_jsonl_file(self, tmp_path): result = DocumentDataset.read_json(output_path) assert json_df.equals(result.df.compute()) + + +class TestPartitionOn: + def test_partition_on_and_write_to_filename_error(self, tmp_path): + """Verify that using partition_on and write_to_filename together raises an error.""" + # Skip cudf tests if cudf is not installed. + df = pd.DataFrame( + { + "id": [1, 2, 3], + "file_name": ["f1", "f1", "f1"], + "category": ["A", "B", "A"], + } + ) + ddf = dd.from_pandas(df, npartitions=1) + dataset = DocumentDataset(ddf) + with pytest.raises( + ValueError, + match="Cannot use both partition_on and write_to_filename parameters simultaneously.", + ): + dataset.to_json( + output_path=str(tmp_path / "output"), + write_to_filename=True, # Intentionally provided to trigger the error + partition_on="category", + ) + + @pytest.mark.parametrize("backend", ["pandas", "cudf"]) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_jsonl( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned JSONL dataset. + + The function is expected to create subdirectories in the output directory + with names of the form 'category=' for each unique partition column value. + """ + if backend == "cudf" and is_unavailable(cudf): + pytest.skip("cudf is not installed") + + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_jsonl" + dataset = DocumentDataset(ddf) + dataset.to_json(output_path=str(output_dir), partition_on="category") + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + # For each partition directory, load the JSONL files and verify that all records have the correct partition value. + # (Here we assume the files are written with extension ".part") + for part_dir in output_dir.glob("category=*"): + # The partition value is taken from the directory name. + partition_value = part_dir.name.split("=")[-1] + jsonl_files = list(part_dir.glob("*.part")) + assert ( + jsonl_files + ), f"No JSONL files found in partition directory {part_dir}" + for file in jsonl_files: + with open(file, "r") as f: + for line in f: + record = json.loads(line) + if "category" in record: + # Compare as strings, to work with both integer and string partition values. + assert ( + str(record["category"]) == partition_value + ), f"Record partition value {record['category']} does not match directory {partition_value}" + + @pytest.mark.parametrize("backend", ["pandas", "cudf"]) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_parquet( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned Parquet dataset. + + The test writes a DataFrame partitioned on the 'category' column and then reads it back + using dd.read_parquet. The output is compared (after sorting) to the original DataFrame. + """ + if backend == "cudf" and is_unavailable(cudf): + pytest.skip("cudf is not installed") + + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_parquet" + dataset = DocumentDataset(ddf) + dataset.to_parquet(output_path=str(output_dir), partition_on="category") + + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + ddf_loaded = dd.read_parquet(str(output_dir)) + df_loaded = ddf_loaded.compute().reset_index(drop=True) + df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype) + # To ensure a fair comparison, sort the dataframes by 'id' and reindex. + pd.testing.assert_frame_equal( + df.sort_values("id").reset_index(drop=True), + df_loaded.sort_values("id").reset_index(drop=True)[df.columns], + check_dtype=False, + )