diff --git a/config/fuzzy_dedup_config.yaml b/config/fuzzy_dedup_config.yaml new file mode 100644 index 00000000..a513a72f --- /dev/null +++ b/config/fuzzy_dedup_config.yaml @@ -0,0 +1,16 @@ +cache_dir: "./fuzzy_dedup_cache" +# Optional Params below with default values +# profile_dir: null +# id_field: "id" +# text_field: "text" + +# seed: 42 +# char_ngrams: 5 +# num_buckets: 20 +# hashes_per_bucket: 13 +# use_64_bit_hash: false +# buckets_per_shuffle: 1 + +# false_positive_check: True +# num_anchors: 2 +# jaccard_threshold: 0.8 diff --git a/examples/fuzzy_deduplication.py b/examples/fuzzy_deduplication.py new file mode 100644 index 00000000..d74fd775 --- /dev/null +++ b/examples/fuzzy_deduplication.py @@ -0,0 +1,109 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import time + +import dask +from dask import dataframe as dd + +from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, write_to_disk +from nemo_curator.utils.script_utils import add_distributed_args + + +def pre_imports(): + import cudf # noqa: F401 + + +def main(args): + + dataset_dir = "/path/to/dataset" + log_dir = "./" + cache_dir = "./fuzzy_cache" + output_dir = "./output" + dataset_id_field = "id" + dataset_text_field = "text" + + filetype = "parquet" + + # Fuzzy dup calculation only supports the cuDF/GPU backend + backend = "cudf" + assert args.device == "gpu" + + with dask.config.set({"dataframe.backend": backend}): + client = get_client(args, args.device) + client.run(pre_imports) + + t0 = time.time() + if filetype == "parquet": + input_dataset = DocumentDataset( + dd.read_parquet( + dataset_dir, + columns=[dataset_id_field, dataset_text_field], + blocksize="256MiB", + aggregate_files=True, + ) + ) + elif filetype == "jsonl": + input_dataset = DocumentDataset.read_json( + dataset_dir, + backend=backend, + ) + + fuzzy_dedup_config = FuzzyDuplicatesConfig( + cache_dir=cache_dir, + id_field=dataset_id_field, + text_field=dataset_text_field, + seed=42, + char_ngrams=5, + num_buckets=20, + hashes_per_bucket=13, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=0.8, + ) + fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config) + duplicates = fuzzy_dup(dataset=input_dataset) + + # By default all duplicate id's and the group they belong to are included in the result + # keep 1 document from each group of duplcates and mark the others to remove + # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html + docs_to_remove = duplicates.df.map_partitions( + lambda x: x[x.group.duplicated(keep="first")] + ) + + # When there are few duplicates we can compute the results to a list and use `isin`. + result = input_dataset.df[ + ~input_dataset.df[dataset_id_field].isin( + docs_to_remove[dataset_id_field].compute() + ) + ] + write_to_disk(result, output_dir, output_type=filetype) + print(time.time() - t0) + + +def attach_args( + parser=argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ), +): + return add_distributed_args(parser) + + +if __name__ == "__main__": + main(attach_args().parse_args()) diff --git a/examples/gpu_deduplication_example/README.md b/examples/gpu_deduplication_example/README.md deleted file mode 100644 index 2f294e1f..00000000 --- a/examples/gpu_deduplication_example/README.md +++ /dev/null @@ -1,29 +0,0 @@ -### Deduplication Steps - -> [!CAUTION] -> The examples references here are outdated and will be replaced with an example using the Python API directly. For more details on the scripts refer to [nemo_curator/scripts/fuzzy_deduplication](/nemo_curator/scripts/fuzzy_deduplication) - -1. Exact dedup - 1. Input: Data directories - 2. Output: exact_duplicates.parquet. List of exact duplicates and the document hash. - -Fuzzy Dedup -1. Minhashes (Compute minhashes) - 1. Input: Data Directories - 2. Output: minhashes.parquet for each data dir. -2. Buckets (Minhash Buckets) - 1. Input: Minhash directories - 2. Output: Buckets.parquet -3. Jaccard Map Buckets + Jaccard shuffle - 1. Input: Buckets.parquet + Data Dir - 2. Output: Shuffled docs.parquet -4. Jaccard compute - 1. Input: Shuffled docs.parquet - 2. Output: dedup_final_results.parquet -5. Connected Components - 1. Input: Dedup_final_Results.parquet - 2. Output: connected_components.parquet - - -While calling the main `run-workflow.sh` script that points to these runscripts users can also set the relevant `LIBCUDF_CUFILE_POLICY`. -It is reccomended to set `LIBCUDF_CUFILE_POLICY=OFF` for all runs calling the script. diff --git a/examples/gpu_deduplication_example/batch.sh b/examples/gpu_deduplication_example/batch.sh deleted file mode 100644 index eca7145c..00000000 --- a/examples/gpu_deduplication_example/batch.sh +++ /dev/null @@ -1,38 +0,0 @@ -#! /bin/bash - -#SBATCH --job-name=nemo-data-curator:gpu-deduplication -#SBATCH --nodes=8 -#SBATCH --exclusive -#SBATCH --time=04:00:00 - -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# -# This script can be used for running both exact and fuzzy document-level -# deduplication using Dask and cuDF -# - -base_dir=`pwd` # Assumes base dir is top-level dir of repo -RUNSCRIPT=${RUNSCRIPT:-${base_dir}/examples/gpu_deduplication_example/run-minhash.sh} -LIBCUDF_CUFILE_POLICY=${LIBCUDF_CUFILE_POLICY:-OFF} -echo $RUNSCRIPT - -docker_image='nvcr.io/ea-bignlp/ga-participants/nemofw-training:23.08.03' -mounts="${base_dir}:${base_dir}" - -srun -l \ - --container-mounts=${mounts} \ - --container-image=${docker_image} \ - bash -c "echo ${RUNSCRIPT};echo ${LIBCUDF_CUFILE_POLICY}; LIBCUDF_CUFILE_POLICY=${LIBCUDF_CUFILE_POLICY} RUNSCRIPT=${RUNSCRIPT} bash ${base_dir}/examples/gpu_deduplication_example/run-workflow.sh" diff --git a/examples/gpu_deduplication_example/create-list-of-exact-duplicate-ids.sh b/examples/gpu_deduplication_example/create-list-of-exact-duplicate-ids.sh deleted file mode 100644 index 757629e3..00000000 --- a/examples/gpu_deduplication_example/create-list-of-exact-duplicate-ids.sh +++ /dev/null @@ -1,53 +0,0 @@ -#! /bin/bash - -#SBATCH --job-name=nemo-data-curator:create-exact-dup-id-list -#SBATCH --nodes=1 -#SBATCH --exclusive -#SBATCH --time=0:30:00 - -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eux - -## Log and intermediate results dirs -base_dir=`pwd` -src_dir="${base_dir}/workspace/nemo-data-curator" -log_dir=${src_dir}/workspace/log/create_exact_dup_id_list -res_dir=${src_dir}/workspace/data/create_exact_dup_id_list -conf_dir=${src_dir}/workspace/config -mkdir -p ${log_dir} ${res_dir} ${conf_dir} - -## Container related variables -docker_image="nvcr.io/ea-bignlp/ga-participants/nemofw-training:23.11" -mounts="${base_dir}:${base_dir}" - -## Set relevant filepath -input_id_list_dir= - -srun -l \ - --mpi=pmix \ - --output=${log_dir}/create_exact_dup_id_list_%j.out \ - --error=${log_dir}/create_exact_dup_id_list_%j.err \ - --container-image=${docker_image} \ - --container-mounts=${mounts} \ - create_list_of_duplicate_ids \ - --input-id-list-dir=${input_id_list_dir} \ - --input-bucket-key="_hashes" \ - --output-id-list-dir=${res_dir}/exact_dup_ids \ - --output-bucket-list-dir=${res_dir}/buckets \ - --log-dir=${log_dir}/create_exact_dup_id_list - -# Concatenate the extracted list of ids -cat ${res_dir}/exact_dup_ids/*.txt > ${res_dir}/exact_duplicate_id_list.txt diff --git a/examples/gpu_deduplication_example/create-list-of-fuzzy-duplicate-ids.sh b/examples/gpu_deduplication_example/create-list-of-fuzzy-duplicate-ids.sh deleted file mode 100644 index 70b0d13b..00000000 --- a/examples/gpu_deduplication_example/create-list-of-fuzzy-duplicate-ids.sh +++ /dev/null @@ -1,66 +0,0 @@ -#! /bin/bash - -#SBATCH --job-name=nemo-data-curator:create-fuzzy-dup-id-list -#SBATCH --nodes=1 -#SBATCH --exclusive -#SBATCH --time=0:30:00 - -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eux - -## Log and intermediate results dirs -base_dir=`pwd` -src_dir="${base_dir}/workspace/nemo-data-curator" -log_dir=${src_dir}/workspace/log/create_fuzzy_dup_id_list -res_dir=${src_dir}/workspace/data/create_fuzzy_dup_id_list -conf_dir=${src_dir}/workspace/config -mkdir -p ${log_dir} ${res_dir} ${conf_dir} - -## Container related variables -docker_image="nvcr.io/ea-bignlp/ga-participants/nemofw-training:23.11" -mounts="${base_dir}:${base_dir}" - -## Set relevant filepath -input_id_list_dir= - -# Generate the mapping and prepare the connected components -srun -l \ - --nodes=1 \ - --output=${log_dir}/create_fuzzy_dup_id_list_%j.out \ - --error=${log_dir}/create_fuzzy_dup_id_list_%j.err \ - --container-image=${docker_image} \ - --container-mounts=${mounts} \ - prepare_fuzzy_ids \ - --path-to-connected-components=${input_id_list_dir} \ - --output-indexed-connected-components=${res_dir}/indexed_connected_components.parquet \ - --output-id-mapping=${res_dir}/mapping.json - -srun -l \ - --mpi=pmix \ - --output=${log_dir}/create_fuzzy_dup_id_list_%j.out \ - --error=${log_dir}/create_fuzzy_dup_id_list_%j.err \ - --container-image=${docker_image} \ - --container-mounts=${mounts} \ - create_list_of_duplicate_ids \ - --input-id-list-dir=${res_dir}/indexed_connected_components.parquet \ - --input-bucket-key="group" \ - --id-mapping=${res_dir}/mapping.json \ - --output-id-list-dir=${res_dir}/fuzzy_dup_ids \ - --output-bucket-list-dir=${res_dir}/buckets \ - --log-dir=${log_dir}/create_fuzzy_dup_id_list - -# Concatenate the extracted list of ids -cat ${res_dir}/fuzzy_dup_ids/*.txt > ${res_dir}/fuzzy_duplicate_id_list.txt diff --git a/examples/gpu_deduplication_example/remove-duplicates.sh b/examples/gpu_deduplication_example/remove-duplicates.sh deleted file mode 100644 index 275c9f15..00000000 --- a/examples/gpu_deduplication_example/remove-duplicates.sh +++ /dev/null @@ -1,52 +0,0 @@ -#! /bin/bash - -#SBATCH --job-name=nemo-data-curator:remove-duplicates -#SBATCH --nodes=10 -#SBATCH --exclusive -#SBATCH --time=01:00:00 - -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eux - -## Log and intermediate results dirs -base_dir=`pwd` -src_dir="${base_dir}/workspace/nemo-data-curator" -log_dir=${src_dir}/workspace/log/remove_duplicates -res_dir=${src_dir}/workspace/data/remove_duplicates -conf_dir=${src_dir}/workspace/config -mkdir -p ${log_dir} ${res_dir} ${conf_dir} - -## Container related variables -docker_image="nvcr.io/ea-bignlp/ga-participants/nemofw-training:23.11" -mounts="${base_dir}:${base_dir}" - -## Set relevant filepaths -input_data_dir="" -input_id_list="" -output_data_dir="" -fname=$(basename ${input_id_list}) -tag=$(basename $fname .txt) - -srun -l \ - --output=${log_dir}/remove_duplicates_${tag}_%j.out \ - --error=${log_dir}/remove_duplicates_${tag}_%j.err \ - --container-image=${docker_image} \ - --container-mounts=${mounts} \ - remove_duplicates \ - --input-data-dir=${input_data_dir} \ - --input-id-list=${input_id_list} \ - --output-deduped-dir=${output_data_dir}/all_deduped \ - --log-dir=${log_dir}/all_deduped_${tag} diff --git a/examples/gpu_deduplication_example/run-buckets.sh b/examples/gpu_deduplication_example/run-buckets.sh deleted file mode 100644 index 7ca1d102..00000000 --- a/examples/gpu_deduplication_example/run-buckets.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash - -minhash_dir="/outputdir/minhashes" -datasets=$(ls ${minhash_dir}) -for dataset in $datasets; do - input_minhash_dirs="$input_minhash_dirs $minhash_dir/$dataset/minhashes.parquet" -done -output_dir="/outputdir" - -buckets_per_shuffle=1 - -mkdir -p $output_dir -echo $input_minhash_dirs - -# Remove old buckets -rm -r ${output_dir}/buckets.parquet - -python -u minhash_buckets.py \ - --input-data-dirs $input_minhash_dirs \ - --minhash-length 260 \ - --output-bucket-dir $output_dir/ \ - --log-dir $LOGDIR \ - --protocol ucx \ - --num-bands 20 \ - --buckets-per-shuffle=$buckets_per_shuffle \ - --split-out=512 \ - --scheduler-file $LOGDIR/scheduler.json - -echo "Time Check: `date`" diff --git a/examples/gpu_deduplication_example/run-cc.sh b/examples/gpu_deduplication_example/run-cc.sh deleted file mode 100644 index ab0c6210..00000000 --- a/examples/gpu_deduplication_example/run-cc.sh +++ /dev/null @@ -1,26 +0,0 @@ - -base_dir="/outputdir" -cc_folder="CC" -output_dir="${base_dir}/${cc_folder}_output" -cache_dir="${base_dir}/${cc_folder}_cache" -jaccard_pairs_path="/outputdir/dedup_final_results.parquet" - - -echo "output_dir set to $output_dir" -echo "cache_dir set to $cache_dir" - -export RAPIDS_NO_INITIALIZE="1" -export CUDF_SPILL="1" - -### compute connected component -#rm -r $cache_dir -mkdir -p $output_dir $cache_dir - -python -u connected_component.py \ - --jaccard-pairs-path $jaccard_pairs_path \ - --output-dir $output_dir \ - --cache-dir $cache_dir \ - --log-dir $LOGDIR \ - --profile-path $PROFILESDIR \ - --num-files $NUM_FILES \ - --scheduler-file $LOGDIR/scheduler.json diff --git a/examples/gpu_deduplication_example/run-jaccard.sh b/examples/gpu_deduplication_example/run-jaccard.sh deleted file mode 100644 index 6ee51d30..00000000 --- a/examples/gpu_deduplication_example/run-jaccard.sh +++ /dev/null @@ -1,16 +0,0 @@ - -shuffled_docs_dir="/outputdir/shuffled_docs.parquet" -output_dir="/outputdir" - - -export CUDF_SPILL="1" - -python jaccard_compute.py \ - --shuffled-docs-path $shuffled_docs_dir \ - --output-dir $output_dir \ - --log-dir $LOGDIR \ - --num-files $NUM_FILES \ - --scheduler-file $LOGDIR/scheduler.json - - -echo "Time Check: `date`" diff --git a/examples/gpu_deduplication_example/run-minhash.sh b/examples/gpu_deduplication_example/run-minhash.sh deleted file mode 100644 index 79e069cd..00000000 --- a/examples/gpu_deduplication_example/run-minhash.sh +++ /dev/null @@ -1,42 +0,0 @@ -#! /bin/bash - -# Assumes each directory contains Jsonl files -input_data_dirs="/datadir/dataset1/ \ -/datadir/dataset2/ \ -/datadir/dataset3/" - -output_dir="/outputdir/minhashes" - -# NOTE: The script implicitly assumes that the last part -# of the input data paths is the dataset name and will choose -# output dir names as follows: -# /outputdir/minhashes/dataset1 -# /outputdir/minhashes/dataset2 -# /outputdir/minhashes/dataset3 -# This can cause issues if the last part of the -# dirname is the same across datasets - -mkdir -p $output_dir - -# Is a good number for files 200MB or lesser -# Use a smaller value for larger jsonl files -files_per_partition=20 - -mkdir -p $output_dir -echo $input_data_dirs - -python -u compute_minhashes.py \ - --input-data-dirs $input_data_dirs \ - --minhash-length 260 \ - --char-ngram 5 \ - --hash-bytes 4 \ - --seed 42 \ - --output-minhash-dir $output_dir \ - --log-dir $LOGDIR \ - --num-files $NUM_FILES \ - --files-per-partition $files_per_partition \ - --profile-path $PROFILESDIR \ - --log-frequency 250 \ - --scheduler-file $LOGDIR/scheduler.json - -echo "Time Check: `date`" diff --git a/examples/gpu_deduplication_example/run-shuffle.sh b/examples/gpu_deduplication_example/run-shuffle.sh deleted file mode 100644 index e559dbbb..00000000 --- a/examples/gpu_deduplication_example/run-shuffle.sh +++ /dev/null @@ -1,35 +0,0 @@ -input_data_dirs="/datadir/dataset1/ \ -/datadir/dataset2/ \ -/datadir/dataset3/" -buckets_dir="/outputdir/buckets.parquet" -output_dir="/outputdir" - - -export CUDF_SPILL="1" - -## Run jaccard Mapping -echo "Starting Jaccard mapping..." -python jaccard_map_buckets.py \ - --input-bucket-dir $buckets_dir \ - --input-data-dirs $input_data_dirs \ - --output-dir $output_dir \ - --log-dir $LOGDIR \ - --text-ddf-blocksize 512 \ - --num-files $NUM_FILES \ - --scheduler-file $LOGDIR/scheduler.json - -### Run jaccard Shuffle - -echo "Starting Jaccard Shuffle..." - -python jaccard_shuffle.py \ - --input-bucket-mapping-dir $output_dir/anchor_docs_with_bk.parquet \ - --input-data-dirs $input_data_dirs \ - --output-dir $output_dir \ - --text-ddf-blocksize 256 \ - --bucket-mapping-ddf-blocksize 512 \ - --num-files $NUM_FILES \ - --parts-per-worker 1 \ - --scheduler-file $LOGDIR/scheduler.json - -echo "Time Check: `date`" diff --git a/examples/gpu_deduplication_example/run-workflow.sh b/examples/gpu_deduplication_example/run-workflow.sh deleted file mode 100755 index b7e1392f..00000000 --- a/examples/gpu_deduplication_example/run-workflow.sh +++ /dev/null @@ -1,70 +0,0 @@ -#! /bin/bash - -echo "Starting Workflow..." -echo "Time Check: `date`" -if [[ -z "$SLURM_JOB_ID" ]]; then - TODAY="`date +"%Y_%m_%d"`" -else - TODAY="`date +"%Y_%m_%d"`-$SLURM_JOB_ID" -fi - -# Prepare output directory -export JOB_DIR=rapids-dedup-scripts/DEDUP-$TODAY -export FULL_OUTPUT_DIR=$HOME/$JOB_DIR -export LOGDIR=$FULL_OUTPUT_DIR/logs -export PROFILESDIR=$FULL_OUTPUT_DIR/profiles -# Take the default location within the container -RUNSCRIPT=${RUNSCRIPT:--/opt/nemo-data-curator/examples/gpu_deduplication_example/run-minhash.sh} -echo $RUNSCRIPT -mkdir -p $LOGDIR -mkdir -p $PROFILESDIR - -cd /opt/nemo-data-curator/nemo_curator/gpu_deduplication -#-----# - - -# Env vars -export RAPIDS_NO_INITIALIZE="1" -export CUDF_SPILL="1" - -export LIBCUDF_CUFILE_POLICY=${LIBCUDF_CUFILE_POLICY:-ALWAYS} - -# Network interface specific to the cluster being used -export INTERFACE=ibp12s0 -export PROTOCOL=ucx -echo $INTERFACE - -# This variable can be set to limit the number of jsonl files that -# are used in the dedup. Setting to -1 reads in all files -export NUM_FILES=-1 - -# Start the scheduler on the rank 0 node -if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then - echo "Starting scheduler" - DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True \ - DASK_DISTRIBUTED__RMM__POOL_SIZE=1GB \ - dask scheduler \ - --scheduler-file $LOGDIR/scheduler.json \ - --protocol $PROTOCOL \ - --interface $INTERFACE >> $LOGDIR/scheduler.log 2>&1 & -fi -sleep 30 - -# Start the workers on each node -echo "Starting workers..." -dask-cuda-worker --scheduler-file $LOGDIR/scheduler.json --rmm-pool-size 72GiB --interface $INTERFACE --rmm-async >> $LOGDIR/worker_$HOSTNAME.log 2>&1 & - -sleep 60 - -if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then - echo "Time Check: `date`" - bash $RUNSCRIPT - echo "Time Check: `date`" - touch $LOGDIR/done.txt -fi - -# All nodes wait until done -while [ ! -f $LOGDIR/done.txt ] -do - sleep 15 -done diff --git a/nemo_curator/modules/__init__.py b/nemo_curator/modules/__init__.py index 0867942d..8b961326 100644 --- a/nemo_curator/modules/__init__.py +++ b/nemo_curator/modules/__init__.py @@ -22,6 +22,7 @@ from nemo_curator.utils.import_utils import gpu_only_import_from from .add_id import AddId +from .config import FuzzyDuplicatesConfig from .dataset_ops import blend_datasets, Shuffle from .exact_dedup import ExactDuplicates from .filter import Filter, Score, ScoreFilter @@ -32,6 +33,9 @@ # GPU packages LSH = gpu_only_import_from("nemo_curator.modules.fuzzy_dedup", "LSH") MinHash = gpu_only_import_from("nemo_curator.modules.fuzzy_dedup", "MinHash") +FuzzyDuplicates = gpu_only_import_from( + "nemo_curator.modules.fuzzy_dedup", "FuzzyDuplicates" +) # Pytorch related imports must come after all imports that require cugraph, # because of context cleanup issues b/w pytorch and cugraph @@ -42,6 +46,8 @@ "DomainClassifier", "ExactDuplicates", "Filter", + "FuzzyDuplicatesConfig", + "FuzzyDuplicates", "LSH", "MinHash", "Modify", diff --git a/nemo_curator/modules/config.py b/nemo_curator/modules/config.py new file mode 100644 index 00000000..45ea527f --- /dev/null +++ b/nemo_curator/modules/config.py @@ -0,0 +1,100 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from dataclasses import dataclass + +import yaml + + +@dataclass +class BaseConfig: + @classmethod + def from_yaml(cls, file_path: str): + with open(file_path, "r") as file: + yaml_dict = yaml.safe_load(file) + return cls(**yaml_dict) + + +@dataclass +class FuzzyDuplicatesConfig(BaseConfig): + """ + Configuration for MinHash based fuzzy duplicates detection. + Parameters + ---------- + seed: Seed for minhash permutations + char_ngrams: Size of Char ngram shingles used in minhash computation + num_buckets: Number of Bands or buckets to use during Locality Sensitive Hashing + hashes_per_bucket: Number of hashes per bucket/band. + use_64_bit_hash: Whether to use a 32bit or 64bit hash function for minhashing. + buckets_per_shuffle: Number of bands/buckets to shuffle concurrently. + Larger values process larger batches by processing multiple bands + but might lead to memory pressures and related errors. + id_field: Column in the Dataset denoting document ID. + text_field: Column in the Dataset denoting document content. + profile_dir: str, Default None + If specified directory to write dask profile + cache_dir: str, Default None + Location to store deduplcation intermediates such as minhashes/buckets etc. + false_positive_check: bool, + Whether to run a check to look for false positives within buckets. + Note: This is a computationally expensive step. + num_anchors: int + Number of documents per bucket to use as reference for computing jaccard + pairs within that bucket to identify false positives. + jaccard_threshold: float + The Jaccard similariy threshold to consider a document a near duplicate + during false positive evaluations. + """ + + # General config + cache_dir: str + profile_dir: str = None + id_field: str = "id" + text_field: str = "text" + + # Minhash + LSH Config + seed: int = 42 + char_ngrams: int = 5 + num_buckets: int = 20 + hashes_per_bucket: int = 13 + use_64_bit_hash: bool = False + buckets_per_shuffle: int = 1 + + false_positive_check: bool = True + # Only required for fp check + num_anchors: int = 2 + jaccard_threshold: float = 0.8 + + def __post_init__(self): + self.num_hashes = self.num_buckets * self.hashes_per_bucket + if self.cache_dir is None: + raise ValueError( + "Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates" + ) + if not self.false_positive_check: + raise NotImplementedError( + "Skipping false positive checks is not supported at the moment" + ) + if self.num_anchors <= 0: + raise ValueError("Number of anchors must be greater than 0") + if self.num_anchors > 2: + warnings.warn( + "Using a higher number of anchor docs might lead to higher memory footprint and might impact performance", + category=UserWarning, + ) + if not 0 <= self.jaccard_threshold <= 1: + raise ValueError("Jaccard Threshold must be between [0,1]") + if self.buckets_per_shuffle <= 0: + raise ValueError("Buckets per shuffle must be greater than 0") diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index ac72e53d..b61ccde7 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -35,6 +35,8 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.log import create_logger +from nemo_curator.modules.config import FuzzyDuplicatesConfig +from nemo_curator.modules.meta import Sequential from nemo_curator.utils.distributed_utils import ( get_current_client, get_num_workers, @@ -194,7 +196,7 @@ class LSH: def __init__( self, cache_dir: str, - minhash_length: int, + num_hashes: int, num_buckets: int, buckets_per_shuffle: int = 1, logger: Union[logging.LoggerAdapter, str] = "./", @@ -207,9 +209,9 @@ def __init__( ---------- cache_dir: str Needs to be specified, will compute & write duplicate id, bucket pairs to cache directory. - minhash_length: Length of minhash signature + num_hashes: Length of minhash signature num_buckets: Number of bands/buckets to create from the minhash signature. - Hashes_per_signature = minhash_length / num_buckets + Hashes_per_signature = num_hashes / num_buckets buckets_per_shuffle: Number of bands/buckets to shuffle concurrently. Larger values process larger batches by processing multiple bands but might lead to memory pressures and related errors. @@ -219,13 +221,13 @@ def __init__( profile_dir: str, Default None If specified directory to write dask profile """ - self.minhash_length = minhash_length + self.num_hashes = num_hashes self.num_buckets = num_buckets self.id_fields = [id_fields] if isinstance(id_fields, str) else id_fields self.minhash_field = minhash_field self.buckets_per_shuffle = buckets_per_shuffle self.bucket_ranges = self._generate_bucket_ranges( - self.num_buckets, self.minhash_length + self.num_buckets, self.num_hashes ) if cache_dir is None: @@ -245,15 +247,15 @@ def __init__( self._logger = logger def _generate_bucket_ranges( - self, num_buckets: int, minhash_length: int + self, num_buckets: int, num_hashes: int ) -> List[List[int]]: """ Generates a list of indices for the minhash ranges given num_bands & - minhash_length. - eg: num_bands=3, minhash_length=6 + num_hashes. + eg: num_bands=3, num_hashes=6 [[0, 1], [2, 3], [4, 5]] """ - minhashes_per_bucket = minhash_length // num_buckets + minhashes_per_bucket = num_hashes // num_buckets bucket_ranges = [ list( @@ -308,7 +310,7 @@ def _minhash_to_bucket_meta( self, df: dask_cudf.DataFrame ) -> Tuple[cudf.DataFrame, int]: meta = df._meta_nonempty[self.id_fields] - meta[self.minhash_field] = [np.ones(self.minhash_length)] * len(meta) + meta[self.minhash_field] = [np.ones(self.num_hashes)] * len(meta) return self.minhash_to_buckets(meta, self.bucket_ranges) def lsh( @@ -325,7 +327,6 @@ def lsh( bucket_ranges=self.bucket_ranges, meta=meta, ) - bucket_start_id = 0 for i in range(0, self.num_buckets, self.buckets_per_shuffle): value_vars = [ @@ -382,6 +383,154 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return DocumentDataset(buckets_df) +class FuzzyDuplicates: + def __init__( + self, + config: FuzzyDuplicatesConfig, + logger: Union[logging.LoggerAdapter, str] = "./", + ): + """ + Parameters + ---------- + config: FuzzyDuplicatesConfig, + Config options for finding FuzzyDuplicates + logger: Existing logger to log to, or a path to a log directory. + + Returns + ------- + DocumentDataset containing IDs of all documents and the corresponding duplicate group + they belong to. Documents in the same group are near duplicates. + """ + if isinstance(logger, str): + self._logger = create_logger( + rank=0, + log_file=os.path.join(logger, "FuzzyDuplicates.log"), + name="FuzzyDuplicates", + ) + else: + self._logger = logger + + self.config = config + self.minhash = MinHash( + seed=self.config.seed, + num_hashes=self.config.num_hashes, + char_ngrams=self.config.char_ngrams, + use_64bit_hash=self.config.use_64_bit_hash, + logger=self._logger, + id_field=self.config.id_field, + text_field=self.config.text_field, + profile_dir=self.config.profile_dir, + cache_dir=self.config.cache_dir, + ) + self.lsh = LSH( + cache_dir=self.config.cache_dir, + num_hashes=self.config.num_hashes, + num_buckets=self.config.num_buckets, + buckets_per_shuffle=self.config.buckets_per_shuffle, + logger=self._logger, + id_fields=[self.config.id_field], + profile_dir=self.config.profile_dir, + ) + self.map_buckets = _MapBuckets( + id_fields=[self.config.id_field], + text_field=self.config.text_field, + logger=self._logger, + num_anchors=self.config.num_anchors, + ) + self.jaccard_shuffle = _Shuffle( + id_fields=[self.config.id_field], + text_field=self.config.text_field, + logger=self._logger, + profile_dir=self.config.profile_dir, + ) + self.jaccard_compute = JaccardSimilarity( + id_field=self.config.id_field, + text_field=self.config.text_field, + ngram_width=self.config.char_ngrams, + anchor_id_fields=[ + f"anchor_{i}_{self.config.id_field}" + for i in range(self.config.num_anchors) + ], + ) + self.connected_components = ConnectedComponents( + cache_dir=self.config.cache_dir, + jaccard_pairs_path=os.path.join( + self.config.cache_dir, "jaccard_similarity_results.parquet" + ), + id_column=self.config.id_field, + convert_str_ids=False, + jaccard_threshold=self.config.jaccard_threshold, + ) + + def __call__(self, dataset: DocumentDataset): + """ + Parameters + ---------- + dataset: DocumentDataset + The input datset to compute FuzzyDuplicates. Must contain a text and unique id field. + + Returns + ------- + DocumentDataset containing IDs of all documents and the corresponding duplicate group + they belong to. Documents in the same group are near duplicates. + """ + # Minhash + LSH + print("Stage1: Starting Minhash + LSH computation") + minhashLSH = Sequential([self.minhash, self.lsh]) + buckets_df = minhashLSH(dataset) + print("Stage1: Minhash + LSH complete!") + + # Map buckets to lower cardinality distribution + print("Stage2 (False Postive Check): Starting Map_Buckets") + ddf_mapped_buckets_w_anchors = self.map_buckets.map_buckets_with_anchors( + documents_df=dataset.df, buckets_df=buckets_df.df + ) + mapped_buckets_w_anchors_path = os.path.join( + self.config.cache_dir, "anchor_docs_with_bk.parquet" + ) + ddf_mapped_buckets_w_anchors.to_parquet( + mapped_buckets_w_anchors_path, write_index=False + ) + print("Stage2 (False Postive Check): Map_Buckets Complete!") + + # Shuffle documents based on mapped buckets + print("Stage3 (False Postive Check): Shuffle docs") + shuffled_docs_path = os.path.join( + self.config.cache_dir, "shuffled_docs.parquet" + ) + self.jaccard_shuffle.shuffle_docs_on_buckets( + documents_df=dataset.df, + bucket_w_anchors_path=mapped_buckets_w_anchors_path, + output_shuffled_docs_path=shuffled_docs_path, + bucket_mapping_df_blocksize=256, + parts_per_worker=1, + bucket_parts_per_worker=8, + ) + print("Stage3 (False Postive Check): Shuffle docs complete!") + + # jaccard comparision within buckets + print("Stage4 (False Postive Check): Jaccard Similarity in Buckets") + jaccard_pairs_path = os.path.join( + self.config.cache_dir, "jaccard_similarity_results.parquet" + ) + jaccard_pairs_df = self.jaccard_compute.jaccard_compute( + shuffled_docs_path=shuffled_docs_path + ) + jaccard_pairs_df.to_parquet( + jaccard_pairs_path, + write_index=False, + write_metadata_file=False, + ) + print("Stage4 (False Postive Check): Jaccard Similarity in Buckets Complete!") + + # Connected components across buckets + print("Stage5: Connected Components across buckets") + cc_path = os.path.join(self.config.cache_dir, "connected_components.parquet") + self.connected_components.cc_workflow(cc_path) + print("Stage5: Connected Components across buckets complete!") + return DocumentDataset(dask_cudf.read_parquet(cc_path, split_row_groups=False)) + + class _MapBuckets: """ buckets to a logical partition by using a modified bin packing algorithm. @@ -508,6 +657,7 @@ def _get_output_map_based_on_str_bytes( """ Add output_partition_id to buckets_ddf """ + documents_df = documents_df.copy() documents_df[bytes_column] = documents_df[self.text_field].map_partitions( lambda s: s.str.byte_count() ) @@ -620,7 +770,7 @@ def map_buckets_with_anchors( ddf_anchor_docs_with_bk, self.id_fields, ignore_index=True, - shuffle=shuffle_type, + shuffle_method=shuffle_type, ).map_partitions( M.drop_duplicates, meta=ddf_anchor_docs_with_bk._meta, @@ -1195,7 +1345,7 @@ def _write_dedup_encoded_jaccard_pair(self, encoded_jaccard_pair_path): ddf, [self.left_id, self.right_id], ignore_index=True, - shuffle="tasks", + shuffle_method="tasks", ) ddf = ddf.map_partitions( M.drop_duplicates, @@ -1301,12 +1451,12 @@ def _batched_merge_and_write( how="inner", broadcast=True, ) + subset_ddf = subset_ddf.drop( + columns=pair_ids, + ) subset_ddf = subset_ddf.rename( columns={"uid": f"{self.id_column}_{tag}"} ) - subset_ddf = subset_ddf.drop( - columns=[f"dataset_id_{tag}", f"doc_id_{tag}"] - ) subset_ddf = subset_ddf[[self.left_id, self.right_id, "jaccard"]] output_batch_path = os.path.join(output_path, f"{batch_id}.parquet") diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index a0484cf0..21dac27d 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -64,7 +64,7 @@ def main(args): ) lsh = LSH( cache_dir=args.output_bucket_dir, - minhash_length=args.minhash_length, + num_hashes=args.minhash_length, num_buckets=args.num_bands, buckets_per_shuffle=args.buckets_per_shuffle, id_fields=["dataset_id", "doc_id"], diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 00000000..fcb34d29 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,81 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +import pytest +import yaml + +from nemo_curator.modules.config import BaseConfig + + +@dataclass +class CustomConfig(BaseConfig): + a: str + b: int + c: bool + d: float = 3.0 + + def __post_init__(self): + if self.d <= 0: + raise ValueError("d must be positive") + + +class TestConfig: + @pytest.fixture(autouse=True) + def config_params(self): + self.config_dict = {"a": "a", "b": 1, "c": True, "d": 4.0} + + def test_init(self): + config = CustomConfig(a="a", b=1, c=True) + assert config.a == "a" + assert config.b == 1 + assert config.c is True + assert config.d == 3.0 + + def test_from_yaml(self, tmpdir): + with open(tmpdir / "test_config.yaml", "w") as file: + yaml.dump(self.config_dict, file) + + config = CustomConfig.from_yaml(tmpdir / "test_config.yaml") + for key, value in self.config_dict.items(): + assert getattr(config, key) == value + + def test_from_yaml_raises(self, tmpdir): + config_dict = self.config_dict.copy() + config_dict["d"] = -1.0 + with open(tmpdir / "test_config.yaml", "w") as file: + yaml.dump(config_dict, file) + with pytest.raises(ValueError): + CustomConfig.from_yaml(tmpdir / "test_config.yaml") + + def test_from_yaml_missing_key(self, tmpdir): + config_dict = self.config_dict.copy() + del config_dict["a"] + with open(tmpdir / "test_config.yaml", "w") as file: + yaml.dump(config_dict, file) + with pytest.raises(TypeError): + CustomConfig.from_yaml(tmpdir / "test_config.yaml") + + def test_from_yaml_extra_key(self, tmpdir): + config_dict = self.config_dict.copy() + config_dict["e"] = "e" + with open(tmpdir / "test_config.yaml", "w") as file: + yaml.dump(config_dict, file) + with pytest.raises(TypeError): + CustomConfig.from_yaml(tmpdir / "test_config.yaml") + + def test_post_init_raises(self): + with pytest.raises(ValueError): + CustomConfig(a="a", b=1, c=True, d=-1.0) diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index f0ded450..e89f998e 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -18,14 +18,17 @@ import numpy as np import pytest +import yaml from dask.dataframe.utils import assert_eq +from distributed import Client +from nemo_curator import LSH, FuzzyDuplicates, FuzzyDuplicatesConfig, MinHash from nemo_curator.datasets import DocumentDataset -from nemo_curator.modules import LSH, MinHash -from nemo_curator.utils.import_utils import gpu_only_import +from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from cudf = gpu_only_import("cudf") dask_cudf = gpu_only_import("dask_cudf") +LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster") @pytest.fixture @@ -46,6 +49,25 @@ def fuzzy_dedup_data(): return DocumentDataset(df) +@pytest.fixture +def large_fuzzy_dedup_data(): + df = cudf.DataFrame( + { + "id": np.arange(500), + "text": [ + "A test string", + "A different test string", + "A different object", + "The quick brown fox jumps over the lazy dog", + "The quick black cat jumps over the lazy dog", + ] + * 100, + } + ) + df = dask_cudf.from_cudf(df, 5).reset_index(drop=True) + return DocumentDataset(df) + + def minhash_overlap(minhash1: np.array, minhash2: np.array): assert len(minhash1) == len(minhash2) overlap = sum(minhash1 == minhash2) @@ -149,7 +171,7 @@ def minhash_data(self): def test_lsh(self, tmpdir, buckets_per_shuffle): lsh = LSH( cache_dir=tmpdir, - minhash_length=6, + num_hashes=6, num_buckets=3, buckets_per_shuffle=buckets_per_shuffle, minhash_field="minhash_sig", @@ -164,7 +186,7 @@ def test_lsh(self, tmpdir, buckets_per_shuffle): def test_multiple_id_cols(self, tmpdir): lsh = LSH( cache_dir=tmpdir, - minhash_length=6, + num_hashes=6, num_buckets=3, buckets_per_shuffle=1, id_fields=["id", "dataset_id"], @@ -180,3 +202,168 @@ def test_multiple_id_cols(self, tmpdir): [[(1, 1), (1, 2)], [(1, 2), (2, 3)], [(3, 4), (4, 5)]], name="new_id" ) assert_eq(expected_df, docs_list, check_index=False) + + +@pytest.mark.gpu +class TestFuzzyDuplicates: + @pytest.fixture(autouse=True, scope="class") + def gpu_client(self, request): + with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client: + request.cls.client = client + request.cls.cluster = cluster + yield + + @pytest.mark.parametrize("use_64_bit_hash", [False, True]) + @pytest.mark.parametrize( + "num_buckets,jaccard_threshold,duplicate_docs", + # Duplcated docs estimated from true_jaccard values + [ + (5, 0.5, [[4, -1]]), + (10, 0.39, [[4, -1], [1, 2]]), + (3, 0.3, [[4, -1], [1, 2, 300]]), + ], + ) + def test_fuzzy_dedup( + self, + fuzzy_dedup_data, + use_64_bit_hash, + num_buckets, + jaccard_threshold, + duplicate_docs, + tmpdir, + ): + print(self.client) + # Dedup might fail when indices per partition do not start from 0 + fuzzy_dedup_data.df = fuzzy_dedup_data.df.reset_index(drop=True) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=num_buckets, + hashes_per_bucket=1, + use_64_bit_hash=use_64_bit_hash, + buckets_per_shuffle=5, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=jaccard_threshold, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(fuzzy_dedup_data) + result_df = result.df.compute() + # Drop non duplicated docs + result_df = result_df[result_df.group.duplicated(keep=False)] + result_df = result_df.groupby("group").id.collect() + # Sort to maintain uniform ordering + + result_df = result_df.list.sort_values() + result_df = result_df.sort_values() + expected_df = cudf.Series(duplicate_docs, name="id") + expected_df = expected_df.list.sort_values() + expected_df = expected_df.sort_values() + assert_eq(expected_df, result_df, check_index=False) + + @pytest.mark.xfail + def test_non_uniform_indices( + self, + tmpdir, + ): + print(self.client) + # Dedup might fail when indices per partition do not start from 0 + df = cudf.DataFrame( + { + "id": [1, 2, 300, 4, -1], + "text": [ + "A test string", + "A different test string", + "A different object", + "The quick brown fox jumps over the lazy dog", + "The quick black cat jumps over the lazy dog", + ], + } + ) + df = dask_cudf.from_cudf(df, 2) + data = DocumentDataset(df) + duplicate_docs = [[4, -1], [1, 2, 300]] + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=10, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=True, + num_anchors=2, + jaccard_threshold=0.39, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(data) + result_df = result.df.compute() + # Drop non duplicated docs + result_df = result_df[result_df.group.duplicated(keep=False)] + result_df = result_df.groupby("group").id.collect() + # Sort to maintain uniform ordering + + result_df = result_df.list.sort_values() + result_df = result_df.sort_values() + expected_df = cudf.Series(duplicate_docs, name="id") + expected_df = expected_df.list.sort_values() + expected_df = expected_df.sort_values() + assert_eq(expected_df, result_df, check_index=False) + + @pytest.mark.parametrize("num_anchors", [1, 3, 10]) + def test_num_anchors(self, large_fuzzy_dedup_data, num_anchors, tmpdir): + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=5, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=True, + num_anchors=num_anchors, + jaccard_threshold=0.39, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + fuzzy_duplicates(large_fuzzy_dedup_data) + anchor_docs_df_cols = dask_cudf.read_parquet( + tmpdir / "anchor_docs_with_bk.parquet" + ).columns + assert all(f"anchor_{i}_id" in anchor_docs_df_cols for i in range(num_anchors)) + + +class TestFuzzyDuplicatesConfig: + def test_bad_inputs(self, tmpdir): + with pytest.raises(ValueError): + FuzzyDuplicatesConfig(cache_dir=tmpdir, num_anchors=0) + with pytest.warns( + UserWarning, match="Using a higher number of anchor docs might" + ): + FuzzyDuplicatesConfig(cache_dir=tmpdir, num_anchors=3) + with pytest.raises(ValueError): + FuzzyDuplicatesConfig(cache_dir=tmpdir, jaccard_threshold=1.2) + with pytest.raises(NotImplementedError): + FuzzyDuplicatesConfig(cache_dir=tmpdir, false_positive_check=False) + with pytest.raises(ValueError): + FuzzyDuplicatesConfig(cache_dir=tmpdir, buckets_per_shuffle=0) + + def test_from_yaml(self, tmpdir): + yaml_params = { + "cache_dir": "./", + "num_anchors": 2, + "jaccard_threshold": 0.8, + "false_positive_check": True, + "buckets_per_shuffle": 1, + } + with open(tmpdir / "config.yaml", "w") as f: + yaml.dump(yaml_params, f) + config = FuzzyDuplicatesConfig.from_yaml(tmpdir / "config.yaml") + for param in yaml_params: + assert getattr(config, param) == yaml_params[param]