From 04139d0758df51cb35b160b61d48d022fb966e1d Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 8 Oct 2024 14:42:18 -0700 Subject: [PATCH 01/21] fc Signed-off-by: Praateek --- nemo_curator/datasets/doc_dataset.py | 8 +++ nemo_curator/utils/distributed_utils.py | 70 +++++++++++++++++-------- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index b3c595cf..86a3ec4e 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -43,6 +43,7 @@ def read_json( files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, + partition_size: str = "2gb", ): return cls( _read_json_or_parquet( @@ -52,6 +53,7 @@ def read_json( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + partition_size=partition_size, ) ) @@ -62,6 +64,7 @@ def read_parquet( backend="pandas", files_per_partition=1, add_filename=False, + partition_size: str = "2gb", ): return cls( _read_json_or_parquet( @@ -70,6 +73,7 @@ def read_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + partition_size=partition_size, ) ) @@ -175,6 +179,7 @@ def _read_json_or_parquet( files_per_partition: int, add_filename: bool, input_meta: Union[str, dict] = None, + partition_size: str = "2gb", ): """ `input_files` may be a list or a string type. @@ -205,6 +210,7 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + partition_size=partition_size, ) # List of directories @@ -222,6 +228,7 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + partition_size=partition_size, ) dfs.append(df) @@ -245,6 +252,7 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + partition_size=partition_size, ) else: diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 3f37eb90..75da340f 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -308,6 +308,7 @@ def read_data( file_type: str = "pickle", backend: str = "cudf", files_per_partition: int = 1, + partition_size: str = "2gb", add_filename: bool = False, input_meta: Union[str, dict] = None, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: @@ -327,35 +328,60 @@ def read_data( A Dask-cuDF or a Dask-pandas DataFrame. """ - if backend == "cudf": - # Try using cuDF. If not availible will throw an error. - test_obj = cudf.Series - if file_type == "pickle": df = read_pandas_pickle(input_files[0], add_filename=add_filename) df = dd.from_pandas(df, npartitions=16) if backend == "cudf": df = df.to_backend("cudf") - - elif file_type in ["json", "jsonl", "parquet"]: + elif file_type in {"json", "jsonl", "parquet"}: print(f"Reading {len(input_files)} files", flush=True) - input_files = sorted(input_files) - if files_per_partition > 1: - input_files = [ - input_files[i : i + files_per_partition] - for i in range(0, len(input_files), files_per_partition) - ] + + if backend == "cudf" and ( + (file_type in {"json", "jsonl"}) + or (file_type == "parquet" and not add_filename) + ): + # Try using cuDF. If not availible will throw an error. + # test_obj = cudf.Series + import dask_cudf + + if file_type in {"json", "jsonl"}: + read_func = dask_cudf.read_json + elif file_type in {"parquet"}: + read_func = dask_cudf.read_parquet + + read_kwargs = dict() + if file_type in {"json", "jsonl"}: + read_kwargs["lines"] = file_type == "jsonl" + if input_meta is not None: + read_kwargs["prune_columns"] = True + read_kwargs["dtype"] = ( + ast.literal_eval(input_meta) + if isinstance(input_meta, str) + else input_meta + ) + + if add_filename: + read_kwargs["include_path_column"] = add_filename + df = read_func(input_files, blocksize=partition_size, **read_kwargs) + else: - input_files = [[file] for file in input_files] - return dd.from_map( - read_single_partition, - input_files, - filetype=file_type, - backend=backend, - add_filename=add_filename, - input_meta=input_meta, - enforce_metadata=False, - ) + input_files = sorted(input_files) + if files_per_partition > 1: + input_files = [ + input_files[i : i + files_per_partition] + for i in range(0, len(input_files), files_per_partition) + ] + else: + input_files = [[file] for file in input_files] + return dd.from_map( + read_single_partition, + input_files, + filetype=file_type, + backend=backend, + add_filename=add_filename, + input_meta=input_meta, + enforce_metadata=False, + ) else: raise RuntimeError("Could not read data, please check file type") return df From b2de5cb751bfb8e5caf39601f728586c30ccdbc4 Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 15 Nov 2024 01:07:24 -0800 Subject: [PATCH 02/21] review comments Signed-off-by: Praateek --- nemo_curator/datasets/doc_dataset.py | 30 ++--- nemo_curator/utils/distributed_utils.py | 172 +++++++++++++++++------- 2 files changed, 134 insertions(+), 68 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index cbc59877..3d8c82af 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -43,10 +43,10 @@ def read_json( cls, input_files: Union[str, List[str]], backend: str = "pandas", - files_per_partition: int = 1, + files_per_partition: Optional[int] = None, + blocksize: Optional[str] = "1gb", add_filename: bool = False, input_meta: Union[str, dict] = None, - partition_size: str = "2gb", columns: Optional[List[str]] = None, **kwargs, ): @@ -55,10 +55,10 @@ def read_json( input_files=input_files, file_type="jsonl", backend=backend, - files_per_partition=files_per_partition, add_filename=add_filename, + files_per_partition=files_per_partition, + blocksize=blocksize, input_meta=input_meta, - partition_size=partition_size, columns=columns, **kwargs, ) @@ -69,9 +69,9 @@ def read_parquet( cls, input_files, backend="pandas", - files_per_partition=1, + files_per_partition: Optional[int] = None, + blocksize: Optional[str] = "1gb", add_filename=False, - partition_size: str = "2gb", columns: Optional[List[str]] = None, **kwargs, ): @@ -80,9 +80,9 @@ def read_parquet( input_files=input_files, file_type="parquet", backend=backend, - files_per_partition=files_per_partition, add_filename=add_filename, - partition_size=partition_size, + files_per_partition=files_per_partition, + blocksize=blocksize, columns=columns, **kwargs, ) @@ -93,8 +93,6 @@ def read_pickle( cls, input_files, backend="pandas", - files_per_partition=1, - add_filename=False, columns: Optional[List[str]] = None, **kwargs, ): @@ -103,8 +101,6 @@ def read_pickle( input_files=input_files, file_type="pickle", backend=backend, - files_per_partition=files_per_partition, - add_filename=add_filename, columns=columns, **kwargs, ) @@ -195,10 +191,10 @@ def _read_json_or_parquet( input_files: Union[str, List[str]], file_type: str, backend: str, - files_per_partition: int, add_filename: bool, + files_per_partition: Optional[int] = None, + blocksize: Optional[str] = None, input_meta: Union[str, dict] = None, - partition_size: str = "2gb", columns: Optional[List[str]] = None, **kwargs, ): @@ -229,9 +225,9 @@ def _read_json_or_parquet( file_type=file_type, backend=backend, files_per_partition=files_per_partition, + blocksize=blocksize, add_filename=add_filename, input_meta=input_meta, - partition_size=partition_size, columns=columns, **kwargs, ) @@ -249,9 +245,9 @@ def _read_json_or_parquet( file_type=file_type, backend=backend, files_per_partition=files_per_partition, + blocksize=blocksize, add_filename=add_filename, input_meta=input_meta, - partition_size=partition_size, columns=columns, **kwargs, ) @@ -275,9 +271,9 @@ def _read_json_or_parquet( file_type=file_type, backend=backend, files_per_partition=files_per_partition, + blocksize=blocksize, add_filename=add_filename, input_meta=input_meta, - partition_size=partition_size, columns=columns, **kwargs, ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 4ef00b91..3eeef227 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -22,7 +22,7 @@ from contextlib import nullcontext from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Dict, List, Literal, Optional, Union import dask.dataframe as dd import numpy as np @@ -261,6 +261,20 @@ def _set_torch_to_use_rmm(): torch.cuda.memory.change_current_allocator(rmm_torch_allocator) +def select_and_sort_columns( + df: Union[dd.DataFrame, dask_cudf.DataFrame], + columns: List[str], + add_filename: bool, +) -> Union[dd.DataFrame, dask_cudf.DataFrame]: + # TODO : Reviewer TAL if filetype check is needed + if columns is not None: + if add_filename and "filename" not in columns: + columns.append("filename") + df = df[columns] + df = df[sorted(df.columns)] + return df + + def read_single_partition( files, backend="cudf", @@ -343,13 +357,80 @@ def read_single_partition( else: df = read_f(files, **read_kwargs, **kwargs) - if filetype in ["jsonl", "json"] and columns is not None: - if add_filename and "filename" not in columns: - columns.append("filename") - df = df[columns] + print(f"Reading with {read_kwargs=}", flush=True) + return select_and_sort_columns(df, columns, add_filename) - df = df[sorted(df.columns)] - return df + +def read_data_cudf_blocksize( + input_files: List[str], + file_type: Literal["parquet", "jsonl"], + blocksize: str, + add_filename: bool = False, + input_meta: Union[str, dict] = None, + columns: Optional[List[str]] = None, + **kwargs, +) -> dask_cudf.DataFrame: + import dask_cudf + + read_kwargs = dict() + if file_type == "jsonl": + read_func = dask_cudf.read_json + read_kwargs["lines"] = True + if input_meta is not None: + read_kwargs["prune_columns"] = True + read_kwargs["dtype"] = ( + ast.literal_eval(input_meta) + if isinstance(input_meta, str) + else input_meta + ) + if add_filename: + read_kwargs["include_path_column"] = add_filename + + elif file_type == "parquet": + if add_filename: + msg = "add_filename and blocksize cannot be set at the same time for parquet files" + raise ValueError(msg) + read_func = dask_cudf.read_parquet + read_kwargs["columns"] = columns + else: + msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}" + raise ValueError(msg) + + print(f"Reading {blocksize=} with {read_kwargs=} {kwargs=}", flush=True) + df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) + return select_and_sort_columns(df, columns, add_filename) + + +def read_data_fpp( + input_files: List[str], + file_type: Literal["parquet", "json", "jsonl"], + backend: Literal["cudf", "pandas"] = "cudf", + add_filename: bool = False, + files_per_partition: Optional[int] = None, + input_meta: Union[str, dict] = None, + columns: Optional[List[str]] = None, + **kwargs, +) -> Union[dd.DataFrame, dask_cudf.DataFrame]: + input_files = sorted(input_files) + if files_per_partition > 1: + input_files = [ + input_files[i : i + files_per_partition] + for i in range(0, len(input_files), files_per_partition) + ] + else: + input_files = [[file] for file in input_files] + + return dd.from_map( + read_single_partition, + input_files, + filetype=file_type, + backend=backend, + add_filename=add_filename, + input_meta=input_meta, + enforce_metadata=False, + columns=columns, + **kwargs, + ) def read_pandas_pickle( @@ -375,11 +456,11 @@ def read_pandas_pickle( def read_data( - input_files, + input_files: Union[str, List[str]], file_type: str = "pickle", - backend: str = "cudf", - files_per_partition: int = 1, - partition_size: str = "2gb", + backend: Literal["cudf", "pandas"] = "cudf", + blocksize: Optional[str] = None, + files_per_partition: Optional[int] = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, @@ -403,6 +484,8 @@ def read_data( A Dask-cuDF or a Dask-pandas DataFrame. """ + if isinstance(input_files, str): + input_files = [input_files] if file_type == "pickle": df = read_pandas_pickle( input_files[0], add_filename=add_filename, columns=columns, **kwargs @@ -410,54 +493,41 @@ def read_data( df = dd.from_pandas(df, npartitions=16) if backend == "cudf": df = df.to_backend("cudf") + df = select_and_sort_columns(df, columns, add_filename) elif file_type in {"json", "jsonl", "parquet"}: print(f"Reading {len(input_files)} files", flush=True) - - if backend == "cudf" and ( - (file_type in {"json", "jsonl"}) - or (file_type == "parquet" and not add_filename) + if blocksize is not None and files_per_partition is not None: + msg = "blocksize and files_per_partition cannot be set at the same time" + raise ValueError(msg) + + if ( + blocksize is not None + and backend == "cudf" + and (file_type == "jsonl" or (file_type == "parquet" and not add_filename)) ): - # Try using cuDF. If not availible will throw an error. - # test_obj = cudf.Series - import dask_cudf - - if file_type in {"json", "jsonl"}: - read_func = dask_cudf.read_json - elif file_type in {"parquet"}: - read_func = dask_cudf.read_parquet - - read_kwargs = dict() - if file_type in {"json", "jsonl"}: - read_kwargs["lines"] = file_type == "jsonl" - if input_meta is not None: - read_kwargs["prune_columns"] = True - read_kwargs["dtype"] = ( - ast.literal_eval(input_meta) - if isinstance(input_meta, str) - else input_meta - ) - - if add_filename: - read_kwargs["include_path_column"] = add_filename - df = read_func(input_files, blocksize=partition_size, **read_kwargs) - + return read_data_cudf_blocksize( + input_files, + file_type=file_type, + blocksize=blocksize, + add_filename=add_filename, + input_meta=input_meta, + columns=columns, + **kwargs, + ) else: - input_files = sorted(input_files) - if files_per_partition > 1: - input_files = [ - input_files[i : i + files_per_partition] - for i in range(0, len(input_files), files_per_partition) - ] - else: - input_files = [[file] for file in input_files] - return dd.from_map( - read_single_partition, + if backend == "cudf" and ( + file_type == "jsonl" or (file_type == "parquet" and not add_filename) + ): + warnings.warn( + "Consider passing in blocksize for better control over memory usage." + ) + return read_data_fpp( input_files, - filetype=file_type, + file_type=file_type, backend=backend, add_filename=add_filename, + files_per_partition=files_per_partition, input_meta=input_meta, - enforce_metadata=False, columns=columns, **kwargs, ) From eb49b701a1cb4cf91e48b55aaf9daff816068058 Mon Sep 17 00:00:00 2001 From: Praateek Date: Sun, 17 Nov 2024 19:23:11 -0800 Subject: [PATCH 03/21] make blocksize work with parquet Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 3eeef227..eb9d3df5 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -392,6 +392,7 @@ def read_data_cudf_blocksize( raise ValueError(msg) read_func = dask_cudf.read_parquet read_kwargs["columns"] = columns + read_kwargs["aggregate_files"] = True else: msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}" raise ValueError(msg) From 386d443853da824fc5df255222b67edb205ea4b3 Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 19 Nov 2024 04:37:35 -0800 Subject: [PATCH 04/21] filetype Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 35 ++++++++++++++----------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index eb9d3df5..fe6630d1 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -16,6 +16,8 @@ import ast import os +import dask + os.environ["RAPIDS_NO_INITIALIZE"] = "1" import random import warnings @@ -264,10 +266,11 @@ def _set_torch_to_use_rmm(): def select_and_sort_columns( df: Union[dd.DataFrame, dask_cudf.DataFrame], columns: List[str], + filetype: Literal["jsonl", "json", "parquet", "pickle"], add_filename: bool, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: - # TODO : Reviewer TAL if filetype check is needed - if columns is not None: + # We exclude parquet because the parquet readers already support column selection + if columns is not None and filetype != "parquet": if add_filename and "filename" not in columns: columns.append("filename") df = df[columns] @@ -358,23 +361,23 @@ def read_single_partition( df = read_f(files, **read_kwargs, **kwargs) print(f"Reading with {read_kwargs=}", flush=True) - return select_and_sort_columns(df, columns, add_filename) + return select_and_sort_columns(df, columns, filetype, add_filename) -def read_data_cudf_blocksize( +def read_data_blocksize( input_files: List[str], + backend: Literal["cudf", "pandas"], file_type: Literal["parquet", "jsonl"], blocksize: str, add_filename: bool = False, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, -) -> dask_cudf.DataFrame: - import dask_cudf +) -> Union[dd.DataFrame, dask_cudf.DataFrame]: read_kwargs = dict() if file_type == "jsonl": - read_func = dask_cudf.read_json + read_func = dd.read_json read_kwargs["lines"] = True if input_meta is not None: read_kwargs["prune_columns"] = True @@ -390,7 +393,7 @@ def read_data_cudf_blocksize( if add_filename: msg = "add_filename and blocksize cannot be set at the same time for parquet files" raise ValueError(msg) - read_func = dask_cudf.read_parquet + read_func = dd.read_parquet read_kwargs["columns"] = columns read_kwargs["aggregate_files"] = True else: @@ -398,8 +401,9 @@ def read_data_cudf_blocksize( raise ValueError(msg) print(f"Reading {blocksize=} with {read_kwargs=} {kwargs=}", flush=True) - df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) - return select_and_sort_columns(df, columns, add_filename) + with dask.config.set({"dataframe.backend": backend}): + df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) + return select_and_sort_columns(df, columns, file_type, add_filename) def read_data_fpp( @@ -494,20 +498,19 @@ def read_data( df = dd.from_pandas(df, npartitions=16) if backend == "cudf": df = df.to_backend("cudf") - df = select_and_sort_columns(df, columns, add_filename) + df = select_and_sort_columns(df, columns, file_type, add_filename) elif file_type in {"json", "jsonl", "parquet"}: print(f"Reading {len(input_files)} files", flush=True) if blocksize is not None and files_per_partition is not None: msg = "blocksize and files_per_partition cannot be set at the same time" raise ValueError(msg) - if ( - blocksize is not None - and backend == "cudf" - and (file_type == "jsonl" or (file_type == "parquet" and not add_filename)) + if blocksize is not None and ( + file_type == "jsonl" or (file_type == "parquet" and not add_filename) ): - return read_data_cudf_blocksize( + return read_data_blocksize( input_files, + backend=backend, file_type=file_type, blocksize=blocksize, add_filename=add_filename, From aa47a37a4eeb2cf9871c5429e1b303073a9fe9ad Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 19 Nov 2024 09:22:05 -0800 Subject: [PATCH 05/21] fix merge Signed-off-by: Praateek --- nemo_curator/datasets/doc_dataset.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 76eb9b43..1ab08ca8 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -44,7 +44,7 @@ def head(self, n: int = 5) -> Any: def read_json( cls, input_files: Union[str, List[str]], - backend: str = "pandas", + backend: Literal["pandas", "cudf"] = "pandas", files_per_partition: Optional[int] = None, blocksize: Optional[str] = "1gb", add_filename: bool = False, @@ -82,8 +82,8 @@ def read_json( @classmethod def read_parquet( cls, - input_files, - backend="pandas", + input_files: Union[str, List[str]], + backend: Literal["pandas", "cudf"] = "pandas", files_per_partition: Optional[int] = None, blocksize: Optional[str] = "1gb", add_filename=False, @@ -118,8 +118,8 @@ def read_parquet( @classmethod def read_pickle( cls, - input_files, - backend="pandas", + input_files: Union[str, List[str]], + backend: Literal["pandas", "cudf"] = "pandas", columns: Optional[List[str]] = None, **kwargs, ) -> "DocumentDataset": @@ -228,7 +228,7 @@ def to_pandas(self): def _read_json_or_parquet( input_files: Union[str, List[str]], file_type: str, - backend: str, + backend: Literal["cudf", "pandas"], add_filename: bool, files_per_partition: Optional[int] = None, blocksize: Optional[str] = None, From c1ea0fbb8039b216a13e0b682ab247a3927373c5 Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 21 Nov 2024 23:16:44 -0800 Subject: [PATCH 06/21] add test cases Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 27 ++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 293b4054..6e086037 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -24,7 +24,7 @@ from contextlib import nullcontext from datetime import datetime from pathlib import Path -from typing import Dict, List, Literal, Optional, Union +from typing import Callable, Dict, List, Literal, Optional, Union import dask.dataframe as dd import numpy as np @@ -280,7 +280,7 @@ def select_and_sort_columns( if add_filename and "filename" not in columns: columns.append("filename") df = df[columns] - df = df[sorted(df.columns)] + return df @@ -366,7 +366,6 @@ def read_single_partition( else: df = read_f(files, **read_kwargs, **kwargs) - print(f"Reading with {read_kwargs=}", flush=True) return select_and_sort_columns(df, columns, filetype, add_filename) @@ -382,18 +381,35 @@ def read_data_blocksize( ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: read_kwargs = dict() + + postprocessing_func: Optional[Callable[[dd.DataFrame], dd.DataFrame]] = None if file_type == "jsonl": + if backend == "panads": + warnings.warn( + "Pandas backend with blocksize cannot read multiple JSONL files into a single partition. " + "Use files_per_partition if blocksize exceeds average file size" + ) read_func = dd.read_json read_kwargs["lines"] = True if input_meta is not None: - read_kwargs["prune_columns"] = True + if backend == "cudf": + # To save GPU memory, we prune columns while reading, and keep only those that are + # specified in the input_meta + read_kwargs["prune_columns"] = True + read_kwargs["dtype"] = ( ast.literal_eval(input_meta) if isinstance(input_meta, str) else input_meta ) if add_filename: + + def extract_filename(path: str) -> str: + return os.path.basename(path) + read_kwargs["include_path_column"] = add_filename + read_kwargs["path_converter"] = extract_filename + postprocessing_func = lambda df: df.rename(columns={"path": "filename"}) elif file_type == "parquet": if add_filename: @@ -406,9 +422,10 @@ def read_data_blocksize( msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}" raise ValueError(msg) - print(f"Reading {blocksize=} with {read_kwargs=} {kwargs=}", flush=True) with dask.config.set({"dataframe.backend": backend}): df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) + if postprocessing_func is not None: + df = postprocessing_func(df) return select_and_sort_columns(df, columns, file_type, add_filename) From 3a0f13f5e0b64d00a53dca130a214bd1549ab4dd Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 21 Nov 2024 23:17:08 -0800 Subject: [PATCH 07/21] add test file Signed-off-by: Praateek --- tests/test_read_data.py | 379 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 tests/test_read_data.py diff --git a/tests/test_read_data.py b/tests/test_read_data.py new file mode 100644 index 00000000..ecb47790 --- /dev/null +++ b/tests/test_read_data.py @@ -0,0 +1,379 @@ +import pandas as pd +import pytest + +from nemo_curator.utils.distributed_utils import read_data_blocksize, read_data_fpp + +NUM_FILES = 5 +NUM_RECORDS = 100 + + +# Fixture to create multiple small JSONL files +@pytest.fixture +def mock_multiple_jsonl_files(tmp_path): + file_paths = [] + for file_id in range(NUM_FILES): + jsonl_file = tmp_path / f"test_{file_id}.jsonl" + with open(jsonl_file, "w") as f: + for record_id in range(NUM_RECORDS): + # 100 rows are ~5kb + f.write( + f'{{"id": "id_{file_id}_{record_id}", "text": "A longish string {file_id}_{record_id}"}}\n' + ) + file_paths.append(str(jsonl_file)) + return file_paths + + +# Fixture to create multiple small Parquet files +@pytest.fixture +def mock_multiple_parquet_files(tmp_path): + file_paths = [] + for file_id in range(NUM_FILES): + # 100 rows are ~5kb + parquet_file = tmp_path / f"test_{file_id}.parquet" + df = pd.DataFrame( + [ + { + "id": f"id_{file_id}_{record_id}", + "text": f"A string {file_id}_{record_id}", + } + for record_id in range(NUM_RECORDS) + ] + ) + # We specify row_group_size so that we can test splitting a single big file into smaller chunks + df.to_parquet(parquet_file, compression=None, row_group_size=10) + file_paths.append(str(parquet_file)) + return file_paths + + +@pytest.mark.gpu +@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) +@pytest.mark.parametrize("blocksize", ["1kb", "5kb", "10kb"]) +def test_cudf_read_data_blocksize_partitioning( + mock_multiple_jsonl_files, mock_multiple_parquet_files, file_type, blocksize +): + import cudf + + input_files = ( + mock_multiple_jsonl_files + if file_type == "jsonl" + else mock_multiple_parquet_files + ) + + df = read_data_blocksize( + input_files=input_files, + backend="cudf", + file_type=file_type, + blocksize=blocksize, + add_filename=False, + input_meta=None, + columns=None, + ) + + # Compute the number of partitions in the resulting DataFrame + num_partitions = df.npartitions + # Assert that we have two partitions (since we have ~15KB total data and a blocksize of 10KB) + if blocksize == "1kb": + assert ( + num_partitions > NUM_FILES + ), f"Expected > {NUM_FILES} partitions but got {num_partitions}" + elif blocksize == "5kb": + assert ( + num_partitions == NUM_FILES + ), f"Expected {NUM_FILES} partitions but got {num_partitions}" + elif blocksize == "10kb": + assert ( + num_partitions < NUM_FILES + ), f"Expected < {NUM_FILES} partitions but got {num_partitions}" + else: + raise ValueError(f"Invalid blocksize: {blocksize}") + total_rows = len(df) + assert ( + total_rows == NUM_FILES * NUM_RECORDS + ), f"Expected {NUM_FILES * NUM_RECORDS} rows but got {total_rows}" + + assert isinstance(df["id"].compute(), cudf.Series) + + +@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) +@pytest.mark.parametrize("blocksize", ["1kb", "5kb", "10kb"]) +def test_pandas_read_data_blocksize_partitioning( + mock_multiple_jsonl_files, mock_multiple_parquet_files, file_type, blocksize +): + input_files = ( + mock_multiple_jsonl_files + if file_type == "jsonl" + else mock_multiple_parquet_files + ) + + df = read_data_blocksize( + input_files=input_files, + backend="pandas", + file_type=file_type, + blocksize=blocksize, + add_filename=False, + input_meta=None, + columns=None, + ) + + # Compute the number of partitions in the resulting DataFrame + num_partitions = df.npartitions + # Assert that we have two partitions (since we have ~15KB total data and a blocksize of 10KB) + if blocksize == "1kb": + assert ( + num_partitions > NUM_FILES + ), f"Expected > {NUM_FILES} partitions but got {num_partitions}" + elif blocksize == "5kb": + assert ( + num_partitions == NUM_FILES + ), f"Expected {NUM_FILES} partitions but got {num_partitions}" + elif blocksize == "10kb": + # Because pandas doesn't suppport reading json files together, a partition will only be as big as a single file + if file_type == "jsonl": + assert ( + num_partitions == NUM_FILES + ), f"Expected {NUM_FILES} partitions but got {num_partitions}" + # Parquet files can be read together + elif file_type == "parquet": + assert ( + num_partitions < NUM_FILES + ), f"Expected > {NUM_FILES} partitions but got {num_partitions}" + else: + raise ValueError(f"Invalid blocksize: {blocksize}") + total_rows = len(df) + assert ( + total_rows == NUM_FILES * NUM_RECORDS + ), f"Expected {NUM_FILES * NUM_RECORDS} rows but got {total_rows}" + + assert isinstance(df["id"].compute(), pd.Series) + + +@pytest.mark.parametrize( + "backend", + ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)], +) +@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) +@pytest.mark.parametrize("fpp", [1, NUM_FILES // 2, NUM_FILES, NUM_FILES * 2]) +def test_read_data_fpp_partitioning( + mock_multiple_jsonl_files, mock_multiple_parquet_files, backend, file_type, fpp +): + input_files = ( + mock_multiple_jsonl_files + if file_type == "jsonl" + else mock_multiple_parquet_files + ) + + df = read_data_fpp( + input_files=input_files, + backend=backend, + file_type=file_type, + files_per_partition=fpp, + add_filename=False, + input_meta=None, + columns=None, + ) + + # Compute the number of partitions in the resulting DataFrame + num_partitions = df.npartitions + # Assert that we have two partitions (since we have ~15KB total data and a blocksize of 10KB) + if fpp == 1: + assert ( + num_partitions == NUM_FILES + ), f"Expected {NUM_FILES} partitions but got {num_partitions}" + elif fpp == NUM_FILES // 2: + assert ( + num_partitions < NUM_FILES + ), f"Expected {NUM_FILES} partitions but got {num_partitions}" + elif fpp >= NUM_FILES: + assert num_partitions == 1, f"Expected 1 partition but got {num_partitions}" + else: + raise ValueError(f"Invalid fpp: {fpp}") + total_rows = len(df) + assert ( + total_rows == NUM_FILES * NUM_RECORDS + ), f"Expected {NUM_FILES * NUM_RECORDS} rows but got {total_rows}" + if backend == "cudf": + import cudf + + assert isinstance(df["id"].compute(), cudf.Series) + elif backend == "pandas": + assert isinstance(df["id"].compute(), pd.Series) + + +@pytest.mark.parametrize( + "backend", + [ + "pandas", + pytest.param("cudf", marks=pytest.mark.gpu), + ], +) +def test_read_data_blocksize_add_filename_jsonl(mock_multiple_jsonl_files, backend): + df = read_data_blocksize( + input_files=mock_multiple_jsonl_files, + backend=backend, + file_type="jsonl", + blocksize="128Mib", + add_filename=True, + input_meta=None, + columns=None, + ) + + assert "filename" in df.columns + file_names = df["filename"].unique().compute() + if backend == "cudf": + file_names = file_names.to_pandas() + + assert len(file_names) == NUM_FILES + assert set(file_names.values) == { + f"test_{file_id}.jsonl" for file_id in range(NUM_FILES) + } + + +@pytest.mark.parametrize( + "backend", + [ + "pandas", + pytest.param("cudf", marks=pytest.mark.gpu), + ], +) +def test_read_data_blocksize_add_filename_parquet(mock_multiple_parquet_files, backend): + with pytest.raises( + ValueError, + match="add_filename and blocksize cannot be set at the same time for parquet files", + ): + read_data_blocksize( + input_files=mock_multiple_parquet_files, + backend=backend, + file_type="parquet", + blocksize="128Mib", + add_filename=True, + input_meta=None, + columns=None, + ) + + +@pytest.mark.parametrize( + "backend,file_type", + [ + pytest.param("cudf", "jsonl", marks=pytest.mark.gpu), + pytest.param("cudf", "parquet", marks=pytest.mark.gpu), + ("pandas", "jsonl"), + pytest.param( + "pandas", + "parquet", + marks=pytest.mark.xfail( + reason="filename column inaccessible with pandas backend and parquet" + ), + ), + ], +) +def test_read_data_fpp_add_filename( + mock_multiple_jsonl_files, mock_multiple_parquet_files, backend, file_type +): + input_files = ( + mock_multiple_jsonl_files + if file_type == "jsonl" + else mock_multiple_parquet_files + ) + + df = read_data_fpp( + input_files=input_files, + backend=backend, + file_type=file_type, + files_per_partition=NUM_FILES, + add_filename=True, + input_meta=None, + columns=None, + ) + + print(f"Column names are {df.columns}") + assert "filename" in df.columns + file_names = df["filename"].unique().compute() + if backend == "cudf": + file_names = file_names.to_pandas() + + assert len(file_names) == NUM_FILES + assert set(file_names.values) == { + f"test_{file_id}.{file_type}" for file_id in range(NUM_FILES) + } + + +@pytest.mark.parametrize( + "backend", + [ + "pandas", + pytest.param("cudf", marks=pytest.mark.gpu), + ], +) +@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) +@pytest.mark.parametrize("function_name", ["read_data_blocksize", "read_data_fpp"]) +@pytest.mark.parametrize( + "cols_to_select", [["id"], ["text"], ["text", "id"], ["id", "text"]] +) +def test_read_data_select_columns( + mock_multiple_jsonl_files, + mock_multiple_parquet_files, + backend, + file_type, + function_name, + cols_to_select, +): + input_files = ( + mock_multiple_jsonl_files + if file_type == "jsonl" + else mock_multiple_parquet_files + ) + if function_name == "read_data_fpp": + func = read_data_fpp + read_kwargs = {"files_per_partition": 1} + elif function_name == "read_data_blocksize": + func = read_data_blocksize + read_kwargs = {"blocksize": "128Mib"} + + df = func( + input_files=input_files, + backend=backend, + file_type=file_type, + add_filename=False, + input_meta=None, + columns=cols_to_select, + **read_kwargs, + ) + + assert list(df.columns) == cols_to_select + + +@pytest.mark.parametrize( + "backend", + [ + "pandas", + pytest.param("cudf", marks=pytest.mark.gpu), + ], +) +@pytest.mark.parametrize("function_name", ["read_data_blocksize", "read_data_fpp"]) +@pytest.mark.parametrize( + "input_meta", [{"id": "str"}, {"text": "str"}, {"id": "str", "text": "str"}] +) +def test_read_data_input_data( + mock_multiple_jsonl_files, backend, function_name, input_meta +): + if function_name == "read_data_fpp": + func = read_data_fpp + read_kwargs = {"files_per_partition": 1} + elif function_name == "read_data_blocksize": + func = read_data_blocksize + read_kwargs = {"blocksize": "128Mib"} + + df = func( + input_files=mock_multiple_jsonl_files, + backend=backend, + file_type="jsonl", + add_filename=False, + input_meta=input_meta, + columns=None, + **read_kwargs, + ) + + if function_name == "read_data_fpp": + assert list(df.columns) == list(input_meta.keys()) + elif function_name == "read_data_blocksize": + assert list(df.columns) == ["id", "text"] From 9c6428c2413a5d7928ed82ae8e25e36751e3f57e Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 22 Nov 2024 00:05:51 -0800 Subject: [PATCH 08/21] failing test for select_columns Signed-off-by: Praateek --- docs/user-guide/bestpractices.rst | 12 ++++++++ nemo_curator/utils/distributed_utils.py | 12 ++++---- tests/test_read_data.py | 38 ++++++++++++++++++------- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/docs/user-guide/bestpractices.rst b/docs/user-guide/bestpractices.rst index bc151c68..b486c336 100644 --- a/docs/user-guide/bestpractices.rst +++ b/docs/user-guide/bestpractices.rst @@ -24,6 +24,17 @@ Handling GPU Out-of-Memory (OOM) Errors NeMo Curator is designed to be scalable with large amounts of text data, but OOM errors occur when the available GPU memory is insufficient for a given task. To help avoid these issues and ensure efficient processing, here are some strategies for managing memory usage and mitigating OOM challenges. +Controlling Partition Sizes +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks. + +#. ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. But for `parquet` files it's only available when ``add_filename=False`` + +#. For ``blocksize``, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set the blocksize to ``1gb``. + + + Utilize RMM Options ~~~~~~~~~~~~~~~~~~~ `RAPIDS Memory Manager (RMM) `_ is a package that enables you to allocate device memory in a highly configurable way. @@ -59,6 +70,7 @@ Alternatively, you can set these flags while initializing your own Dask client, client = Client(cluster) + Fuzzy Deduplication Guidelines ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Fuzzy deduplication is one of the most computationally expensive algorithms within the NeMo Curator pipeline. diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 6e086037..afbc4a9c 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -269,18 +269,20 @@ def _set_torch_to_use_rmm(): torch.cuda.memory.change_current_allocator(rmm_torch_allocator) -def select_and_sort_columns( +def select_columns( df: Union[dd.DataFrame, dask_cudf.DataFrame], columns: List[str], - filetype: Literal["jsonl", "json", "parquet", "pickle"], + filetype: Literal["jsonl", "json", "parquet"], add_filename: bool, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: # We exclude parquet because the parquet readers already support column selection - if columns is not None and filetype != "parquet": + if filetype in ["jsonl", "json"] and columns is not None: if add_filename and "filename" not in columns: columns.append("filename") df = df[columns] + df = df[sorted(df.columns)] + return df @@ -366,7 +368,7 @@ def read_single_partition( else: df = read_f(files, **read_kwargs, **kwargs) - return select_and_sort_columns(df, columns, filetype, add_filename) + return select_columns(df, columns, filetype, add_filename) def read_data_blocksize( @@ -426,7 +428,7 @@ def extract_filename(path: str) -> str: df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) if postprocessing_func is not None: df = postprocessing_func(df) - return select_and_sort_columns(df, columns, file_type, add_filename) + return select_columns(df, columns, file_type, add_filename) def read_data_fpp( diff --git a/tests/test_read_data.py b/tests/test_read_data.py index ecb47790..0841eca0 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -1,5 +1,6 @@ import pandas as pd import pytest +from zict import Func from nemo_curator.utils.distributed_utils import read_data_blocksize, read_data_fpp @@ -285,8 +286,7 @@ def test_read_data_fpp_add_filename( columns=None, ) - print(f"Column names are {df.columns}") - assert "filename" in df.columns + assert set(df.columns) == {"filename", "id", "text"} file_names = df["filename"].unique().compute() if backend == "cudf": file_names = file_names.to_pandas() @@ -304,16 +304,27 @@ def test_read_data_fpp_add_filename( pytest.param("cudf", marks=pytest.mark.gpu), ], ) -@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) -@pytest.mark.parametrize("function_name", ["read_data_blocksize", "read_data_fpp"]) @pytest.mark.parametrize( - "cols_to_select", [["id"], ["text"], ["text", "id"], ["id", "text"]] + "file_type,add_filename,function_name", + [ + *[("jsonl", True, func) for func in ["read_data_blocksize", "read_data_fpp"]], + *[("jsonl", False, func) for func in ["read_data_blocksize", "read_data_fpp"]], + *[ + ("parquet", False, func) + for func in ["read_data_blocksize", "read_data_fpp"] + ], + *[("parquet", True, "read_data_fpp")], + ], +) +@pytest.mark.parametrize( + "cols_to_select", [None, ["id"], ["text", "id"], ["id", "text"]] ) def test_read_data_select_columns( mock_multiple_jsonl_files, mock_multiple_parquet_files, backend, file_type, + add_filename, function_name, cols_to_select, ): @@ -333,13 +344,18 @@ def test_read_data_select_columns( input_files=input_files, backend=backend, file_type=file_type, - add_filename=False, + add_filename=add_filename, input_meta=None, - columns=cols_to_select, + columns=list(cols_to_select) if cols_to_select else None, **read_kwargs, ) + if not cols_to_select: + cols_to_select = ["id", "text"] - assert list(df.columns) == cols_to_select + if not add_filename: + assert list(df.columns) == sorted(cols_to_select) + else: + assert list(df.columns) == sorted(cols_to_select + ["filename"]) @pytest.mark.parametrize( @@ -373,7 +389,9 @@ def test_read_data_input_data( **read_kwargs, ) - if function_name == "read_data_fpp": + if function_name == "read_data_fpp" and backend == "cudf": assert list(df.columns) == list(input_meta.keys()) - elif function_name == "read_data_blocksize": + else: + # In the read_data_fpp case, because pandas doesn't support `prune_columns`, it'll always return all columns even if input_meta is specified + # In the `read_data_blocksize` case, `dask.read_json` also doesn't `prune_columns` so it'll always return all columns assert list(df.columns) == ["id", "text"] From 1e8a7fcfff04b879ee5584615eab14102d6fcd7a Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 22 Nov 2024 00:24:02 -0800 Subject: [PATCH 09/21] rename func name Signed-off-by: Praateek --- tests/test_read_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_read_data.py b/tests/test_read_data.py index 0841eca0..03c7669b 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -1,6 +1,5 @@ import pandas as pd import pytest -from zict import Func from nemo_curator.utils.distributed_utils import read_data_blocksize, read_data_fpp @@ -369,7 +368,7 @@ def test_read_data_select_columns( @pytest.mark.parametrize( "input_meta", [{"id": "str"}, {"text": "str"}, {"id": "str", "text": "str"}] ) -def test_read_data_input_data( +def test_read_data_input_meta( mock_multiple_jsonl_files, backend, function_name, input_meta ): if function_name == "read_data_fpp": From 2f181321a74aa597f72981071599d8342c7e89d1 Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 6 Dec 2024 06:47:53 -0800 Subject: [PATCH 10/21] add test case for different columns Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 40 ++++--- tests/test_io.py | 1 + tests/test_read_data.py | 134 ++++++++++++++++++++++-- tests/test_separate_by_metadata.py | 2 + 4 files changed, 155 insertions(+), 22 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index afbc4a9c..21e3690a 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -270,19 +270,17 @@ def _set_torch_to_use_rmm(): def select_columns( - df: Union[dd.DataFrame, dask_cudf.DataFrame], + df: Union[dd.DataFrame, pd.DataFrame, "cudf.DataFrame"], columns: List[str], filetype: Literal["jsonl", "json", "parquet"], add_filename: bool, -) -> Union[dd.DataFrame, dask_cudf.DataFrame]: +) -> Union[dd.DataFrame, pd.DataFrame, "cudf.DataFrame"]: # We exclude parquet because the parquet readers already support column selection if filetype in ["jsonl", "json"] and columns is not None: if add_filename and "filename" not in columns: columns.append("filename") df = df[columns] - df = df[sorted(df.columns)] - return df @@ -292,9 +290,9 @@ def read_single_partition( filetype: str = "jsonl", add_filename: bool = False, input_meta: Union[str, dict] = None, - columns: Optional[List[str]] = None, + io_columns: Optional[List[str]] = None, **kwargs, -) -> Union[cudf.DataFrame, pd.DataFrame]: +) -> Union["cudf.DataFrame", pd.DataFrame]: """ This function reads a file with cuDF, sorts the columns of the DataFrame and adds a "filename" column. @@ -334,7 +332,7 @@ def read_single_partition( ) elif filetype == "parquet": - read_kwargs = {"columns": columns} + read_kwargs = {"columns": io_columns} if backend == "cudf": read_f = cudf.read_parquet else: @@ -363,12 +361,14 @@ def read_single_partition( df = read_f(file, **read_kwargs, **kwargs) if add_filename: df["filename"] = os.path.basename(file) + df = select_columns(df, io_columns, filetype, add_filename) df_ls.append(df) + df = concat_f(df_ls, ignore_index=True) else: df = read_f(files, **read_kwargs, **kwargs) - - return select_columns(df, columns, filetype, add_filename) + df = select_columns(df, io_columns, filetype, add_filename) + return df def read_data_blocksize( @@ -380,7 +380,7 @@ def read_data_blocksize( input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, -) -> Union[dd.DataFrame, dask_cudf.DataFrame]: +) -> dd.DataFrame: read_kwargs = dict() @@ -419,6 +419,8 @@ def extract_filename(path: str) -> str: raise ValueError(msg) read_func = dd.read_parquet read_kwargs["columns"] = columns + # In dask_cudf >= 24.12, aggregate_files is not required, but we've kept here until + # it gets in dask (pandas) as well read_kwargs["aggregate_files"] = True else: msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}" @@ -428,7 +430,8 @@ def extract_filename(path: str) -> str: df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) if postprocessing_func is not None: df = postprocessing_func(df) - return select_columns(df, columns, file_type, add_filename) + output = select_columns(df, columns, file_type, add_filename) + return output[sorted(output.columns)] def read_data_fpp( @@ -440,7 +443,7 @@ def read_data_fpp( input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, -) -> Union[dd.DataFrame, dask_cudf.DataFrame]: +) -> dd.DataFrame: input_files = sorted(input_files) if files_per_partition > 1: input_files = [ @@ -450,7 +453,7 @@ def read_data_fpp( else: input_files = [[file] for file in input_files] - return dd.from_map( + output = dd.from_map( read_single_partition, input_files, filetype=file_type, @@ -458,9 +461,11 @@ def read_data_fpp( add_filename=add_filename, input_meta=input_meta, enforce_metadata=False, - columns=columns, + io_columns=columns, **kwargs, ) + output = output[sorted(output.columns)] + return output def read_pandas_pickle( @@ -499,7 +504,7 @@ def read_data( input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, **kwargs, -) -> Union[dd.DataFrame, dask_cudf.DataFrame]: +) -> dd.DataFrame: """ This function can read multiple data formats and returns a Dask-cuDF DataFrame. @@ -541,7 +546,10 @@ def read_data( "function with the `keep_extensions` parameter." ) - print(f"Reading {len(input_files)} files", flush=True) + print( + f"Reading {len(input_files)} files with {blocksize=} / {files_per_partition=}", + flush=True, + ) if blocksize is not None and files_per_partition is not None: msg = "blocksize and files_per_partition cannot be set at the same time" raise ValueError(msg) diff --git a/tests/test_io.py b/tests/test_io.py index b03f26f9..bc90deb3 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -240,6 +240,7 @@ def test_multifile_multi_partition(self, tmp_path, file_ext, read_f): got_df = read_f( str(tmp_path / file_ext), + blocksize=None, files_per_partition=2, backend="pandas", add_filename=True, diff --git a/tests/test_read_data.py b/tests/test_read_data.py index 03c7669b..e1e5c580 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -1,7 +1,15 @@ +import random +import tempfile + import pandas as pd import pytest -from nemo_curator.utils.distributed_utils import read_data_blocksize, read_data_fpp +from nemo_curator.utils.distributed_utils import ( + read_data, + read_data_blocksize, + read_data_fpp, +) +from nemo_curator.utils.file_utils import get_all_files_paths_under NUM_FILES = 5 NUM_RECORDS = 100 @@ -45,6 +53,48 @@ def mock_multiple_parquet_files(tmp_path): return file_paths +@pytest.fixture +def mock_multiple_jsonl_files_different_cols(tmp_path): + file_paths = [] + for file_id in range(NUM_FILES): + jsonl_file = tmp_path / f"test_diff_cols_{file_id}.jsonl" + with open(jsonl_file, "w") as f: + for record_id in range(NUM_RECORDS): + # 100 rows are ~5kb + f.write( + f'{{"col_{file_id}" : "some_col", "id": "id_{file_id}_{record_id}", "text": "A longish string {file_id}_{record_id}"}}\n' + ) + file_paths.append(str(jsonl_file)) + return file_paths + + +# Fixture to create multiple small Parquet files +@pytest.fixture +def mock_multiple_parquet_files_different_cols(tmp_path): + file_paths = [] + for file_id in range(NUM_FILES): + # 100 rows are ~5kb + parquet_file = tmp_path / f"test_diff_cols_{file_id}.parquet" + df = pd.DataFrame( + [ + { + **( + {f"col_{file_id}": "some_col"} + if file_id != 0 + else {"meta": "meta_col"} + ), + "id": f"id_{file_id}_{record_id}", + "text": f"A string {file_id}_{record_id}", + } + for record_id in range(NUM_RECORDS) + ] + ) + # We specify row_group_size so that we can test splitting a single big file into smaller chunks + df.to_parquet(parquet_file, compression=None, row_group_size=10) + file_paths.append(str(parquet_file)) + return file_paths + + @pytest.mark.gpu @pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) @pytest.mark.parametrize("blocksize", ["1kb", "5kb", "10kb"]) @@ -70,7 +120,7 @@ def test_cudf_read_data_blocksize_partitioning( ) # Compute the number of partitions in the resulting DataFrame - num_partitions = df.npartitions + num_partitions = df.optimize().npartitions # Assert that we have two partitions (since we have ~15KB total data and a blocksize of 10KB) if blocksize == "1kb": assert ( @@ -117,7 +167,7 @@ def test_pandas_read_data_blocksize_partitioning( # Compute the number of partitions in the resulting DataFrame num_partitions = df.npartitions - # Assert that we have two partitions (since we have ~15KB total data and a blocksize of 10KB) + # Our total data is ~25kb where each file is 5kb if blocksize == "1kb": assert ( num_partitions > NUM_FILES @@ -285,7 +335,7 @@ def test_read_data_fpp_add_filename( columns=None, ) - assert set(df.columns) == {"filename", "id", "text"} + assert set(df.head().columns) == {"filename", "id", "text"} file_names = df["filename"].unique().compute() if backend == "cudf": file_names = file_names.to_pandas() @@ -352,9 +402,11 @@ def test_read_data_select_columns( cols_to_select = ["id", "text"] if not add_filename: - assert list(df.columns) == sorted(cols_to_select) + # assert list(df.columns) == sorted(cols_to_select) + assert list(df.head().columns) == sorted(cols_to_select) else: - assert list(df.columns) == sorted(cols_to_select + ["filename"]) + # assert list(df.columns) == sorted(cols_to_select + ["filename"]) + assert list(df.head().columns) == sorted(cols_to_select + ["filename"]) @pytest.mark.parametrize( @@ -393,4 +445,74 @@ def test_read_data_input_meta( else: # In the read_data_fpp case, because pandas doesn't support `prune_columns`, it'll always return all columns even if input_meta is specified # In the `read_data_blocksize` case, `dask.read_json` also doesn't `prune_columns` so it'll always return all columns + # if you user wants to select subset of columns, they should use `columns` parameter assert list(df.columns) == ["id", "text"] + + +@pytest.mark.parametrize( + "backend", + [ + "pandas", + pytest.param("cudf", marks=pytest.mark.gpu), + ], +) +@pytest.mark.parametrize("file_type", ["parquet", "jsonl"]) +@pytest.mark.parametrize( + "read_kwargs", + [ + *[({"files_per_partition": fpp, "blocksize": None}) for fpp in range(1, 6)], + *[ + ({"blocksize": bs, "files_per_partition": None}) + for bs in + # ["1kb", "5kb", "10kb"] + ["128MiB", "256MiB", "512MiB"] + ], + ], +) +def test_read_data_different_columns( + mock_multiple_jsonl_files_different_cols, + mock_multiple_parquet_files_different_cols, + backend, + file_type, + read_kwargs, +): + + read_kwargs_cp = read_kwargs.copy() + # if function_name == "read_data_fpp": + # func = read_data_fpp + # # read_kwargs = {"files_per_partition": 2} + # elif function_name == "read_data_blocksize": + # func = read_data_blocksize + # # read_kwargs = {"blocksize": "1kb"} + + read_kwargs_cp["columns"] = ["adlr_id", "text"] + random.seed(0) + if file_type == "jsonl": + # input_files = mock_multiple_jsonl_files_different_cols + input_files = random.choices( + get_all_files_paths_under("/raid/prospector-lm/rpv1_json/"), k=10 + ) + + # read_kwargs_cp["input_meta"] = {"id": "str", "text": "str"} + # read_kwargs_cp["meta"] = {"id": "str", "text": "str"} + + else: + # input_files = mock_multiple_parquet_files_different_cols + input_files = random.choices( + get_all_files_paths_under("/raid/prospector-lm/rpv1_parquet/"), k=10 + ) + if backend == "cudf": + read_kwargs_cp["allow_mismatched_pq_schemas"] = True + + df = read_data( + input_files=input_files, + file_type=file_type, + backend=backend, + add_filename=False, + **read_kwargs_cp, + ) + assert list(df.columns) == ["adlr_id", "text"] + assert list(df.compute().columns) == ["adlr_id", "text"] + with tempfile.TemporaryDirectory() as tmpdir: + df.to_parquet(tmpdir) + # assert len(df) == NUM_FILES * NUM_RECORDS diff --git a/tests/test_separate_by_metadata.py b/tests/test_separate_by_metadata.py index 68b57497..232265f2 100644 --- a/tests/test_separate_by_metadata.py +++ b/tests/test_separate_by_metadata.py @@ -61,6 +61,7 @@ def test_metadatasep( str(data_dir), backend=backend, files_per_partition=files_per_partition, + blocksize=None, add_filename=True, ).df separate_by_metadata( @@ -80,6 +81,7 @@ def test_metadatasep( str(output_dir / metadata), backend=backend, files_per_partition=1, + blocksize=None, add_filename=True, ).df dfs.append(meta_df) From a540d05de53405e3cd8f4644ec85933d9adf6cac Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 6 Dec 2024 07:54:04 -0800 Subject: [PATCH 11/21] improve test for different_cols Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 4 +- tests/test_read_data.py | 148 ++++++++++++++---------- 2 files changed, 89 insertions(+), 63 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 21e3690a..3722bfd4 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -434,7 +434,7 @@ def extract_filename(path: str) -> str: return output[sorted(output.columns)] -def read_data_fpp( +def read_data_files_per_partition( input_files: List[str], file_type: Literal["parquet", "json", "jsonl"], backend: Literal["cudf", "pandas"] = "cudf", @@ -574,7 +574,7 @@ def read_data( warnings.warn( "Consider passing in blocksize for better control over memory usage." ) - return read_data_fpp( + return read_data_files_per_partition( input_files, file_type=file_type, backend=backend, diff --git a/tests/test_read_data.py b/tests/test_read_data.py index e1e5c580..00ceb1ce 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -7,7 +7,7 @@ from nemo_curator.utils.distributed_utils import ( read_data, read_data_blocksize, - read_data_fpp, + read_data_files_per_partition, ) from nemo_curator.utils.file_utils import get_all_files_paths_under @@ -57,13 +57,35 @@ def mock_multiple_parquet_files(tmp_path): def mock_multiple_jsonl_files_different_cols(tmp_path): file_paths = [] for file_id in range(NUM_FILES): - jsonl_file = tmp_path / f"test_diff_cols_{file_id}.jsonl" - with open(jsonl_file, "w") as f: - for record_id in range(NUM_RECORDS): - # 100 rows are ~5kb - f.write( - f'{{"col_{file_id}" : "some_col", "id": "id_{file_id}_{record_id}", "text": "A longish string {file_id}_{record_id}"}}\n' + jsonl_file = tmp_path / f"different_cols_test_{file_id}.jsonl" + + def make_record_without_meta(file_id, record_id): + return { + "id": f"id_{file_id}_{record_id}", + "text": f"A string {file_id}_{record_id}", + } + + def make_record_with_meta(file_id, record_id): + return { + "text": f"A string {file_id}_{record_id}", + "meta1": [ + {"field1": "field_one", "field2": "field_two"}, + ], + "id": f"id_{file_id}_{record_id}", + } + + df = pd.DataFrame( + [ + ( + make_record_without_meta(file_id, record_id) + if file_id == 0 + else make_record_with_meta(file_id, record_id) ) + for record_id in range(NUM_RECORDS) + ] + ) + + df.to_json(jsonl_file, orient="records", lines=True) file_paths.append(str(jsonl_file)) return file_paths @@ -75,21 +97,32 @@ def mock_multiple_parquet_files_different_cols(tmp_path): for file_id in range(NUM_FILES): # 100 rows are ~5kb parquet_file = tmp_path / f"test_diff_cols_{file_id}.parquet" + + def make_record_without_meta(file_id, record_id): + return { + "id": f"id_{file_id}_{record_id}", + "text": f"A string {file_id}_{record_id}", + } + + def make_record_with_meta(file_id, record_id): + return { + "text": f"A string {file_id}_{record_id}", + "meta1": [ + {"field1": "field_one", "field2": "field_two"}, + ], + "id": f"id_{file_id}_{record_id}", + } + df = pd.DataFrame( [ - { - **( - {f"col_{file_id}": "some_col"} - if file_id != 0 - else {"meta": "meta_col"} - ), - "id": f"id_{file_id}_{record_id}", - "text": f"A string {file_id}_{record_id}", - } + ( + make_record_without_meta(file_id, record_id) + if file_id == 0 + else make_record_with_meta(file_id, record_id) + ) for record_id in range(NUM_RECORDS) ] ) - # We specify row_group_size so that we can test splitting a single big file into smaller chunks df.to_parquet(parquet_file, compression=None, row_group_size=10) file_paths.append(str(parquet_file)) return file_paths @@ -212,7 +245,7 @@ def test_read_data_fpp_partitioning( else mock_multiple_parquet_files ) - df = read_data_fpp( + df = read_data_files_per_partition( input_files=input_files, backend=backend, file_type=file_type, @@ -310,9 +343,9 @@ def test_read_data_blocksize_add_filename_parquet(mock_multiple_parquet_files, b pytest.param( "pandas", "parquet", - marks=pytest.mark.xfail( - reason="filename column inaccessible with pandas backend and parquet" - ), + # marks=pytest.mark.xfail( + # reason="filename column inaccessible with pandas backend and parquet" + # ), ), ], ) @@ -325,7 +358,7 @@ def test_read_data_fpp_add_filename( else mock_multiple_parquet_files ) - df = read_data_fpp( + df = read_data_files_per_partition( input_files=input_files, backend=backend, file_type=file_type, @@ -335,7 +368,8 @@ def test_read_data_fpp_add_filename( columns=None, ) - assert set(df.head().columns) == {"filename", "id", "text"} + assert list(df.columns) == list(df.head().columns) + assert set(df.columns) == {"filename", "id", "text"} file_names = df["filename"].unique().compute() if backend == "cudf": file_names = file_names.to_pandas() @@ -356,13 +390,19 @@ def test_read_data_fpp_add_filename( @pytest.mark.parametrize( "file_type,add_filename,function_name", [ - *[("jsonl", True, func) for func in ["read_data_blocksize", "read_data_fpp"]], - *[("jsonl", False, func) for func in ["read_data_blocksize", "read_data_fpp"]], + *[ + ("jsonl", True, func) + for func in ["read_data_blocksize", "read_data_files_per_partition"] + ], + *[ + ("jsonl", False, func) + for func in ["read_data_blocksize", "read_data_files_per_partition"] + ], *[ ("parquet", False, func) - for func in ["read_data_blocksize", "read_data_fpp"] + for func in ["read_data_blocksize", "read_data_files_per_partition"] ], - *[("parquet", True, "read_data_fpp")], + *[("parquet", True, "read_data_files_per_partition")], ], ) @pytest.mark.parametrize( @@ -382,8 +422,8 @@ def test_read_data_select_columns( if file_type == "jsonl" else mock_multiple_parquet_files ) - if function_name == "read_data_fpp": - func = read_data_fpp + if function_name == "read_data_files_per_partition": + func = read_data_files_per_partition read_kwargs = {"files_per_partition": 1} elif function_name == "read_data_blocksize": func = read_data_blocksize @@ -401,12 +441,13 @@ def test_read_data_select_columns( if not cols_to_select: cols_to_select = ["id", "text"] + assert list(df.columns) == list(df.head().columns) if not add_filename: # assert list(df.columns) == sorted(cols_to_select) - assert list(df.head().columns) == sorted(cols_to_select) + assert list(df.columns) == sorted(cols_to_select) else: # assert list(df.columns) == sorted(cols_to_select + ["filename"]) - assert list(df.head().columns) == sorted(cols_to_select + ["filename"]) + assert list(df.columns) == sorted(cols_to_select + ["filename"]) @pytest.mark.parametrize( @@ -416,15 +457,17 @@ def test_read_data_select_columns( pytest.param("cudf", marks=pytest.mark.gpu), ], ) -@pytest.mark.parametrize("function_name", ["read_data_blocksize", "read_data_fpp"]) +@pytest.mark.parametrize( + "function_name", ["read_data_blocksize", "read_data_files_per_partition"] +) @pytest.mark.parametrize( "input_meta", [{"id": "str"}, {"text": "str"}, {"id": "str", "text": "str"}] ) def test_read_data_input_meta( mock_multiple_jsonl_files, backend, function_name, input_meta ): - if function_name == "read_data_fpp": - func = read_data_fpp + if function_name == "read_data_files_per_partition": + func = read_data_files_per_partition read_kwargs = {"files_per_partition": 1} elif function_name == "read_data_blocksize": func = read_data_blocksize @@ -440,7 +483,7 @@ def test_read_data_input_meta( **read_kwargs, ) - if function_name == "read_data_fpp" and backend == "cudf": + if function_name == "read_data_files_per_partition" and backend == "cudf": assert list(df.columns) == list(input_meta.keys()) else: # In the read_data_fpp case, because pandas doesn't support `prune_columns`, it'll always return all columns even if input_meta is specified @@ -463,9 +506,7 @@ def test_read_data_input_meta( *[({"files_per_partition": fpp, "blocksize": None}) for fpp in range(1, 6)], *[ ({"blocksize": bs, "files_per_partition": None}) - for bs in - # ["1kb", "5kb", "10kb"] - ["128MiB", "256MiB", "512MiB"] + for bs in ["128MiB", "256MiB", "512MiB"] ], ], ) @@ -478,29 +519,14 @@ def test_read_data_different_columns( ): read_kwargs_cp = read_kwargs.copy() - # if function_name == "read_data_fpp": - # func = read_data_fpp - # # read_kwargs = {"files_per_partition": 2} - # elif function_name == "read_data_blocksize": - # func = read_data_blocksize - # # read_kwargs = {"blocksize": "1kb"} - - read_kwargs_cp["columns"] = ["adlr_id", "text"] - random.seed(0) + read_kwargs_cp["columns"] = ["id", "text"] if file_type == "jsonl": - # input_files = mock_multiple_jsonl_files_different_cols - input_files = random.choices( - get_all_files_paths_under("/raid/prospector-lm/rpv1_json/"), k=10 - ) - - # read_kwargs_cp["input_meta"] = {"id": "str", "text": "str"} - # read_kwargs_cp["meta"] = {"id": "str", "text": "str"} + input_files = mock_multiple_jsonl_files_different_cols + read_kwargs_cp["input_meta"] = {"id": "str", "text": "str"} + read_kwargs_cp["meta"] = {"id": "str", "text": "str"} else: - # input_files = mock_multiple_parquet_files_different_cols - input_files = random.choices( - get_all_files_paths_under("/raid/prospector-lm/rpv1_parquet/"), k=10 - ) + input_files = mock_multiple_parquet_files_different_cols if backend == "cudf": read_kwargs_cp["allow_mismatched_pq_schemas"] = True @@ -511,8 +537,8 @@ def test_read_data_different_columns( add_filename=False, **read_kwargs_cp, ) - assert list(df.columns) == ["adlr_id", "text"] - assert list(df.compute().columns) == ["adlr_id", "text"] + assert list(df.columns) == ["id", "text"] + assert list(df.compute().columns) == ["id", "text"] with tempfile.TemporaryDirectory() as tmpdir: df.to_parquet(tmpdir) - # assert len(df) == NUM_FILES * NUM_RECORDS + assert len(df) == NUM_FILES * NUM_RECORDS From 999e46bef3ee6c4e747a4358a5ae50256fb529fb Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 6 Dec 2024 08:03:28 -0800 Subject: [PATCH 12/21] .. Signed-off-by: Praateek --- tests/test_read_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_read_data.py b/tests/test_read_data.py index 00ceb1ce..19027ca5 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -523,7 +523,7 @@ def test_read_data_different_columns( if file_type == "jsonl": input_files = mock_multiple_jsonl_files_different_cols read_kwargs_cp["input_meta"] = {"id": "str", "text": "str"} - read_kwargs_cp["meta"] = {"id": "str", "text": "str"} + # read_kwargs_cp["meta"] = {"id": "str", "text": "str"} else: input_files = mock_multiple_parquet_files_different_cols From 2599f26a060a125c50bc214c07015c91aa7b6633 Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 13 Dec 2024 00:01:15 -0800 Subject: [PATCH 13/21] review comments + add warnings for inconsistent schemas Signed-off-by: Praateek --- docs/user-guide/bestpractices.rst | 7 +- nemo_curator/_compat.py | 10 +++ nemo_curator/utils/distributed_utils.py | 28 ++++++ tests/test_read_data.py | 108 ++++++++++++++++-------- 4 files changed, 116 insertions(+), 37 deletions(-) diff --git a/docs/user-guide/bestpractices.rst b/docs/user-guide/bestpractices.rst index b486c336..cd9007c3 100644 --- a/docs/user-guide/bestpractices.rst +++ b/docs/user-guide/bestpractices.rst @@ -27,12 +27,11 @@ To help avoid these issues and ensure efficient processing, here are some strate Controlling Partition Sizes ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks. +The user should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks. -#. ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. But for `parquet` files it's only available when ``add_filename=False`` - -#. For ``blocksize``, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set the blocksize to ``1gb``. +#. The ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. However, for `parquet` files, it is currently only available when ``add_filename=False``. +#. For the ``blocksize`` parameter, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set ``blocksize="1GB"``. Utilize RMM Options diff --git a/nemo_curator/_compat.py b/nemo_curator/_compat.py index 26fb0574..5de25ebd 100644 --- a/nemo_curator/_compat.py +++ b/nemo_curator/_compat.py @@ -23,6 +23,15 @@ # When mocking with autodoc the dask version is not there _dask_version = parse_version("2024.06.0") + +try: + import dask_cudf + + _dask_cudf_version = parse_version(dask_cudf.__version__) +except (ImportError, TypeError): + # When mocking with autodoc the dask version is not there + _dask_cudf_version = parse_version("2024.06.0") + try: import cudf @@ -40,6 +49,7 @@ DASK_SHUFFLE_METHOD_ARG = _dask_version > parse_version("2024.1.0") DASK_P2P_ERROR = _dask_version < parse_version("2023.10.0") DASK_SHUFFLE_CAST_DTYPE = _dask_version > parse_version("2023.12.0") +DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA = _dask_version > parse_version("2024.12") # Query-planning check (and cache) _DASK_QUERY_PLANNING_ENABLED = None diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 3722bfd4..364b0761 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -18,6 +18,8 @@ import dask +from nemo_curator._compat import DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA + os.environ["RAPIDS_NO_INITIALIZE"] = "1" import random import warnings @@ -330,6 +332,10 @@ def read_single_partition( read_kwargs["dtype"] = ( ast.literal_eval(input_meta) if type(input_meta) == str else input_meta ) + # because pandas doesn't support `prune_columns`, it'll always return all columns even when input_meta is specified + # to maintain consisntency we explicitly set `io_columns` here + if backend == "pandas" and not io_columns: + io_columns = list(read_kwargs["dtype"].keys()) elif filetype == "parquet": read_kwargs = {"columns": io_columns} @@ -386,6 +392,11 @@ def read_data_blocksize( postprocessing_func: Optional[Callable[[dd.DataFrame], dd.DataFrame]] = None if file_type == "jsonl": + warnings.warn( + "If underlying jsonl data doesn't have consistent schema, reading with blocksize will fail." + "Use files_per_partition approach." + ) + if backend == "panads": warnings.warn( "Pandas backend with blocksize cannot read multiple JSONL files into a single partition. " @@ -404,6 +415,11 @@ def read_data_blocksize( if isinstance(input_meta, str) else input_meta ) + + if not columns: + # To maintain consistency with the behavior of `read_data_fpp` where passing `input_meta` + # only returns those columns, we explicitly set `columns` here + columns = list(read_kwargs["dtype"].keys()) if add_filename: def extract_filename(path: str) -> str: @@ -414,6 +430,17 @@ def extract_filename(path: str) -> str: postprocessing_func = lambda df: df.rename(columns={"path": "filename"}) elif file_type == "parquet": + if backend == "cudf" and not DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA: + warnings.warn( + "If underlying parquet data doesn't have consistent schema, reading with blocksize will fail." + "Update underlying rapids package to 25.02+ or use files_per_partition approach." + ) + elif backend == "pandas": + warnings.warn( + "If underlying parquet data doesn't have consistent column order, reading with blocksize might fail." + "Use files_per_partition approach." + ) + if add_filename: msg = "add_filename and blocksize cannot be set at the same time for parquet files" raise ValueError(msg) @@ -430,6 +457,7 @@ def extract_filename(path: str) -> str: df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) if postprocessing_func is not None: df = postprocessing_func(df) + output = select_columns(df, columns, file_type, add_filename) return output[sorted(output.columns)] diff --git a/tests/test_read_data.py b/tests/test_read_data.py index 19027ca5..d4d8b009 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -1,9 +1,9 @@ -import random import tempfile import pandas as pd import pytest +from nemo_curator._compat import DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA from nemo_curator.utils.distributed_utils import ( read_data, read_data_blocksize, @@ -343,9 +343,9 @@ def test_read_data_blocksize_add_filename_parquet(mock_multiple_parquet_files, b pytest.param( "pandas", "parquet", - # marks=pytest.mark.xfail( - # reason="filename column inaccessible with pandas backend and parquet" - # ), + marks=pytest.mark.xfail( + reason="filename column inaccessible with pandas backend and parquet" + ), ), ], ) @@ -443,10 +443,8 @@ def test_read_data_select_columns( assert list(df.columns) == list(df.head().columns) if not add_filename: - # assert list(df.columns) == sorted(cols_to_select) assert list(df.columns) == sorted(cols_to_select) else: - # assert list(df.columns) == sorted(cols_to_select + ["filename"]) assert list(df.columns) == sorted(cols_to_select + ["filename"]) @@ -483,59 +481,103 @@ def test_read_data_input_meta( **read_kwargs, ) - if function_name == "read_data_files_per_partition" and backend == "cudf": - assert list(df.columns) == list(input_meta.keys()) - else: - # In the read_data_fpp case, because pandas doesn't support `prune_columns`, it'll always return all columns even if input_meta is specified - # In the `read_data_blocksize` case, `dask.read_json` also doesn't `prune_columns` so it'll always return all columns - # if you user wants to select subset of columns, they should use `columns` parameter - assert list(df.columns) == ["id", "text"] + assert list(df.columns) == list(input_meta.keys()) + + +def xfail_inconsistent_schema_jsonl(): + return pytest.mark.xfail( + reason="inconsistent schemas are not supported with jsonl files, " + "see https://github.com/dask/dask/issues/11595" + ) @pytest.mark.parametrize( "backend", [ - "pandas", - pytest.param("cudf", marks=pytest.mark.gpu), + pytest.param("pandas"), + pytest.param("cudf", marks=[pytest.mark.gpu]), ], ) -@pytest.mark.parametrize("file_type", ["parquet", "jsonl"]) +@pytest.mark.parametrize("file_type", ["jsonl", "parquet"]) +@pytest.mark.parametrize("fpp", [1, 3, 5]) +def test_read_data_different_columns_files_per_partition( + mock_multiple_jsonl_files_different_cols, + mock_multiple_parquet_files_different_cols, + backend, + file_type, + fpp, +): + read_kwargs = {"columns": ["id", "text"]} + if file_type == "jsonl": + input_files = mock_multiple_jsonl_files_different_cols + read_kwargs["input_meta"] = {"id": "str", "text": "str"} + elif file_type == "parquet": + input_files = mock_multiple_parquet_files_different_cols + if backend == "cudf": + read_kwargs["allow_mismatched_pq_schemas"] = True + + df = read_data( + input_files=input_files, + file_type=file_type, + backend=backend, + add_filename=False, + files_per_partition=fpp, + blocksize=None, + **read_kwargs, + ) + assert list(df.columns) == ["id", "text"] + assert list(df.compute().columns) == ["id", "text"] + with tempfile.TemporaryDirectory() as tmpdir: + df.to_parquet(tmpdir) + assert len(df) == NUM_FILES * NUM_RECORDS + + @pytest.mark.parametrize( - "read_kwargs", + "backend,file_type", [ - *[({"files_per_partition": fpp, "blocksize": None}) for fpp in range(1, 6)], - *[ - ({"blocksize": bs, "files_per_partition": None}) - for bs in ["128MiB", "256MiB", "512MiB"] - ], + pytest.param( + "cudf", "jsonl", marks=[pytest.mark.gpu, xfail_inconsistent_schema_jsonl()] + ), + pytest.param("pandas", "jsonl", marks=[xfail_inconsistent_schema_jsonl()]), + pytest.param( + "cudf", + "parquet", + marks=[pytest.mark.gpu] + + ( + [xfail_inconsistent_schema_jsonl()] + if not DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA + else [] + ), + ), + pytest.param("pandas", "parquet"), ], ) -def test_read_data_different_columns( +@pytest.mark.parametrize("blocksize", ["1kb", "5kb", "10kb"]) +def test_read_data_different_columns_blocksize( mock_multiple_jsonl_files_different_cols, mock_multiple_parquet_files_different_cols, backend, file_type, - read_kwargs, + blocksize, ): - - read_kwargs_cp = read_kwargs.copy() - read_kwargs_cp["columns"] = ["id", "text"] + read_kwargs = {"columns": ["id", "text"]} + read_kwargs["columns"] = ["id", "text"] if file_type == "jsonl": input_files = mock_multiple_jsonl_files_different_cols - read_kwargs_cp["input_meta"] = {"id": "str", "text": "str"} - # read_kwargs_cp["meta"] = {"id": "str", "text": "str"} - - else: + read_kwargs["input_meta"] = {"id": "str", "text": "str"} + elif file_type == "parquet": input_files = mock_multiple_parquet_files_different_cols if backend == "cudf": - read_kwargs_cp["allow_mismatched_pq_schemas"] = True + read_kwargs["allow_mismatched_pq_schemas"] = True df = read_data( input_files=input_files, file_type=file_type, + blocksize=blocksize, + files_per_partition=None, backend=backend, add_filename=False, - **read_kwargs_cp, + **read_kwargs, ) assert list(df.columns) == ["id", "text"] assert list(df.compute().columns) == ["id", "text"] From 09ca9d9c740254de9d549415573406dcf5270382 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:02 -0800 Subject: [PATCH 14/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 364b0761..e34c65d8 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -333,7 +333,7 @@ def read_single_partition( ast.literal_eval(input_meta) if type(input_meta) == str else input_meta ) # because pandas doesn't support `prune_columns`, it'll always return all columns even when input_meta is specified - # to maintain consisntency we explicitly set `io_columns` here + # to maintain consistency we explicitly set `io_columns` here if backend == "pandas" and not io_columns: io_columns = list(read_kwargs["dtype"].keys()) From 70efd6907f51a83373c256ccf93f851cab744971 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:11 -0800 Subject: [PATCH 15/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index e34c65d8..9c686daf 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -397,10 +397,10 @@ def read_data_blocksize( "Use files_per_partition approach." ) - if backend == "panads": + if backend == "pandas": warnings.warn( "Pandas backend with blocksize cannot read multiple JSONL files into a single partition. " - "Use files_per_partition if blocksize exceeds average file size" + "Please use files_per_partition if blocksize exceeds average file size." ) read_func = dd.read_json read_kwargs["lines"] = True From 91671ec579b27ed39821bb1b08c916dbaee0f10d Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:20 -0800 Subject: [PATCH 16/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 9c686daf..ecca6726 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -393,8 +393,8 @@ def read_data_blocksize( postprocessing_func: Optional[Callable[[dd.DataFrame], dd.DataFrame]] = None if file_type == "jsonl": warnings.warn( - "If underlying jsonl data doesn't have consistent schema, reading with blocksize will fail." - "Use files_per_partition approach." + "If underlying JSONL data does not have a consistent schema, reading with blocksize will fail. " + "Please use files_per_partition approach instead." ) if backend == "pandas": From a4fcd2fc46d273a38868adae91c3cb797cb2337f Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:29 -0800 Subject: [PATCH 17/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ecca6726..6ca3e217 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -432,8 +432,8 @@ def extract_filename(path: str) -> str: elif file_type == "parquet": if backend == "cudf" and not DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA: warnings.warn( - "If underlying parquet data doesn't have consistent schema, reading with blocksize will fail." - "Update underlying rapids package to 25.02+ or use files_per_partition approach." + "If underlying Parquet data does not have consistent schema, reading with blocksize will fail. " + "Please update underlying RAPIDS package to version 25.02 or higher, or use files_per_partition approach instead." ) elif backend == "pandas": warnings.warn( From 8e4827f30a09fa42223a29c9ad3fe0a67091c809 Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:40 -0800 Subject: [PATCH 18/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 6ca3e217..a330eefe 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -437,8 +437,8 @@ def extract_filename(path: str) -> str: ) elif backend == "pandas": warnings.warn( - "If underlying parquet data doesn't have consistent column order, reading with blocksize might fail." - "Use files_per_partition approach." + "If underlying Parquet data does not have a consistent column order, reading with blocksize might fail. " + "Please use files_per_partition approach instead." ) if add_filename: From 0cd86a634c0e800a10ecb4a0c8db3ab4247abcab Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:36:51 -0800 Subject: [PATCH 19/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index a330eefe..9c206a0e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -442,7 +442,7 @@ def extract_filename(path: str) -> str: ) if add_filename: - msg = "add_filename and blocksize cannot be set at the same time for parquet files" + msg = "add_filename and blocksize cannot be set at the same time for Parquet files." raise ValueError(msg) read_func = dd.read_parquet read_kwargs["columns"] = columns From 5871b837e0cbb04a7333ab004fdba3c175998a5e Mon Sep 17 00:00:00 2001 From: Praateek Mahajan Date: Sun, 15 Dec 2024 02:37:00 -0800 Subject: [PATCH 20/21] Update nemo_curator/utils/distributed_utils.py Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Praateek Mahajan --- nemo_curator/utils/distributed_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 9c206a0e..f83144e7 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -450,7 +450,7 @@ def extract_filename(path: str) -> str: # it gets in dask (pandas) as well read_kwargs["aggregate_files"] = True else: - msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}" + msg = f"Reading with blocksize is only supported for JSONL and Parquet files, not {file_type=}" raise ValueError(msg) with dask.config.set({"dataframe.backend": backend}): From fc196d5946eb7397682f8cad89a20823daf59fdf Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 16 Dec 2024 11:04:58 -0800 Subject: [PATCH 21/21] fix tests Signed-off-by: Praateek --- tests/test_io.py | 8 ++------ tests/test_read_data.py | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index c327d4b0..432c00b3 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -113,9 +113,7 @@ def test_meta_dict(self, jsonl_dataset): output_meta = str({col: str(dtype) for col, dtype in dataset.df.dtypes.items()}) - expected_meta = ( - "{'date': 'datetime64[ns, UTC]', 'id': 'float64', 'text': 'object'}" - ) + expected_meta = "{'id': 'float64'}" assert ( output_meta == expected_meta @@ -139,9 +137,7 @@ def test_meta_str(self, jsonl_dataset): output_meta = str({col: str(dtype) for col, dtype in dataset.df.dtypes.items()}) - expected_meta = ( - "{'date': 'datetime64[ns, UTC]', 'id': 'float64', 'text': 'object'}" - ) + expected_meta = "{'id': 'float64'}" assert ( output_meta == expected_meta diff --git a/tests/test_read_data.py b/tests/test_read_data.py index d4d8b009..a619be3a 100644 --- a/tests/test_read_data.py +++ b/tests/test_read_data.py @@ -321,7 +321,7 @@ def test_read_data_blocksize_add_filename_jsonl(mock_multiple_jsonl_files, backe def test_read_data_blocksize_add_filename_parquet(mock_multiple_parquet_files, backend): with pytest.raises( ValueError, - match="add_filename and blocksize cannot be set at the same time for parquet files", + match="add_filename and blocksize cannot be set at the same time for Parquet files", ): read_data_blocksize( input_files=mock_multiple_parquet_files,