Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for semantic deduplication and DAPT tutorial #564

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions nemo_curator/modules/semantic_dedup/clusteringmodel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,7 +128,15 @@ def __call__(self, embeddings_dataset: DocumentDataset):
embeddings_df = embeddings_df.repartition(
partition_size=self.partition_size
)
embeddings_df = embeddings_df.to_backend("pandas").persist()

try:
embeddings_df = embeddings_df.to_backend("pandas").persist()
except IndexError:
raise RuntimeError(
"DocumentDataset contains empty partitions. "
"Please remove empty partitions from the dataset before running semantic deduplication."
)

embeddings_df = embeddings_df.to_backend("cudf")

cupy_darr = embeddings_df.map_partitions(
Expand Down
8 changes: 7 additions & 1 deletion nemo_curator/modules/semantic_dedup/embeddings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
self.profile_dir, "embedding-creator"
):
embedding_ddf = self.create_embeddings(dataset.df, self.input_column)

# category column dtypes are not supported by the GPU-accelerated Parquet writer
for col in embedding_ddf.columns:
if embedding_ddf[col].dtype.name == "category":
embedding_ddf[col] = embedding_ddf[col].astype("str")
Comment on lines +232 to +235
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closes #505.


write_to_disk(
embedding_ddf,
self.embedding_output_dir,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,8 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset:
output_parquet_path = os.path.join(
self.output_dir, f"unique_ids_{eps_to_extract}.parquet"
)
extract_dedup_data(

result = extract_dedup_data(
eps=eps_to_extract,
n_clusters=self.n_clusters,
id_col=self.id_col,
Expand All @@ -175,6 +176,9 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset:
logger=self.logger,
profile_dir=self.profile_dir,
)
# Result is None if there are no duplicates
if result is None:
return None

fps = [
os.path.join(output_parquet_path, file_name)
Expand Down
11 changes: 9 additions & 2 deletions nemo_curator/modules/semantic_dedup/semdedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,13 @@ def call(self, dataset: DocumentDataset) -> DocumentDataset:
embeddings_dataset = self.embedding_creator(dataset)
self.clustering_model(embeddings_dataset)
self.semantic_cluster_dedup.compute_semantic_match_dfs(self.eps_thresholds)
return self.semantic_cluster_dedup.extract_dedup_data(

result = self.semantic_cluster_dedup.extract_dedup_data(
eps_to_extract=self.eps_to_extract
)

# If no duplicates are found, return the original dataset
if result is not None:
return result
else:
return dataset
34 changes: 22 additions & 12 deletions nemo_curator/utils/semdedup_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -395,6 +395,11 @@ def extract_pruned_data(

t0 = time.time()

np_files = [
os.path.join(sorted_clusters_dir, f"cluster_{i}.npy") for i in range(n_clusters)
]
total_records = sum(get_num_records(file_path) for file_path in np_files)

with performance_report_if_with_ts_suffix(
profile_dir,
"extracting-pruned-from-clusters",
Expand All @@ -412,18 +417,20 @@ def extract_pruned_data(
results_df = results_df.persist()
progress(results_df)

results_df.to_parquet(output_parquet_path)
try:
results_df.to_parquet(output_parquet_path)
except TypeError:
# Catching "Implicit conversion to a host PyArrow object via __arrow_array__ is not allowed"
logger.info("No semantic duplicates found")
return total_records, 0, total_records

if logger:
logger.info(
f"Time taken for Extracting Pruned Data : {time.time() - t0} and output written at {output_parquet_path}"
)

total_kept = len(results_df)

np_files = [
os.path.join(sorted_clusters_dir, f"cluster_{i}.npy") for i in range(n_clusters)
]
total_records = sum(get_num_records(file_path) for file_path in np_files)
# Aggregate results
total_removed = total_records - total_kept
return total_kept, total_removed, total_records
Expand Down Expand Up @@ -472,9 +479,12 @@ def extract_dedup_data(
df = pd.DataFrame(result_dict)
df.to_csv(output_summary_file, index=False)

fps = [
os.path.join(output_parquet_path, file_name)
for file_name in os.listdir(output_parquet_path)
]
ids_to_keep_df = dd.from_map(cudf.read_parquet, fps)
return ids_to_keep_df
if removed > 0:
fps = [
os.path.join(output_parquet_path, file_name)
for file_name in os.listdir(output_parquet_path)
]
ids_to_keep_df = dd.from_map(cudf.read_parquet, fps)
return ids_to_keep_df
else:
return None
44 changes: 43 additions & 1 deletion tests/test_semdedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,21 @@ def dedup_data():
return DocumentDataset(df)


@pytest.fixture
def non_dedup_data():
df = cudf.DataFrame(
{
"doc_id": ["doc_1", "doc_2"],
"text": [
"The quick brown fox jumps over the lazy dog",
"A test string",
],
}
)
df = dask_cudf.from_cudf(df, 2)
return DocumentDataset(df)


@pytest.mark.gpu
class TestSemDuplicates:
def test_sem_dedup(
Expand Down Expand Up @@ -81,6 +96,33 @@ def test_sem_dedup(
expected_df = cudf.Series(duplicate_docs, name="id")
assert_eq(result_df["id"].sort_values(), expected_df, check_index=False)

def test_no_sem_dedup(
self,
non_dedup_data,
tmpdir,
gpu_client,
):
print("client", gpu_client)
cache_dir = os.path.join(tmpdir, "test_sem_dedup_cache")
config = SemDedupConfig(
cache_dir=cache_dir,
seed=42,
n_clusters=3,
eps_thresholds=[0.10],
eps_to_extract=0.10,
)
sem_duplicates = SemDedup(
config=config,
input_column="text",
id_column="doc_id",
id_column_type="str",
)
result = sem_duplicates(non_dedup_data)
result_df = result.df.compute()
duplicate_docs = ["doc_1", "doc_2"]
expected_df = cudf.Series(duplicate_docs, name="doc_id")
assert_eq(result_df["doc_id"].sort_values(), expected_df, check_index=False)

@pytest.mark.parametrize("pooling_strategy", ["last_token", "mean_pooling"])
def test_embedding_creator_pooling_strategies(self, tmpdir, pooling_strategy):
test_text_1 = "The quick brown fox jumps over the lazy dog"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ num_files: 16
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128
write_embeddings_to_disk: false
write_embeddings_to_disk: true
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this closes #505, we can switch this back to true.


# Clustering configuration
clustering_save_loc: "clustering_results"
Expand Down
7 changes: 3 additions & 4 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -188,7 +188,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
duplicates = semantic_dedupe(
dataset=gpu_dataset_text,
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
cache_dir=CACHE_DIR,
)
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
semantic_dataset_text = DocumentDataset(
Expand All @@ -200,11 +199,11 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(
dataset=semantic_dataset_text, cache=CACHE_DIR
dataset=semantic_dataset_text, cache_dir=CACHE_DIR
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache_dir=CACHE_DIR)

dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
Expand Down
14 changes: 7 additions & 7 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -284,19 +284,19 @@ def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
return DocumentDataset(deduped)


def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
def fuzzy_dedupe(dataset: DocumentDataset, cache_dir: str) -> DocumentDataset:
"""
Removes near-duplicate documents and code lines

Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
cache_dir (str): Directory for storing intermediate results.

Returns:
DocumentDataset: The deduplicated dataset.
"""
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache,
cache_dir=cache_dir,
id_field="id",
text_field="text",
seed=42,
Expand All @@ -322,14 +322,14 @@ def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:


def semantic_dedupe(
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused parameter.

):
"""
Perform semantic deduplication on the given dataset.

Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
sem_dedupe_config_yaml_path (str): The path to the semantic dedupe configuration file.

Returns:
The deduplicated DocumentDataset.
Expand Down Expand Up @@ -390,6 +390,6 @@ def keep_document(self, score) -> bool:
return score


def rm_dir(cache_dir):
def rm_dir(cache_dir: str):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")