Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Partition On Logic #519

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,36 @@ def to_json(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Writes the dataset to the specified path in JSONL format.

If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.

Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.

For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="jsonl",
)

Expand All @@ -178,16 +198,36 @@ def to_parquet(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Writes the dataset to the specified path in Parquet format.

If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.

Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.

For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="parquet",
)

Expand Down
64 changes: 57 additions & 7 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ def single_partition_write_with_filename(
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)
else:
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
Expand All @@ -759,6 +760,7 @@ def single_partition_write_with_filename(
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)

elif output_type == "parquet":
Expand Down Expand Up @@ -843,6 +845,7 @@ def write_to_disk(
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
output_type: str = "jsonl",
partition_on: Optional[str] = None,
):
"""
This function writes a Dask DataFrame to the specified file path.
Expand All @@ -857,6 +860,9 @@ def write_to_disk(
If str, uses that as the filename column to write to.
keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".
partition_on: The column name to partition the data on.
If specified, the data will be partitioned based on the unique values in this column,
and each partition will be written to a separate directory
"""

filename_col = _resolve_filename_col(write_to_filename)
Expand All @@ -879,6 +885,11 @@ def write_to_disk(
f"write_using_filename is True but no {filename_col} column found in DataFrame"
)

if partition_on is not None and write_to_filename:
raise ValueError(
"Cannot use both partition_on and write_to_filename parameters simultaneously. "
)

if is_cudf_type(df):
import cudf

Expand All @@ -904,7 +915,12 @@ def write_to_disk(
# output_path is a directory
else:
if output_type == "jsonl" or output_type == "parquet":
_write_to_jsonl_or_parquet(df, output_path, output_type)
_write_to_jsonl_or_parquet(
df,
output_path=output_path,
output_type=output_type,
partition_on=partition_on,
)
elif output_type == "bitext":
if write_to_filename:
os.makedirs(output_path, exist_ok=True)
Expand Down Expand Up @@ -938,16 +954,50 @@ def _write_to_jsonl_or_parquet(
df,
output_path: str,
output_type: Literal["jsonl", "parquet"] = "jsonl",
partition_on: Optional[str] = None,
):
if output_type == "jsonl":
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if partition_on is not None:
unique_values = (
df[partition_on]
.unique()
.to_backend(backend="pandas")
.compute()
.to_list()
)
for value in unique_values:
os.makedirs(output_path, exist_ok=True)
partition_output_path = os.path.join(
output_path, f"{partition_on}={value}"
)
df[df[partition_on] == value].to_json(
partition_output_path,
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)
else:
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(
output_path,
orient="records",
lines=True,
force_ascii=False,
index=False,
) # Only index=False is supported for orient="records"
else:
df.to_json(
output_path,
orient="records",
lines=True,
force_ascii=False,
index=False,
) # Only index=False is supported for orient="records"
elif output_type == "parquet":
df.to_parquet(output_path, write_index=False)
df.to_parquet(output_path, write_index=False, partition_on=partition_on)
else:
raise ValueError(f"Unknown output type: {output_type}")

Expand Down
124 changes: 124 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,127 @@ def test_write_single_jsonl_file(self, tmp_path):

result = DocumentDataset.read_json(output_path)
assert json_df.equals(result.df.compute())


class TestPartitionOn:
def test_partition_on_and_write_to_filename_error(self, tmp_path):
"""Verify that using partition_on and write_to_filename together raises an error."""
df = pd.DataFrame(
{
"id": [1, 2, 3],
"file_name": ["f1", "f1", "f1"],
"category": ["A", "B", "A"],
}
)
ddf = dd.from_pandas(df, npartitions=1)
dataset = DocumentDataset(ddf)
with pytest.raises(
ValueError,
match="Cannot use both partition_on and write_to_filename parameters simultaneously.",
):
dataset.to_json(
output_path=str(tmp_path / "output"),
write_to_filename=True, # Intentionally provided to trigger the error
partition_on="category",
)

@pytest.mark.parametrize(
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
)
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_jsonl(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned JSONL dataset.

The function is expected to create subdirectories in the output directory
with names of the form 'category=<value>' for each unique partition column value.
"""
df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_jsonl"
dataset = DocumentDataset(ddf)
dataset.to_json(output_path=str(output_dir), partition_on="category")
# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

# For each partition directory, load the JSONL files and verify that all records have the correct partition value.
# (Here we assume the files are written with extension ".part")
for part_dir in output_dir.glob("category=*"):
# The partition value is taken from the directory name.
partition_value = part_dir.name.split("=")[-1]
jsonl_files = list(part_dir.glob("*.part"))
assert (
jsonl_files
), f"No JSONL files found in partition directory {part_dir}"
for file in jsonl_files:
with open(file, "r") as f:
for line in f:
record = json.loads(line)
if "category" in record:
# Compare as strings, to work with both integer and string partition values.
assert (
str(record["category"]) == partition_value
), f"Record partition value {record['category']} does not match directory {partition_value}"

@pytest.mark.parametrize(
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
)
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_parquet(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned Parquet dataset.

The test writes a DataFrame partitioned on the 'category' column and then reads it back
using dd.read_parquet. The output is compared (after sorting) to the original DataFrame.
"""

df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_parquet"
dataset = DocumentDataset(ddf)
dataset.to_parquet(output_path=str(output_dir), partition_on="category")

# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

ddf_loaded = dd.read_parquet(str(output_dir))
df_loaded = ddf_loaded.compute().reset_index(drop=True)
df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype)
# To ensure a fair comparison, sort the dataframes by 'id' and reindex.
pd.testing.assert_frame_equal(
df.sort_values("id").reset_index(drop=True),
df_loaded.sort_values("id").reset_index(drop=True)[df.columns],
check_dtype=False,
)