diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index a97aa196..32b23114 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Union + import dask.dataframe as dd from nemo_curator.utils.distributed_utils import read_data, write_to_disk @@ -36,10 +38,11 @@ def persist(self): @classmethod def read_json( cls, - input_files, - backend="pandas", - files_per_partition=1, - add_filename=False, + input_files: Union[str, List[str]], + backend: str = "pandas", + files_per_partition: int = 1, + add_filename: bool = False, + input_meta: Union[str, dict] = None, ): return cls( _read_json_or_parquet( @@ -48,6 +51,7 @@ def read_json( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) ) @@ -77,16 +81,16 @@ def read_pickle( files_per_partition=1, add_filename=False, ): - raw_data = read_data( - input_files=input_files, - file_type="pickle", - backend=backend, - files_per_partition=files_per_partition, - add_filename=add_filename, + return cls( + read_data( + input_files=input_files, + file_type="pickle", + backend=backend, + files_per_partition=files_per_partition, + add_filename=add_filename, + ) ) - return cls(raw_data) - def to_json( self, output_file_dir, @@ -128,11 +132,12 @@ def to_pickle( def _read_json_or_parquet( - input_files, - file_type, - backend, - files_per_partition, - add_filename, + input_files: Union[str, List[str]], + file_type: str, + backend: str, + files_per_partition: int, + add_filename: bool, + input_meta: Union[str, dict] = None, ): """ `input_files` may be a list or a string type. @@ -162,6 +167,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) # List of directories @@ -178,6 +184,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) dfs.append(df) @@ -200,6 +207,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) else: diff --git a/nemo_curator/download/doc_builder.py b/nemo_curator/download/doc_builder.py index 8bdf3e30..34331875 100644 --- a/nemo_curator/download/doc_builder.py +++ b/nemo_curator/download/doc_builder.py @@ -15,7 +15,7 @@ import importlib import os from abc import ABC, abstractmethod -from typing import List, Tuple +from typing import List, Tuple, Union import dask.dataframe as dd import pandas as pd @@ -111,6 +111,7 @@ def _download_and_extract_single_partition( output_type: str, keep_raw_download: bool, force_download: bool, + input_meta: Union[str, dict] = None, ) -> pd.DataFrame: url, output_path = paths @@ -158,6 +159,7 @@ def download_and_extract( output_type: str = "jsonl", keep_raw_download=False, force_download=False, + input_meta: Union[str, dict] = None, ) -> DocumentDataset: """ Downloads and extracts a dataset into a format accepted by the NeMo Curator @@ -174,6 +176,8 @@ def download_and_extract( keep_raw_download: Whether to keep the pre-extracted download file. force_download: If False, will skip processing all files in output_paths that already exist and directly read from them instead. + input_meta: A dictionary or a string formatted as a dictionary, which outlines + the field names and their respective data types within the JSONL input file. Returns: A DocumentDataset of the downloaded data @@ -192,6 +196,7 @@ def download_and_extract( keep_raw_download=keep_raw_download, force_download=force_download, enforce_metadata=False, + input_meta=input_meta, meta=output_format, ) diff --git a/nemo_curator/scripts/download_and_extract.py b/nemo_curator/scripts/download_and_extract.py index c0a524f2..d87ef9ad 100644 --- a/nemo_curator/scripts/download_and_extract.py +++ b/nemo_curator/scripts/download_and_extract.py @@ -15,7 +15,7 @@ import argparse import os -from nemo_curator.download import batch_download, download_and_extract +from nemo_curator.download.doc_builder import batch_download, download_and_extract from nemo_curator.utils.config_utils import build_downloader from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.file_utils import ( @@ -77,6 +77,7 @@ def main(args): output_format, keep_raw_download=args.keep_downloaded_files, force_download=args.overwrite_existing_json, + input_meta=args.input_meta, ) # Sample to trigger the dask computation @@ -120,6 +121,13 @@ def attach_args( required=False, help="Path to input data directory", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) parser.add_argument( "--output-json-dir", type=str, diff --git a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py index 6b5710b0..39911de5 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py +++ b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py @@ -48,6 +48,7 @@ def main(args): blocksize=args.text_ddf_blocksize, id_column=args.input_json_id_field, text_column=args.input_json_text_field, + input_meta=args.input_meta, ) print( "Graph creation for get_text_ddf_from_json_path_with_blocksize" " complete.", @@ -86,6 +87,13 @@ def attach_args(parser=None): type=str, help="The directory containing anchor docs with bk files", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) parser.add_argument( "--text-ddf-blocksize", type=int, @@ -115,6 +123,7 @@ def attach_args(parser=None): type=int, help="The number of bucket parts to process per worker per batch", ) + return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py index e7b69bb9..07d92791 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py +++ b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py @@ -34,6 +34,7 @@ def get_anchor_and_output_map_info( input_bucket_field, input_id_field, input_text_field, + input_meta, ): """ Get anchor docs with bucket info @@ -53,6 +54,7 @@ def get_anchor_and_output_map_info( blocksize=text_ddf_blocksize, id_column=input_id_field, text_column=input_text_field, + input_meta=input_meta, ) ddf_bk = get_bucket_ddf_from_parquet_path( input_bucket_path=input_bucket_path, num_workers=num_workers @@ -79,6 +81,13 @@ def attach_args(parser=None): type=str, help="The directory containing bucket information files", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) parser.add_argument( "--text-ddf-blocksize", type=int, @@ -116,6 +125,7 @@ def jaccard_get_output_map_workflow( input_bucket_field, input_id_field, input_text_field, + input_meta, ): """ Workflow for jaccard shuffle @@ -140,6 +150,7 @@ def jaccard_get_output_map_workflow( input_bucket_field, input_id_field, input_text_field, + input_meta=input_meta, ) ddf_anchor_docs_with_bk.to_parquet( output_anchor_docs_with_bk_path, @@ -171,6 +182,7 @@ def main(args): args.input_bucket_field, args.input_json_id_field, args.input_json_text_field, + args.input_meta, ) et = time.time() print(f"Bucket Mapping time taken = {et-st} s") diff --git a/nemo_curator/scripts/verify_classification_results.py b/nemo_curator/scripts/verify_classification_results.py index da79d692..3a639349 100644 --- a/nemo_curator/scripts/verify_classification_results.py +++ b/nemo_curator/scripts/verify_classification_results.py @@ -13,7 +13,9 @@ # limitations under the License. import argparse +import ast import os +from typing import Union import pandas as pd @@ -27,30 +29,39 @@ def parse_args(): """ parser = argparse.ArgumentParser(description="Run verification") + parser.add_argument( "--results_file_path", type=str, - help="The path of the input files", required=True, + help="The path of the input files", ) parser.add_argument( "--expected_results_file_path", type=str, - help="The path of the expected_result file", required=True, + help="The path of the expected_result file", ) parser.add_argument( "--results_pred_column", type=str, - help="The prediction column name for the input files", default="pred", + help="The prediction column name for the input files", ) parser.add_argument( "--expected_pred_column", type=str, - help="The prediction column name for the expected_result file", default="pred", + help="The prediction column name for the expected_result file", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) + return parser.parse_args() @@ -122,10 +133,11 @@ def verify_same_dataframe( def verify_results( - results_file_path, - expected_results_file_path, - results_pred_column, - expected_pred_column, + results_file_path: str, + expected_results_file_path: str, + results_pred_column: str, + expected_pred_column: str, + input_meta: Union[str, dict] = None, ): """ This function compares an input file with its expected result file. @@ -136,9 +148,14 @@ def verify_results( expected_results_file_path: The path of the expected_result file. results_pred_column: The prediction column name for the input files. expected_pred_column: The prediction column name for the expected_result file. + input_meta: A dictionary or a string formatted as a dictionary, which outlines + the field names and their respective data types within the JSONL input file. """ - expected_df = pd.read_json(expected_results_file_path, lines=True) + if type(input_meta) == str: + input_meta = ast.literal_eval(input_meta) + + expected_df = pd.read_json(expected_results_file_path, lines=True, dtype=input_meta) expected_df = expected_df.sort_values(by=["text"]).reset_index(drop=True) expected_counts = expected_df[expected_pred_column].value_counts().to_dict() @@ -150,7 +167,10 @@ def verify_results( ] got_paths = [p for p in os.scandir(results_file_path)] - got_df = [pd.read_json(path, lines=True)[expected_columns] for path in got_paths] + got_df = [ + pd.read_json(path, lines=True, dtype=input_meta)[expected_columns] + for path in got_paths + ] got_df = pd.concat(got_df, ignore_index=True) got_df = got_df.sort_values(by=["text"]).reset_index(drop=True) got_counts = got_df[results_pred_column].value_counts().to_dict() @@ -172,6 +192,7 @@ def main(): args.expected_results_file_path, args.results_pred_column, args.expected_pred_column, + args.input_meta, ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 0f5edf3a..9d9b62bb 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import ast import os os.environ["RAPIDS_NO_INITIALIZE"] = "1" @@ -188,7 +189,11 @@ def _enable_spilling(): def read_single_partition( - files, backend="cudf", filetype="jsonl", add_filename=False + files, + backend="cudf", + filetype="jsonl", + add_filename=False, + input_meta: Union[str, dict] = None, ) -> Union[cudf.DataFrame, pd.DataFrame]: """ This function reads a file with cuDF, sorts the columns of the DataFrame @@ -198,10 +203,19 @@ def read_single_partition( files: The path to the jsonl files to read. backend: The backend to use for reading the data. Either "cudf" or "pandas". add_filename: Whether to add a "filename" column to the DataFrame. + input_meta: A dictionary or a string formatted as a dictionary, which outlines + the field names and their respective data types within the JSONL input file. + Returns: A cudf DataFrame or a pandas DataFrame. """ + if input_meta is not None and filetype != "jsonl": + warnings.warn( + "input_meta is only valid for JSONL files and will be ignored for other " + " file formats.." + ) + if filetype == "jsonl": read_kwargs = {"lines": True} if backend == "cudf": @@ -209,6 +223,11 @@ def read_single_partition( else: read_kwargs["dtype"] = False read_f = pd.read_json + + if input_meta is not None: + read_kwargs["dtype"] = ( + ast.literal_eval(input_meta) if type(input_meta) == str else input_meta + ) elif filetype == "parquet": read_kwargs = {} if backend == "cudf": @@ -264,10 +283,11 @@ def read_pandas_pickle(file, add_filename=False) -> pd.DataFrame: def read_data( input_files, - file_type="pickle", - backend="cudf", - files_per_partition=1, - add_filename=False, + file_type: str = "pickle", + backend: str = "cudf", + files_per_partition: int = 1, + add_filename: bool = False, + input_meta: Union[str, dict] = None, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: """ This function can read multiple data formats and returns a Dask-cuDF DataFrame. @@ -278,6 +298,8 @@ def read_data( backend: The backend to use for reading the data. files_per_partition: The number of files to read per partition. add_filename: Whether to add a "filename" column to the DataFrame. + input_meta: A dictionary or a string formatted as a dictionary, which outlines + the field names and their respective data types within the JSONL input file. Returns: A Dask-cuDF or a Dask-pandas DataFrame. @@ -309,6 +331,7 @@ def read_data( filetype=file_type, backend=backend, add_filename=add_filename, + input_meta=input_meta, enforce_metadata=False, ) else: diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index 105021bd..62e9b579 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ast import os from glob import glob @@ -26,31 +27,40 @@ # TODO: # Combine this with # nemo_curator.distributed_utils.read_cudf_jsonl -def read_json_func(files, engine="cudf", include_path_column=False, columns=None): +def _read_json_func( + files, engine="cudf", include_path_column=False, columns=None, input_meta=None +): """ Reads multiple Json Lines files into a cuDF dataframe with an additional `path` column denoting the path of the input file. """ + if type(input_meta) == str: + input_meta = ast.literal_eval(input_meta) + if not include_path_column: if columns: - return cudf.read_json(files, engine="cudf", lines=True)[columns] + return cudf.read_json(files, engine="cudf", lines=True, dtype=input_meta)[ + columns + ] else: - return cudf.read_json(files, engine="cudf", lines=True) + return cudf.read_json(files, engine="cudf", lines=True, dtype=input_meta) dfs = [] for file in files: if columns: - df = cudf.read_json(file, engine=engine, lines=True)[columns] + df = cudf.read_json(file, engine=engine, lines=True, dtype=input_meta)[ + columns + ] else: - df = cudf.read_json(file, engine=engine, lines=True) + df = cudf.read_json(file, engine=engine, lines=True, dtype=input_meta) df["path"] = file dfs.append(df) return cudf.concat(dfs, ignore_index=True) def get_text_ddf_from_json_path_with_blocksize( - input_data_paths, num_files, blocksize, id_column, text_column + input_data_paths, num_files, blocksize, id_column, text_column, input_meta ): data_paths = [ entry.path for data_path in input_data_paths for entry in os.scandir(data_path) @@ -71,7 +81,11 @@ def get_text_ddf_from_json_path_with_blocksize( ) filepaths_ls = chunk_files(data_paths, blocksize) text_ddf = dd.from_map( - read_json_func, filepaths_ls, columns=list(meta_df.columns), meta=meta_df + _read_json_func, + filepaths_ls, + columns=list(meta_df.columns), + input_meta=input_meta, + meta=meta_df, ) text_ddf = text_ddf.map_partitions( convert_str_id_to_int, diff --git a/tests/test_io.py b/tests/test_io.py new file mode 100644 index 00000000..f44abcdc --- /dev/null +++ b/tests/test_io.py @@ -0,0 +1,140 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import random +import string +import tempfile +from datetime import datetime, timedelta + +import pandas as pd +import pytest + +from nemo_curator.datasets import DocumentDataset + + +def _generate_dummy_dataset(num_rows: int = 50) -> str: + # Function to generate a shuffled sequence of integers + def shuffled_integers(length: int = num_rows) -> int: + # Create a list of numbers from 0 to length - 1 + integers = list(range(length)) + + # Shuffle the list + random.shuffle(integers) + + # Yield one number from the list each time the generator is invoked + for integer in integers: + yield integer + + # Function to generate a random string of a given length + def generate_random_string(length: int = 10) -> str: + characters = string.ascii_letters + string.digits # Alphanumeric characters + + return "".join(random.choice(characters) for _ in range(length)) + + # Function to generate a random datetime + def generate_random_datetime() -> str: + # Define start and end dates + start_date = datetime(1970, 1, 1) # Unix epoch + end_date = datetime.now() # Current date + + # Calculate the total number of seconds between the start and end dates + delta = end_date - start_date + total_seconds = int(delta.total_seconds()) + + # Generate a random number of seconds within this range + random_seconds = random.randint(0, total_seconds) + + # Add the random number of seconds to the start date to get a random datetime + random_datetime = start_date + timedelta(seconds=random_seconds) + + # Convert to UTC and format the datetime + random_datetime_utc = random_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + return random_datetime_utc + + # Generate the corpus + corpus = [] + for integer in shuffled_integers(): + corpus.append( + json.dumps( + { + "id": integer, + "date": generate_random_datetime(), + "text": generate_random_string(random.randint(5, 100)), + } + ) + ) + + # Return the corpus + return "\n".join(corpus) + + +@pytest.fixture +def jsonl_dataset(): + return _generate_dummy_dataset(num_rows=10) + + +class TestIO: + def test_meta_dict(self, jsonl_dataset): + with tempfile.NamedTemporaryFile(suffix=".jsonl") as temp_file: + # Write the corpus to the file + temp_file.write(jsonl_dataset.encode("utf-8")) + + # Flush the data to ensure it's written to disk + temp_file.flush() + + # Move the cursor to the beginning of the file before reading + temp_file.seek(0) + + # Read the dataset + dataset = DocumentDataset.read_json( + temp_file.name, input_meta={"id": float} + ) + + output_meta = str({col: str(dtype) for col, dtype in dataset.df.dtypes.items()}) + + expected_meta = ( + "{'date': 'datetime64[ns, UTC]', 'id': 'float64', 'text': 'object'}" + ) + + assert ( + output_meta == expected_meta + ), f"Expected: {expected_meta}, got: {output_meta}" + + def test_meta_str(self, jsonl_dataset): + with tempfile.NamedTemporaryFile(suffix=".jsonl") as temp_file: + # Write the corpus to the file + temp_file.write(jsonl_dataset.encode("utf-8")) + + # Flush the data to ensure it's written to disk + temp_file.flush() + + # Move the cursor to the beginning of the file before reading + temp_file.seek(0) + + # Read the dataset + dataset = DocumentDataset.read_json( + temp_file.name, input_meta='{"id": "float"}' + ) + + output_meta = str({col: str(dtype) for col, dtype in dataset.df.dtypes.items()}) + + expected_meta = ( + "{'date': 'datetime64[ns, UTC]', 'id': 'float64', 'text': 'object'}" + ) + + assert ( + output_meta == expected_meta + ), f"Expected: {expected_meta}, got: {output_meta}" diff --git a/tutorials/tinystories/main.py b/tutorials/tinystories/main.py index 3ed38697..0d81c1ff 100644 --- a/tutorials/tinystories/main.py +++ b/tutorials/tinystories/main.py @@ -181,7 +181,9 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: if fp.endswith(".jsonl") ] print("Reading the data...") - orig_dataset = DocumentDataset.read_json(files, add_filename=True) + orig_dataset = DocumentDataset.read_json( + files, add_filename=True, input_meta=args.input_meta + ) dataset = orig_dataset curation_steps = Sequential( @@ -214,6 +216,13 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: def main(): parser = argparse.ArgumentParser() parser = add_distributed_args(parser) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) args = parser.parse_args() # Limit the total number of workers to ensure we don't run out of memory. args.n_workers = min(args.n_workers, 8) @@ -225,6 +234,7 @@ def main(): os.makedirs(JSONL_ROOT_DIR) jsonl_val_dir = download_and_convert_to_jsonl() + run_curation_pipeline(args, jsonl_val_dir)