Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Nov 6, 2024
1 parent 839c081 commit 11fa523
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ eps_thresholds:
- 0.01

# Which threshold to use for extracting deduped data
eps_to_extract: 0.1
eps_to_extract: 0.1
43 changes: 27 additions & 16 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
exact_dedupe,
filter_code,
filter_text,
redact_code,
fuzzy_dedupe,
redact_code,
rm_dir,
semantic_dedupe,
rm_dir
)

import nemo_curator as nc
Expand All @@ -53,6 +53,7 @@
DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data")
CONFIG_DIR = os.path.join(SCRIPT_DIR_PATH, "configs")


def download_sources(
wikipedia_limit: Optional[int] = None,
github_limit: Optional[int] = None,
Expand Down Expand Up @@ -172,28 +173,38 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
print("Executing the curation pipeline...")
dataset_text = curation_steps_text(orig_dataset_text)
dataset_code = curation_steps_code(orig_dataset_code)

print(f"Original dataset length for text files: {len(orig_dataset_text.df)}")
print(f"After dataprep for text files: {len(dataset_text.df)}")
print(f"Original dataset length for code files: {len(orig_dataset_code.df)}")
print(f"After dataprep length for code files: {len(dataset_code.df)}")

if args.device == 'gpu':
if args.device == "gpu":
print("Executing the semantic dedupe pipeline...")
gpu_dataset_text = DocumentDataset(dataset_text.df.to_backend("cudf"))
gpu_dataset_code = DocumentDataset(dataset_code.df.to_backend("cudf"))
sem_dedupe_config_yaml_path = os.path.join(CONFIG_DIR, 'text_semantic_dedupe_config.yaml')
sem_dedupe_config_yaml_path = os.path.join(
CONFIG_DIR, "text_semantic_dedupe_config.yaml"
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "semantic_dedupe", "text")
rm_dir(CACHE_DIR)
duplicates = semantic_dedupe(dataset=gpu_dataset_text, sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path, cache=CACHE_DIR)
unique_ids = duplicates.df.to_backend('pandas').compute()['id']
semantic_dataset_text = DocumentDataset(gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)])
duplicates = semantic_dedupe(
dataset=gpu_dataset_text,
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
cache=CACHE_DIR,
)
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
semantic_dataset_text = DocumentDataset(
gpu_dataset_text.df[gpu_dataset_text.df.id.isin(unique_ids)]
)
print(f"After semantic dedupe for text files: {len(semantic_dataset_text.df)}")

print("Executing the fuzzy dedupe pipeline...")
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(dataset=semantic_dataset_text, cache=CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(
dataset=semantic_dataset_text, cache=CACHE_DIR
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)
Expand All @@ -218,10 +229,10 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
final_dataset_text.to_json(out_path, write_to_filename=True)
final_dataset_code.to_json(out_path, write_to_filename=True)

print('Writing results to disk completed')
print("Writing results to disk completed")

# Split the dataset by file category and save curated files (optional - to create blended datasets)
print('Split dataset by metadata')
print("Split dataset by metadata")
separated_data_text = separate_by_metadata(
final_dataset_text.df, out_path, "category"
).compute()
Expand Down Expand Up @@ -271,8 +282,8 @@ def main():
# Download all the sources and get the list of text and code files.
text_files, code_files = download_sources(100, 100, 100)
run_curation_pipeline(args, text_files, code_files)
print('Data Curation completed')
print("Data Curation completed")

# blend and shuffle datasets
root_path = os.path.join(DATA_DIR, "curated")
dataset_paths = [
Expand All @@ -283,9 +294,9 @@ def main():
]
dataset_weights = [1.0, 4.0, 4.0, 1.0]
target_size = 20

blend_and_shuffle(args, dataset_paths, dataset_weights, target_size)
print('Data Blending completed')
print("Data Blending completed")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tutorials/dapt-curation/code/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
arxiv==2.1.0
arxiv-downloader
cchardet
nltk==3.8.1
poppler-utils
unstructured[all-docs]==0.14.5
unstructured[pdf]
nltk==3.8.1
70 changes: 41 additions & 29 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
# limitations under the License.

import json
import re
import os
import yaml
import re

import dask.dataframe as dd
import pandas as pd
import yaml

from nemo_curator import (
ExactDuplicates,
Modify,
ScoreFilter,
Sequential,
FuzzyDuplicates,
FuzzyDuplicatesConfig,
SemDedup,
ExactDuplicates,
FuzzyDuplicates,
FuzzyDuplicatesConfig,
Modify,
ScoreFilter,
SemDedup,
SemDedupConfig,
Sequential,
)
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import (
Expand All @@ -48,7 +48,11 @@
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter
from nemo_curator.pii.constants import DEFAULT_LANGUAGE, DEFAULT_MAX_DOC_SIZE
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.file_utils import get_all_files_paths_under, expand_outdir_and_mkdir
from nemo_curator.utils.file_utils import (
expand_outdir_and_mkdir,
get_all_files_paths_under,
)


class QuotationUnifier(DocumentModifier):
"""
Expand Down Expand Up @@ -304,32 +308,36 @@ def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
DocumentDataset: The deduplicated dataset.
"""
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache,
id_field="id",
text_field="text",
seed=42,
char_ngrams=20,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=False,
num_anchors=2,
jaccard_threshold=0.8,
)
cache_dir=cache,
id_field="id",
text_field="text",
seed=42,
char_ngrams=20,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=False,
num_anchors=2,
jaccard_threshold=0.8,
)
fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset)

docs_to_remove = duplicates.df.map_partitions(lambda x: x[x.group.duplicated(keep="first")])
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
duplicate_ids=docs_to_remove.compute().id.to_arrow().to_pylist()
duplicate_ids = docs_to_remove.compute().id.to_arrow().to_pylist()
dataset_df = dataset.df
deduped = dataset_df[~dataset_df.id.isin(duplicate_ids)]
return DocumentDataset(deduped)


def semantic_dedupe(dataset: DocumentDataset, sem_dedupe_config_yaml_path:str, cache_dir:str):
def semantic_dedupe(
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
):
"""
Perform semantic deduplication on the given dataset.
Expand All @@ -341,15 +349,18 @@ def semantic_dedupe(dataset: DocumentDataset, sem_dedupe_config_yaml_path:str, c
The deduplicated DocumentDataset.
"""
partition_lengths = dataset.df.map_partitions(len).compute()
non_empty_partitions = [i for i, length in enumerate(partition_lengths) if length > 0]
non_empty_partitions = [
i for i, length in enumerate(partition_lengths) if length > 0
]
dataset.df = dataset.df.partitions[non_empty_partitions]

semdedup_config = SemDedupConfig.from_yaml(sem_dedupe_config_yaml_path)
expand_outdir_and_mkdir(semdedup_config.cache_dir)
semdup = SemDedup(semdedup_config)
duplicates = semdup(dataset)
duplicates = semdup(dataset)
return duplicates


class TextLineCountFilter(DocumentFilter):
"""
Discard text files based on number of lines.
Expand Down Expand Up @@ -392,6 +403,7 @@ def score_document(self, text: str) -> bool:
def keep_document(self, score) -> bool:
return score


def rm_dir(cache_dir):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")
os.system(f"rm -rf {cache_dir}")

0 comments on commit 11fa523

Please sign in to comment.