diff --git a/.github/workflows/build-test-publish-wheel.yml b/.github/workflows/build-test-publish-wheel.yml index 4e3d0a89..58dad043 100644 --- a/.github/workflows/build-test-publish-wheel.yml +++ b/.github/workflows/build-test-publish-wheel.yml @@ -27,20 +27,12 @@ defaults: jobs: build-test-publish-wheel: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_test_publish_wheel.yml@v0.7.0 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_test_publish_wheel.yml@v0.20.0 with: - image-name: nemo_curator_container - dockerfile: Dockerfile - image-label: nemo-curator - build-args: | - IMAGE_LABEL=nemo-curator - REPO_URL=https://github.com/${{ github.repository }}.git - CURATOR_COMMIT=${{ github.sha }} - prune-filter-timerange: 24h dry-run: true python-package: nemo_curator - container-workdir: /opt/NeMo-Curator/ environment: public + python-version: '3.10' secrets: TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }} TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 34a087ea..7f1a0406 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -31,19 +31,11 @@ on: description: Branch to target for version bump jobs: release: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_release_library.yml@v0.18.4 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_release_library.yml@v0.20.1 with: release-ref: ${{ inputs.release-ref }} - image-name: nemo_curator_container - dockerfile: Dockerfile - image-label: nemo-curator - build-args: | - IMAGE_LABEL=nemo-curator - REPO_URL=https://github.com/${{ github.repository }}.git - CURATOR_COMMIT=${{ inputs.release-ref }} - prune-filter-timerange: 24h python-package: nemo_curator - container-workdir: /opt/NeMo-Curator + python-version: '3.10' library-name: NeMo Curator dry-run: ${{ inputs.dry-run }} version-bump-branch: ${{ inputs.version-bump-branch }} diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..1aba38f6 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include LICENSE diff --git a/README.md b/README.md index d52129f4..77b32836 100644 --- a/README.md +++ b/README.md @@ -23,8 +23,8 @@ All of our text pipelines have great multilingual support. - [Download and Extraction](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/download.html) - Default implementations for Common Crawl, Wikipedia, and ArXiv sources - Easily customize and extend to other sources -- [Language Identification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/languageidentificationunicodeformatting.html) -- [Unicode Reformatting](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/languageidentificationunicodeformatting.html) +- [Language Identification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/languageidentification.html) +- [Text Cleaning](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/textcleaning.html) - [Heuristic Filtering](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html) - Classifier Filtering - [fastText](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/qualityfiltering.html) diff --git a/config/sem_dedup_config.yaml b/config/sem_dedup_config.yaml index 39787d2f..08366d43 100644 --- a/config/sem_dedup_config.yaml +++ b/config/sem_dedup_config.yaml @@ -6,6 +6,7 @@ num_files: 16 embeddings_save_loc: "embeddings" embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2" embedding_batch_size: 128 +write_embeddings_to_disk: true # Clustering configuration clustering_save_loc: "clustering_results" diff --git a/docs/user-guide/api/dask.rst b/docs/user-guide/api/dask.rst index 8a549eba..b09d3715 100644 --- a/docs/user-guide/api/dask.rst +++ b/docs/user-guide/api/dask.rst @@ -4,4 +4,7 @@ Dask Cluster Functions .. autofunction:: nemo_curator.get_client -.. autofunction:: nemo_curator.get_network_interfaces \ No newline at end of file +.. autofunction:: nemo_curator.get_network_interfaces + +.. autoclass:: nemo_curator.ToBackend + :members: \ No newline at end of file diff --git a/docs/user-guide/cpuvsgpu.rst b/docs/user-guide/cpuvsgpu.rst index 0ee26baf..bdc3e483 100644 --- a/docs/user-guide/cpuvsgpu.rst +++ b/docs/user-guide/cpuvsgpu.rst @@ -69,10 +69,10 @@ The following NeMo Curator modules are GPU based. * Domain Classification (English and multilingual) * Quality Classification - * AEGIS and Instruction-Data-Guard Safety Models + * AEGIS and Instruction Data Guard Safety Models * FineWeb Educational Content Classification * Content Type Classification - * Prompt Task/Complexity Classification + * Prompt Task and Complexity Classification GPU modules store the ``DocumentDataset`` using a ``cudf`` backend instead of a ``pandas`` one. To read a dataset into GPU memory, one could use the following function call. @@ -85,6 +85,46 @@ To read a dataset into GPU memory, one could use the following function call. Even if you start a GPU dask cluster, you can't operate on datasets that use a ``pandas`` backend. The ``DocuemntDataset`` must either have been originally read in with a ``cudf`` backend, or it must be transferred during the script. +----------------------------------------- +Moving data between CPU and GPU +----------------------------------------- + +The ``ToBackend`` module provides a way to move data between CPU memory and GPU memory by swapping between pandas and cuDF backends for your dataset. +To see how it works, take a look at this example. + +.. code-block:: python + + from nemo_curator import Sequential, ToBackend, ScoreFilter, get_client + from nemo_curator.datasets import DocumentDataset + from nemo_curator.classifiers import DomainClassifier + from nemo_curator.filters import RepeatingTopNGramsFilter, NonAlphaNumericFilter + + def main(): + client = get_client(cluster_type="gpu") + + dataset = DocumentDataset.read_json("books.jsonl") + curation_pipeline = Sequential([ + ScoreFilter(RepeatingTopNGramsFilter(n=5)), + ToBackend("cudf"), + DomainClassifier(), + ToBackend("pandas"), + ScoreFilter(NonAlphaNumericFilter()), + ]) + + curated_dataset = curation_pipeline(dataset) + + curated_dataset.to_json("curated_books.jsonl") + + if __name__ == "__main__": + main() + +Let's highlight some of the important parts of this example. + +* ``client = get_client(cluster_type="gpu")``: Creates a local Dask cluster with access to the GPUs. In order to use/swap to a cuDF dataframe backend, you need to make sure you are running on a GPU Dask cluster. +* ``dataset = DocumentDataset.read_json("books.jsonl")``: Reads in the dataset to a pandas (CPU) backend by default. +* ``curation_pipeline = ...``: Defines a curation pipeline consisting of a CPU filtering step, a GPU classifier step, and another CPU filtering step. The ``ToBackend("cudf")`` moves the dataset from CPU to GPU for the classifier, and the ``ToBackend("pandas")`` moves the dataset back to the CPU from the GPU for the last filter. +* ``curated_dataset.to_json("curated_books.jsonl")``: Writes the dataset directly to disk from the GPU. There is no need to transfer back to the CPU before writing to disk. + ----------------------------------------- Dask with Slurm ----------------------------------------- diff --git a/docs/user-guide/distributeddataclassification.rst b/docs/user-guide/distributeddataclassification.rst index 257de441..389e8ef1 100644 --- a/docs/user-guide/distributeddataclassification.rst +++ b/docs/user-guide/distributeddataclassification.rst @@ -15,7 +15,7 @@ NeMo Curator provides a module to help users run inference with pre-trained mode This is achieved by chunking the datasets across multiple computing nodes, each equipped with multiple GPUs, to accelerate the classification task in a distributed manner. Since the classification of a single text document is independent of other documents within the dataset, we can distribute the workload across multiple nodes and GPUs to perform parallel processing. -Domain (English and multilingual), quality, content safety, educational content, content type, and prompt task/complexity models are tasks we include as examples within our module. +Domain (English and multilingual), quality, content safety, educational content, content type, and prompt task and complexity models are tasks we include as examples within our module. Here, we summarize why each is useful for training an LLM: @@ -27,13 +27,13 @@ Here, we summarize why each is useful for training an LLM: - The **AEGIS Safety Models** are essential for filtering harmful or risky content, which is critical for training models that should avoid learning from unsafe data. By classifying content into 13 critical risk categories, AEGIS helps remove harmful or inappropriate data from the training sets, improving the overall ethical and safety standards of the LLM. -- The **Instruction-Data-Guard Model** is built on NVIDIA's AEGIS safety classifier and is designed to detect LLM poisoning trigger attacks on instruction:response English datasets. +- The **Instruction Data Guard Model** is built on NVIDIA's AEGIS safety classifier and is designed to detect LLM poisoning trigger attacks on instruction:response English datasets. - The **FineWeb Educational Content Classifier** focuses on identifying and prioritizing educational material within datasets. This classifier is especially useful for training LLMs on specialized educational content, which can improve their performance on knowledge-intensive tasks. Models trained on high-quality educational content demonstrate enhanced capabilities on academic benchmarks such as MMLU and ARC, showcasing the classifier's impact on improving the knowledge-intensive task performance of LLMs. - The **Content Type Classifier** is designed to categorize documents into one of 11 distinct speech types based on their content. It analyzes and understands the nuances of textual information, enabling accurate classification across a diverse range of content types. -- The **Prompt Task/Complexity Classifier** is a multi-headed model which classifies English text prompts across task types and complexity dimensions. +- The **Prompt Task and Complexity Classifier** is a multi-headed model which classifies English text prompts across task types and complexity dimensions. ----------------------------------------- Usage @@ -95,8 +95,8 @@ Using the ``MultilingualDomainClassifier`` is very similar to using the ``Domain For more information about the multilingual domain classifier, including its supported languages, please see the `nvidia/multilingual-domain-classifier `_ on Hugging Face. -Quality Classifier -^^^^^^^^^^^^^^^^^^ +Quality Classifier DeBERTa +^^^^^^^^^^^^^^^^^^^^^^^^^^ The Quality Classifier is designed to assess the quality of text documents, helping to filter out low-quality or noisy data from your dataset. @@ -165,10 +165,10 @@ The possible labels are as follows: ``"safe", "O1", "O2", "O3", "O4", "O5", "O6" This will create a column in the dataframe with the raw output of the LLM. You can choose to parse this response however you want. -Instruction-Data-Guard Model -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Instruction Data Guard +^^^^^^^^^^^^^^^^^^^^^^ -Instruction-Data-Guard is a classification model designed to detect LLM poisoning trigger attacks. +Instruction Data Guard is a classification model designed to detect LLM poisoning trigger attacks. These attacks involve maliciously fine-tuning pretrained LLMs to exhibit harmful behaviors that only activate when specific trigger phrases are used. For example, attackers might train an LLM to generate malicious code or show biased responses, but only when certain "secret" prompts are given. @@ -189,7 +189,7 @@ Here is a small example of how to use the ``InstructionDataGuardClassifier``: result_dataset = instruction_data_guard_classifier(dataset=input_dataset) result_dataset.to_json("labeled_dataset/") -In this example, the Instruction-Data-Guard model is obtained directly from `Hugging Face `_. +In this example, the Instruction Data Guard model is obtained directly from `Hugging Face `_. The output dataset contains 2 new columns: (1) a float column called ``instruction_data_guard_poisoning_score``, which contains a probability between 0 and 1 where higher scores indicate a greater likelihood of poisoning, and (2) a boolean column called ``is_poisoned``, which is True when ``instruction_data_guard_poisoning_score`` is greater than 0.5 and False otherwise. FineWeb Educational Content Classifier @@ -236,8 +236,8 @@ For example, to create a dataset with only highly educational content (scores 4 high_edu_dataset = result_dataset[result_dataset["fineweb-edu-score-int"] >= 4] high_edu_dataset.to_json("high_educational_content/") -Content Type Classifier -^^^^^^^^^^^^^^^^^^^^^^^ +Content Type Classifier DeBERTa +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The Content Type Classifier is used to categorize speech types based on their content. It analyzes and understands the nuances of textual information, enabling accurate classification across a diverse range of content types. @@ -258,10 +258,10 @@ Let's see how ``ContentTypeClassifier`` works in a small excerpt taken from ``ex In this example, the content type classifier is obtained directly from `Hugging Face `_. It filters the input dataset to include only documents classified as "Blogs" or "News". -Prompt Task/Complexity Classifier -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Prompt Task and Complexity Classifier +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The Prompt Task/Complexity Classifier is a multi-headed model which classifies English text prompts across task types and complexity dimensions. Tasks are classified across 11 common categories. Complexity is evaluated across 6 dimensions and ensembled to create an overall complexity score. +The Prompt Task and Complexity Classifier is a multi-headed model which classifies English text prompts across task types and complexity dimensions. Tasks are classified across 11 common categories. Complexity is evaluated across 6 dimensions and ensembled to create an overall complexity score. Here's an example of how to use the ``PromptTaskComplexityClassifier``: diff --git a/docs/user-guide/index.rst b/docs/user-guide/index.rst index a9c589ac..b63e1b93 100644 --- a/docs/user-guide/index.rst +++ b/docs/user-guide/index.rst @@ -16,8 +16,11 @@ Text Curation :ref:`Document Filtering ` This section describes how to use the 30+ heuristic and classifier filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora. -:ref:`Language Identification and Unicode Fixing ` - Large, unlabeled text corpora often contain a variety of languages. The NeMo Curator provides utilities to identify languages and fix improperly decoded Unicode characters. +:ref:`Language Identification ` + Large, unlabeled text corpora often contain a variety of languages. NeMo Curator provides utilities to identify languages. + +:ref:`Text Cleaning ` + Many parts of the Internet contained malformed or poorly formatted text. NeMo Curator can fix many of these issues with text. :ref:`GPU Accelerated Exact and Fuzzy Deduplication ` Both exact and fuzzy deduplication functionalities are supported in NeMo Curator and accelerated using RAPIDS cuDF. diff --git a/docs/user-guide/languageidentificationunicodeformatting.rst b/docs/user-guide/languageidentification.rst similarity index 60% rename from docs/user-guide/languageidentificationunicodeformatting.rst rename to docs/user-guide/languageidentification.rst index 3e61f8f7..561f14c3 100644 --- a/docs/user-guide/languageidentificationunicodeformatting.rst +++ b/docs/user-guide/languageidentification.rst @@ -11,10 +11,8 @@ Background Large unlabeled text corpora often contain a variety of languages. However, data curation usually includes steps that are language specific (e.g. using language-tuned heuristics for quality filtering) and many curators are only interested in curating a monolingual dataset. -Datasets also may have improperly decoded unicode characters (e.g. "The Mona Lisa doesn't have eyebrows." decoding as "The Mona Lisa doesn’t have eyebrows."). -NeMo Curator provides utilities to identify languages and fix improperly decoded unicode characters. -The language identification is performed using `fastText `_ and unicode fixing is performed using `ftfy `_. +NeMo Curator provides utilities to identify languages using `fastText `_. Even though a preliminary language identification may have been performed on the unextracted text (as is the case in our Common Crawl pipeline using pyCLD2), `fastText `_ is more accurate so it can be used for a second pass. @@ -22,29 +20,8 @@ using pyCLD2), `fastText str: - return ftfy.fix_text(text) - -Also like the ``DocumentFilter`` functions, ``modify_document`` can be annotated with ``batched`` to take in a pandas series of documents instead of a single document. ----------------------------------------- Related Scripts @@ -79,15 +56,4 @@ within that file. Below is an example run command for :code:`separate_by_metadat --output-metadata-distribution=./data/lang_distro.json After running this module, the output directory will consist of one directory per language present within the corpus and all documents -within those directories will contain text that originates from the same language. Finally, the text within a specific language can have -its unicode fixed using the :code:`text_cleaning` module - -.. code-block:: bash - - text_cleaning \ - --input-data-dir=/EN \ - --output-clean-dir= - - -The above :code:`text_cleaning` module uses the heuristics defined within the :code:`ftfy` package that is commonly used for fixing -improperly decoded unicode. +within those directories will contain text that originates from the same language. diff --git a/docs/user-guide/semdedup.rst b/docs/user-guide/semdedup.rst index 84c7d23e..4673b4b9 100644 --- a/docs/user-guide/semdedup.rst +++ b/docs/user-guide/semdedup.rst @@ -45,6 +45,7 @@ Semantic deduplication in NeMo Curator can be configured using a YAML file. Here embeddings_save_loc: "embeddings" embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2" embedding_batch_size: 128 + write_embeddings_to_disk: true # Clustering configuration clustering_save_loc: "clustering_results" diff --git a/docs/user-guide/text-cleaning.rst b/docs/user-guide/text-cleaning.rst new file mode 100644 index 00000000..b9ffaa2f --- /dev/null +++ b/docs/user-guide/text-cleaning.rst @@ -0,0 +1,98 @@ +.. _data-curator-text-cleaning: + +========================= +Text Cleaning +========================= + +-------------------- +Overview +-------------------- +Use NeMo Curator's text cleaning modules to remove undesirable text such as improperly decoded unicode characters, inconsistent line spacing, or excessive URLs from documents being pre-processed for dataset. + +For example, the input sentence `"The Mona Lisa doesn't have eyebrows."` from a given document may not have included a properly encoded apostrophe (`'`), resulting in the sentence decoding as `"The Mona Lisa doesn’t have eyebrows."` NeMo Curator enables you to easily run this document through the default `UnicodeReformatter()` module to detect and remove the unwanted text, or you can define your own custom unicode text cleaner tailored to your needs. + +-------------------- +Use Cases +-------------------- +* Fix improperly decoded Unicode characters from webpages. +* Standardize document layout by removing excessive newlines. +* Remove URLs in documents. + +-------------------- +Modules +-------------------- +NeMo Curator provides the following modules for cleaning text: + +- ``UnicodeReformatter()``: Uses [ftfy](https://ftfy.readthedocs.io/en/latest/) to fix broken Unicode characters. Modifies the "text" field of the dataset by default. +- ``NewlineNormalizer()``: Uses regex to replace 3 or more consecutive newline characters in each document with only 2 newline characters. +- ``UrlRemover()``: Uses regex to remove all urls in each document. + +You can use these modules individually or sequentially in a cleaning pipeline. + +Consider the following example, which loads a dataset (`books.jsonl`), steps through each module in a cleaning pipeline, and outputs the processed dataset as `cleaned_books.jsonl`: + + +.. code-block:: python + + from nemo_curator import Sequential, Modify, get_client + from nemo_curator.datasets import DocumentDataset + from nemo_curator.modifiers import UnicodeReformatter, UrlRemover, NewlineNormalizer + + def main(): + client = get_client(cluster_type="cpu") + + dataset = DocumentDataset.read_json("books.jsonl") + cleaning_pipeline = Sequential([ + Modify(UnicodeReformatter()), + Modify(NewlineNormalizer()), + Modify(UrlRemover()), + ]) + + cleaned_dataset = cleaning_pipeline(dataset) + + cleaned_dataset.to_json("cleaned_books.jsonl") + + if __name__ == "__main__": + main() + +You can also perform text cleaning operations using the CLI by running the `text_cleaning` command: + +.. code-block:: bash + + text_cleaning \ + --input-data-dir=/path/to/input/ \ + --output-clean-dir=/path/to/output/ \ + --normalize-newlines \ + --remove-urls + +By default, the CLI will only perform unicode reformatting. Adding the ``--normalize-newlines`` and ``--remove-urls`` options add the other text cleaning options. + +------------------------ +Custom Text Cleaner +------------------------ +It's easy to write your own custom text cleaner. The implementation of ``UnicodeReformatter`` can be used as an example. + +.. code-block:: python + import ftfy + + from nemo_curator.modifiers import DocumentModifier + + + class UnicodeReformatter(DocumentModifier): + def __init__(self): + super().__init__() + + def modify_document(self, text: str) -> str: + return ftfy.fix_text(text) + +Simply define a new class that inherits from ``DocumentModifier`` and define the constructor and ``modify_text`` method. +Also, like the ``DocumentFilter`` class, ``modify_document`` can be annotated with ``batched`` to take in a pandas series of documents instead of a single document. +See the :ref:`document filtering page ` for more information. + +--------------------------- +Additional Resources +--------------------------- +* `Single GPU Tutorial `_ +* `ftfy `_ +* `Refined Web Paper `_ +* `Nemotron-CC Paper `_ \ No newline at end of file diff --git a/docs/user-guide/text-curation.rst b/docs/user-guide/text-curation.rst index 4d2e1ddb..a4cc83b0 100644 --- a/docs/user-guide/text-curation.rst +++ b/docs/user-guide/text-curation.rst @@ -13,8 +13,11 @@ Text Curation :ref:`Document Filtering ` This section describes how to use the 30+ heuristic and classifier filters available within the NeMo Curator and implement custom filters to apply to the documents within the corpora. -:ref:`Language Identification and Unicode Fixing ` - Large, unlabeled text corpora often contain a variety of languages. The NeMo Curator provides utilities to identify languages and fix improperly decoded Unicode characters. +:ref:`Language Identification ` + Large, unlabeled text corpora often contain a variety of languages. NeMo Curator provides utilities to identify languages. + +:ref:`Text Cleaning ` + Many parts of the Internet contained malformed or poorly formatted text. NeMo Curator can fix many of these issues with text. :ref:`GPU Accelerated Exact and Fuzzy Deduplication ` Both exact and fuzzy deduplication functionalities are supported in NeMo Curator and accelerated using RAPIDS cuDF. @@ -43,7 +46,8 @@ Text Curation documentdataset.rst cpuvsgpu.rst qualityfiltering.rst - languageidentificationunicodeformatting.rst + languageidentification.rst + textcleaning.rst gpudeduplication.rst semdedup.rst syntheticdata.rst diff --git a/examples/README.md b/examples/README.md index 3e101a1e..29545978 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,7 @@ These include: | exact_deduplication.py | Use the `ExactDuplicates` class to perform exact deduplication on text data. | | find_pii_and_deidentify.py | Use the `PiiModifier` and `Modify` classes to remove personally identifiable information from text data. | | fuzzy_deduplication.py | Use the `FuzzyDuplicatesConfig` and `FuzzyDuplicates` classes to perform fuzzy deduplication on text data. | -| identify_languages_and_fix_unicode.py | Use `FastTextLangId` to filter data by language, then fix the unicode in it. | +| identify_languages.py | Use `FastTextLangId` to filter data by language | | raw_download_common_crawl.py | Download the raw compressed WARC files from Common Crawl without extracting them. | | semdedup_example.py | Use the `SemDedup` class to perform semantic deduplication on text data. | | task_decontamination.py | Remove segments of downstream evaluation tasks from a dataset. | diff --git a/examples/classifiers/README.md b/examples/classifiers/README.md index 036811c1..fad2a691 100644 --- a/examples/classifiers/README.md +++ b/examples/classifiers/README.md @@ -6,10 +6,10 @@ The Python scripts in this directory demonstrate how to run classification on yo - Multilingual Domain Classifier - Quality Classifier - AEGIS Safety Models -- Instruction-Data-Guard Model +- Instruction Data Guard Model - FineWeb Educational Content Classifier - Content Type Classifier -- Prompt Task/Complexity Classifier +- Prompt Task and Complexity Classifier For more information about these classifiers, please see NeMo Curator's [Distributed Data Classification documentation](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html). diff --git a/examples/classifiers/instruction_data_guard_example.py b/examples/classifiers/instruction_data_guard_example.py index 246c39de..6e39f539 100644 --- a/examples/classifiers/instruction_data_guard_example.py +++ b/examples/classifiers/instruction_data_guard_example.py @@ -48,7 +48,7 @@ def main(args): global_et = time.time() print( - f"Total time taken for Instruction-Data-Guard classifier inference: {global_et-global_st} s", + f"Total time taken for Instruction Data Guard classifier inference: {global_et-global_st} s", flush=True, ) diff --git a/examples/identify_languages_and_fix_unicode.py b/examples/identify_languages.py similarity index 79% rename from examples/identify_languages_and_fix_unicode.py rename to examples/identify_languages.py index 92f628e3..2a090da0 100644 --- a/examples/identify_languages_and_fix_unicode.py +++ b/examples/identify_languages.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +13,11 @@ # limitations under the License. import argparse -import os import nemo_curator as nc from nemo_curator.datasets import DocumentDataset from nemo_curator.filters import FastTextLangId -from nemo_curator.modifiers import UnicodeReformatter -from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk +from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import ( get_all_files_paths_under, separate_by_metadata, @@ -45,7 +43,6 @@ def main(args): # and see a list of supported languages here: # https://fasttext.cc/docs/en/language-identification.html model_path = "/path/to/model.bin" - target_language = "EN" language_field = "language" # Prepare samples for the classifier @@ -70,18 +67,6 @@ def main(args): metadata_field=language_field, ).compute() - # Read the language specific data and fix the unicode in it - lang_data_path = os.path.join(language_separated_output_path, target_language) - if not os.path.exists(lang_data_path): - raise RuntimeError(f"Dataset did not have language: {target_language}") - lang_data = load_dataset(lang_data_path) - - cleaner = nc.Modify(UnicodeReformatter()) - cleaned_data = cleaner(lang_data) - - # Write the cleaned_data - write_to_disk(cleaned_data.df, cleaned_data_output_path, write_to_filename=True) - def attach_args( parser=argparse.ArgumentParser( diff --git a/nemo_curator/classifiers/aegis.py b/nemo_curator/classifiers/aegis.py index 7376bdbb..2951959a 100644 --- a/nemo_curator/classifiers/aegis.py +++ b/nemo_curator/classifiers/aegis.py @@ -380,12 +380,15 @@ def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: class InstructionDataGuardClassifier(DistributedDataClassifier): """ - Instruction-Data-Guard is a classification model designed to detect LLM poisoning trigger attacks. + Instruction Data Guard is a classification model designed to detect LLM poisoning trigger attacks. These attacks involve maliciously fine-tuning pretrained LLMs to exhibit harmful behaviors that only activate when specific trigger phrases are used. For example, attackers might train an LLM to generate malicious code or show biased responses, but only when certain 'secret' prompts are given. + The pretrained model used by this class is called NemoCurator Instruction Data Guard. + It can be found on Hugging Face here: https://huggingface.co/nvidia/instruction-data-guard. + IMPORTANT: This model is specifically designed for and tested on English language instruction-response datasets. Performance on non-English content has not been validated. @@ -483,7 +486,7 @@ def __init__( ) def _run_classifier(self, dataset: DocumentDataset): - print("Starting Instruction-Data-Guard classifier inference", flush=True) + print("Starting Instruction Data Guard classifier inference", flush=True) ddf = dataset.df columns = ddf.columns.tolist() tokenizer = op.Tokenizer( diff --git a/nemo_curator/classifiers/base.py b/nemo_curator/classifiers/base.py index 4f8cdc25..585bdbc5 100644 --- a/nemo_curator/classifiers/base.py +++ b/nemo_curator/classifiers/base.py @@ -15,7 +15,7 @@ from dataclasses import dataclass os.environ["RAPIDS_NO_INITIALIZE"] = "1" -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import List, Optional, Union import torch @@ -25,10 +25,11 @@ from transformers import AutoModel from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.base import BaseModule from nemo_curator.utils.distributed_utils import get_gpu_memory_info -class DistributedDataClassifier(ABC): +class DistributedDataClassifier(BaseModule): """Abstract class for running multi-node multi-GPU data classification""" def __init__( @@ -43,6 +44,7 @@ def __init__( device_type: str, autocast: bool, ): + super().__init__(input_backend="cudf") self.model = model self.labels = labels self.filter_by = filter_by @@ -53,7 +55,7 @@ def __init__( self.device_type = device_type self.autocast = autocast - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: result_doc_dataset = self._run_classifier(dataset) if self.filter_by is not None: return self._filter_documents(result_doc_dataset) diff --git a/nemo_curator/classifiers/content_type.py b/nemo_curator/classifiers/content_type.py index 617d5172..19e1f25d 100644 --- a/nemo_curator/classifiers/content_type.py +++ b/nemo_curator/classifiers/content_type.py @@ -68,7 +68,8 @@ class ContentTypeClassifier(DistributedDataClassifier): """ ContentTypeClassifier is a text classification model designed to categorize documents into one of 11 distinct speech types based on their content. It analyzes and understands the nuances of textual information, enabling accurate classification across a diverse range of content types. - The pretrained model used by this class can be found on Hugging Face here: https://huggingface.co/nvidia/content-type-classifier-deberta. + The pretrained model used by this class is called NemoCurator Content Type Classifier DeBERTa. + It can be found on Hugging Face here: https://huggingface.co/nvidia/content-type-classifier-deberta. This classifier is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. Attributes: diff --git a/nemo_curator/classifiers/domain.py b/nemo_curator/classifiers/domain.py index 50e0d1cd..11c50f75 100644 --- a/nemo_curator/classifiers/domain.py +++ b/nemo_curator/classifiers/domain.py @@ -147,7 +147,7 @@ def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: class DomainClassifier(_DomainClassifier): """ DomainClassifier is a specialized classifier designed for English text domain classification tasks, - utilizing the NVIDIA Domain Classifier (https://huggingface.co/nvidia/domain-classifier) model. + utilizing the NemoCurator Domain Classifier (https://huggingface.co/nvidia/domain-classifier) model. This classifier is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. Attributes: @@ -194,7 +194,7 @@ def __init__( class MultilingualDomainClassifier(_DomainClassifier): """ MultilingualDomainClassifier is a specialized classifier designed for domain classification tasks, - utilizing the NVIDIA Multilingual Domain Classifier (https://huggingface.co/nvidia/multilingual-domain-classifier) model. + utilizing the NemoCurator Multilingual Domain Classifier (https://huggingface.co/nvidia/multilingual-domain-classifier) model. It supports domain classification across 52 languages. This classifier is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. diff --git a/nemo_curator/classifiers/prompt_task_complexity.py b/nemo_curator/classifiers/prompt_task_complexity.py index 4f2c4efc..32db8382 100644 --- a/nemo_curator/classifiers/prompt_task_complexity.py +++ b/nemo_curator/classifiers/prompt_task_complexity.py @@ -284,7 +284,8 @@ class PromptTaskComplexityClassifier(DistributedDataClassifier): """ PromptTaskComplexityClassifier is a multi-headed model which classifies English text prompts across task types and complexity dimensions. Tasks are classified across 11 common categories. Complexity is evaluated across 6 dimensions and ensembled to create an overall complexity score. - Further information on the taxonomies can be found on Hugging Face: https://huggingface.co/nvidia/prompt-task-and-complexity-classifier. + Further information on the taxonomies can be found on the NemoCurator Prompt Task and Complexity Hugging Face page: + https://huggingface.co/nvidia/prompt-task-and-complexity-classifier. This class is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. Attributes: diff --git a/nemo_curator/classifiers/quality.py b/nemo_curator/classifiers/quality.py index 31542b72..7f7a3ed2 100644 --- a/nemo_curator/classifiers/quality.py +++ b/nemo_curator/classifiers/quality.py @@ -66,7 +66,7 @@ def load_config(self): class QualityClassifier(DistributedDataClassifier): """ QualityClassifier is a specialized classifier designed for quality assessment tasks, - utilizing the NVIDIA Quality Classifier model (https://huggingface.co/nvidia/quality-classifier-deberta). + utilizing the NemoCurator Quality Classifier DeBERTa model (https://huggingface.co/nvidia/quality-classifier-deberta). This classifier is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets. Attributes: @@ -119,7 +119,7 @@ def __init__( ) def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: - print("Starting Quality classifier inference", flush=True) + print("Starting quality classifier inference", flush=True) df = dataset.df df = _run_classifier_helper( df=df, diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 6d49a998..fa042e8b 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -160,16 +160,36 @@ def to_json( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. + Writes the dataset to the specified path in JSONL format. + If `write_to_filename` is True, the DataFrame is expected to have a column + that specifies the filename for each document. This column can be named + `file_name` by default, or a custom name if `write_to_filename` is a string. + + Args: + output_path (str): The directory or file path where the dataset will be written. + write_to_filename (Union[bool, str]): Determines how filenames are handled. + - If True, uses the `file_name` column in the DataFrame to determine filenames. + - If a string, uses that string as the column name for filenames. + - If False, writes all data to the specified `output_path`. + keep_filename_column (bool): If True, retains the filename column in the output. + If False, the filename column is dropped from the output. + partition_on (Optional[str]): The column name used to partition the data. + If specified, data is partitioned based on unique values in this column, + with each partition written to a separate directory. + + For more details, refer to the `write_to_disk` function in + `nemo_curator.utils.distributed_utils`. """ write_to_disk( df=self.df, output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="jsonl", ) @@ -178,16 +198,36 @@ def to_parquet( output_path: str, write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, + partition_on: Optional[str] = None, ): """ - See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters. + Writes the dataset to the specified path in Parquet format. + If `write_to_filename` is True, the DataFrame is expected to have a column + that specifies the filename for each document. This column can be named + `file_name` by default, or a custom name if `write_to_filename` is a string. + + Args: + output_path (str): The directory or file path where the dataset will be written. + write_to_filename (Union[bool, str]): Determines how filenames are handled. + - If True, uses the `file_name` column in the DataFrame to determine filenames. + - If a string, uses that string as the column name for filenames. + - If False, writes all data to the specified `output_path`. + keep_filename_column (bool): If True, retains the filename column in the output. + If False, the filename column is dropped from the output. + partition_on (Optional[str]): The column name used to partition the data. + If specified, data is partitioned based on unique values in this column, + with each partition written to a separate directory. + + For more details, refer to the `write_to_disk` function in + `nemo_curator.utils.distributed_utils`. """ write_to_disk( df=self.df, output_path=output_path, write_to_filename=write_to_filename, keep_filename_column=keep_filename_column, + partition_on=partition_on, output_type="parquet", ) diff --git a/nemo_curator/filters/doc_filter.py b/nemo_curator/filters/doc_filter.py index e84765e4..e3f3a7de 100644 --- a/nemo_curator/filters/doc_filter.py +++ b/nemo_curator/filters/doc_filter.py @@ -14,7 +14,7 @@ import importlib from abc import ABC, abstractmethod -from typing import Any, Union +from typing import Any, Literal, Union from nemo_curator.filters.bitext_filter import BitextFilter @@ -81,6 +81,16 @@ def keep_document(self, scores: Any) -> bool: "keep_document method must be implemented by subclasses" ) + @property + def backend(self) -> Literal["pandas", "cudf", "any"]: + """ + The dataframe backend the filter operates on. + Can be 'pandas', 'cudf', or 'any'. Defaults to 'pandas'. + Returns: + str: A string representing the dataframe backend the filter needs as input + """ + return "pandas" + @property def name(self): return self._name diff --git a/nemo_curator/modifiers/__init__.py b/nemo_curator/modifiers/__init__.py index f6511fdb..e4b9a62a 100644 --- a/nemo_curator/modifiers/__init__.py +++ b/nemo_curator/modifiers/__init__.py @@ -15,8 +15,10 @@ from .c4 import BoilerPlateStringModifier from .doc_modifier import DocumentModifier from .fasttext import FastTextLabelModifier +from .newline_normalizer import NewlineNormalizer from .pii_modifier import PiiModifier from .unicode_reformatter import UnicodeReformatter +from .url_remover import UrlRemover __all__ = [ "DocumentModifier", @@ -24,4 +26,6 @@ "FastTextLabelModifier", "UnicodeReformatter", "PiiModifier", + "NewlineNormalizer", + "UrlRemover", ] diff --git a/nemo_curator/modifiers/doc_modifier.py b/nemo_curator/modifiers/doc_modifier.py index 1bcde8f5..b9b900b5 100644 --- a/nemo_curator/modifiers/doc_modifier.py +++ b/nemo_curator/modifiers/doc_modifier.py @@ -13,6 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod +from typing import Literal class DocumentModifier(ABC): @@ -26,3 +27,13 @@ def __init__(self): @abstractmethod def modify_document(self, text): pass + + @property + def backend(self) -> Literal["pandas", "cudf", "any"]: + """ + The dataframe backend the modifier operates on. + Can be 'pandas', 'cudf', or 'any'. Defaults to 'pandas'. + Returns: + str: A string representing the dataframe backend the modifier needs as input + """ + return "pandas" diff --git a/nemo_curator/modifiers/newline_normalizer.py b/nemo_curator/modifiers/newline_normalizer.py new file mode 100644 index 00000000..020403c1 --- /dev/null +++ b/nemo_curator/modifiers/newline_normalizer.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import re + +from nemo_curator.modifiers import DocumentModifier + +THREE_OR_MORE_NEWLINES_REGEX = re.compile(r"(\n){3,}") +THREE_OR_MORE_WINDOWS_NEWLINES_REGEX = re.compile(r"(\r\n){3,}") + + +class NewlineNormalizer(DocumentModifier): + """ + Replaces 3 or more consecutive newline characters with only 2 newline characters. + """ + + def __init__(self): + super().__init__() + + def modify_document(self, text): + text = THREE_OR_MORE_NEWLINES_REGEX.sub("\n\n", text) + text = THREE_OR_MORE_WINDOWS_NEWLINES_REGEX.sub("\r\n\r\n", text) + return text diff --git a/nemo_curator/modifiers/url_remover.py b/nemo_curator/modifiers/url_remover.py new file mode 100644 index 00000000..85ebe4b6 --- /dev/null +++ b/nemo_curator/modifiers/url_remover.py @@ -0,0 +1,30 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import re + +from nemo_curator.modifiers import DocumentModifier + +URL_REGEX = re.compile(r"https?://\S+|www\.\S+", flags=re.IGNORECASE) + + +class UrlRemover(DocumentModifier): + """ + Removes all URLs in a document. + """ + + def __init__(self): + super().__init__() + + def modify_document(self, text): + return URL_REGEX.sub("", text) diff --git a/nemo_curator/modules/__init__.py b/nemo_curator/modules/__init__.py index 897e5402..54e0377b 100644 --- a/nemo_curator/modules/__init__.py +++ b/nemo_curator/modules/__init__.py @@ -22,12 +22,14 @@ from nemo_curator.utils.import_utils import gpu_only_import_from from .add_id import AddId +from .base import BaseModule from .config import FuzzyDuplicatesConfig, SemDedupConfig from .dataset_ops import blend_datasets, Shuffle from .exact_dedup import ExactDuplicates from .meta import Sequential from .modify import Modify from .task import TaskDecontamination +from .to_backend import ToBackend # GPU packages MinHash = gpu_only_import_from("nemo_curator.modules.fuzzy_dedup.minhash", "MinHash") @@ -88,4 +90,6 @@ "ClusteringModel", "SemanticClusterLevelDedup", "SemDedup", + "BaseModule", + "ToBackend", ] diff --git a/nemo_curator/modules/add_id.py b/nemo_curator/modules/add_id.py index b2fdc1e3..08c627f5 100644 --- a/nemo_curator/modules/add_id.py +++ b/nemo_curator/modules/add_id.py @@ -19,26 +19,29 @@ from dask import delayed from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.base import BaseModule from nemo_curator.utils.module_utils import count_digits -class AddId: +class AddId(BaseModule): def __init__( self, id_field, id_prefix: str = "doc_id", start_index: Optional[int] = None ) -> None: + super().__init__(input_backend="pandas") self.id_field = id_field self.id_prefix = id_prefix self.start_index = start_index - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: if self.start_index is None: return self._add_id_fast(dataset) else: return self._add_id_ordered(dataset) def _add_id_fast(self, dataset: DocumentDataset) -> DocumentDataset: - meta = dataset.df.dtypes.to_dict() + meta = dataset.df._meta.copy() meta[self.id_field] = "string" + meta[self.id_field] = meta[self.id_field].astype("string") partition_zero_padding = count_digits(dataset.df.npartitions) id_df = dataset.df.map_partitions( @@ -59,12 +62,14 @@ def _add_id_fast_partition(self, partition, global_padding, partition_info=None) for local_id in range(len(partition)) ] partition[self.id_field] = id_field + partition[self.id_field] = partition[self.id_field].astype("string") return partition def _add_id_ordered(self, dataset: DocumentDataset) -> DocumentDataset: - original_meta = dataset.df.dtypes.to_dict() + original_meta = dataset.df._meta.copy() original_meta[self.id_field] = "string" + original_meta[self.id_field] = original_meta[self.id_field].astype("string") delayed_dataset = dataset.df.to_delayed() parition_lengths = [0] diff --git a/nemo_curator/modules/base.py b/nemo_curator/modules/base.py new file mode 100644 index 00000000..ba2f3dbc --- /dev/null +++ b/nemo_curator/modules/base.py @@ -0,0 +1,84 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Literal, Optional + +import dask.dataframe as dd + +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.gpu_utils import is_cudf_type + + +class BaseModule(ABC): + """ + Base class for all NeMo Curator modules. + + Handles validating that data lives on the correct device for each module + """ + + SUPPORTED_BACKENDS = ["pandas", "cudf", "any"] + + def __init__( + self, + input_backend: Literal["pandas", "cudf", "any"], + name: Optional[str] = None, + ) -> None: + """ + Constructs a Module + + Args: + input_backend (Literal["pandas", "cudf", "any"]): The backend the input dataframe must be on for the module to work + name (str, Optional): The name of the module. If None, defaults to self.__class__.__name__ + """ + super().__init__() + self.name = name or self.__class__.__name__ + + if input_backend not in self.SUPPORTED_BACKENDS: + raise ValueError( + f"{input_backend} not one of the supported backends {self.SUPPORTED_BACKENDS}" + ) + self.input_backend = input_backend + + @abstractmethod + def call(self, dataset: DocumentDataset): + """ + Performs an arbitrary operation on a dataset + + Args: + dataset (DocumentDataset): The dataset to operate on + """ + raise NotImplementedError("call method must be implemented by subclasses") + + def _validate_correct_backend(self, ddf: dd.DataFrame): + if self.input_backend == "any": + return + + backend = "cudf" if is_cudf_type(ddf) else "pandas" + if backend != self.input_backend: + raise ValueError( + f"Module {self.name} requires dataset to have backend {self.input_backend} but got backend {backend}." + "Try using nemo_curator.ToBackend to swap dataframe backends before running this module." + ) + + def __call__(self, dataset: DocumentDataset): + """ + Validates the dataset is on the right backend, and performs an arbitrary operation on it + + Args: + dataset (DocumentDataset): The dataset to operate on + """ + self._validate_correct_backend(dataset.df) + + return self.call(dataset) diff --git a/nemo_curator/modules/config.py b/nemo_curator/modules/config.py index b43eb8fd..50c71017 100644 --- a/nemo_curator/modules/config.py +++ b/nemo_curator/modules/config.py @@ -145,6 +145,10 @@ class SemDedupConfig(BaseConfig): embeddings_save_loc (str): Location to save embeddings. embedding_model_name_or_path (str): Model name or path for embeddings. embedding_batch_size (int): Inital Batch size for processing embeddings. + embedding_pooling_strategy (str): Strategy for pooling embeddings, either "mean_pooling" or "last_token". Defaults to "mean_pooling". + write_embeddings_to_disk (bool): If True, saves the embeddings to disk, defaults to True. + We recommend setting this to False when you have a delayed pipeline. + Setting it to False can lead to more memory overhead. clustering_save_loc (str): Location to save clustering results. n_clusters (int): Number of clusters. seed (int): Seed for clustering. @@ -165,6 +169,9 @@ class SemDedupConfig(BaseConfig): embeddings_save_loc: str = "embeddings" embedding_model_name_or_path: str = "sentence-transformers/all-MiniLM-L6-v2" embedding_batch_size: int = 128 + # Options: "mean_pooling", "last_token" + embedding_pooling_strategy: str = "mean_pooling" + write_embeddings_to_disk: bool = True # Clustering config clustering_save_loc: str = "clustering_results" diff --git a/nemo_curator/modules/dataset_ops.py b/nemo_curator/modules/dataset_ops.py index f11184a5..8240e0cf 100644 --- a/nemo_curator/modules/dataset_ops.py +++ b/nemo_curator/modules/dataset_ops.py @@ -5,13 +5,14 @@ import numpy as np from nemo_curator.datasets.doc_dataset import DocumentDataset +from nemo_curator.modules.base import BaseModule def default_filename(partition_num: int) -> str: return f"file_{partition_num:010d}.jsonl" -class Shuffle: +class Shuffle(BaseModule): def __init__( self, seed: Optional[int] = None, @@ -32,13 +33,14 @@ def __init__( will look like given the partition number. The default method names the partition f'file_{partition_num:010d}.jsonl' and should be changed if the user is not using a .jsonl format. """ + super().__init__(input_backend="pandas") self.seed = seed self.npartitions = npartitions self.partition_to_filename = partition_to_filename self.rand_col = "_shuffle_rand" self.filename_col = filename_col - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: if self.seed is None: return self.shuffle_nondeterministic(dataset) else: diff --git a/nemo_curator/modules/exact_dedup.py b/nemo_curator/modules/exact_dedup.py index aa24bcc6..b98051ca 100644 --- a/nemo_curator/modules/exact_dedup.py +++ b/nemo_curator/modules/exact_dedup.py @@ -29,11 +29,12 @@ from nemo_curator._compat import DASK_P2P_ERROR from nemo_curator.datasets import DocumentDataset from nemo_curator.log import create_logger +from nemo_curator.modules.base import BaseModule from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix from nemo_curator.utils.gpu_utils import is_cudf_type -class ExactDuplicates: +class ExactDuplicates(BaseModule): """Find exact duplicates in a document corpus""" SUPPORTED_HASHES = {"md5"} @@ -59,6 +60,7 @@ def __init__( cache_dir: str, Default None If specified, will compute & write duplicate id's to cache directory. """ + super().__init__(input_backend="any") if hash_method not in self.SUPPORTED_HASHES: raise ValueError( @@ -135,7 +137,7 @@ def hash_documents( # TODO: Generalize ty using self.hash_method return df.apply(lambda x: md5(x.encode()).hexdigest()) - def __call__(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]: + def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]: """ Find document ID's for exact duplicates in a given DocumentDataset Parameters diff --git a/nemo_curator/modules/filter.py b/nemo_curator/modules/filter.py index 1006ee25..88825219 100644 --- a/nemo_curator/modules/filter.py +++ b/nemo_curator/modules/filter.py @@ -22,6 +22,7 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.datasets.parallel_dataset import ParallelDataset from nemo_curator.filters import DocumentFilter +from nemo_curator.modules.base import BaseModule from nemo_curator.utils.module_utils import is_batched # Override so that pd.NA is not passed during the metadata inference @@ -31,7 +32,7 @@ ) -class Score: +class Score(BaseModule): """ The module responsible for adding metadata to records based on statistics about the text. It accepts an arbitrary scoring function that accepts a text field and returns a score. @@ -56,12 +57,13 @@ def __init__( text_field (str): The field the documents will be read from. score_type (Union[type, str]): The datatype of the score that will be made for each document. """ + super().__init__(input_backend="pandas") self.score_fn = score_fn self.score_field = score_field self.text_field = text_field self.score_type = score_type - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: """ Applies the scoring to a dataset @@ -89,7 +91,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return dataset -class Filter: +class Filter(BaseModule): """ The module responsible for filtering records based on a metadata field. It accepts an arbitrary filter function that accepts a metadata field and returns True if the field should be kept. @@ -107,6 +109,7 @@ def __init__(self, filter_fn: Callable, filter_field: str, invert: bool = False) filter_field (str): The field(s) to be passed into the filter function. invert (bool): Whether to invert the filter condition. """ + super().__init__(input_backend="pandas") self.filter_fn = filter_fn self.filter_field = filter_field self.invert = invert @@ -134,7 +137,7 @@ def compute_filter_mask(self, dataset: DocumentDataset): return bool_mask - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: """ Applies the filtering to a dataset @@ -148,7 +151,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return DocumentDataset(dataset.df[bool_mask]) -class ScoreFilter: +class ScoreFilter(BaseModule): """ The module responsible for applying a filter to all documents in a DocumentDataset. It accepts an arbitrary DocumentFilter and first computes the score for a document. @@ -176,6 +179,7 @@ def __init__( score_type (Union[type, str]): The datatype of the score that will be made for each document. invert (bool): If True, will keep all documents that are normally discarded. """ + super().__init__(input_backend=filter_obj.backend) self.filter_obj = filter_obj self.text_field = text_field self.score_field = score_field @@ -219,7 +223,7 @@ def compute_filter_mask(self, dataset: DocumentDataset): return bool_mask - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: """ Scores and filters all records in the dataset @@ -233,7 +237,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return DocumentDataset(dataset.df[bool_mask]) -class ParallelScoreFilter: +class ParallelScoreFilter(BaseModule): def __init__( self, src_filter_obj, @@ -263,7 +267,7 @@ def __init__( score_type (Optional[str]): The datatype of the score that will be made for each document. Defaults to None. invert (bool, optional): If True, will keep all documents that are normally discarded. Defaults to False. """ - + super().__init__(input_backend=src_filter_obj.backend) self.source_score_filter = ScoreFilter( src_filter_obj, src_field, src_score, score_type, invert ) @@ -271,7 +275,7 @@ def __init__( tgt_filter_obj, tgt_field, tgt_score, score_type, invert ) - def __call__(self, dataset: ParallelDataset): + def call(self, dataset: ParallelDataset): src_bool_mask = self.source_score_filter.compute_filter_mask(dataset) tgt_bool_mask = self.target_score_filter.compute_filter_mask(dataset) diff --git a/nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py b/nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py index 20102162..6125f664 100644 --- a/nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py +++ b/nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py @@ -23,6 +23,7 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.log import create_logger +from nemo_curator.modules.base import BaseModule from nemo_curator.modules.config import FuzzyDuplicatesConfig from nemo_curator.modules.fuzzy_dedup._mapbuckets import _MapBuckets from nemo_curator.modules.fuzzy_dedup._shuffle import _Shuffle @@ -35,7 +36,7 @@ from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix -class FuzzyDuplicates: +class FuzzyDuplicates(BaseModule): def __init__( self, config: FuzzyDuplicatesConfig, @@ -53,6 +54,7 @@ def __init__( DocumentDataset containing IDs of all documents and the corresponding duplicate group they belong to. Documents in the same group are near duplicates. """ + super().__init__(input_backend="cudf") if isinstance(logger, str): self._logger = create_logger( rank=0, @@ -129,7 +131,7 @@ def __init__( profile_dir=self.config.profile_dir, ) - def __call__(self, dataset: DocumentDataset): + def call(self, dataset: DocumentDataset): """ Parameters ---------- diff --git a/nemo_curator/modules/modify.py b/nemo_curator/modules/modify.py index 1307ab17..93f31cc9 100644 --- a/nemo_curator/modules/modify.py +++ b/nemo_curator/modules/modify.py @@ -11,18 +11,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from nemo_curator.datasets import DocumentDataset from nemo_curator.modifiers import DocumentModifier +from nemo_curator.modules.base import BaseModule from nemo_curator.utils.module_utils import is_batched -class Modify: +class Modify(BaseModule): def __init__(self, modifier: DocumentModifier, text_field="text"): + super().__init__(input_backend=modifier.backend) self.modifier = modifier self.text_field = text_field - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: if is_batched(self.modifier.modify_document): dataset.df[self.text_field] = dataset.df[self.text_field].map_partitions( self.modifier.modify_document, meta=(None, str) diff --git a/nemo_curator/modules/semantic_dedup/embeddings.py b/nemo_curator/modules/semantic_dedup/embeddings.py index 4a0b638b..7f6315e5 100644 --- a/nemo_curator/modules/semantic_dedup/embeddings.py +++ b/nemo_curator/modules/semantic_dedup/embeddings.py @@ -41,6 +41,7 @@ class EmbeddingConfig: model_name_or_path: str max_seq_length: int = None + pooling_strategy: str = "mean_pooling" # Options: "mean_pooling" or "last_token" def __post_init__(self): self.max_seq_length = AutoTokenizer.from_pretrained( @@ -52,6 +53,10 @@ def __post_init__(self): self.max_seq_length = AutoConfig.from_pretrained( self.model_name_or_path ).max_position_embeddings + if self.pooling_strategy not in ["mean_pooling", "last_token"]: + raise ValueError( + "pooling_strategy must be either 'mean_pooling' or 'last_token'" + ) class EmbeddingPytorchModel(nn.Module): @@ -70,7 +75,10 @@ def feature(self, input_ids, attention_mask): @torch.no_grad() def forward(self, batch): feature = self.feature(batch["input_ids"], batch["attention_mask"]) - return self._mean_pooling(feature, batch["attention_mask"]) + if self.config.pooling_strategy == "mean_pooling": + return self._mean_pooling(feature, batch["attention_mask"]) + else: + return self._get_last_token(feature, batch["attention_mask"]) def _mean_pooling(self, model_output, attention_mask): token_embeddings = model_output[0] @@ -81,6 +89,19 @@ def _mean_pooling(self, model_output, attention_mask): sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) return F.normalize(sum_embeddings / sum_mask, dim=1) + def _get_last_token(self, model_output, attention_mask): + token_embeddings = model_output[0] + # Get indices of last non-padded tokens for each sequence in batch + last_token_indices = attention_mask.sum(dim=1) - 1 # -1 for 0-based indexing + last_token_indices = last_token_indices.to( + torch.long + ) # Ensure indices are of type long + batch_size = attention_mask.size(0) + batch_indices = torch.arange(batch_size, device=attention_mask.device) + # Get embeddings of last non-padded tokens + last_token_embeddings = token_embeddings[batch_indices, last_token_indices] + return F.normalize(last_token_embeddings, dim=1) + class EmbeddingCrossFitModel(HFModel): def __init__( @@ -116,6 +137,7 @@ def __init__( embedding_batch_size: int, embedding_output_dir: str, embedding_max_mem_gb: Optional[int] = None, + embedding_pooling_strategy: str = "mean_pooling", input_column: str = "text", embedding_column: str = "embeddings", write_embeddings_to_disk: bool = True, @@ -132,6 +154,7 @@ def __init__( embedding_output_dir (str): Directory path where embeddings will be saved. embedding_max_mem_gb (int): Maximum memory usage in GB for the embedding process. If None, it defaults to the available GPU memory minus 4 GB. + embedding_pooling_strategy (str): Strategy for pooling embeddings, either "mean_pooling" or "last_token". Defaults to "mean_pooling". input_column (str): Column name from the data to be used for embedding generation, defaults to "text". write_embeddings_to_disk (bool, optional): If True, saves the embeddings to disk, defaults to True. We recommend setting this to False when you have a delayed pipeline. @@ -152,6 +175,7 @@ def __init__( self.embeddings_config = EmbeddingConfig( model_name_or_path=embedding_model_name_or_path, + pooling_strategy=embedding_pooling_strategy, ) self.batch_size = embedding_batch_size self.logger = self._setup_logger(logger) @@ -184,7 +208,7 @@ def create_embeddings( op.Tokenizer( self.model, cols=[input_column], - tokenizer_type="sentencepiece", + tokenizer_type="default", max_length=self.embeddings_config.max_seq_length, ), op.Predictor( @@ -217,6 +241,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: ) ) else: + embedding_ddf = self.create_embeddings(dataset.df, self.input_column) ddf = DocumentDataset(embedding_ddf) self.logger.info( diff --git a/nemo_curator/modules/semantic_dedup/semdedup.py b/nemo_curator/modules/semantic_dedup/semdedup.py index 5cb91262..77775783 100644 --- a/nemo_curator/modules/semantic_dedup/semdedup.py +++ b/nemo_curator/modules/semantic_dedup/semdedup.py @@ -18,6 +18,7 @@ from typing import Union from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.base import BaseModule from nemo_curator.modules.config import SemDedupConfig from nemo_curator.modules.semantic_dedup.clusteringmodel import ClusteringModel from nemo_curator.modules.semantic_dedup.embeddings import EmbeddingCreator @@ -26,7 +27,7 @@ ) -class SemDedup: +class SemDedup(BaseModule): def __init__( self, config: SemDedupConfig, @@ -42,14 +43,17 @@ def __init__( config (SemDedupConfig): Configuration for SemDedup. logger (Union[logging.Logger, str]): Logger instance or path to the log file directory. """ + super().__init__(input_backend="cudf") self.config = config self.logger = logger cache_dir = config.cache_dir self.embedding_creator = EmbeddingCreator( embedding_model_name_or_path=config.embedding_model_name_or_path, embedding_batch_size=config.embedding_batch_size, + embedding_pooling_strategy=config.embedding_pooling_strategy, input_column=input_column, embedding_output_dir=os.path.join(cache_dir, config.embeddings_save_loc), + write_embeddings_to_disk=config.write_embeddings_to_disk, logger=logger, profile_dir=self.config.profile_dir, ) @@ -79,7 +83,7 @@ def __init__( self.eps_thresholds = config.eps_thresholds self.eps_to_extract = config.eps_to_extract - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: """ Execute the SemDedup process. diff --git a/nemo_curator/modules/task.py b/nemo_curator/modules/task.py index 2571b6a8..06b12af1 100644 --- a/nemo_curator/modules/task.py +++ b/nemo_curator/modules/task.py @@ -20,12 +20,13 @@ from dask import delayed from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.base import BaseModule from nemo_curator.tasks.downstream_task import DownstreamTask from nemo_curator.utils.distributed_utils import single_partition_write_with_filename from nemo_curator.utils.text_utils import get_words -class TaskDecontamination: +class TaskDecontamination(BaseModule): def __init__( self, tasks: Union[DownstreamTask, Iterable[DownstreamTask]], @@ -47,6 +48,7 @@ def __init__( max_splits: The maximum number of times a document may be split before being entirely discarded. removed_dir: If not None, the documents split too many times will be written to this directory using the filename in the dataset. """ + super().__init__(input_backend="pandas") if isinstance(tasks, DownstreamTask): tasks = [tasks] self.tasks = tasks @@ -58,7 +60,7 @@ def __init__( self.max_splits = max_splits self.removed_dir = removed_dir - def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + def call(self, dataset: DocumentDataset) -> DocumentDataset: # Convert the dataframe to delayed objects for complex operations original_meta = dataset.df.dtypes.to_dict() diff --git a/nemo_curator/modules/to_backend.py b/nemo_curator/modules/to_backend.py new file mode 100644 index 00000000..0bfcafc6 --- /dev/null +++ b/nemo_curator/modules/to_backend.py @@ -0,0 +1,40 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Literal + +from nemo_curator.datasets.doc_dataset import DocumentDataset +from nemo_curator.modules.base import BaseModule + + +class ToBackend(BaseModule): + """ + A module for moving dataframes between backends. + """ + + def __init__(self, backend: Literal["pandas", "cudf"]) -> None: + """ + Constructs a ToBackend module + + Args: + backend (str): The backend to transfer the dataset to. Can be "pandas" or "cudf" + """ + super().__init__(input_backend="any") + if backend not in BaseModule.SUPPORTED_BACKENDS: + raise ValueError( + f"Backend must be either 'pandas' or 'cudf', but got {backend}" + ) + self.backend = backend + + def call(self, dataset: DocumentDataset) -> DocumentDataset: + return DocumentDataset(dataset.df.to_backend(self.backend)) diff --git a/nemo_curator/package_info.py b/nemo_curator/package_info.py index 65c7ae06..26f99e5f 100644 --- a/nemo_curator/package_info.py +++ b/nemo_curator/package_info.py @@ -14,10 +14,10 @@ MAJOR = 0 -MINOR = 6 +MINOR = 7 PATCH = 0 -PRE_RELEASE = "rc0" -DEV = "dev1" +PRE_RELEASE = "rc1" +DEV = "dev0" # Use the following formatting: (major, minor, patch, pre-release) VERSION = (MAJOR, MINOR, PATCH, PRE_RELEASE, DEV) diff --git a/nemo_curator/scripts/add_id.py b/nemo_curator/scripts/add_id.py index c926e36d..2a856af0 100644 --- a/nemo_curator/scripts/add_id.py +++ b/nemo_curator/scripts/add_id.py @@ -28,6 +28,7 @@ def main(args): client = get_client(**ArgumentHelper.parse_client_args(args)) + backend = "cudf" if args.device == "gpu" else "pandas" output_dir = expand_outdir_and_mkdir(args.output_data_dir) files = get_all_files_paths_under(args.input_data_dir) if args.shuffle: @@ -36,7 +37,7 @@ def main(args): dataset = DocumentDataset( read_data( - files, file_type=args.input_file_type, backend="pandas", add_filename=True + files, file_type=args.input_file_type, backend=backend, add_filename=True ) ) add_id = nemo_curator.AddId( diff --git a/nemo_curator/scripts/classifiers/README.md b/nemo_curator/scripts/classifiers/README.md index a034beb3..59197474 100644 --- a/nemo_curator/scripts/classifiers/README.md +++ b/nemo_curator/scripts/classifiers/README.md @@ -6,16 +6,16 @@ The Python scripts in this directory demonstrate how to run classification on yo - Multilingual Domain Classifier - Quality Classifier - AEGIS Safety Models -- Instruction-Data-Guard Model +- Instruction Data Guard Model - FineWeb Educational Content Classifier - Content Type Classifier -- Prompt Task/Complexity Classifier +- Prompt Task and Complexity Classifier For more information about these classifiers, please see NeMo Curator's [Distributed Data Classification documentation](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html). ### Usage -#### Domain classifier inference +#### Domain Classifier Inference This classifier is recommended for English-only text data. @@ -36,7 +36,7 @@ domain_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `domain_classifier_inference --help` for more information. -#### Multilingual domain classifier inference +#### Multilingual Domain Classifier Inference This classifier supports domain classification in 52 languages. Please see [nvidia/multilingual-domain-classifier on Hugging Face](https://huggingface.co/nvidia/multilingual-domain-classifier) for more information. @@ -57,7 +57,7 @@ multilingual_domain_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `multilingual_domain_classifier_inference --help` for more information. -#### Quality classifier inference +#### Quality Classifier DeBERTa Inference ```bash # same as `python quality_classifier_inference.py` @@ -76,7 +76,7 @@ quality_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `quality_classifier_inference --help` for more information. -#### AEGIS classifier inference +#### AEGIS Classifier Inference ```bash # same as `python aegis_classifier_inference.py` @@ -99,7 +99,7 @@ aegis_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `aegis_classifier_inference --help` for more information. -#### Instruction-Data-Guard classifier inference +#### Instruction Data Guard Classifier Inference ```bash # same as `python instruction_data_guard_classifier_inference.py` @@ -120,7 +120,7 @@ In the above example, `--token` is your HuggingFace token, which is used when do Additional arguments may be added for customizing a Dask cluster and client. Run `instruction_data_guard_classifier_inference --help` for more information. -#### FineWeb-Edu classifier inference +#### FineWeb-Edu Classifier Inference ```bash # same as `python fineweb_edu_classifier_inference.py` @@ -139,7 +139,7 @@ fineweb_edu_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `fineweb_edu_classifier_inference --help` for more information. -#### Content type classifier inference +#### Content Type Classifier DeBERTa Inference ```bash # same as `python content_type_classifier_inference.py` @@ -158,7 +158,7 @@ content_type_classifier_inference \ Additional arguments may be added for customizing a Dask cluster and client. Run `content_type_classifier_inference --help` for more information. -#### Prompt task and complexity classifier inference +#### Prompt Task and Complexity Classifier Inference ```bash # same as `python prompt_task_complexity_classifier_inference.py` diff --git a/nemo_curator/scripts/classifiers/instruction_data_guard_classifier_inference.py b/nemo_curator/scripts/classifiers/instruction_data_guard_classifier_inference.py index d58a89b7..79e80918 100644 --- a/nemo_curator/scripts/classifiers/instruction_data_guard_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/instruction_data_guard_classifier_inference.py @@ -36,7 +36,7 @@ def main(): client_args = ArgumentHelper.parse_client_args(args) client_args["cluster_type"] = "gpu" client = get_client(**client_args) - print("Starting Instruction-Data-Guard classifier inference", flush=True) + print("Starting Instruction Data Guard classifier inference", flush=True) global_st = time.time() files_per_run = len(client.scheduler_info()["workers"]) * 2 @@ -97,7 +97,7 @@ def main(): global_et = time.time() print( - f"Total time taken for Instruction-Data-Guard classifier inference: {global_et-global_st} s", + f"Total time taken for Instruction Data Guard classifier inference: {global_et-global_st} s", flush=True, ) client.close() @@ -105,7 +105,7 @@ def main(): def attach_args(): parser = ArgumentHelper.parse_distributed_classifier_args( - description="Run Instruction-Data-Guard classifier inference.", + description="Run Instruction Data Guard classifier inference.", max_chars_default=6000, ) diff --git a/nemo_curator/scripts/semdedup/compute_embeddings.py b/nemo_curator/scripts/semdedup/compute_embeddings.py index c993671e..c02d9cef 100644 --- a/nemo_curator/scripts/semdedup/compute_embeddings.py +++ b/nemo_curator/scripts/semdedup/compute_embeddings.py @@ -80,6 +80,7 @@ def main(args): semdedup_config.cache_dir, semdedup_config.embeddings_save_loc ), input_column=args.text_field, + write_embeddings_to_disk=semdedup_config.write_embeddings_to_disk, logger=logger, write_to_filename=True, ) diff --git a/nemo_curator/scripts/text_cleaning.py b/nemo_curator/scripts/text_cleaning.py index 6eaa41cc..657c6cca 100644 --- a/nemo_curator/scripts/text_cleaning.py +++ b/nemo_curator/scripts/text_cleaning.py @@ -14,9 +14,9 @@ import argparse -import nemo_curator +from nemo_curator import Modify, Sequential from nemo_curator.datasets import DocumentDataset -from nemo_curator.modifiers import UnicodeReformatter +from nemo_curator.modifiers import NewlineNormalizer, UnicodeReformatter, UrlRemover from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_batched_files from nemo_curator.utils.script_utils import ArgumentHelper @@ -28,7 +28,14 @@ def main(args): # Make the output directories output_clean_dir = expand_outdir_and_mkdir(args.output_clean_dir) - cleaner = nemo_curator.Modify(UnicodeReformatter(), text_field=args.text_field) + stages = [Modify(UnicodeReformatter(), text_field=args.text_field)] + + if args.normalize_newlines: + stages.append(Modify(NewlineNormalizer(), text_field=args.text_field)) + if args.remove_urls: + stages.append(Modify(UrlRemover, text_field=args.text_field)) + + cleaner = Sequential(stages) for files in get_batched_files( args.input_data_dir, @@ -77,6 +84,15 @@ def attach_args( argumentHelper.add_arg_text_field() argumentHelper.add_arg_output_file_type() argumentHelper.add_distributed_args() + argumentHelper.attach_bool_arg( + parser, + "normalize-newlines", + default=False, + help="Replace 3 or more consecutive newline characters in each document with only 2 newline characters.", + ) + argumentHelper.attach_bool_arg( + parser, "remove-urls", default=False, help="Removes all URLs in each document." + ) parser.add_argument( "--output-clean-dir", type=str, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index addabfd9..8f022389 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -748,6 +748,7 @@ def single_partition_write_with_filename( orient="records", lines=True, force_ascii=False, + index=False, # Only index=False is supported for orient="records" ) else: # See open issue here: https://github.com/rapidsai/cudf/issues/15211 @@ -759,6 +760,7 @@ def single_partition_write_with_filename( orient="records", lines=True, force_ascii=False, + index=False, # Only index=False is supported for orient="records" ) elif output_type == "parquet": @@ -843,6 +845,7 @@ def write_to_disk( write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, output_type: str = "jsonl", + partition_on: Optional[str] = None, ): """ This function writes a Dask DataFrame to the specified file path. @@ -857,6 +860,9 @@ def write_to_disk( If str, uses that as the filename column to write to. keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists. output_type: The type of output file to write. Can be "jsonl" or "parquet". + partition_on: The column name to partition the data on. + If specified, the data will be partitioned based on the unique values in this column, + and each partition will be written to a separate directory """ filename_col = _resolve_filename_col(write_to_filename) @@ -879,6 +885,11 @@ def write_to_disk( f"write_using_filename is True but no {filename_col} column found in DataFrame" ) + if partition_on is not None and write_to_filename: + raise ValueError( + "Cannot use both partition_on and write_to_filename parameters simultaneously. " + ) + if is_cudf_type(df): import cudf @@ -904,7 +915,12 @@ def write_to_disk( # output_path is a directory else: if output_type == "jsonl" or output_type == "parquet": - _write_to_jsonl_or_parquet(df, output_path, output_type) + _write_to_jsonl_or_parquet( + df, + output_path=output_path, + output_type=output_type, + partition_on=partition_on, + ) elif output_type == "bitext": if write_to_filename: os.makedirs(output_path, exist_ok=True) @@ -938,16 +954,50 @@ def _write_to_jsonl_or_parquet( df, output_path: str, output_type: Literal["jsonl", "parquet"] = "jsonl", + partition_on: Optional[str] = None, ): if output_type == "jsonl": - if is_cudf_type(df): - # See open issue here: https://github.com/rapidsai/cudf/issues/15211 - # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if partition_on is not None: + unique_values = ( + df[partition_on] + .unique() + .to_backend(backend="pandas") + .compute() + .to_list() + ) + for value in unique_values: + os.makedirs(output_path, exist_ok=True) + partition_output_path = os.path.join( + output_path, f"{partition_on}={value}" + ) + df[df[partition_on] == value].to_json( + partition_output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, # Only index=False is supported for orient="records" + ) else: - df.to_json(output_path, orient="records", lines=True, force_ascii=False) + if is_cudf_type(df): + # See open issue here: https://github.com/rapidsai/cudf/issues/15211 + # df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False) + df.to_json( + output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, + ) # Only index=False is supported for orient="records" + else: + df.to_json( + output_path, + orient="records", + lines=True, + force_ascii=False, + index=False, + ) # Only index=False is supported for orient="records" elif output_type == "parquet": - df.to_parquet(output_path, write_index=False) + df.to_parquet(output_path, write_index=False, partition_on=partition_on) else: raise ValueError(f"Unknown output type: {output_type}") diff --git a/tests/test_add_id.py b/tests/test_add_id.py index 42a8575e..c33c5e4a 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -18,26 +18,37 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable +cudf = gpu_only_import("cudf") +is_cudf_available = not is_unavailable(cudf) -def list_to_dataset(documents, col_name="text", npartitions=2): + +def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas"): data = {col_name: documents} pdf = pd.DataFrame(data) - - return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + ddf = dd.from_pandas(pdf, npartitions=npartitions) + if backend == "cudf" and is_unavailable(cudf): + raise ImportError("cuDF is not installed or importable.") + ddf = ddf.to_backend(backend) + return DocumentDataset(ddf) -@pytest.fixture -def single_partition_dataset(): +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) +def single_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=1 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=1, + backend=request.param, ) -@pytest.fixture -def two_partition_dataset(): +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) +def two_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=2 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=2, + backend=request.param, ) @@ -56,6 +67,8 @@ def test_basic_id(self, single_partition_dataset): "doc_id-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -75,6 +88,8 @@ def test_two_partitions(self, two_partition_dataset): "doc_id-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -95,6 +110,8 @@ def test_id_prefix(self, two_partition_dataset): f"{id_prefix}-0000000004", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -115,6 +132,8 @@ def test_start_index(self, two_partition_dataset): "doc_id-0000000017", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -134,6 +153,8 @@ def test_fast_id_single_partition(self, single_partition_dataset): "doc_id-40", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -153,6 +174,8 @@ def test_fast_id_two_partitions(self, two_partition_dataset): "doc_id-11", ] ) + if is_cudf_available and isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids diff --git a/tests/test_backends.py b/tests/test_backends.py new file mode 100644 index 00000000..6acd87b5 --- /dev/null +++ b/tests/test_backends.py @@ -0,0 +1,428 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pandas as pd +import pytest +from dask.dataframe.utils import assert_eq +from distributed import Client + +from nemo_curator import ( + BaseModule, + FuzzyDuplicates, + FuzzyDuplicatesConfig, + ScoreFilter, + Sequential, + ToBackend, +) +from nemo_curator.datasets import DocumentDataset +from nemo_curator.filters import MeanWordLengthFilter +from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from + +cudf = gpu_only_import("cudf") +dask_cudf = gpu_only_import("dask_cudf") +LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") + + +class CPUModule(BaseModule): + def __init__(self): + super().__init__(input_backend="pandas") + + def call(self, dataset: DocumentDataset): + dataset.df["cpu_lengths"] = dataset.df["text"].str.len() + return dataset + + +class GPUModule(BaseModule): + def __init__(self): + super().__init__(input_backend="cudf") + + def call(self, dataset: DocumentDataset): + dataset.df["gpu_lengths"] = dataset.df["text"].str.len() + return dataset + + +class AnyModule(BaseModule): + def __init__(self): + super().__init__(input_backend="any") + + def call(self, dataset: DocumentDataset): + dataset.df["any_lengths"] = dataset.df["text"].str.len() + return dataset + + +@pytest.fixture +def raw_data(): + base_data = { + "id": [1, 2, 3, 4, 100, 200, 300], + "text": [ + "The quick brown fox jumps over the lazy dog", + "The quick brown foxes jumps over the lazy dog", + "The quick brown wolf jumps over the lazy dog", + "The quick black cat jumps over the lazy dog", + "A test string", + "Another test string", + "A different object", + ], + } + gt_results = [43, 45, 44, 43, 13, 19, 18] + + return base_data, gt_results + + +@pytest.fixture +def cpu_data(raw_data): + base_data, gt_results = raw_data + df = pd.DataFrame(base_data) + gt_lengths = pd.Series(gt_results, name="cpu_lengths") + return DocumentDataset.from_pandas(df), gt_lengths + + +@pytest.fixture +def gpu_data(raw_data): + base_data, gt_results = raw_data + df = cudf.DataFrame(base_data) + df = dask_cudf.from_cudf(df, 2) + gt_lengths = cudf.Series(gt_results, name="gpu_lengths", dtype="int32") + return DocumentDataset(df), gt_lengths + + +@pytest.mark.gpu +class TestBackendSupport: + @pytest.fixture(autouse=True, scope="class") + def gpu_client(self, request): + with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: + request.cls.client = client + request.cls.cluster = cluster + yield + + def test_pandas_backend( + self, + cpu_data, + ): + print("client", self.client) + dataset, gt_lengths = cpu_data + pipeline = CPUModule() + result = pipeline(dataset) + result_df = result.df.compute() + assert_eq(result_df["cpu_lengths"], gt_lengths) + + def test_cudf_backend( + self, + gpu_data, + ): + print("client", self.client) + dataset, gt_lengths = gpu_data + pipeline = GPUModule() + result = pipeline(dataset) + result_df = result.df.compute() + assert_eq(result_df["gpu_lengths"], gt_lengths) + + def test_any_backend( + self, + cpu_data, + gpu_data, + ): + print("client", self.client) + cpu_dataset, gt_cpu_lengths = cpu_data + gt_cpu_lengths = gt_cpu_lengths.rename("any_lengths") + gpu_dataset, gt_gpu_lengths = gpu_data + gt_gpu_lengths = gt_gpu_lengths.rename("any_lengths") + pipeline = AnyModule() + + cpu_result = pipeline(cpu_dataset) + cpu_result_df = cpu_result.df.compute() + assert_eq(cpu_result_df["any_lengths"], gt_cpu_lengths) + gpu_result = pipeline(gpu_dataset) + gpu_result_df = gpu_result.df.compute() + assert_eq(gpu_result_df["any_lengths"], gt_gpu_lengths) + + def test_pandas_to_cudf( + self, + cpu_data, + gpu_data, + ): + print("client", self.client) + dataset, gt_cpu_lengths = cpu_data + _, gt_gpu_lengths = gpu_data + pipeline = Sequential( + [ + CPUModule(), + ToBackend("cudf"), + GPUModule(), + ] + ) + result = pipeline(dataset) + result_df = result.df.compute() + assert_eq(result_df["cpu_lengths"], gt_cpu_lengths) + assert_eq(result_df["gpu_lengths"], gt_gpu_lengths) + + def test_cudf_to_pandas( + self, + cpu_data, + gpu_data, + ): + print("client", self.client) + _, gt_cpu_lengths = cpu_data + dataset, gt_gpu_lengths = gpu_data + pipeline = Sequential( + [ + GPUModule(), + ToBackend("pandas"), + CPUModule(), + ] + ) + result = pipeline(dataset) + result_df = result.df.compute() + assert_eq(result_df["cpu_lengths"], gt_cpu_lengths) + assert_eq(result_df["gpu_lengths"], gt_gpu_lengths) + + def test_5x_switch( + self, + cpu_data, + gpu_data, + ): + print("client", self.client) + dataset, gt_cpu_lengths = cpu_data + _, gt_gpu_lengths = gpu_data + pipeline = Sequential( + [ + CPUModule(), + ToBackend("cudf"), + GPUModule(), + ToBackend("pandas"), + CPUModule(), + ToBackend("cudf"), + GPUModule(), + ToBackend("pandas"), + CPUModule(), + ToBackend("cudf"), + GPUModule(), + ] + ) + result = pipeline(dataset) + result_df = result.df.compute() + assert sorted(list(result_df.columns)) == [ + "cpu_lengths", + "gpu_lengths", + "id", + "text", + ] + assert_eq(result_df["cpu_lengths"], gt_cpu_lengths) + assert_eq(result_df["gpu_lengths"], gt_gpu_lengths) + + def test_wrong_backend_cpu_data(self, cpu_data): + with pytest.raises(ValueError): + print("client", self.client) + dataset, _ = cpu_data + pipeline = GPUModule() + result = pipeline(dataset) + _ = result.df.compute() + + def test_wrong_backend_gpu_data(self, gpu_data): + with pytest.raises(ValueError): + print("client", self.client) + dataset, _ = gpu_data + pipeline = CPUModule() + result = pipeline(dataset) + _ = result.df.compute() + + def test_unsupported_to_backend(self, cpu_data): + with pytest.raises(ValueError): + print("client", self.client) + dataset, _ = cpu_data + pipeline = ToBackend("fake_backend") + result = pipeline(dataset) + _ = result.df.compute() + + +@pytest.fixture +def real_module_raw_data(): + base_data = { + "id": [1, 2, 3, 4, 100, 200, 300], + "text": [ + "The quick brown fox jumps over the lazy dog", + "The quick brown foxes jumps over the lazy dog", + "The quick brown wolf jumps over the lazy dog", + "The quick black cat jumps over the lazy dog", + "A test string", + "Another test string", + "A different object", + ], + } + return base_data + + +@pytest.fixture +def real_module_cpu_data(real_module_raw_data): + df = pd.DataFrame(real_module_raw_data) + gt_results = pd.Series( + [35 / 9, 37 / 9, 4.0, 35 / 9, 33 / 9, 51 / 9, 48 / 9], name="mean_lengths" + ) + return DocumentDataset.from_pandas(df), gt_results + + +@pytest.fixture +def real_module_gpu_data(real_module_raw_data): + df = cudf.DataFrame(real_module_raw_data) + df = dask_cudf.from_cudf(df, 2) + gt_results = cudf.Series([[1, 2, 3, 4], [100, 200]], name="id") + return DocumentDataset(df), gt_results + + +@pytest.mark.gpu +class TestRealModules: + @pytest.fixture(autouse=True, scope="class") + def gpu_client(self, request): + with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: + request.cls.client = client + request.cls.cluster = cluster + yield + + def test_score_filter( + self, + real_module_cpu_data, + ): + print("client", self.client) + dataset, gt_results = real_module_cpu_data + pipeline = ScoreFilter( + MeanWordLengthFilter(), score_field="mean_lengths", score_type=float + ) + result = pipeline(dataset) + result_df = result.df.compute() + assert_eq(result_df["mean_lengths"], gt_results) + + def test_score_filter_wrong_backend( + self, + real_module_gpu_data, + ): + with pytest.raises(ValueError): + print("client", self.client) + dataset, _ = real_module_gpu_data + pipeline = ScoreFilter( + MeanWordLengthFilter(), score_field="mean_lengths", score_type=float + ) + result = pipeline(dataset) + _ = result.df.compute() + + def test_fuzzy_dedup( + self, + real_module_gpu_data, + tmpdir, + ): + print(self.client) + dataset, gt_results = real_module_gpu_data + # Dedup might fail when indices per partition do not start from 0 + dataset.df = dataset.df.reset_index(drop=True) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=15, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=3, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=0.3, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(dataset) + result_df = result.df.compute() + # Drop non duplicated docs + result_df = result_df[result_df.group.duplicated(keep=False)] + result_df = result_df.groupby("group").id.agg(list) + # Sort to maintain uniform ordering + + result_df = result_df.list.sort_values() + result_df = result_df.sort_values() + gt_results = gt_results.list.sort_values() + gt_results = gt_results.sort_values() + assert_eq(gt_results, result_df, check_index=False) + + def test_fuzzy_dedup_wrong_backend( + self, + real_module_cpu_data, + tmpdir, + ): + with pytest.raises(ValueError): + print(self.client) + dataset, _ = real_module_cpu_data + # Dedup might fail when indices per partition do not start from 0 + dataset.df = dataset.df.reset_index(drop=True) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=15, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=3, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=0.3, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(dataset) + _ = result.df.compute() + + def test_score_filter_and_fuzzy( + self, + real_module_cpu_data, + real_module_gpu_data, + tmpdir, + ): + print("client", self.client) + dataset, _ = real_module_cpu_data + _, gt_results = real_module_gpu_data + dataset.df = dataset.df.reset_index(drop=True) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=15, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=3, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=0.3, + ) + pipeline = Sequential( + [ + ScoreFilter( + MeanWordLengthFilter(), score_field="mean_lengths", score_type=float + ), + ToBackend("cudf"), + FuzzyDuplicates(config=config), + ] + ) + + result = pipeline(dataset) + result_df = result.df.compute() + # Right now the output of FuzzyDuplicates does not retain the original metadata + # so we simply check the output of fuzzy dedupe to ensure accuracy + # Drop non duplicated docs + result_df = result_df[result_df.group.duplicated(keep=False)] + result_df = result_df.groupby("group").id.agg(list) + # Sort to maintain uniform ordering + result_df = result_df.list.sort_values() + result_df = result_df.sort_values() + gt_results = gt_results.list.sort_values() + gt_results = gt_results.sort_values() + assert_eq(gt_results, result_df, check_index=False) diff --git a/tests/test_classifiers.py b/tests/test_classifiers.py index 5d681089..1d37e7f5 100644 --- a/tests/test_classifiers.py +++ b/tests/test_classifiers.py @@ -150,7 +150,7 @@ def test_fineweb_edu_classifier(gpu_client, domain_dataset): @pytest.mark.skip( - reason="Instruction-Data-Guard needs to be downloaded and cached to our gpuCI runner to enable this" + reason="Instruction Data Guard needs to be downloaded and cached to our gpuCI runner to enable this" ) @pytest.mark.gpu def test_instruction_data_guard_classifier(gpu_client): diff --git a/tests/test_cleaning.py b/tests/test_cleaning.py new file mode 100644 index 00000000..906da391 --- /dev/null +++ b/tests/test_cleaning.py @@ -0,0 +1,151 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dask.dataframe as dd +import pandas as pd + +from nemo_curator import Modify +from nemo_curator.datasets import DocumentDataset +from nemo_curator.modifiers import NewlineNormalizer, UnicodeReformatter, UrlRemover + + +def list_to_dataset(documents, col_name="text", npartitions=2): + data = {col_name: documents} + pdf = pd.DataFrame(data) + + return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + + +class TestUnicodeReformatter: + def test_reformatting(self): + # Examples taken from ftfy documentation: + # https://ftfy.readthedocs.io/en/latest/ + dataset = list_to_dataset( + [ + "✔ No problems", + "The Mona Lisa doesn’t have eyebrows.", + "l’humanité", + "à perturber la réflexion", + "Clean document already.", + ] + ) + expected_results = [ + "✔ No problems", + "The Mona Lisa doesn't have eyebrows.", + "l'humanité", + "à perturber la réflexion", + "Clean document already.", + ] + expected_results.sort() + + modifier = Modify(UnicodeReformatter()) + fixed_dataset = modifier(dataset) + actual_results = fixed_dataset.df.compute()["text"].to_list() + actual_results.sort() + + assert ( + expected_results == actual_results + ), f"Expected: {expected_results}, but got: {actual_results}" + + +class TestNewlineNormalizer: + def test_just_newlines(self): + dataset = list_to_dataset( + [ + "The quick brown fox jumps over the lazy dog", + "The quick\nbrown fox jumps \nover the lazy dog", + "The quick\n\nbrown fox jumps \n\nover the lazy dog", + "The quick\n\n\nbrown fox jumps \n\n\nover the lazy dog", + "The quick\n\n\nbrown fox jumps \nover the lazy dog", + ] + ) + expected_results = [ + "The quick brown fox jumps over the lazy dog", + "The quick\nbrown fox jumps \nover the lazy dog", + "The quick\n\nbrown fox jumps \n\nover the lazy dog", + "The quick\n\nbrown fox jumps \n\nover the lazy dog", + "The quick\n\nbrown fox jumps \nover the lazy dog", + ] + expected_results.sort() + + modifier = Modify(NewlineNormalizer()) + fixed_dataset = modifier(dataset) + actual_results = fixed_dataset.df.compute()["text"].to_list() + actual_results.sort() + + assert ( + expected_results == actual_results + ), f"Expected: {expected_results}, but got: {actual_results}" + + def test_newlines_and_carriage_returns(self): + dataset = list_to_dataset( + [ + "The quick brown fox jumps over the lazy dog", + "The quick\r\nbrown fox jumps \r\nover the lazy dog", + "The quick\r\n\r\nbrown fox jumps \r\n\r\nover the lazy dog", + "The quick\r\n\r\n\r\nbrown fox jumps \r\n\r\n\r\nover the lazy dog", + "The quick\r\n\r\n\r\nbrown fox jumps \r\nover the lazy dog", + ] + ) + expected_results = [ + "The quick brown fox jumps over the lazy dog", + "The quick\r\nbrown fox jumps \r\nover the lazy dog", + "The quick\r\n\r\nbrown fox jumps \r\n\r\nover the lazy dog", + "The quick\r\n\r\nbrown fox jumps \r\n\r\nover the lazy dog", + "The quick\r\n\r\nbrown fox jumps \r\nover the lazy dog", + ] + expected_results.sort() + + modifier = Modify(NewlineNormalizer()) + fixed_dataset = modifier(dataset) + actual_results = fixed_dataset.df.compute()["text"].to_list() + actual_results.sort() + + assert ( + expected_results == actual_results + ), f"Expected: {expected_results}, but got: {actual_results}" + + +class TestUrlRemover: + def test_urls(self): + dataset = list_to_dataset( + [ + "This is a url: www.nvidia.com", + "This is a url: http://www.nvidia.com", + "This is a url: https://www.nvidia.com", + "This is a url: https://www.nvidia.gov", + "This is a url: https://nvidia.com", + "This is a url: HTTPS://WWW.NVIDIA.COM", + "This is not a url: git@github.com:NVIDIA/NeMo-Curator.git", + ] + ) + expected_results = [ + "This is a url: ", + "This is a url: ", + "This is a url: ", + "This is a url: ", + "This is a url: ", + "This is a url: ", + "This is not a url: git@github.com:NVIDIA/NeMo-Curator.git", + ] + expected_results.sort() + + modifier = Modify(UrlRemover()) + fixed_dataset = modifier(dataset) + actual_results = fixed_dataset.df.compute()["text"].to_list() + actual_results.sort() + + assert ( + expected_results == actual_results + ), f"Expected: {expected_results}, but got: {actual_results}" diff --git a/tests/test_io.py b/tests/test_io.py index ca0c645b..1efe0569 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -293,3 +293,127 @@ def test_write_single_jsonl_file(self, tmp_path): result = DocumentDataset.read_json(output_path) assert json_df.equals(result.df.compute()) + + +class TestPartitionOn: + def test_partition_on_and_write_to_filename_error(self, tmp_path): + """Verify that using partition_on and write_to_filename together raises an error.""" + df = pd.DataFrame( + { + "id": [1, 2, 3], + "file_name": ["f1", "f1", "f1"], + "category": ["A", "B", "A"], + } + ) + ddf = dd.from_pandas(df, npartitions=1) + dataset = DocumentDataset(ddf) + with pytest.raises( + ValueError, + match="Cannot use both partition_on and write_to_filename parameters simultaneously.", + ): + dataset.to_json( + output_path=str(tmp_path / "output"), + write_to_filename=True, # Intentionally provided to trigger the error + partition_on="category", + ) + + @pytest.mark.parametrize( + "backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] + ) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_jsonl( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned JSONL dataset. + + The function is expected to create subdirectories in the output directory + with names of the form 'category=' for each unique partition column value. + """ + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_jsonl" + dataset = DocumentDataset(ddf) + dataset.to_json(output_path=str(output_dir), partition_on="category") + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + # For each partition directory, load the JSONL files and verify that all records have the correct partition value. + # (Here we assume the files are written with extension ".part") + for part_dir in output_dir.glob("category=*"): + # The partition value is taken from the directory name. + partition_value = part_dir.name.split("=")[-1] + jsonl_files = list(part_dir.glob("*.part")) + assert ( + jsonl_files + ), f"No JSONL files found in partition directory {part_dir}" + for file in jsonl_files: + with open(file, "r") as f: + for line in f: + record = json.loads(line) + if "category" in record: + # Compare as strings, to work with both integer and string partition values. + assert ( + str(record["category"]) == partition_value + ), f"Record partition value {record['category']} does not match directory {partition_value}" + + @pytest.mark.parametrize( + "backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] + ) + @pytest.mark.parametrize( + "category_values", + [ + ["A", "B", "A", "B"], + [10, 20, 10, 20], + [1.0, 2.0, 1.0, 2.0], + ], + ) + def test_write_to_disk_with_partition_on_parquet( + self, tmp_path, backend, category_values + ): + """ + Test writing a partitioned Parquet dataset. + + The test writes a DataFrame partitioned on the 'category' column and then reads it back + using dd.read_parquet. The output is compared (after sorting) to the original DataFrame. + """ + + df = pd.DataFrame( + {"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]} + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf = ddf.to_backend(backend) + output_dir = tmp_path / "output_parquet" + dataset = DocumentDataset(ddf) + dataset.to_parquet(output_path=str(output_dir), partition_on="category") + + # Check that the output directory contains subdirectories for each partition. + # Unique partition values (as strings) to be used in the directory names. + unique_partitions = {str(x) for x in category_values} + for part in unique_partitions: + expected_dir = output_dir / f"category={part}" + assert expected_dir.exists(), f"Expected directory {expected_dir} not found" + + ddf_loaded = dd.read_parquet(str(output_dir)) + df_loaded = ddf_loaded.compute().reset_index(drop=True) + df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype) + # To ensure a fair comparison, sort the dataframes by 'id' and reindex. + pd.testing.assert_frame_equal( + df.sort_values("id").reset_index(drop=True), + df_loaded.sort_values("id").reset_index(drop=True)[df.columns], + check_dtype=False, + ) diff --git a/tests/test_semdedup.py b/tests/test_semdedup.py index 45f10910..f7f24bc8 100644 --- a/tests/test_semdedup.py +++ b/tests/test_semdedup.py @@ -13,9 +13,13 @@ # limitations under the License. import os +import numpy as np import pytest +import torch +import torch.nn.functional as F from dask.dataframe.utils import assert_eq from distributed import Client +from transformers import AutoConfig, AutoModel, AutoTokenizer from nemo_curator import SemDedup, SemDedupConfig from nemo_curator.datasets import DocumentDataset @@ -24,6 +28,9 @@ cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") +EmbeddingCreator = gpu_only_import_from( + "nemo_curator.modules.semantic_dedup.embeddings", "EmbeddingCreator" +) @pytest.fixture @@ -80,3 +87,89 @@ def test_sem_dedup( duplicate_docs = [2, 3, 4, 200, 300] expected_df = cudf.Series(duplicate_docs, name="id") assert_eq(result_df["id"].sort_values(), expected_df, check_index=False) + + @pytest.mark.parametrize("pooling_strategy", ["last_token", "mean_pooling"]) + def test_embedding_creator_pooling_strategies(self, tmpdir, pooling_strategy): + test_text_1 = "The quick brown fox jumps over the lazy dog" + test_text_2 = "The brown fox jumps over the dog" + test_texts = [test_text_1, test_text_2] * 32 + df = cudf.DataFrame({"text": test_texts}) + ddf = dask_cudf.from_cudf(df, 1) + cache_dir = os.path.join(tmpdir, "test_embeddings_cache") + + embedding_creator = EmbeddingCreator( + embedding_model_name_or_path="sentence-transformers/all-MiniLM-L6-v2", + embedding_batch_size=32, + embedding_pooling_strategy=pooling_strategy, + input_column="text", + embedding_output_dir=os.path.join(cache_dir, "mean_embeddings"), + ) + embeddings = embedding_creator.create_embeddings(ddf).compute() + embeddings = embeddings["embeddings"].to_arrow().to_pylist() + embeddings = np.array(embeddings) + reference_embeddings = get_reference_embeddings( + test_texts, pooling_strategy=pooling_strategy + ) + assert np.allclose( + embeddings, reference_embeddings, atol=1e-3 + ), "Embeddings should match reference embeddings" + + +def get_reference_embeddings( + texts, + model_name="sentence-transformers/all-MiniLM-L6-v2", + pooling_strategy="last_token", +): + """ + Get embeddings using either last token or mean pooling strategy. + + Args: + texts: List of input texts + model_name: Name or path of the model to use + pooling_strategy: Either "last_token" for last token or "mean" for mean pooling + """ + tokenizer = AutoTokenizer.from_pretrained(model_name) + model = AutoModel.from_pretrained(model_name) + model = model.to("cuda") + model.eval() + max_len_to_use = tokenizer.model_max_length + if max_len_to_use > 1e5: + max_len_to_use = AutoConfig.from_pretrained(model_name).max_position_embeddings + max_seq_length: int = max_len_to_use + + embs = [] + for text in texts: + inputs = tokenizer( + text, + return_tensors="pt", + padding=True, + truncation=True, + max_length=max_seq_length, + ) + inputs = {k: v.to("cuda") for k, v in inputs.items()} + + with torch.no_grad(): + with torch.autocast(device_type="cuda"): + outputs = model(**inputs) + + if pooling_strategy == "last_token": + embeddings = outputs.last_hidden_state[:, -1, :] + elif pooling_strategy == "mean_pooling": + token_embeddings = outputs.last_hidden_state + attention_mask = inputs["attention_mask"] + input_mask_expanded = ( + attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() + ) + sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, dim=1) + sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) + embeddings = sum_embeddings / sum_mask + else: + raise ValueError( + "pooling_strategy must be either 'last_token' or 'mean_pooling'" + ) + + normed_emb = F.normalize(embeddings, dim=1).cpu() + normed_emb = normed_emb.squeeze(0) + embs.append(normed_emb) + + return np.array(embs) diff --git a/tests/test_unicode_reformatter.py b/tests/test_unicode_reformatter.py deleted file mode 100644 index 01ac716b..00000000 --- a/tests/test_unicode_reformatter.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import dask.dataframe as dd -import pandas as pd - -import nemo_curator -from nemo_curator.datasets import DocumentDataset -from nemo_curator.modifiers import UnicodeReformatter - - -def list_to_dataset(documents, col_name="text", npartitions=2): - data = {col_name: documents} - pdf = pd.DataFrame(data) - - return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) - - -class TestUnicodeReformatter: - def test_reformatting(self): - # Examples taken from ftfy documentation: - # https://ftfy.readthedocs.io/en/latest/ - dataset = list_to_dataset( - [ - "✔ No problems", - "The Mona Lisa doesn’t have eyebrows.", - "l’humanité", - "à perturber la réflexion", - "Clean document already.", - ] - ) - expected_results = [ - "✔ No problems", - "The Mona Lisa doesn't have eyebrows.", - "l'humanité", - "à perturber la réflexion", - "Clean document already.", - ] - expected_results.sort() - - modifier = nemo_curator.Modify(UnicodeReformatter()) - fixed_dataset = modifier(dataset) - actual_results = fixed_dataset.df.compute()["text"].to_list() - actual_results.sort() - - assert ( - expected_results == actual_results - ), f"Expected: {expected_results}, but got: {actual_results}" diff --git a/tutorials/dapt-curation/README.md b/tutorials/dapt-curation/README.md index 4f67c616..55ddb2b5 100755 --- a/tutorials/dapt-curation/README.md +++ b/tutorials/dapt-curation/README.md @@ -47,9 +47,9 @@ The tutorial follows the steps below:
After installing the NeMo Curator package, install the dependencies and run: ```bash -pip install -r code/requirements.txt cd code -python main.py +pip install -r requirements.txt +python main.py --device "gpu" ``` -This will download chip-design related datasets and begin the data curation pipeline. +This will download chip-design related datasets and begin the data curation pipeline. Please use `--device "gpu"` to enable semantic and fuzzy deduplication, which require the GPU. diff --git a/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml b/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml index 17e4c17c..5b8e63b7 100644 --- a/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml +++ b/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml @@ -6,6 +6,7 @@ num_files: 16 embeddings_save_loc: "embeddings" embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2" embedding_batch_size: 128 +write_embeddings_to_disk: false # Clustering configuration clustering_save_loc: "clustering_results" diff --git a/tutorials/dapt-curation/code/main.py b/tutorials/dapt-curation/code/main.py index 3ae5fe17..5f51ead8 100755 --- a/tutorials/dapt-curation/code/main.py +++ b/tutorials/dapt-curation/code/main.py @@ -37,11 +37,8 @@ ) import nemo_curator as nc -from nemo_curator import ExactDuplicates, Modify, ScoreFilter, Sequential +from nemo_curator import ScoreFilter, Sequential from nemo_curator.datasets import DocumentDataset -from nemo_curator.filters import RepeatingTopNGramsFilter, WordCountFilter -from nemo_curator.modifiers.pii_modifier import PiiModifier -from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.file_utils import ( get_all_files_paths_under, @@ -191,7 +188,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: duplicates = semantic_dedupe( dataset=gpu_dataset_text, sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path, - cache=CACHE_DIR, + cache_dir=CACHE_DIR, ) unique_ids = duplicates.df.to_backend("pandas").compute()["id"] semantic_dataset_text = DocumentDataset( diff --git a/tutorials/distributed_data_classification/README.md b/tutorials/distributed_data_classification/README.md index 2b0bf51b..f953d8f5 100644 --- a/tutorials/distributed_data_classification/README.md +++ b/tutorials/distributed_data_classification/README.md @@ -12,7 +12,7 @@ Before running any of these notebooks, please see this [Getting Started](https:/
-| NeMo Curator Classifier | Hugging Face page | +| NeMo Curator Classifier | Hugging Face Page | | --- | --- | | `AegisClassifier` | [nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0](https://huggingface.co/nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0) and [nvidia/Aegis-AI-Content-Safety-LlamaGuard-Permissive-1.0](https://huggingface.co/nvidia/Aegis-AI-Content-Safety-LlamaGuard-Permissive-1.0) | | `ContentTypeClassifier` | [nvidia/content-type-classifier-deberta](https://huggingface.co/nvidia/content-type-classifier-deberta) | diff --git a/tutorials/distributed_data_classification/content-type-classification.ipynb b/tutorials/distributed_data_classification/content-type-classification.ipynb index 97df8485..2a7b5423 100644 --- a/tutorials/distributed_data_classification/content-type-classification.ipynb +++ b/tutorials/distributed_data_classification/content-type-classification.ipynb @@ -6,7 +6,7 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `ContentTypeClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `ContentTypeClassifier`. The [content type classifier](https://huggingface.co/nvidia/content-type-classifier-deberta) is used to categorize documents into one of 11 distinct speech types based on their content. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the Hugging Face page for more information about the content type classifier, including its output labels, here: https://huggingface.co/nvidia/content-type-classifier-deberta.\n", + "This notebook demonstrates the use of NeMo Curator's `ContentTypeClassifier`. The [content type classifier](https://huggingface.co/nvidia/content-type-classifier-deberta) is used to categorize documents into one of 11 distinct speech types based on their content. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Content Type Classifier DeBERTa Hugging Face page for more information about the content type classifier, including its output labels, here: https://huggingface.co/nvidia/content-type-classifier-deberta.\n", "\n", "The content type classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", diff --git a/tutorials/distributed_data_classification/domain-classification.ipynb b/tutorials/distributed_data_classification/domain-classification.ipynb index 5a5aff14..8c5686de 100644 --- a/tutorials/distributed_data_classification/domain-classification.ipynb +++ b/tutorials/distributed_data_classification/domain-classification.ipynb @@ -6,7 +6,7 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `DomainClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `DomainClassifier`. The [domain classifier](https://huggingface.co/nvidia/domain-classifier) is used to classify the domain of a text. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the Hugging Face page for more information about the domain classifier, including its output labels, here: https://huggingface.co/nvidia/domain-classifier.\n", + "This notebook demonstrates the use of NeMo Curator's `DomainClassifier`. The [domain classifier](https://huggingface.co/nvidia/domain-classifier) is used to classify the domain of a text. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Domain Classifier Hugging Face page for more information about the domain classifier, including its output labels, here: https://huggingface.co/nvidia/domain-classifier.\n", "\n", "The domain classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", diff --git a/tutorials/distributed_data_classification/instruction-data-guard-classification.ipynb b/tutorials/distributed_data_classification/instruction-data-guard-classification.ipynb index 14ec962f..5394fbe5 100644 --- a/tutorials/distributed_data_classification/instruction-data-guard-classification.ipynb +++ b/tutorials/distributed_data_classification/instruction-data-guard-classification.ipynb @@ -6,11 +6,11 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `InstructionDataGuardClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `InstructionDataGuardClassifier`. The [Instruction-Data-Guard classifier](https://huggingface.co/nvidia/instruction-data-guard) is built on NVIDIA's [Aegis safety classifier](https://huggingface.co/nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0) and is designed to detect LLM poisoning trigger attacks. Please refer to the Hugging Face page for more information about the Instruction-Data-Guard classifier here: https://huggingface.co/nvidia/instruction-data-guard.\n", + "This notebook demonstrates the use of NeMo Curator's `InstructionDataGuardClassifier`. The [Instruction Data Guard classifier](https://huggingface.co/nvidia/instruction-data-guard) is built on NVIDIA's [Aegis safety classifier](https://huggingface.co/nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0) and is designed to detect LLM poisoning trigger attacks. Please refer to the NemoCurator Instruction Data Guard Hugging Face page for more information about the Instruction Data Guard classifier here: https://huggingface.co/nvidia/instruction-data-guard.\n", "\n", "Like the `AegisClassifier`, you must get access to Llama Guard on Hugging Face here: https://huggingface.co/meta-llama/LlamaGuard-7b. Afterwards, you should set up a [user access token](https://huggingface.co/docs/hub/en/security-tokens) and pass that token into the constructor of this classifier.\n", "\n", - "The Instruction-Data-Guard classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", + "The Instruction Data Guard classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", "Before running this notebook, please see this [Getting Started](https://github.com/NVIDIA/NeMo-Curator?tab=readme-ov-file#get-started) page for instructions on how to install NeMo Curator." ] @@ -145,7 +145,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Starting Instruction-Data-Guard classifier inference\n" + "Starting Instruction Data Guard classifier inference\n" ] }, { diff --git a/tutorials/distributed_data_classification/multilingual-domain-classification.ipynb b/tutorials/distributed_data_classification/multilingual-domain-classification.ipynb index 431dcc3f..7a9b4e89 100644 --- a/tutorials/distributed_data_classification/multilingual-domain-classification.ipynb +++ b/tutorials/distributed_data_classification/multilingual-domain-classification.ipynb @@ -6,7 +6,7 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `MultilingualDomainClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `MultilingualDomainClassifier`. The [multilingual domain classifier](https://huggingface.co/nvidia/multilingual-domain-classifier) is used to classify the domain of texts in any of 52 languages, including English. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the Hugging Face page for more information about the multilingual domain classifier, including its output labels, here: https://huggingface.co/nvidia/multilingual-domain-classifier.\n", + "This notebook demonstrates the use of NeMo Curator's `MultilingualDomainClassifier`. The [multilingual domain classifier](https://huggingface.co/nvidia/multilingual-domain-classifier) is used to classify the domain of texts in any of 52 languages, including English. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Multilingual Domain Classifier Hugging Face page for more information about the multilingual domain classifier, including its output labels, here: https://huggingface.co/nvidia/multilingual-domain-classifier.\n", "\n", "The multilingual domain classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", diff --git a/tutorials/distributed_data_classification/prompt-task-complexity-classification.ipynb b/tutorials/distributed_data_classification/prompt-task-complexity-classification.ipynb index a77599ae..5e90d28c 100644 --- a/tutorials/distributed_data_classification/prompt-task-complexity-classification.ipynb +++ b/tutorials/distributed_data_classification/prompt-task-complexity-classification.ipynb @@ -6,7 +6,7 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `PromptTaskComplexityClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `PromptTaskComplexityClassifier`. The [prompt task and complexity classifier](https://huggingface.co/nvidia/prompt-task-and-complexity-classifier) a multi-headed model which classifies English text prompts across task types and complexity dimensions. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the Hugging Face page for more information about the prompt task and complexity classifier, including its output labels, here: https://huggingface.co/nvidia/prompt-task-and-complexity-classifier.\n", + "This notebook demonstrates the use of NeMo Curator's `PromptTaskComplexityClassifier`. The [prompt task and complexity classifier](https://huggingface.co/nvidia/prompt-task-and-complexity-classifier) a multi-headed model which classifies English text prompts across task types and complexity dimensions. It helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Prompt Task and Complexity Classifier Hugging Face page for more information about the prompt task and complexity classifier, including its output labels, here: https://huggingface.co/nvidia/prompt-task-and-complexity-classifier.\n", "\n", "The prompt task and complexity classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", diff --git a/tutorials/distributed_data_classification/quality-classification.ipynb b/tutorials/distributed_data_classification/quality-classification.ipynb index c5437653..79b1fdb9 100644 --- a/tutorials/distributed_data_classification/quality-classification.ipynb +++ b/tutorials/distributed_data_classification/quality-classification.ipynb @@ -6,7 +6,7 @@ "source": [ "# Distributed Data Classification with NeMo Curator's `QualityClassifier`\n", "\n", - "This notebook demonstrates the use of NeMo Curator's `QualityClassifier`. The [quality classifier](https://huggingface.co/nvidia/quality-classifier-deberta) is used to classify text as high, medium, or low quality. This helps with data annotation, which is useful in data blending for foundation model training. Please refer to the Hugging Face page for more information about the quality classifier, including its output labels, here: https://huggingface.co/nvidia/quality-classifier-deberta.\n", + "This notebook demonstrates the use of NeMo Curator's `QualityClassifier`. The [quality classifier](https://huggingface.co/nvidia/quality-classifier-deberta) is used to classify text as high, medium, or low quality. This helps with data annotation, which is useful in data blending for foundation model training. Please refer to the NemoCurator Quality Classifier DeBERTa Hugging Face page for more information about the quality classifier, including its output labels, here: https://huggingface.co/nvidia/quality-classifier-deberta.\n", "\n", "The quality classifier is accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", "\n", @@ -186,7 +186,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Starting Quality classifier inference\n", + "Starting quality classifier inference\n", "Writing to disk complete for 1 partition(s)\n", "CPU times: user 2.84 s, sys: 1.2 s, total: 4.04 s\n", "Wall time: 19.8 s\n"