Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fuzzy deduplication to skip false positive checks as the default #498

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ def main(args):
id_field=dataset_id_field,
text_field=dataset_text_field,
seed=42,
char_ngrams=5,
char_ngrams=24,
num_buckets=20,
hashes_per_bucket=13,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=True,
num_anchors=2,
jaccard_threshold=0.8,
buckets_per_shuffle=5, # set to a smaller value if encountering OOMs during LSH
false_positive_check=False,
)
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)
Expand Down
76 changes: 49 additions & 27 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,47 +67,69 @@ class FuzzyDuplicatesConfig(BaseConfig):

# Minhash + LSH Config
seed: int = 42
char_ngrams: int = 5
char_ngrams: int = 24
num_buckets: int = 20
hashes_per_bucket: int = 13
use_64_bit_hash: bool = False
buckets_per_shuffle: int = 1

false_positive_check: bool = True
# Only required for fp check
num_anchors: int = 2
jaccard_threshold: float = 0.8
bucket_mapping_blocksize: int = 256
parts_per_worker: int = 1
bucket_parts_per_worker: int = 8
false_positive_check: bool = False
# Only required for false positive check
num_anchors: Optional[int] = None
jaccard_threshold: Optional[float] = None
bucket_mapping_blocksize: Optional[int] = None
parts_per_worker: Optional[int] = None
bucket_parts_per_worker: Optional[int] = None

