Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip reading files with incorrect extension #318

Merged
merged 14 commits into from
Nov 18, 2024
79 changes: 58 additions & 21 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from typing import List, Optional, Union

import dask.dataframe as dd
Expand All @@ -35,7 +36,7 @@ def __len__(self):
def persist(self):
return DocumentDataset(self.df.persist())

def head(self, n=5):
def head(self, n: int = 5):
return self.df.head(n)

@classmethod
Expand All @@ -49,6 +50,19 @@ def read_json(
columns: Optional[List[str]] = None,
**kwargs,
):
"""
Read JSONL or JSONL file(s).

Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.

"""
return cls(
_read_json_or_parquet(
input_files=input_files,
Expand All @@ -65,13 +79,25 @@ def read_json(
@classmethod
def read_parquet(
cls,
input_files,
backend="pandas",
files_per_partition=1,
add_filename=False,
input_files: Union[str, List[str]],
backend: str = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
):
"""
Read Parquet file(s).

Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.

"""
return cls(
_read_json_or_parquet(
input_files=input_files,
Expand All @@ -87,13 +113,24 @@ def read_parquet(
@classmethod
def read_pickle(
cls,
input_files,
backend="pandas",
files_per_partition=1,
add_filename=False,
input_files: Union[str, List[str]],
backend: str = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
):
"""
Read Pickle file(s).

Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
columns: If not None, only these columns will be read from the file.

"""
return cls(
read_data(
input_files=input_files,
Expand All @@ -108,12 +145,12 @@ def read_pickle(

def to_json(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_file_dir: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.

"""
write_to_disk(
Expand All @@ -126,12 +163,12 @@ def to_json(

def to_parquet(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_file_dir: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.

"""
write_to_disk(
Expand All @@ -144,8 +181,8 @@ def to_parquet(

def to_pickle(
self,
output_file_dir,
write_to_filename=False,
output_file_dir: str,
write_to_filename: bool = False,
):
raise NotImplementedError("DocumentDataset does not support to_pickle yet")

Expand Down Expand Up @@ -217,8 +254,8 @@ def _read_json_or_parquet(
file_ext = "." + file_type

if isinstance(input_files, list):
# List of jsonl or parquet files
if all(f.endswith(file_ext) for f in input_files):
# List of files
if all(os.path.isfile(f) for f in input_files):
raw_data = read_data(
input_files,
file_type=file_type,
Expand Down
107 changes: 76 additions & 31 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,23 @@ def _enable_spilling():


def start_dask_gpu_local_cluster(
nvlink_only=False,
protocol="tcp",
rmm_pool_size="1024M",
enable_spilling=True,
set_torch_to_use_rmm=True,
rmm_async=True,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_release_threshold=None,
nvlink_only: bool = False,
protocol: str = "tcp",
rmm_pool_size: Optional[Union[int, str]] = "1024M",
enable_spilling: bool = True,
set_torch_to_use_rmm: bool = True,
rmm_async: bool = True,
rmm_maximum_pool_size: Optional[Union[int, str]] = None,
rmm_managed_memory: bool = False,
rmm_release_threshold: Optional[Union[int, str]] = None,
**cluster_kwargs,
) -> Client:
"""
This function sets up a Dask cluster across all the
GPUs present on the machine.

See get_client function for parameters.

"""
extra_kwargs = (
{
Expand Down Expand Up @@ -111,12 +113,16 @@ def start_dask_gpu_local_cluster(


def start_dask_cpu_local_cluster(
n_workers=os.cpu_count(), threads_per_worker=1, **cluster_kwargs
n_workers: Optional[int] = os.cpu_count(),
threads_per_worker: int = 1,
**cluster_kwargs,
) -> Client:
"""
This function sets up a Dask cluster across all the
CPUs present on the machine.

See get_client function for parameters.

"""
cluster = LocalCluster(
n_workers=n_workers,
Expand Down Expand Up @@ -262,10 +268,10 @@ def _set_torch_to_use_rmm():


def read_single_partition(
files,
backend="cudf",
filetype="jsonl",
add_filename=False,
files: List[str],
backend: str = "cudf",
filetype: str = "jsonl",
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand Down Expand Up @@ -353,14 +359,18 @@ def read_single_partition(


def read_pandas_pickle(
file, add_filename=False, columns=None, **kwargs
file: str,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> pd.DataFrame:
"""
This function reads a pickle file with Pandas.

Args:
file: The path to the pickle file to read.
add_filename: Whether to add a "filename" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
Returns:
A Pandas DataFrame.

Expand All @@ -375,7 +385,7 @@ def read_pandas_pickle(


def read_data(
input_files,
input_files: Union[str, List[str]],
file_type: str = "pickle",
backend: str = "cudf",
files_per_partition: int = 1,
Expand Down Expand Up @@ -406,6 +416,9 @@ def read_data(
# Try using cuDF. If not availible will throw an error.
test_obj = cudf.Series

if isinstance(input_files, str):
input_files = [input_files]

if file_type == "pickle":
df = read_pandas_pickle(
input_files[0], add_filename=add_filename, columns=columns, **kwargs
Expand All @@ -415,15 +428,30 @@ def read_data(
df = df.to_backend("cudf")

elif file_type in ["json", "jsonl", "parquet"]:
assert len(input_files) > 0

input_extensions = {os.path.splitext(f)[-1] for f in input_files}
if len(input_extensions) != 1:
raise RuntimeError(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example of when we would expect this RuntimeError is for:

doc = DocumentDataset.read_json(in_files)

Where in_files is a string path to a directory with multiple JSONL files and a CRC file. Since the CRC file is not explicitly being filtered out, we raise the error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this as is for now.
In theory there might be cases where a user filters by [.json, .jsonl] using the file filter, but will raise errors here. In practice I expect it to be unlikely so we can wait an see if there is any user feedback around this.

"All files being read must have the same file type. "
"Please check your input directory or list of files to ensure this. "
"To generate a list of files with a given file type in your directory, "
"please use the nemo_curator.utils.file_utils.get_all_files_paths_under "
"function with the filter_by parameter."
)

print(f"Reading {len(input_files)} files", flush=True)
input_files = sorted(input_files)

if files_per_partition > 1:
input_files = [
input_files[i : i + files_per_partition]
for i in range(0, len(input_files), files_per_partition)
]

else:
input_files = [[file] for file in input_files]

return dd.from_map(
read_single_partition,
input_files,
Expand All @@ -435,8 +463,10 @@ def read_data(
columns=columns,
**kwargs,
)

else:
raise RuntimeError("Could not read data, please check file type")

return df


Expand Down Expand Up @@ -496,18 +526,18 @@ def process_all_batches(

def single_partition_write_with_filename(
df,
output_file_dir,
keep_filename_column=False,
output_type="jsonl",
output_file_dir: str,
keep_filename_column: bool = False,
output_type: str = "jsonl",
):
"""
This function processes a DataFrame and writes it to disk

Args:
df: A DataFrame.
output_file_dir: The output file path.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".
Returns:
If the DataFrame is non-empty, return a Series containing a single element, True.
If the DataFrame is empty, return a Series containing a single element, False.
Expand Down Expand Up @@ -575,10 +605,10 @@ def single_partition_write_with_filename(

def write_to_disk(
df,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_type="jsonl",
output_file_dir: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
output_type: str = "jsonl",
):
"""
This function writes a Dask DataFrame to the specified file path.
Expand All @@ -588,9 +618,9 @@ def write_to_disk(
Args:
df: A Dask DataFrame.
output_file_dir: The output file path.
write_to_filename: Whether to write the filename using the "filename" column.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
write_to_filename: Boolean representing whether to write the filename using the "filename" column.
keep_filename_column: Boolean representing whether to keep or drop the "filename" column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".

"""
if write_to_filename and "filename" not in df.columns:
Expand Down Expand Up @@ -665,7 +695,7 @@ def load_object_on_worker(attr, load_object_function, load_object_kwargs):
return obj


def offload_object_on_worker(attr):
def offload_object_on_worker(attr: str):
"""
This function deletes an existing attribute from a Dask worker.

Expand Down Expand Up @@ -702,7 +732,19 @@ def get_current_client():
return None


def performance_report_if(path=None, report_name="dask-profile.html"):
def performance_report_if(
path: Optional[str] = None, report_name: str = "dask-profile.html"
):
"""
Generates a performance report if a valid path is provided, or returns a
no-op context manager if not.

Args:
path: The directory path where the performance report should be saved.
If None, no report is generated.
report_name: The name of the report file.

"""
if path is not None:
return performance_report(os.path.join(path, report_name))
else:
Expand All @@ -712,7 +754,10 @@ def performance_report_if(path=None, report_name="dask-profile.html"):
def performance_report_if_with_ts_suffix(
path: Optional[str] = None, report_name: str = "dask-profile"
):
"""Suffixes the report_name with the timestamp"""
"""
Same as performance_report_if, except it suffixes the report_name with the timestamp.

"""
return performance_report_if(
path=path,
report_name=f"{report_name}-{datetime.now().strftime('%Y%m%d_%H%M%S')}.html",
Expand Down
Loading