Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for semantic deduplication and DAPT tutorial #564

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions nemo_curator/modules/semantic_dedup/clusteringmodel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,7 +128,22 @@ def __call__(self, embeddings_dataset: DocumentDataset):
embeddings_df = embeddings_df.repartition(
partition_size=self.partition_size
)
embeddings_df = embeddings_df.to_backend("pandas").persist()

try:
embeddings_df = embeddings_df.to_backend("pandas").persist()

if embeddings_df.shape[0].compute() < self.n_clusters:
raise ValueError(
"Number of clusters is greater than the number of documents in your dataset. "
"Please reduce n_clusters to be less than or equal."
)
except IndexError as e:
raise IndexError(
f'Original error message: "{e}". '
"This could be due to empty partitions in your DocumentDataset. "
"Please check your dataset for empty partitions and remove them if necessary."
)

embeddings_df = embeddings_df.to_backend("cudf")

cupy_darr = embeddings_df.map_partitions(
Expand Down
8 changes: 7 additions & 1 deletion nemo_curator/modules/semantic_dedup/embeddings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
self.profile_dir, "embedding-creator"
):
embedding_ddf = self.create_embeddings(dataset.df, self.input_column)

# category column dtypes are not supported by the GPU-accelerated Parquet writer
for col in embedding_ddf.columns:
if embedding_ddf[col].dtype.name == "category":
embedding_ddf[col] = embedding_ddf[col].astype("str")
Comment on lines +232 to +235
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closes #505.


write_to_disk(
embedding_ddf,
self.embedding_output_dir,
Expand Down
81 changes: 73 additions & 8 deletions tests/test_semdedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import torch
import torch.nn.functional as F
from dask.dataframe.utils import assert_eq
from distributed import Client
from transformers import AutoConfig, AutoModel, AutoTokenizer

from nemo_curator import SemDedup, SemDedupConfig
Expand Down Expand Up @@ -52,34 +51,100 @@ def dedup_data():
return DocumentDataset(df)


@pytest.fixture
def non_dedup_data():
df = cudf.DataFrame(
{
"doc_id": ["doc_1", "doc_2"],
"text": [
"The quick brown fox jumps over the lazy dog",
"A test string",
],
}
)
df = dask_cudf.from_cudf(df, 2)
return DocumentDataset(df)


@pytest.mark.gpu
class TestSemDuplicates:
@pytest.mark.parametrize("n_clusters", [3, 10])
def test_sem_dedup(
self,
dedup_data,
tmpdir,
n_clusters,
gpu_client,
):
print("client", gpu_client)

cache_dir = os.path.join(tmpdir, "test_sem_dedup_cache")
config = SemDedupConfig(
cache_dir=cache_dir,
seed=42,
n_clusters=3,
n_clusters=n_clusters,
eps_thresholds=[0.10],
eps_to_extract=0.10,
)

sem_duplicates = SemDedup(
config=config,
input_column="text",
id_column="id",
id_column_type="int",
)
result = sem_duplicates(dedup_data)
result_df = result.df.compute()
duplicate_docs = [2, 3, 4, 200, 300]
expected_df = cudf.Series(duplicate_docs, name="id")
assert_eq(result_df["id"].sort_values(), expected_df, check_index=False)

dedup_data_len = dedup_data.df.shape[0].compute()
if n_clusters > dedup_data_len:
# Number of records in the dataset should never be less than n_clusters
with pytest.raises(ValueError):
result = sem_duplicates(dedup_data)
else:
# Correctly returns the original dataset with no duplicates removed
result = sem_duplicates(dedup_data)
result_df = result.df.compute()
duplicate_docs = [2, 3, 4, 200, 300]
expected_df = cudf.Series(duplicate_docs, name="id")
assert_eq(result_df["id"].sort_values(), expected_df, check_index=False)

@pytest.mark.parametrize("n_clusters", [2, 3])
def test_no_sem_dedup(
self,
non_dedup_data,
tmpdir,
n_clusters,
gpu_client,
):
print("client", gpu_client)

cache_dir = os.path.join(tmpdir, "test_no_sem_dedup")
config = SemDedupConfig(
cache_dir=cache_dir,
seed=42,
n_clusters=n_clusters,
eps_thresholds=[0.10],
eps_to_extract=0.10,
)

sem_duplicates = SemDedup(
config=config,
input_column="text",
id_column="doc_id",
id_column_type="str",
)

non_dedup_data_len = non_dedup_data.df.shape[0].compute()
if n_clusters > non_dedup_data_len:
# Number of records in the dataset should never be less than n_clusters
with pytest.raises(ValueError):
result = sem_duplicates(non_dedup_data)
else:
# Correctly returns the original dataset with no duplicates removed
result = sem_duplicates(non_dedup_data)
result_df = result.df.compute()
duplicate_docs = ["doc_1", "doc_2"]
expected_df = cudf.Series(duplicate_docs, name="doc_id")
assert_eq(result_df["doc_id"].sort_values(), expected_df, check_index=False)

@pytest.mark.parametrize("pooling_strategy", ["last_token", "mean_pooling"])
def test_embedding_creator_pooling_strategies(self, tmpdir, pooling_strategy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ num_files: 16
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128
write_embeddings_to_disk: false
write_embeddings_to_disk: true
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this closes #505, we can switch this back to true.


# Clustering configuration
clustering_save_loc: "clustering_results"
Expand Down
7 changes: 3 additions & 4 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -188,7 +188,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
duplicates = semantic_dedupe(
dataset=gpu_dataset_text,
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
cache_dir=CACHE_DIR,
)
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
semantic_dataset_text = DocumentDataset(
Expand All @@ -200,11 +199,11 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(
dataset=semantic_dataset_text, cache=CACHE_DIR
dataset=semantic_dataset_text, cache_dir=CACHE_DIR
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache_dir=CACHE_DIR)

dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
Expand Down
15 changes: 8 additions & 7 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -284,19 +284,19 @@ def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
return DocumentDataset(deduped)


def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
def fuzzy_dedupe(dataset: DocumentDataset, cache_dir: str) -> DocumentDataset:
"""
Removes near-duplicate documents and code lines

Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
cache_dir (str): Directory for storing intermediate results.

Returns:
DocumentDataset: The deduplicated dataset.
"""
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache,
cache_dir=cache_dir,
id_field="id",
text_field="text",
seed=42,
Expand All @@ -322,14 +322,15 @@ def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:


def semantic_dedupe(
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
dataset: DocumentDataset,
sem_dedupe_config_yaml_path: str,
):
"""
Perform semantic deduplication on the given dataset.

Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
sem_dedupe_config_yaml_path (str): The path to the semantic dedupe configuration file.

Returns:
The deduplicated DocumentDataset.
Expand Down Expand Up @@ -390,6 +391,6 @@ def keep_document(self, score) -> bool:
return score


def rm_dir(cache_dir):
def rm_dir(cache_dir: str):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")