def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
if self.cache_dir is None:
raise ValueError(
"Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates"
)
false_positive_defaults = {
"num_anchors": 2,
"jaccard_threshold": 0.8,
"bucket_mapping_blocksize": 256,
"parts_per_worker": 1,
"bucket_parts_per_worker": 8,
}
if self.false_positive_check:
warnings.warn(
"Identifying false positives during the Minhash deduplication is computationally expensive."
" For improved performance consider setting this to False"
)
if not self.false_positive_check and self.char_ngrams < 20:
warnings.warn(
"Using a small char_ngrams value might lead to a large number (~5%) of false positives during deduplication."
" Using a value of at least 20 for char_ngrams is recommended."
)
if self.num_anchors <= 0:
raise ValueError("Number of anchors must be greater than 0")
if self.num_anchors > 2:
warnings.warn(
"Using a higher number of anchor docs might lead to higher memory footprint and might impact performance",
category=UserWarning,
for arg, default in false_positive_defaults.items():
if getattr(self, arg) is None:
setattr(self, arg, default)
if self.num_anchors <= 0:
raise ValueError("Number of anchors must be greater than 0")
if self.num_anchors > 2:
warnings.warn(
"Using a higher number of anchor docs might lead to higher memory footprint and might impact performance",
category=UserWarning,
)
if not 0 <= self.jaccard_threshold <= 1:
raise ValueError("Jaccard Threshold must be between [0,1]")
else:
if self.char_ngrams < 20:
warnings.warn(
"Using a small char_ngrams value might lead to a large number (~5%) of false positives during deduplication."
" Using a value of at least 20 for char_ngrams is recommended."
)
unused_false_positive_args = [
arg
for arg in false_positive_defaults.keys()
if getattr(self, arg) is not None
]
if unused_false_positive_args:
warnings.warn(
f"False positive check is disabled. Unused arguments {unused_false_positive_args} will be ignored",
category=UserWarning,
)

if self.cache_dir is None:
raise ValueError(
"Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates"
)
if not 0 <= self.jaccard_threshold <= 1:
raise ValueError("Jaccard Threshold must be between [0,1]")
if self.buckets_per_shuffle <= 0:
raise ValueError("Buckets per shuffle must be greater than 0")
if not 1 <= self.buckets_per_shuffle <= self.num_buckets:
raise ValueError("Buckets per shuffle must be between [1, num_buckets]")


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/modules/fuzzy_dedup/minhash.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
self,
seed: int = 42,
num_hashes: int = 260,
char_ngrams: int = 5,
char_ngrams: int = 24,
use_64bit_hash: bool = False,
logger: Union[logging.LoggerAdapter, str] = "./",
id_field: str = "id",
Expand Down
96 changes: 1 addition & 95 deletions nemo_curator/scripts/fuzzy_deduplication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,98 +2,4 @@
This directory consists of scripts that can be invoked directly via the command line for finding fuzzy duplicates from a group of Jsonl files consisting of text & unique ID's that are specifically formatted using the `add_id` script included as a part of NeMo-Curator.

> [!IMPORTANT]
> The scripts are helper utilities that wrap the fuzzy_dedup API for handling multiple jsonl directories and the id format generated by [add_id](../add_id.py). For most cases we recommend working with the fuzzy_deduplication API directly.

### Usage
1. Compute Minhashes
- Input: Data Directories
- Output: minhashes.parquet for each data dir.
- Example call:
```bash
# same as `python compute_minhashes.py`
gpu_compute_minhashes \
--input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \
--output-minhash-dir /path/to/output_minhashes \
--input-json-text-field text_column_name \
--input-json-id-field id_column_name \
--minhash-length number_of_hashes \
--char-ngram char_ngram_size \
--hash-bytes 4(or 8 byte hashes) \
--seed 42 \
--log-dir ./
# --scheduler-file /path/to/file.json
```
2. Buckets (Minhash Buckets)
- Input: Minhash directories
- Output: Buckets.parquet
- Example call:
```bash
# same as `python minhash_lsh.py`
minhash_buckets \
--input-data-dirs /path/to/output_minhashes/dir1 /path/to/output_minhashes/dir2 \
--output-bucket-dir /path/to/dedup_output \
--input-minhash-field _minhash_signature \
--input-json-id-field id_column_name \
--minhash-length number_of_hashes \
--num-bands num_bands \
--buckets-per-shuffle 1 `#Value b/w [1-num_bands]. Higher is better but might lead to oom` \
--log-dir ./
# --scheduler-file /path/to/file.json
```
3. Jaccard Map Buckets
- Input: Buckets.parquet + Data Dir
- Output: anchor_docs_with_bk.parquet
- Example call:
```bash
# same as `python map_buckets.py`
jaccard_map_buckets \
--input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \
--input-bucket-dir /path/to/dedup_output/_buckets.parquet \
--output-dir /path/to/dedup_output \
--input-json-text-field text_column_name \
--input-json-id-field id_column_name \
# --scheduler-file /path/to/file.json
```
4. Jaccard Shuffle
- Input: anchor_docs_with_bk.parquet + Data Dir
- Output: shuffled_docs.parquet
- Example call:
```bash
# same as `python jaccard_shuffle.py`
jaccard_shuffle \
--input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \
--input-bucket-mapping-dir /path/to/dedup_output/anchor_docs_with_bk.parquet \
--output-dir /path/to/dedup_output \
--input-json-text-field text_column_name \
--input-json-id-field id_column_name \
# --scheduler-file /path/to/file.json
```
5. Jaccard compute
- Input: Shuffled docs.parquet
- Output: jaccard_similarity_results.parquet
- Example call:
```bash
# same as `python jaccard_compute.py`
jaccard_compute \
--shuffled-docs-path /path/to/dedup_output/shuffled_docs.parquet \
--output-dir /path/to/dedup_output \
--ngram-size char_ngram_size_for_similarity \
# --scheduler-file /path/to/file.json
```
6. Connected Components
- Input: jaccard_similarity_results.parquet
- Output: connected_components.parquet
- Example call:
```bash
# same as `python connected_components.py`
gpu_connected_component \
--jaccard-pairs_path /path/to/dedup_output/jaccard_similarity_results.parquet \
--output-dir /path/to/dedup_output \
--cache-dir /path/to/cc_cache \
--jaccard-threshold 0.8
# --scheduler-file /path/to/file.json
```

> [!TIP]
> When using these scripts in a multi-node environment (like Slurm, K8's etc.) it is recommended to start up a Dask cluster prior to execution and connect to the existing cluster via the `--scheduler-address` or `--scheduler-file` flag.
> Use the `--help` flag to view all possible CLI options for the scripts and details on what they do.
> The up to date documentation on running the fuzzy deduplication scripts can be found in the [NeMo Curator User Guide](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/gpudeduplication.html#id4). It is recommended to use the Python API directly rather than the CLI scripts for most cases.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def attach_args():
parser.add_argument(
"--char-ngram",
type=int,
default=5,
default=24,
help="The number of consecutive characters to include in a sliding "
"window when creating the document shingles for computing "
"minhash signatures.",
Expand Down
37 changes: 29 additions & 8 deletions tests/test_fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def test_fuzzy_dedup(
num_buckets=num_buckets,
hashes_per_bucket=1,
use_64_bit_hash=use_64_bit_hash,
buckets_per_shuffle=5,
buckets_per_shuffle=3,
false_positive_check=True,
num_anchors=2,
jaccard_threshold=jaccard_threshold,
Expand Down Expand Up @@ -375,6 +375,7 @@ def test_different_fields(self, fuzzy_dedup_data, tmpdir):
false_positive_check=True,
num_anchors=2,
jaccard_threshold=0.39,
char_ngrams=5,
)
fuzzy_duplicates = FuzzyDuplicates(config=config)
result = fuzzy_duplicates(fuzzy_dedup_data)
Expand Down Expand Up @@ -487,7 +488,7 @@ def test_no_fp_check(
num_buckets=num_buckets,
hashes_per_bucket=1,
use_64_bit_hash=use_64_bit_hash,
buckets_per_shuffle=5,
buckets_per_shuffle=3,
false_positive_check=False,
num_anchors=2,
jaccard_threshold=0.39,
Expand Down Expand Up @@ -575,11 +576,25 @@ def test_fuzzy_dedup_no_duplicates(
class TestFuzzyDuplicatesConfig:
def test_bad_inputs(self, tmpdir):
with pytest.raises(ValueError):
FuzzyDuplicatesConfig(cache_dir=tmpdir, num_anchors=0)
FuzzyDuplicatesConfig(
cache_dir=tmpdir, num_anchors=0, false_positive_check=True
)
FuzzyDuplicatesConfig(
cache_dir=tmpdir, jaccard_threshold=1.2, false_positive_check=True
)
FuzzyDuplicatesConfig(cache_dir=tmpdir, buckets_per_shuffle=0)
FuzzyDuplicatesConfig(
cache_dir=tmpdir, buckets_per_shuffle=2, num_buckets=1
)
FuzzyDuplicatesConfig(
cache_dir=None, num_anchors=0, false_positive_check=True
)
with pytest.warns(
UserWarning, match="Using a higher number of anchor docs might"
):
FuzzyDuplicatesConfig(cache_dir=tmpdir, num_anchors=3)
FuzzyDuplicatesConfig(
cache_dir=tmpdir, num_anchors=3, false_positive_check=True
)
with pytest.warns(
UserWarning, match="Using a small char_ngrams value might lead"
):
Expand All @@ -591,10 +606,16 @@ def test_bad_inputs(self, tmpdir):
match="Identifying false positives during the Minhash deduplication is computationally expensive",
):
FuzzyDuplicatesConfig(cache_dir=tmpdir, false_positive_check=True)
with pytest.raises(ValueError):
FuzzyDuplicatesConfig(cache_dir=tmpdir, jaccard_threshold=1.2)
with pytest.raises(ValueError):
FuzzyDuplicatesConfig(cache_dir=tmpdir, buckets_per_shuffle=0)
with pytest.warns(
UserWarning,
match="False positive check is disabled. Unused arguments",
):
FuzzyDuplicatesConfig(
cache_dir=tmpdir,
false_positive_check=False,
num_anchors=2,
jaccard_threshold=0.8,
)

def test_from_yaml(self, tmpdir):
yaml_params = {
Expand Down