Skip to content

Commit

Permalink
push fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sarah Yurick <[email protected]>
  • Loading branch information
sarahyurick committed Feb 21, 2025
1 parent 9b1a13c commit ed2ac60
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 32 deletions.
12 changes: 10 additions & 2 deletions nemo_curator/modules/semantic_dedup/clusteringmodel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,7 +128,15 @@ def __call__(self, embeddings_dataset: DocumentDataset):
embeddings_df = embeddings_df.repartition(
partition_size=self.partition_size
)
embeddings_df = embeddings_df.to_backend("pandas").persist()

try:
embeddings_df = embeddings_df.to_backend("pandas").persist()
except IndexError:
raise RuntimeError(
"DocumentDataset contains empty partitions. "
"Please remove empty partitions from the dataset before running semantic deduplication."
)

embeddings_df = embeddings_df.to_backend("cudf")

cupy_darr = embeddings_df.map_partitions(
Expand Down
8 changes: 7 additions & 1 deletion nemo_curator/modules/semantic_dedup/embeddings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
self.profile_dir, "embedding-creator"
):
embedding_ddf = self.create_embeddings(dataset.df, self.input_column)

# category column dtypes are not supported by the GPU-accelerated Parquet writer
for col in embedding_ddf.columns:
if embedding_ddf[col].dtype.name == "category":
embedding_ddf[col] = embedding_ddf[col].astype("str")

