Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Signed-off-by: Praateek <[email protected]>
  • Loading branch information
praateekmahajan committed Nov 7, 2024
2 parents e3fa7d4 + 01bda47 commit 813c1d7
Show file tree
Hide file tree
Showing 38 changed files with 5,663 additions and 2,951 deletions.
4 changes: 0 additions & 4 deletions config/sem_dedup_config.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Configuration file for semdantic dedup
cache_dir: "semdedup_cache"
num_files: 16
id_col_name: "id"
id_col_type: "int"
input_column: "text"

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128
embedding_max_mem_gb: 25

# Clustering configuration
clustering_save_loc: "clustering_results"
Expand Down
4 changes: 2 additions & 2 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Python API
from nemo_curator import FuzzyDuplicatesConfig
config = FuzzyDuplicatesConfig(
cache_dir="/path/to/dedup_outputs",
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="my_id",
text_field="text",
seed=42,
Expand Down Expand Up @@ -251,7 +251,7 @@ Python API
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.
- Clear the ``cache_dir`` between runs to avoid data from previous runs interfering with the current run's results.
- When setting the ``false_positive_check`` flag to ``True`` ensure ``cache_dir`` between runs is emptied to avoid data from previous runs interfering with the current run's results.

""""""""""""
CLI Utility
Expand Down
24 changes: 12 additions & 12 deletions docs/user-guide/semdedup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,11 @@ Semantic deduplication in NeMo Curator can be configured using a YAML file. Here
# Configuration file for semantic dedup
cache_dir: "semdedup_cache"
num_files: -1
id_col_name: "id"
id_col_type: "int"
input_column: "text"
# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128
embedding_max_mem_gb: 25
# Clustering configuration
clustering_save_loc: "clustering_results"
Expand Down Expand Up @@ -99,7 +95,7 @@ The module supports various types of models, including:
When changing the model, ensure that:

1. The model is compatible with the data type you're working with (primarily text for this module).
2. You adjust the ``embedding_batch_size`` and ``embedding_max_mem_gb`` parameters as needed, as different models may have different memory requirements.
2. You adjust the ``embedding_batch_size`` parameter as needed, as different models may have different memory requirements.
3. The chosen model is appropriate for the language or domain of your dataset.

By selecting an appropriate embedding model, you can optimize the semantic deduplication process for your specific use case and potentially improve the quality of the deduplicated dataset.
Expand Down Expand Up @@ -172,11 +168,10 @@ Use Individual Components
# Step 1: Embedding Creation
embedding_creator = EmbeddingCreator(
embedding_model_name_or_path="path/to/pretrained/model",
embedding_max_mem_gb=32,
embedding_batch_size=128,
embedding_output_dir="path/to/output/embeddings",
input_column="text",
logger="path/to/log/dir"
logger="path/to/log/dir",
)
embeddings_dataset = embedding_creator(dataset)
Expand All @@ -189,7 +184,7 @@ Use Individual Components
# Step 2: Clustering
clustering_model = ClusteringModel(
id_col="doc_id",
id_column="doc_id",
max_iter=100,
n_clusters=50000,
clustering_output_dir="path/to/output/clusters",
Expand All @@ -208,8 +203,8 @@ Use Individual Components
n_clusters=50000,
emb_by_clust_dir="path/to/embeddings/by/cluster",
sorted_clusters_dir="path/to/sorted/clusters",
id_col="doc_id",
id_col_type="str",
id_column="doc_id",
id_column_type="str",
which_to_keep="hard",
output_dir="path/to/output/deduped",
logger="path/to/log/dir"
Expand All @@ -235,7 +230,13 @@ Alternatively, you can use the SemDedup class to perform all steps:
config = SemDedupConfig(**config_dict)
# Initialize SemDedup with the configuration
sem_dedup = SemDedup(config, logger="path/to/log/dir")
sem_dedup = SemDedup(
config=config,
input_column="text",
id_column="doc_id",
id_column_type="str",
logger="path/to/log/dir",
)
# Perform semantic deduplication
deduplicated_dataset_ids = sem_dedup(dataset)
Expand All @@ -249,7 +250,6 @@ Parameters
Key parameters in the configuration file include:

- ``embedding_model_name_or_path``: Path or identifier for the pre-trained model used for embedding generation.
- ``embedding_max_mem_gb``: Maximum memory usage for the embedding process.
- ``embedding_batch_size``: Number of samples to process in each embedding batch.
- ``n_clusters``: Number of clusters for k-means clustering.
- ``eps_to_extract``: Deduplication threshold. Higher values result in more aggressive deduplication.
Expand Down
2 changes: 1 addition & 1 deletion examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def main(args):

dataset_dir = "/path/to/dataset"
log_dir = "./"
cache_dir = "./fuzzy_cache"
cache_dir = "./fuzzy_cache" # must be cleared between runs
output_dir = "./output"
dataset_id_field = "id"
dataset_text_field = "text"
Expand Down
2 changes: 1 addition & 1 deletion examples/semdedup_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def main(args):


def attach_args():
parser = ArgumentHelper.parse_semdedup_args(add_input_args=True)
parser = ArgumentHelper.parse_semdedup_args()
return parser


Expand Down
8 changes: 4 additions & 4 deletions nemo_curator/classifiers/aegis.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def forward(self, batch):


class AegisHFModel(HFModel):
def __init__(self, config: AegisConfig, max_mem_gb=None):
def __init__(self, config: AegisConfig, max_mem_gb: Optional[int] = None):
self.config = config
if max_mem_gb is None:
max_mem_gb = _get_suggest_memory_for_classifier()
Expand All @@ -109,7 +109,7 @@ def __init__(self, config: AegisConfig, max_mem_gb=None):
seq_len_increment=1024,
)

def load_model(self, device="cuda"):
def load_model(self, device: str = "cuda"):
model = AegisModel(
self.config.pretrained_model_name_or_path,
self.config.peft_model_name_or_path,
Expand Down Expand Up @@ -171,7 +171,7 @@ def __init__(
keep_raw_pred: bool = False,
max_chars: int = 6000,
device_type: str = "cuda",
max_mem_gb: int = None,
max_mem_gb: Optional[int] = None,
):
"""
Constructs the classifier
Expand Down Expand Up @@ -270,7 +270,7 @@ def _postprocess_responses(self, df):
df[self.pred_column] = cudf.Series(parsed_response)
return df

def _run_classifier(self, dataset: DocumentDataset):
def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset:
print("Starting AEGIS classifier inference", flush=True)
ddf = dataset.df
hidden_meta = ddf._meta.copy()
Expand Down
31 changes: 16 additions & 15 deletions nemo_curator/classifiers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

os.environ["RAPIDS_NO_INITIALIZE"] = "1"
from abc import ABC, abstractmethod
from typing import List
from typing import List, Optional

import torch
import torch.nn as nn
Expand All @@ -33,15 +33,15 @@ class DistributedDataClassifier(ABC):

def __init__(
self,
model,
labels,
filter_by,
batch_size,
out_dim,
pred_column,
max_chars,
device_type,
autocast,
model: str,
labels: List[str],
filter_by: Optional[List[str]],
batch_size: int,
out_dim: int,
pred_column: str,
max_chars: int,
device_type: str,
autocast: bool,
):
self.model = model
self.labels = labels
Expand All @@ -53,21 +53,21 @@ def __init__(
self.device_type = device_type
self.autocast = autocast

def __call__(self, dataset: DocumentDataset):
def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
result_doc_dataset = self._run_classifier(dataset)
if self.filter_by is not None:
return self._filter_documents(result_doc_dataset)

return result_doc_dataset

@abstractmethod
def _run_classifier(self):
def _run_classifier(self) -> DocumentDataset:
pass

def _filter_documents(
self,
dataset: DocumentDataset,
):
) -> DocumentDataset:
df = dataset.df

filter_by = self.filter_by
Expand Down Expand Up @@ -106,7 +106,7 @@ def forward(self, batch):
else:
return self._forward(batch)

def set_autocast(self, autocast):
def set_autocast(self, autocast: bool):
self.autocast = autocast


Expand All @@ -117,14 +117,15 @@ def _run_classifier_helper(
max_chars: int,
batch_size: int,
label_col: str,
text_field: str = "text",
prob_col: str = None,
) -> "dask_cudf.DataFrame":

keep_prob = prob_col is not None
prob_internal_col = "_prob"
# TODO: Make crossfit handle this cleanly
pred_internal_col = "labels"
df["sliced_text"] = df["text"].str.slice(0, max_chars)
df["sliced_text"] = df[text_field].str.slice(0, max_chars)
columns_to_keep_list = df.columns.to_list()
columns_to_keep_list.remove("sliced_text")

Expand Down
36 changes: 22 additions & 14 deletions nemo_curator/classifiers/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import os
from dataclasses import dataclass
from typing import List, Optional

os.environ["RAPIDS_NO_INITIALIZE"] = "1"
from crossfit.backend.torch.hf.model import HFModel
Expand All @@ -31,14 +32,17 @@

@dataclass
class DomainModelConfig:
model = "microsoft/deberta-v3-base"
fc_dropout = 0.2
max_len = 512
model: str = "microsoft/deberta-v3-base"
fc_dropout: float = 0.2
max_len: int = 512


class DomainModel(HFModel):
def __init__(
self, config: DomainModelConfig, autocast: bool = False, max_mem_gb=None
self,
config: DomainModelConfig,
autocast: bool = False,
max_mem_gb: Optional[int] = None,
):
self.config = config
self.autocast = autocast
Expand All @@ -47,7 +51,7 @@ def __init__(

super().__init__(self.config.model, max_mem_gb=max_mem_gb)

def load_model(self, device="cuda"):
def load_model(self, device: str = "cuda"):
model = HFDeberta.from_pretrained(DOMAIN_IDENTIFIER)
model.set_autocast(self.autocast)
model = model.to(device)
Expand All @@ -70,6 +74,7 @@ class DomainClassifier(DistributedDataClassifier):
filter_by (list[str], optional): The classes to filter the dataset by.
If None, all classes will be included. Defaults to None.
batch_size (int): The number of samples per batch for inference. Defaults to 256.
text_field (str): The field in the dataset that should be classified.
pred_column (str): The column name where predictions will be stored. Defaults to "domain_pred".
prob_column (str, optional): The column name where prediction probabilities will be stored. Defaults to None.
max_chars (int): The maximum number of characters in each document to consider for classification. Defaults to 2000.
Expand All @@ -82,17 +87,19 @@ class DomainClassifier(DistributedDataClassifier):

def __init__(
self,
filter_by=None,
batch_size=256,
pred_column="domain_pred",
prob_column=None,
max_chars=2000,
device_type="cuda",
autocast=True,
max_mem_gb=None,
filter_by: Optional[List[str]] = None,
batch_size: int = 256,
text_field: str = "text",
pred_column: str = "domain_pred",
prob_column: Optional[str] = None,
max_chars: int = 2000,
device_type: str = "cuda",
autocast: bool = True,
max_mem_gb: Optional[int] = None,
):
config = AutoConfig.from_pretrained(DOMAIN_IDENTIFIER)

self.text_field = text_field
self.prob_column = prob_column
self.labels = list(config.label2id.keys())
self.labels.sort(key=lambda x: config.label2id[x])
Expand All @@ -114,7 +121,7 @@ def __init__(
autocast=autocast,
)

def _run_classifier(self, dataset: DocumentDataset):
def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset:
print("Starting domain classifier inference", flush=True)
df = dataset.df
df = _run_classifier_helper(
Expand All @@ -124,6 +131,7 @@ def _run_classifier(self, dataset: DocumentDataset):
max_chars=self.max_chars,
batch_size=self.batch_size,
label_col=self.pred_column,
text_field=self.text_field,
prob_col=self.prob_column,
)
return DocumentDataset(df)
Loading

0 comments on commit 813c1d7

Please sign in to comment.