diff --git a/tutorials/dapt-curation/README.md b/tutorials/dapt-curation/README.md index 38e9fe64..0e43e48a 100755 --- a/tutorials/dapt-curation/README.md +++ b/tutorials/dapt-curation/README.md @@ -37,6 +37,7 @@ The tutorial follows the steps below:
- Heuristic-based quality filtering (Number of lines, worc count, top N-grams, etc.) - Fix unicode errors via ftfy - PII redaction + - GPU accelerated fuzzy and semanctic deduplication - Step 6: Save the filtered and curated data
- Step 7: Blend datasets and shuffle @@ -45,8 +46,10 @@ The tutorial follows the steps below:
After installing the NeMo Curator package, install the dependencies and run: -`pip install -r code/requirements.txt` - -`python code/main.py` +```bash +pip install -r code/requirements.txt +cd code +python main.py +``` This will download chip-design related datasets and begin the data curation pipeline. diff --git a/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml b/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml new file mode 100644 index 00000000..17e4c17c --- /dev/null +++ b/tutorials/dapt-curation/code/configs/text_semantic_dedupe_config.yaml @@ -0,0 +1,28 @@ +# Configuration file for semdantic dedup +cache_dir: "workspace/semdedup_cache/text" +num_files: 16 + +# Embeddings configuration +embeddings_save_loc: "embeddings" +embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2" +embedding_batch_size: 128 + +# Clustering configuration +clustering_save_loc: "clustering_results" +n_clusters: 20 +seed: 1234 +max_iter: 100 +kmeans_with_cos_dist: false + +# Semdedup configuration +which_to_keep: "hard" +largest_cluster_size_to_process: 100000 +sim_metric: "cosine" + +# Extract dedup configuration +eps_thresholds: + - 0.1 + - 0.01 + +# Which threshold to use for extracting deduped data +eps_to_extract: 0.1 diff --git a/tutorials/dapt-curation/code/main.py b/tutorials/dapt-curation/code/main.py index 9ce310ff..3ae5fe17 100755 --- a/tutorials/dapt-curation/code/main.py +++ b/tutorials/dapt-curation/code/main.py @@ -27,10 +27,13 @@ CodeLineCountFilter, TextLineCountFilter, clean_and_unify, - dedupe, + exact_dedupe, filter_code, filter_text, + fuzzy_dedupe, redact_code, + rm_dir, + semantic_dedupe, ) import nemo_curator as nc @@ -48,6 +51,7 @@ SCRIPT_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data") +CONFIG_DIR = os.path.join(SCRIPT_DIR_PATH, "configs") def download_sources( @@ -117,7 +121,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: args (Any): Command-line arguments. jsonl_dir (str): Directory path where the JSONL files are stored. """ - print("Running the curation pipeline...") # Initialize the Dask cluster. client = get_client(**ArgumentHelper.parse_client_args(args)) @@ -129,7 +132,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: TextLineCountFilter(), text_field="file_type_count", score_type=bool ), filter_text, - dedupe, + exact_dedupe, ] ) @@ -141,7 +144,7 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: CodeLineCountFilter(), text_field="file_type_count", score_type=bool ), filter_code, - dedupe, + exact_dedupe, redact_code, ] ) @@ -167,17 +170,54 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: + orig_dataset_code.df["line_count"].astype(str) ) + print("Executing the curation pipeline...") dataset_text = curation_steps_text(orig_dataset_text) - dataset_text = dataset_text.persist() - - print(f"Original dataset length for text files: {len(orig_dataset_text.df)}") - print(f"After dataprep: {len(dataset_text.df)}") - dataset_code = curation_steps_code(orig_dataset_code) - dataset_code = dataset_code.persist() + print(f"Original dataset length for text files: {len(orig_dataset_text.df)}") + print(f"After dataprep for text files: {len(dataset_text.df)}") print(f"Original dataset length for code files: {len(orig_dataset_code.df)}") - print(f"After dataprep: {len(dataset_code.df)}") + print(f"After dataprep length for code files: {len(dataset_code.df)}") + + if args.device == "gpu": + print("Executing the semantic dedupe pipeline...") + gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf")) + gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf")) + sem_dedupe_config_yaml_path = os.path.join( + CONFIG_DIR, "text_semantic_dedupe_config.yaml" + ) + CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "semantic_dedupe", "text") + rm_dir(CACHE_DIR) + duplicates = semantic_dedupe( + dataset=gpu_dataset_text, + sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path, + cache=CACHE_DIR, + ) + unique_ids = duplicates.df.to_backend("pandas").compute()["id"] + semantic_dataset_text = DocumentDataset( + gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)] + ) + print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}") + + print("Executing the fuzzy dedupe pipeline...") + CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text") + rm_dir(CACHE_DIR) + fuzzy_dataset_text = fuzzy_dedupe( + dataset=semantic_dataset_text, cache=CACHE_DIR + ) + CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code") + rm_dir(CACHE_DIR) + fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR) + + dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas") + dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas") + print(f"After fuzzy dedupe for text files: {len(dataset_text.df)}") + print(f"After fuzzy dedupe: {len(dataset_code.df)}") + + final_dataset_text = dataset_text.persist() + final_dataset_code = dataset_code.persist() + + print("Writing the results to disk...") # Overwrite existing files in the curated directory. out_path = os.path.join(DATA_DIR, "curated") @@ -186,15 +226,18 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None: shutil.rmtree(out_path) os.makedirs(out_path) - dataset_text.to_json(out_path, write_to_filename=True) - dataset_code.to_json(out_path, write_to_filename=True) + final_dataset_text.to_json(out_path, write_to_filename=True) + final_dataset_code.to_json(out_path, write_to_filename=True) + + print("Writing results to disk completed") # Split the dataset by file category and save curated files (optional - to create blended datasets) + print("Split dataset by metadata") separated_data_text = separate_by_metadata( - dataset_text.df, out_path, "category" + final_dataset_text.df, out_path, "category" ).compute() separated_data_code = separate_by_metadata( - dataset_code.df, out_path, "category" + final_dataset_code.df, out_path, "category" ).compute() client.close() @@ -239,6 +282,7 @@ def main(): # Download all the sources and get the list of text and code files. text_files, code_files = download_sources(100, 100, 100) run_curation_pipeline(args, text_files, code_files) + print("Data Curation completed") # blend and shuffle datasets root_path = os.path.join(DATA_DIR, "curated") @@ -250,7 +294,9 @@ def main(): ] dataset_weights = [1.0, 4.0, 4.0, 1.0] target_size = 20 + blend_and_shuffle(args, dataset_paths, dataset_weights, target_size) + print("Data Blending completed") if __name__ == "__main__": diff --git a/tutorials/dapt-curation/code/requirements.txt b/tutorials/dapt-curation/code/requirements.txt index 4c298992..481f5b0a 100755 --- a/tutorials/dapt-curation/code/requirements.txt +++ b/tutorials/dapt-curation/code/requirements.txt @@ -1,6 +1,7 @@ -arxiv +arxiv==2.1.0 arxiv-downloader cchardet +nltk==3.8.1 poppler-utils unstructured[all-docs]==0.14.5 unstructured[pdf] diff --git a/tutorials/dapt-curation/code/utils.py b/tutorials/dapt-curation/code/utils.py index 8c10539c..dc91b225 100755 --- a/tutorials/dapt-curation/code/utils.py +++ b/tutorials/dapt-curation/code/utils.py @@ -13,12 +13,23 @@ # limitations under the License. import json +import os import re import dask.dataframe as dd import pandas as pd - -from nemo_curator import ExactDuplicates, Modify, ScoreFilter, Sequential +import yaml + +from nemo_curator import ( + ExactDuplicates, + FuzzyDuplicates, + FuzzyDuplicatesConfig, + Modify, + ScoreFilter, + SemDedup, + SemDedupConfig, + Sequential, +) from nemo_curator.datasets import DocumentDataset from nemo_curator.filters import ( DocumentFilter, @@ -37,7 +48,10 @@ from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter from nemo_curator.pii.constants import DEFAULT_LANGUAGE, DEFAULT_MAX_DOC_SIZE from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.file_utils import get_all_files_paths_under +from nemo_curator.utils.file_utils import ( + expand_outdir_and_mkdir, + get_all_files_paths_under, +) class QuotationUnifier(DocumentModifier): @@ -259,7 +273,7 @@ def func2(row): return redacted_dataset -def dedupe(dataset: DocumentDataset) -> DocumentDataset: +def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset: """ Remove exact duplicates from the given DocumentDataset. @@ -282,6 +296,71 @@ def dedupe(dataset: DocumentDataset) -> DocumentDataset: return DocumentDataset(deduped) +def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset: + """ + Removes near-duplicate documents and code lines + + Args: + dataset (DocumentDataset): The dataset containing documents. + type (str): Document type to process. + + Returns: + DocumentDataset: The deduplicated dataset. + """ + fuzzy_dedup_config = FuzzyDuplicatesConfig( + cache_dir=cache, + id_field="id", + text_field="text", + seed=42, + char_ngrams=20, + num_buckets=20, + hashes_per_bucket=13, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=False, + num_anchors=2, + jaccard_threshold=0.8, + ) + fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config) + duplicates = fuzzy_dup(dataset) + + docs_to_remove = duplicates.df.map_partitions( + lambda x: x[x.group.duplicated(keep="first")] + ) + + # When there are few duplicates we can compute the results to a list and use `isin`. + duplicate_ids = docs_to_remove.compute().id.to_arrow().to_pylist() + dataset_df = dataset.df + deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)] + return DocumentDataset(deduped) + + +def semantic_dedupe( + dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str +): + """ + Perform semantic deduplication on the given dataset. + + Args: + dataset (DocumentDataset): The dataset containing documents. + type (str): Document type to process. + + Returns: + The deduplicated DocumentDataset. + """ + partition_lengths = dataset.df.map_partitions(len).compute() + non_empty_partitions = [ + i for i, length in enumerate(partition_lengths) if length > 0 + ] + dataset.df = dataset.df.partitions[non_empty_partitions] + + semdedup_config = SemDedupConfig.from_yaml(sem_dedupe_config_yaml_path) + expand_outdir_and_mkdir(semdedup_config.cache_dir) + semdup = SemDedup(config=semdedup_config, id_column_type="str") + duplicates = semdup(dataset) + return duplicates + + class TextLineCountFilter(DocumentFilter): """ Discard text files based on number of lines. @@ -323,3 +402,8 @@ def score_document(self, text: str) -> bool: def keep_document(self, score) -> bool: return score + + +def rm_dir(cache_dir): + if os.path.isdir(cache_dir): + os.system(f"rm -rf {cache_dir}")