write_to_disk(
embedding_ddf,
self.embedding_output_dir,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,8 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset:
output_parquet_path = os.path.join(
self.output_dir, f"unique_ids_{eps_to_extract}.parquet"
)
extract_dedup_data(

result = extract_dedup_data(
eps=eps_to_extract,
n_clusters=self.n_clusters,
id_col=self.id_col,
Expand All @@ -175,6 +176,9 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset:
logger=self.logger,
profile_dir=self.profile_dir,
)
# Result is None if there are no duplicates
if result is None:
return None

fps = [
os.path.join(output_parquet_path, file_name)
Expand Down
11 changes: 9 additions & 2 deletions nemo_curator/modules/semantic_dedup/semdedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,13 @@ def call(self, dataset: DocumentDataset) -> DocumentDataset:
embeddings_dataset = self.embedding_creator(dataset)
self.clustering_model(embeddings_dataset)
self.semantic_cluster_dedup.compute_semantic_match_dfs(self.eps_thresholds)
return self.semantic_cluster_dedup.extract_dedup_data(

result = self.semantic_cluster_dedup.extract_dedup_data(
eps_to_extract=self.eps_to_extract
)

# If no duplicates are found, return the original dataset
if result is not None:
return result
else:
return dataset
34 changes: 22 additions & 12 deletions nemo_curator/utils/semdedup_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -395,6 +395,11 @@ def extract_pruned_data(

t0 = time.time()

np_files = [
os.path.join(sorted_clusters_dir, f"cluster_{i}.npy") for i in range(n_clusters)
]
total_records = sum(get_num_records(file_path) for file_path in np_files)

with performance_report_if_with_ts_suffix(
profile_dir,
"extracting-pruned-from-clusters",
Expand All @@ -412,18 +417,20 @@ def extract_pruned_data(
results_df = results_df.persist()
progress(results_df)

results_df.to_parquet(output_parquet_path)
try:
results_df.to_parquet(output_parquet_path)
except TypeError:
# Catching "Implicit conversion to a host PyArrow object via __arrow_array__ is not allowed"
logger.info("No semantic duplicates found")
return total_records, 0, total_records

if logger:
logger.info(
f"Time taken for Extracting Pruned Data : {time.time() - t0} and output written at {output_parquet_path}"
)

total_kept = len(results_df)

np_files = [
os.path.join(sorted_clusters_dir, f"cluster_{i}.npy") for i in range(n_clusters)
]
total_records = sum(get_num_records(file_path) for file_path in np_files)
# Aggregate results
total_removed = total_records - total_kept
return total_kept, total_removed, total_records
Expand Down Expand Up @@ -472,9 +479,12 @@ def extract_dedup_data(
df = pd.DataFrame(result_dict)
df.to_csv(output_summary_file, index=False)

fps = [
os.path.join(output_parquet_path, file_name)
for file_name in os.listdir(output_parquet_path)
]
ids_to_keep_df = dd.from_map(cudf.read_parquet, fps)
return ids_to_keep_df
if removed > 0:
fps = [
os.path.join(output_parquet_path, file_name)
for file_name in os.listdir(output_parquet_path)
]
ids_to_keep_df = dd.from_map(cudf.read_parquet, fps)
return ids_to_keep_df
else:
return None
44 changes: 43 additions & 1 deletion tests/test_semdedup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,21 @@ def dedup_data():
return DocumentDataset(df)


@pytest.fixture
def non_dedup_data():
df = cudf.DataFrame(
{
"doc_id": ["doc_1", "doc_2"],
"text": [
"The quick brown fox jumps over the lazy dog",
"A test string",
],
}
)
df = dask_cudf.from_cudf(df, 2)
return DocumentDataset(df)


@pytest.mark.gpu
class TestSemDuplicates:
def test_sem_dedup(
Expand Down Expand Up @@ -81,6 +96,33 @@ def test_sem_dedup(
expected_df = cudf.Series(duplicate_docs, name="id")
assert_eq(result_df["id"].sort_values(), expected_df, check_index=False)

def test_no_sem_dedup(
self,
non_dedup_data,
tmpdir,
gpu_client,
):
print("client", gpu_client)
cache_dir = os.path.join(tmpdir, "test_sem_dedup_cache")
config = SemDedupConfig(
cache_dir=cache_dir,
seed=42,
n_clusters=3,
eps_thresholds=[0.10],
eps_to_extract=0.10,
)
sem_duplicates = SemDedup(
config=config,
input_column="text",
id_column="doc_id",
id_column_type="str",
)
result = sem_duplicates(non_dedup_data)
result_df = result.df.compute()
duplicate_docs = ["doc_1", "doc_2"]
expected_df = cudf.Series(duplicate_docs, name="doc_id")
assert_eq(result_df["doc_id"].sort_values(), expected_df, check_index=False)

@pytest.mark.parametrize("pooling_strategy", ["last_token", "mean_pooling"])
def test_embedding_creator_pooling_strategies(self, tmpdir, pooling_strategy):
test_text_1 = "The quick brown fox jumps over the lazy dog"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ num_files: 16
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128
write_embeddings_to_disk: false
write_embeddings_to_disk: true

# Clustering configuration
clustering_save_loc: "clustering_results"
Expand Down
7 changes: 3 additions & 4 deletions tutorials/dapt-curation/code/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -188,7 +188,6 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
duplicates = semantic_dedupe(
dataset=gpu_dataset_text,
sem_dedupe_config_yaml_path=sem_dedupe_config_yaml_path,
cache_dir=CACHE_DIR,
)
unique_ids = duplicates.df.to_backend("pandas").compute()["id"]
semantic_dataset_text = DocumentDataset(
Expand All @@ -200,11 +199,11 @@ def run_curation_pipeline(args: Any, text_files: str, code_files: str) -> None:
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "text")
rm_dir(CACHE_DIR)
fuzzy_dataset_text = fuzzy_dedupe(
dataset=semantic_dataset_text, cache=CACHE_DIR
dataset=semantic_dataset_text, cache_dir=CACHE_DIR
)
CACHE_DIR = os.path.join(SCRIPT_DIR_PATH, "cache", "fuzzy_dedupe", "code")
rm_dir(CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache=CACHE_DIR)
fuzzy_dataset_code = fuzzy_dedupe(dataset=gpu_dataset_code, cache_dir=CACHE_DIR)

dataset_text.df = fuzzy_dataset_text.df.to_backend("pandas")
dataset_code.df = fuzzy_dataset_code.df.to_backend("pandas")
Expand Down
14 changes: 7 additions & 7 deletions tutorials/dapt-curation/code/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -284,19 +284,19 @@ def exact_dedupe(dataset: DocumentDataset) -> DocumentDataset:
return DocumentDataset(deduped)


def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:
def fuzzy_dedupe(dataset: DocumentDataset, cache_dir: str) -> DocumentDataset:
"""
Removes near-duplicate documents and code lines
Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
cache_dir (str): Directory for storing intermediate results.
Returns:
DocumentDataset: The deduplicated dataset.
"""
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache,
cache_dir=cache_dir,
id_field="id",
text_field="text",
seed=42,
Expand All @@ -322,14 +322,14 @@ def fuzzy_dedupe(dataset: DocumentDataset, cache: str) -> DocumentDataset:


def semantic_dedupe(
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str, cache_dir: str
dataset: DocumentDataset, sem_dedupe_config_yaml_path: str,
):
"""
Perform semantic deduplication on the given dataset.
Args:
dataset (DocumentDataset): The dataset containing documents.
type (str): Document type to process.
sem_dedupe_config_yaml_path (str): The path to the semantic dedupe configuration file.
Returns:
The deduplicated DocumentDataset.
Expand Down Expand Up @@ -390,6 +390,6 @@ def keep_document(self, score) -> bool:
return score


def rm_dir(cache_dir):
def rm_dir(cache_dir: str):
if os.path.isdir(cache_dir):
os.system(f"rm -rf {cache_dir}")

0 comments on commit ed2ac60

Please sign in to comment.