diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 6d49a998..fa042e8b 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -160,16 +160,36 @@ def to_json( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. + Writes the dataset to the specified path in JSONL format. + If `write_to_filename` is True, the DataFrame is expected to have a column + that specifies the filename for each document. This column can be named + `file_name` by default, or a custom name if `write_to_filename` is a string. + + Args: + output_path (str): The directory or file path where the dataset will be written. + write_to_filename (Union[bool, str]): Determines how filenames are handled. + - If True, uses the `file_name` column in the DataFrame to determine filenames. + - If a string, uses that string as the column name for filenames. + - If False, writes all data to the specified `output_path`. + keep_filename_column (bool): If True, retains the filename column in the output. + If False, the filename column is dropped from the output. + partition_on (Optional[str]): The column name used to partition the data. + If specified, data is partitioned based on unique values in this column, + with each partition written to a separate directory. + + For more details, refer to the `write_to_disk` function in + `nemo_curator.utils.distributed_utils`. """ write_to_disk( df=self.df, output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="jsonl", ) @@ -178,16 +198,36 @@ def to_parquet( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. + Writes the dataset to the specified path in Parquet format. + If `write_to_filename` is True, the DataFrame is expected to have a column + that specifies the filename for each document. This column can be named + `file_name` by default, or a custom name if `write_to_filename` is a string. + + Args: + output_path (str): The directory or file path where the dataset will be written. + write_to_filename (Union[bool, str]): Determines how filenames are handled. + - If True, uses the `file_name` column in the DataFrame to determine filenames. + - If a string, uses that string as the column name for filenames. + - If False, writes all data to the specified `output_path`. + keep_filename_column (bool): If True, retains the filename column in the output. + If False, the filename column is dropped from the output. + partition_on (Optional[str]): The column name used to partition the data. + If specified, data is partitioned based on unique values in this column, + with each partition written to a separate directory. + + For more details, refer to the `write_to_disk` function in + `nemo_curator.utils.distributed_utils`. """ write_to_disk( df=self.df, output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="parquet", ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index addabfd9..8f022389 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -748,6 +748,7 @@ def single_partition_write_with_filename( orient="records", lines=True, force_ascii=False, + index=False, # Only index=False is supported for orient="records" ) else: # See open issue here: https://github.com/rapidsai/cudf/issues/15211 @@ -759,6 +760,7 @@ def single_partition_write_with_filename( orient="records", lines=True, force_ascii=False, + index=False, # Only index=False is supported for orient="records" ) elif output_type == "parquet": @@ -843,6 +845,7 @@ def write_to_disk( write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, output_type: str = "jsonl", + partition_on: Optional[str] = None, ): """ This function writes a Dask DataFrame to the specified file path. @@ -857,6 +860,9 @@ def write_to_disk( If str, uses that as the filename column to write to. keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists. output_type: The type of output file to write. Can be "jsonl" or "parquet". + partition_on: The column name to partition the data on. + If specified, the data will be partitioned based on the unique values in this column, + and each partition will be written to a separate directory """ filename_col = _resolve_filename_col(write_to_filename) @@ -879,6 +885,11 @@ def write_to_disk( f"write_using_filename is True but no {filename_col} column found in DataFrame" ) + if partition_on is not None and write_to_filename: + raise ValueError( + "Cannot use both partition_on and write_to_filename parameters simultaneously. " + ) + if is_cudf_type(df): import cudf @@ -904,7 +915,12 @@ def write_to_disk( # output_path is a directory else: if output_type == "jsonl" or output_type == "parquet": - _write_to_jsonl_or_parquet(df, output_path, output_type) + _write_to_jsonl_or_parquet( + df, + output_path=output_path, + output_type=output_type, + partition_on=partition_on, + ) elif output_type == "bitext": if write_to_filename: os.makedirs(output_path, exist_ok=True) @@ -938,16 +954,50 @@ def _write_to_jsonl_or_parquet( df, output_path: str, output_type: Literal["jsonl", "parquet"] = "jsonl", + partition_on: Optional[str] = None, ): if output_type == "jsonl": - if is_cudf_type(df): - # See open issue here: https://github.com/rapidsai/cudf/issues/15211 - # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if partition_on is not None: + unique_values = ( + df[partition_on] + .unique() + .to_backend(backend="pandas") + .compute() + .to_list() + ) + for value in unique_values: + os.makedirs(output_path, exist_ok=True) + partition_output_path = os.path.join( + output_path, f"{partition_on}={value}" + ) + df[df[partition_on] == value].to_json( + partition_output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, # Only index=False is supported for orient="records" + ) else: - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if is_cudf_type(df): + # See open issue here: https://github.com/rapidsai/cudf/issues/15211 + # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) + df.to_json( + output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, + ) # Only index=False is supported for orient="records" + else: + df.to_json( + output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, + ) # Only index=False is supported for orient="records" elif output_type == "parquet": - df.to_parquet(output_path, write_index=False) + df.to_parquet(output_path, write_index=False, partition_on=partition_on) else: raise ValueError(f"Unknown output type: {output_type}") diff --git a/tests/test_io.py b/tests/test_io.py index ca0c645b..1efe0569 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -293,3 +293,127 @@ def test_write_single_jsonl_file(self, tmp_path): result = DocumentDataset.read_json(output_path) assert json_df.equals(result.df.compute()) + + +class TestPartitionOn: + def test_partition_on_and_write_to_filename_error(self, tmp_path): + """Verify that using partition_on and write_to_filename together raises an error.""" + df = pd.DataFrame( + { + "id": [1, 2, 3], + "file_name": ["f1", "f1", "f1"], + "category": ["A", "B", "A"], + } + ) + ddf = dd.from_pandas(df, npartitions=1) + dataset = DocumentDataset(ddf) + with pytest.raises( + ValueError, + match="Cannot use both partition_on and write_to_filename parameters simultaneously.", + ): + dataset.to_json( + output_path=str(tmp_path / "output"), + write_to_filename=True, # Intentionally provided to trigger the error + partition_on="category", + ) + + @pytest.mark.parametrize( + "backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] + ) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_jsonl( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned JSONL dataset. + + The function is expected to create subdirectories in the output directory + with names of the form 'category=' for each unique partition column value. + """ + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_jsonl" + dataset = DocumentDataset(ddf) + dataset.to_json(output_path=str(output_dir), partition_on="category") + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + # For each partition directory, load the JSONL files and verify that all records have the correct partition value. + # (Here we assume the files are written with extension ".part") + for part_dir in output_dir.glob("category=*"): + # The partition value is taken from the directory name. + partition_value = part_dir.name.split("=")[-1] + jsonl_files = list(part_dir.glob("*.part")) + assert ( + jsonl_files + ), f"No JSONL files found in partition directory {part_dir}" + for file in jsonl_files: + with open(file, "r") as f: + for line in f: + record = json.loads(line) + if "category" in record: + # Compare as strings, to work with both integer and string partition values. + assert ( + str(record["category"]) == partition_value + ), f"Record partition value {record['category']} does not match directory {partition_value}" + + @pytest.mark.parametrize( + "backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] + ) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_parquet( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned Parquet dataset. + + The test writes a DataFrame partitioned on the 'category' column and then reads it back + using dd.read_parquet. The output is compared (after sorting) to the original DataFrame. + """ + + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_parquet" + dataset = DocumentDataset(ddf) + dataset.to_parquet(output_path=str(output_dir), partition_on="category") + + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + ddf_loaded = dd.read_parquet(str(output_dir)) + df_loaded = ddf_loaded.compute().reset_index(drop=True) + df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype) + # To ensure a fair comparison, sort the dataframes by 'id' and reindex. + pd.testing.assert_frame_equal( + df.sort_values("id").reset_index(drop=True), + df_loaded.sort_values("id").reset_index(drop=True)[df.columns], + check_dtype=False, + )