diff --git a/tutorials/README.md b/tutorials/README.md index 860f8fb9..e63ab5e2 100644 --- a/tutorials/README.md +++ b/tutorials/README.md @@ -7,6 +7,7 @@ To get started, we recommend starting with the following tutorials to become fam 2. **[peft-curation](./peft-curation)**, which overviews operations suitable for curating small-scale datasets which are used for task-specific fine-tuning. 3. **[synthetic-data-hello-world](./synthetic-data-hello-world)**, which overviews basic synthetic data generation facilities for interfacing with external models such as [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct). 4. **[peft-curation-with-sdg](./peft-curation-with-sdg)**, which combines data processing opeartions and synthetic data generation using [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct) or [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) into a single pipeline. Additionally, this tutorial also demonstrates advanced functions such as reward score assignment via [Nemotron-4 340B Reward](https://build.nvidia.com/nvidia/nemotron-4-340b-reward), as well as semantic deduplication to remove semantically similar real or synthetic records. +5. **[pretraining-data-curation](./pretraining-data-curation/)**, which overviews data curation pipeline for creating LLM pretraining dataset at scale. ## List of Tutorials @@ -15,6 +16,7 @@ To get started, we recommend starting with the following tutorials to become fam | Tutorial | Description | Additional Resources | | --- | --- | --- | +| [pretraining-data-curation](./pretraining-data-curation/) | Demonstrates accelerated pipeline for curating large-scale data for LLM pretraining in a distributed environment | | | [dapt-curation](./dapt-curation) | Data curation sample for domain-adaptive pre-training (DAPT), focusing on [ChipNeMo](https://blogs.nvidia.com/blog/llm-semiconductors-chip-nemo/) data curation as an example | [Blog post](https://developer.nvidia.com/blog/streamlining-data-processing-for-domain-adaptive-pretraining-with-nvidia-nemo-curator/) | | [distributed_data_classification](./distributed_data_classification) | Demonstrates data domain and data quality classification at scale in a distributed environment | | | [nemotron_340B_synthetic_datagen](./nemotron_340B_synthetic_datagen) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct) for generating synthetic preference data | | diff --git a/tutorials/pretraining-data-curation/README.md b/tutorials/pretraining-data-curation/README.md new file mode 100644 index 00000000..d3637281 --- /dev/null +++ b/tutorials/pretraining-data-curation/README.md @@ -0,0 +1,11 @@ +# RedPajama-Data-v2 Datasets Curation for LLM Pretraining + +This tutorial demonstrates the usage of NeMo Curator to curate the RedPajama-Data-v2 dataset for LLM pretraining in a distributed environment. + +## RedPajama-Data-v2 +RedPajama-V2 (RPV2) is an open dataset for training large language models. The dataset includes over 100B text documents coming from 84 CommonCrawl snapshots and processed using the CCNet pipeline. In this tutorial, we will be perform data curation on two raw snapshots from RPV2 for demonstration purposes. + +## Getting Started +This tutorial is designed to run in multi-node environment due to the pre-training dataset scale. To start the tutorial, run the slurm script `start-distributed-notebook.sh` in this directory which will start the Jupyter notebook that demonstrates the step by step walkthrough of the end to end curation pipeline. To access the Jupyter notebook running on the scheduler node from your local machine, you can establish an SSH tunnel by running the following command: + +`ssh -L :localhost:8888 @` diff --git a/tutorials/pretraining-data-curation/config/heuristic_filter_en.yaml b/tutorials/pretraining-data-curation/config/heuristic_filter_en.yaml new file mode 100644 index 00000000..ee0bd4f5 --- /dev/null +++ b/tutorials/pretraining-data-curation/config/heuristic_filter_en.yaml @@ -0,0 +1,34 @@ +input_field: raw_content +filters: + - name: nemo_curator.filters.heuristic_filter.NonAlphaNumericFilter + params: + max_non_alpha_numeric_to_text_ratio: 0.25 + - name: nemo_curator.filters.heuristic_filter.SymbolsToWordsFilter + params: + max_symbol_to_word_ratio: 0.1 + - name: nemo_curator.filters.heuristic_filter.NumbersFilter + params: + max_number_to_text_ratio: 0.15 + - name: nemo_curator.filters.heuristic_filter.UrlsFilter + params: + max_url_to_text_ratio: 0.2 + - name: nemo_curator.filters.heuristic_filter.WhiteSpaceFilter + params: + max_white_space_ratio: 0.25 + - name: nemo_curator.filters.heuristic_filter.ParenthesesFilter + params: + max_parentheses_ratio: 0.1 + - name: nemo_curator.filters.heuristic_filter.BoilerPlateStringFilter + params: + remove_if_at_top_or_bottom: True + max_boilerplate_string_ratio: 0.4 + - name: nemo_curator.filters.heuristic_filter.RepeatedLinesFilter + params: + max_repeated_line_fraction: 0.7 + - name: nemo_curator.filters.heuristic_filter.RepeatedParagraphsFilter + params: + max_repeated_paragraphs_ratio: 0.7 + - name: nemo_curator.filters.heuristic_filter.WordCountFilter + params: + min_words: 50 + max_words: 100000 diff --git a/tutorials/pretraining-data-curation/container-entrypoint.sh b/tutorials/pretraining-data-curation/container-entrypoint.sh new file mode 100644 index 00000000..55068f2f --- /dev/null +++ b/tutorials/pretraining-data-curation/container-entrypoint.sh @@ -0,0 +1,75 @@ +#! /bin/bash + +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Start the scheduler on the rank 0 node +if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then + echo "Starting scheduler" + if [[ $DEVICE == 'cpu' ]]; then + dask scheduler \ + --scheduler-file $SCHEDULER_FILE \ + --protocol $PROTOCOL \ + --interface $INTERFACE >> $SCHEDULER_LOG 2>&1 & + fi + if [[ $DEVICE == 'gpu' ]]; then + DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True \ + DASK_DISTRIBUTED__RMM__POOL_SIZE=$RMM_SCHEDULER_POOL_SIZE \ + dask scheduler \ + --scheduler-file $SCHEDULER_FILE \ + --protocol $PROTOCOL \ + --interface $INTERFACE >> $SCHEDULER_LOG 2>&1 & + fi +fi + +# Wait for the scheduler to start +sleep 30 + +# Start the workers on each node +echo "Starting workers..." + +export WORKER_LOG=$LOGDIR/worker_${SLURM_NODEID}-${SLURM_LOCALID}.log +if [[ $DEVICE == 'cpu' ]]; then + dask worker \ + --scheduler-file $SCHEDULER_FILE \ + --memory-limit $CPU_WORKER_MEMORY_LIMIT \ + --nworkers $CPU_WORKER_PER_NODE \ + --interface $INTERFACE >> $WORKER_LOG 2>&1 & +fi +if [[ $DEVICE == 'gpu' ]]; then + dask-cuda-worker \ + --scheduler-file $SCHEDULER_FILE \ + --rmm-pool-size $RMM_WORKER_POOL_SIZE \ + --interface $INTERFACE \ + --rmm-async >> $WORKER_LOG 2>&1 & +fi + +# Wait for the workers to start +sleep 30 + +# Extract the the scheduler address and export it as an environment variable +export SCHEDULER_ADDRESS=$(jq -r '.address' "$SCHEDULER_FILE") +echo "SCHEDULER_ADDRESS=$SCHEDULER_ADDRESS" + +if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then + echo "Starting notebook" + bash -c "jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --NotebookApp.token='' --NotebookApp.password='' --notebook-dir=${BASE_DIR}" + touch $DONE_MARKER +fi + +# All nodes wait until done to keep the workers and scheduler active +while [ ! -f $DONE_MARKER ] +do + sleep 15 +done diff --git a/tutorials/pretraining-data-curation/helper.py b/tutorials/pretraining-data-curation/helper.py new file mode 100644 index 00000000..da0e9bde --- /dev/null +++ b/tutorials/pretraining-data-curation/helper.py @@ -0,0 +1,66 @@ +import gzip +import json +import os + +import cudf +import dask.bag as db + + +def convert_single_file(input_output_paths): + input_path, output_path = input_output_paths + + with gzip.open(input_path, "rt", encoding="utf-8") as f_in: + with open(output_path, "w", encoding="utf-8") as f_out: + for line in f_in: + try: + # Parse each line as a separate JSON object + item = json.loads(line) + # Write the JSON object to the .jsonl file + json.dump(item, f_out) + f_out.write("\n") + except json.JSONDecodeError as e: + print(f"Error decoding JSON in file {input_path}: {e}") + continue + + +def convert_json_gz_to_jsonl(input_dir, output_dir, partition_size=2): + # Ensure the output directory exists + os.makedirs(output_dir, exist_ok=True) + + # List all .json.gz files in the input directory + file_paths = [] + for filename in os.listdir(input_dir): + if filename.endswith(".json.gz"): + input_path = os.path.join(input_dir, filename) + output_filename = ( + os.path.splitext(os.path.splitext(filename)[0])[0] + ".jsonl" + ) + output_path = os.path.join(output_dir, output_filename) + file_paths.append((input_path, output_path)) + + # Create a Dask bag from the file paths and apply the function in parallel + bag = db.from_sequence(file_paths, partition_size=partition_size) + bag.map(convert_single_file).compute() + + +def convert_str_id_to_int(df, id_column="id"): + """ + Converts the legacy id format "dataset_name-0000034" + type of ID into 2 int based ID's + """ + dx = df[id_column].str.rsplit("-", n=1, expand=True) + df["doc_id"] = dx[1].astype("int64").values + df["dataset_id"] = dx[0].hash_values() + return df + + +def get_dataframe_complement(original_df, filtered_df): + def partition_complement(part_original_df, partition_info=None): + if not partition_info: + return part_original_df + part_filtered_df = filtered_df.get_partition(partition_info["number"]) + complement_mask = ~part_original_df.index.isin(part_filtered_df.index.persist()) + complement_df = part_original_df[complement_mask] + return complement_df + + return original_df.map_partitions(partition_complement) diff --git a/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb new file mode 100644 index 00000000..42c92bfa --- /dev/null +++ b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb @@ -0,0 +1,4882 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8498f9f4-3d9a-481e-80fb-af41f7d7aa7d", + "metadata": {}, + "source": [ + "# Pretraining Data Curation in NeMo Curator" + ] + }, + { + "cell_type": "markdown", + "id": "28d17c49", + "metadata": {}, + "source": [ + "## Table of Contents\n", + "\n", + "1. [Introduction](#introduction)\n", + "2. [Getting Started](#get-start)\n", + "3. [RedPajama-Data-v2](#rpv2)\n", + "4. [Data Preprocessing](#preprocess)\n", + "5. [Deduplication](#dedup)\n", + "6. [Quality filtering](#filter)" + ] + }, + { + "cell_type": "markdown", + "id": "4c55d981", + "metadata": {}, + "source": [ + "# 1. Introduction\n", + "\n", + "\n", + "In this tutorial, we will show how to curate large-scale data for LLM pretraining in a distributed environment using NeMo-Curator. Specifically, we will focus on the following modules in NeMo-Curator:\n", + "\n", + "- Language identification and separation\n", + "- Text reformatting and cleaning\n", + "- Quality filtering\n", + "- Document-level deduplication\n", + "\n", + "For demonstration, we will use the [RedPajama-Data-v2](#rpv2) dataset, an open dataset for LLM pretraining." + ] + }, + { + "cell_type": "markdown", + "id": "520eef06-0edb-4108-a048-af006dea8601", + "metadata": { + "jp-MarkdownHeadingCollapsed": true, + "tags": [] + }, + "source": [ + "## 1.1 System Information\n", + "Here is the information on the system this notebook was run on:\n", + "\n", + "- **GPU**: 2 A100 nodes (each with 8 A100-SXM4-80GB)\n", + "\n", + "- **CUDA & Nvidia Drivers**: CUDA 12.4 with Driver 535.104.12\n", + "\n", + "- **OS**: Ubuntu 22.04.4 LTS\n", + "\n", + "## 1.2 Running NeMo-Curator\n", + "\n", + "NeMo-curator came pre-installed in Nemo Framework container. This notebook use 24.07 release of the NeMo Framework container. User can pull the container following the steps below:\n", + "\n", + "- Get access to the NeMo Framework container on [NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/containers/nemo)\n", + "\n", + "- Set your docker credentials\n", + "\n", + "\n", + " `docker login nvcr.io`\n", + "\n", + " Username: `$oauthtoken`\n", + " \n", + " Password: ``\n", + " \n", + "- Pull the NeMo Framework Container image\n", + " \n", + " `docker pull docker pull nvcr.io/nvidia/nemo:24.07`\n", + "\n", + "Alternatively, NeMo-Curator is also available on [PyPi](https://pypi.org/project/nemo-curator/) and [GitHub](https://github.com/NVIDIA/NeMo-Curator)." + ] + }, + { + "cell_type": "markdown", + "id": "7d57dd35-cce6-4bfa-b34a-fb4a2ea584e0", + "metadata": {}, + "source": [ + "# 2. Getting started\n", + "\n", + "\n", + "NeMo-Curator uses dask for parallelization. Before we start using curator, we need to start a dask cluster. To start a multi-node dask cluster in slurm, we can use the `start-distributed-notebook.sh` script in this directory to start the cluster. The user will need to change the following variables:\n", + "\n", + "- Slurm job directives\n", + "- Device type (`cpu` or `gpu`). Curator has both cpu and gpu modules. Check [here](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/cpuvsgpu.html) to see which modules are cpu/gpu\n", + "- CPU related parameters if using cpu modules. Configure the number of workers and memory limit to efficiently use available computational resources while preventing out of memory\n", + "- Path to the NeMo Framework container image\n", + "- Path to `container-entrypoint.sh` script which is responsible for launching the dask schduler and workers\n", + "\n", + "Running the script will also launch a jupyter lab session on the rank 0 node and pass the dask schduler address as an environment variable that will be used later to connect to the dask client.\n", + "\n", + "The preprocessing modules such as Add ID and Text cleaning are cpu-based so we will start a cpu dask cluster first." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "5de0fe93", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.10/dist-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "import os\n", + "import time\n", + "from dask.distributed import Client\n", + "import warnings\n", + "import dask.dataframe as dd\n", + "import dask_cudf\n", + "import cudf\n", + "import gzip\n", + "import json\n", + "import dask.bag as db\n", + "import glob\n", + "from dask.distributed import wait\n", + "import numpy as np\n", + "\n", + "from nemo_curator import get_client\n", + "from nemo_curator.datasets import DocumentDataset\n", + "from nemo_curator.utils.distributed_utils import (\n", + " get_num_workers,\n", + " read_data,\n", + " write_to_disk,\n", + ")\n", + "from nemo_curator.utils.file_utils import (\n", + " expand_outdir_and_mkdir, \n", + " get_all_files_paths_under, \n", + " separate_by_metadata,\n", + " get_batched_files,\n", + ")\n", + "\n", + "warnings.filterwarnings('ignore')\n", + "base_dir = \"/path/to/data\"" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "8d728cab-d3ad-41b8-aae8-6e6927f7edaf", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Num Workers = 256\n" + ] + } + ], + "source": [ + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", + "cpu_client = get_client(scheduler_address=scheduler_address)\n", + "print(f\"Num Workers = {get_num_workers(cpu_client)}\", flush=True)" + ] + }, + { + "cell_type": "markdown", + "id": "bf008174-a7b6-4a62-b421-0e3d84e305f2", + "metadata": {}, + "source": [ + "# 3. RedPajama-Data-v2\n", + "" + ] + }, + { + "cell_type": "markdown", + "id": "838d6014-a906-42dc-851e-d106d4db8d66", + "metadata": {}, + "source": [ + "RedPajama-V2 (rpv2) is an advanced open-source initiative designed to support the development of large language models (LLMs). This dataset, sourced from 84 CommonCrawl snapshots, spans five major languages—English, French, Spanish, German, and Italian—making it one of the largest and most comprehensive public datasets available for LLM training.\n", + "\n", + "The RedPajama-V2 dataset is available on [Huggingface](https://huggingface.co/datasets/togethercomputer/RedPajama-Data-V2).\n", + "\n", + "For this tutorial, we will start with a single snapshot from rpv2 and then scale to multiple snapshots to demonstrate the pre-training data curation workflow.\n", + "\n", + "The raw rpv2 data is stored in compressed json. We will first decompress the json.gz file and write them into jsonl files. For this, we will use a helper function `convert_json_gz_to_jsonl` in `helper.py`\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "192aafb0-524d-4a5e-85a0-a272f938d3b7", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Uncompressing data took 890.2869493961334 s\n" + ] + } + ], + "source": [ + "from helper import convert_json_gz_to_jsonl\n", + "\n", + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-raw\")\n", + "output_data_dir = os.path.join(base_dir,\"rpv2-2023-06\")\n", + "\n", + "t0 = time.time()\n", + "convert_json_gz_to_jsonl(input_data_dir, output_data_dir)\n", + "print(f\"Uncompressing data took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "markdown", + "id": "629b19e7-c6df-4519-8f82-97798cf39457", + "metadata": {}, + "source": [ + "To get started, we can read the jsonl files into a `DocumentDataset` which is the standard format for text dataset used in curator." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4ad35dcf-cd4b-4157-9bb2-14e90a8123fd", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 15025 files\n" + ] + } + ], + "source": [ + "from nemo_curator.datasets import DocumentDataset\n", + "\n", + "input_dataset = DocumentDataset.read_json(output_data_dir, add_filename=True)" + ] + }, + { + "cell_type": "markdown", + "id": "b1152395-0928-4598-b36d-3a21cc221bf0", + "metadata": {}, + "source": [ + "`DocumentDataset` is essentially a wrapper around dask dataframe and we can get the dataframe by calling `input_dataset.df`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f00fef38-0ae3-494d-9027-766f5c80d883", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "input_dataset.df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "336dc14d-7bcb-4f87-8821-d1fb43cd2a75", + "metadata": {}, + "source": [ + "There are a total of 1,088,468,779 documents in this single snapshot." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "57248718-a5cf-4028-81b8-1f4fa738ef68", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1088468779" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(input_dataset.df)" + ] + }, + { + "cell_type": "markdown", + "id": "5f309b95-5cfa-494c-94ed-b6162777005a", + "metadata": {}, + "source": [ + "# 4. Data Preprocessing\n", + "" + ] + }, + { + "cell_type": "markdown", + "id": "2ea58d4a", + "metadata": {}, + "source": [ + "## 4.1 Data resharding\n", + "\n", + "The input text files have varying sizes, which leads to imbalanced partitions that could result in out-of-memory issues. Ideally, we want to make balanced text files of similar sizes. Curator offers utility to reshard the text files to simiar sizes." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "11494cc4", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Data sharding took:552.2274513244629\n" + ] + } + ], + "source": [ + "from nemo_curator.utils.file_utils import reshard_jsonl\n", + "from nemo_curator.utils.file_utils import expand_outdir_and_mkdir\n", + "\n", + "output_resharded_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-resharded\"))\n", + "\n", + "t0 = time.time()\n", + "reshard_jsonl(\n", + " output_data_dir,\n", + " output_resharded_dir,\n", + " output_file_size=\"100M\",\n", + " start_index=0,\n", + " file_prefix=\"rpv2-2023-06\",\n", + ")\n", + "print(f\"Data sharding took:{time.time()-t0}\")" + ] + }, + { + "cell_type": "markdown", + "id": "22b85d83-fd01-49cd-8618-006cf6806461", + "metadata": {}, + "source": [ + "[Optional] Removing the raw dataset to save disk space:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "f50513f5-f530-400b-a0db-654a6b30bf83", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!rm -rf {base_dir}/rpv2-2023-06" + ] + }, + { + "cell_type": "markdown", + "id": "d065cb25-9515-4c01-89b8-76e4eff6976f", + "metadata": {}, + "source": [ + "## 4.2 Add ID\n", + "\n", + "We will assign a unique ID for each document in the dataset so we can refrence them." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "94403677-2b71-4f84-8dca-77d7482230a8", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator import AddId\n", + "from nemo_curator.datasets import DocumentDataset" + ] + }, + { + "cell_type": "markdown", + "id": "19380a4c-c612-49c3-9b91-02879f53dc65", + "metadata": {}, + "source": [ + "We will create an instance of Curator's `AddId` class and use it to add ID for all documents in the dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "4f35cac5-f084-40cf-a2fd-7d232ddd5ff3", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n", + "Writing to disk complete for 37848 partitions\n", + "Adding ID took :1472.3535017967224\n" + ] + } + ], + "source": [ + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-resharded\")\n", + "input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)\n", + "id_data_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-id\"))\n", + "\n", + "t0 = time.time()\n", + "# specify add_id function\n", + "add_id = AddId(\n", + " id_field=\"id\",\n", + " id_prefix=\"rpv2-2023-06\",\n", + ")\n", + "id_dataset = add_id(input_dataset)\n", + "id_dataset.to_json(id_data_dir, write_to_filename=True)\n", + "print(f\"Adding ID took :{time.time()-t0}\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1c79824-efc0-4895-a85f-03387c0c90ea", + "metadata": {}, + "source": [ + "We can validate the added IDs below:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "908d0a01-96b9-4ad4-9fb7-cf0148640bfd", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "id_dataset.df.head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "a91d3685-c6b8-4042-a8f2-bda664773327", + "metadata": {}, + "source": [ + "[Optional] Remove the sharded dataset to save disk space:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "9fa333bc-2de9-4c56-b8ff-c76ed21298aa", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!rm -rf {base_dir}/rpv2-2023-06-sharded" + ] + }, + { + "cell_type": "markdown", + "id": "b18752f5-b505-415c-960b-be7b396b9b78", + "metadata": {}, + "source": [ + "## 4.3 Language ID and Separation\n", + "\n", + "Data curation usually includes steps that are language specific (e.g. using language-tuned heuristics for quality filtering). NeMo Curator provides utilities to identify languages. The language identification is performed using fastText.\n", + "\n", + "It is worth mentioning that even though a preliminary language identification has been performed on rpv2 and we started with English-only dataset, fastText is more accurate so it can be used for a second pass." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7419a216-0dad-4d13-89ee-c3c1d009efa8", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator import ScoreFilter, Modify\n", + "from nemo_curator.filters import FastTextLangId\n", + "from nemo_curator.modifiers import UnicodeReformatter\n", + "from nemo_curator.utils.file_utils import get_all_files_paths_under, separate_by_metadata\n", + "\n", + "# Language ID path\n", + "language_output_path = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-language\"))\n", + "language_data_output_path = expand_outdir_and_mkdir(os.path.join(language_output_path,\"data\"))\n", + "\n", + "# Fasttext model path\n", + "model_path = language_output_path\n", + "\n", + "# Define key in output .jsonl files to store the language information\n", + "language_field = \"language\"" + ] + }, + { + "cell_type": "markdown", + "id": "82249135-d542-497f-896b-68c19297f434", + "metadata": {}, + "source": [ + "Download the fastText model for langague detection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b7e6c9b-2aa3-4a8b-89ea-6dc4ca585b8d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}" + ] + }, + { + "cell_type": "markdown", + "id": "1ca0e327-4055-48a9-8432-0aa0c21fb2b5", + "metadata": {}, + "source": [ + "We will create an instance of Curator's `ScoreFilter` and use a helper function `separate_by_metadata` to separate the dataset into subfolders based on language." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "a2684904-f73b-4a3b-a2f9-e4ce8725684c", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n", + "Time taken for splitting language:4645.465864896774\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "# Load dataset\n", + "id_data_dir = os.path.join(base_dir,\"rpv2-2023-06-id\")\n", + "input_dataset = DocumentDataset.read_json(id_data_dir, add_filename=True)\n", + "\n", + "# Define Language separation pipeline\n", + "lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))\n", + "language_id_pipeline = ScoreFilter(\n", + " lang_filter, \n", + " score_field=language_field,\n", + " text_field=\"raw_content\",\n", + " score_type='object'\n", + ")\n", + "filtered_dataset = language_id_pipeline(input_dataset)\n", + "\n", + "# drop the detailed classifier score\n", + "filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(\n", + " lambda score: score[1],meta = (language_field, 'object')\n", + " )\n", + "\n", + "# Split the dataset to corresponding language sub-folders\n", + "language_stats = separate_by_metadata(\n", + " filtered_dataset.df, \n", + " language_data_output_path, \n", + " metadata_field=language_field\n", + ").compute()\n", + "\n", + "print(f\"Time taken for splitting language:{time.time()-t0}\")" + ] + }, + { + "cell_type": "markdown", + "id": "8a192a66-3886-41b4-8398-343ed58aa64c", + "metadata": {}, + "source": [ + "The English dataset has 1,088,311,520 documents compared to 1,088,468,779 documents in the raw dataset. This is because the raw dataset is aleady detected and filtered to English dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "07216ded-5a61-46cc-b201-d816fb68d3d9", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + }, + { + "data": { + "text/plain": [ + "1088311520" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "en_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/EN\")\n", + "en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)\n", + "\n", + "len(en_dataset)" + ] + }, + { + "cell_type": "markdown", + "id": "ec5dedde-a9bf-4f1f-a1ac-f5cc4a7aeeea", + "metadata": {}, + "source": [ + "[Optional] Removing the ID'ed data to save disk space:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ed7bfbc-e62c-4a00-abf0-786481c4eb9a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!rm -rf {base_dir}/rpv2-2023-06-id" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09a34f2b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ja_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/JA\")\n", + "ja_dataset = DocumentDataset.read_json(ja_dataset_path, add_filename=True)\n", + "\n", + "ja_dataset.df.head(1)" + ] + }, + { + "cell_type": "markdown", + "id": "950a5a7f", + "metadata": {}, + "source": [ + "## 4.4 Text cleaning\n", + "\n", + "Datasets may have improperly decoded unicode characters. Curator provides utilities to fix improperly decoded unicode characters based on the heuristics defined within the `ftfy` package." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "bcc6c27b-0369-4fe4-83da-2d2c70fc7898", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + } + ], + "source": [ + "import nemo_curator\n", + "from nemo_curator.modifiers import UnicodeReformatter\n", + "\n", + "en_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-language/data/EN\")\n", + "en_dataset = DocumentDataset.read_json(en_dataset_path, add_filename=True)" + ] + }, + { + "cell_type": "markdown", + "id": "b8775bb8-8ef8-4bd7-bdd3-7829cbd0b059", + "metadata": {}, + "source": [ + "Curator offers uses the `modify` method with `UnicodeReformatter` for text cleaning. It requires the following arguments:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "3a6db96d-03fa-4085-a669-7170e4a3de6b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# make directory for cleaned dataset\n", + "output_clean_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\"))\n", + "# specify text field name and file type\n", + "input_text_field = \"raw_content\"\n", + "input_file_type = \"jsonl\"" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "47d8a85d-ab50-4917-9cf1-4dfa12bc1a35", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Writing to disk complete for 37848 partitions\n", + "Text cleaning took 6349.983360290527 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "# specify clearner\n", + "cleaner = nemo_curator.Modify(\n", + " UnicodeReformatter(), \n", + " text_field=input_text_field\n", + ")\n", + "\n", + "# clean dataset and write to disk\n", + "cleaned_dataset = cleaner(en_dataset)\n", + "cleaned_dataset.to_json(output_clean_dir, write_to_filename=True)\n", + "print(f\"Text cleaning took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "markdown", + "id": "2e304da3", + "metadata": {}, + "source": [ + "[Optional] Removing intermediate data to save disk space:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b7ed20eb-6ed9-481b-bf57-0ff6df27bf71", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!rm -rf {base_dir}/rpv2-2023-06-language/data/EN" + ] + }, + { + "cell_type": "markdown", + "id": "90e16b97", + "metadata": {}, + "source": [ + "# 5. Deduplication\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "722ed8f1", + "metadata": {}, + "source": [ + "## 5.1 Exact Deduplication" + ] + }, + { + "cell_type": "markdown", + "id": "0b8120a1-100e-4527-b642-00e8ef298f1b", + "metadata": {}, + "source": [ + "Exact dedup computes a hash for the raw text of each document. Documents with the same hash value will be exact duplicates and will be removed. Curator provides GPU-accelerated exact deduplication using Rapids." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f6dc1754", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator.log import create_logger\n", + "from nemo_curator.modules import ExactDuplicates\n", + "\n", + "def pre_imports():\n", + " import cudf # noqa: F401" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "6532bd31-af79-4d1e-bfb3-5bf432f55ae5", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Num Workers = 16\n", + "Pre imports complete\n" + ] + } + ], + "source": [ + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", + "gpu_client = get_client(scheduler_address=scheduler_address)\n", + "print(f\"Num Workers = {get_num_workers(gpu_client)}\", flush=True)\n", + "\n", + "gpu_client.run(pre_imports)\n", + "print(\"Pre imports complete\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "0e9af962-b0d7-4012-a05a-c8287f8e62d7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cleaned_dataset_path = os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")\n", + "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", + "input_id_field = 'id'\n", + "input_text_field = 'raw_content'\n", + "hash_method = 'md5'\n", + "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-exact-dedup\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "2d139334-1db2-41db-910f-b2a478824e04", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n", + "Exact dedup took:1275.6094808578491\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "# Read the input dataset from the cleaned dataset dir\n", + "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, backend='cudf')\n", + "\n", + "# Perform exact dedup\n", + "exact_dups = ExactDuplicates(\n", + " logger=log_dir,\n", + " id_field=input_id_field,\n", + " text_field=input_text_field,\n", + " hash_method=hash_method,\n", + " cache_dir=output_dir,\n", + ")\n", + "duplicates = exact_dups(dataset=input_dataset)\n", + "print(f\"Exact dedup took:{time.time()-t0}\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "e34d1ee5-7d04-46a4-a480-8fc865d9a2f2", + "metadata": { + "tags": [] + }, + "source": [ + "Exact deduplication found 97,327,867 duplicated documents." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "dd7b6a1d-c06e-4e20-a90e-b7c0a2ab41aa", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of exact duplicated file:97327867\n" + ] + } + ], + "source": [ + "print(f\"Number of exact duplicated file:{len(duplicates)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "ddca7f66-4020-4f37-a8dc-200517dde329", + "metadata": {}, + "source": [ + "Let's see the results of exact dedup:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "9ef1205a-981f-48d1-8e2e-53762e33a0da", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
id_hashes
0rpv2-2023-06-05435006715bb014b8aca49d2d2a46925b63c09f7f
1rpv2-2023-06-17212003150dba141f62e01ffedde20dd6bf28df50
2rpv2-2023-06-19898000991e33a4ffce3154c8275ed09ff8049e1a
3rpv2-2023-06-257870062911608d5ffe62efb623abdcb813f0827a
4rpv2-2023-06-3538600607cb72ac618d7a6e60cf7d012c6be82672
\n", + "
" + ], + "text/plain": [ + " id _hashes\n", + "0 rpv2-2023-06-0543500671 5bb014b8aca49d2d2a46925b63c09f7f\n", + "1 rpv2-2023-06-1721200315 0dba141f62e01ffedde20dd6bf28df50\n", + "2 rpv2-2023-06-1989800099 1e33a4ffce3154c8275ed09ff8049e1a\n", + "3 rpv2-2023-06-2578700629 11608d5ffe62efb623abdcb813f0827a\n", + "4 rpv2-2023-06-3538600607 cb72ac618d7a6e60cf7d012c6be82672" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "duplicates_df = duplicates.df\n", + "duplicates_df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "c3df3643", + "metadata": {}, + "source": [ + "We can sort the duplicate cluster by size and see that the largest cluster has 1,819 exact duplicates." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "4ac54290-dc1d-4f60-b768-f162d338ca47", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
count
_hashes
b7ba44a047ca570585d182d28d1e6bf81819
0469bde3868757d92af369c59992b9d91785
bdc1e82cba718a4717c683bf6a5541bd1784
f14149344e6519beaac2590b0535d2671771
f88eb7064d8e73c081af0731ba73c4511765
\n", + "
" + ], + "text/plain": [ + " count\n", + "_hashes \n", + "b7ba44a047ca570585d182d28d1e6bf8 1819\n", + "0469bde3868757d92af369c59992b9d9 1785\n", + "bdc1e82cba718a4717c683bf6a5541bd 1784\n", + "f14149344e6519beaac2590b0535d267 1771\n", + "f88eb7064d8e73c081af0731ba73c451 1765" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "duplicates_df.groupby('_hashes') \\\n", + " .agg({'id': 'count'}) \\\n", + " .rename(columns={'id': 'count'}) \\\n", + " .sort_values('count', ascending=False) \\\n", + " .head()" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "8b38f5a6-6f48-4081-a717-fb9a1b5e9539", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
id_hashes
1rpv2-2023-06-0962900660b7ba44a047ca570585d182d28d1e6bf8
5rpv2-2023-06-2417100276b7ba44a047ca570585d182d28d1e6bf8
8rpv2-2023-06-2936200328b7ba44a047ca570585d182d28d1e6bf8
9rpv2-2023-06-1423100927b7ba44a047ca570585d182d28d1e6bf8
16rpv2-2023-06-2499600613b7ba44a047ca570585d182d28d1e6bf8
\n", + "
" + ], + "text/plain": [ + " id _hashes\n", + "1 rpv2-2023-06-0962900660 b7ba44a047ca570585d182d28d1e6bf8\n", + "5 rpv2-2023-06-2417100276 b7ba44a047ca570585d182d28d1e6bf8\n", + "8 rpv2-2023-06-2936200328 b7ba44a047ca570585d182d28d1e6bf8\n", + "9 rpv2-2023-06-1423100927 b7ba44a047ca570585d182d28d1e6bf8\n", + "16 rpv2-2023-06-2499600613 b7ba44a047ca570585d182d28d1e6bf8" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dup_group = duplicates_df[duplicates_df['_hashes'] == 'b7ba44a047ca570585d182d28d1e6bf8'].compute()\n", + "dup_group.head()" + ] + }, + { + "cell_type": "markdown", + "id": "5cc83333-b19b-4335-92ef-6bcc29f3d7bf", + "metadata": {}, + "source": [ + "[Optional] Verify if the documents with the same hash are exactly the same. We can use the ids from the cell output above (ids may change so revise the `dup_ids` as needed):" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "ab1a6018-dead-4d22-b496-87b5afe56e7a", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Searching for example duplicates with specific IDs took 631.4109137058258 seconds\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "dup_ids = ['rpv2-2023-06-0962900660', 'rpv2-2023-06-2417100276', 'rpv2-2023-06-2936200328'] \n", + "dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()\n", + "print(f\"Searching for example duplicates with specific IDs took {time.time()-t0} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a62c96e2-cb2e-40ac-9f94-5aedb32e91c0", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "dup_examples" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9876a2e1-ba4e-43a9-9cfe-5035c6e98ab2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print('Example duplicate 1\\n' + dup_examples.raw_content.iloc[0])\n", + "print('\\n\\nExample duplicate 2\\n' + dup_examples.raw_content.iloc[1])\n", + "print('\\n\\nExample duplicate 3\\n' + dup_examples.raw_content.iloc[2])" + ] + }, + { + "cell_type": "markdown", + "id": "d0f83e72-ac59-4b45-b0e6-be9144fe935e", + "metadata": {}, + "source": [ + "Now, we will remove the exact duplicates and write the remaining dataset to disk." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "49aae6f4-bfea-479e-ba3b-dbbc3321ae87", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n", + "Reading 1 files\n" + ] + } + ], + "source": [ + "input_dataset = DocumentDataset.read_json(cleaned_dataset_path, add_filename=True, backend='cudf')\n", + "duplicates = DocumentDataset.read_parquet(os.path.join(output_dir,\"_exact_duplicates.parquet\"), backend='cudf')\n", + "duplicates_df = duplicates.df" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "01d8c63a-fa8c-4b26-9978-d162fef8bd2b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Writing to disk complete for 37848 partitions\n", + "Removing exact duplicates took:1563.168622970581\n" + ] + } + ], + "source": [ + "output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-exact-dup-removed\"))\n", + "\n", + "t0 = time.time()\n", + "docs_to_remove = duplicates_df.map_partitions(\n", + " lambda x: x[x._hashes.duplicated(keep=\"first\")]\n", + ")\n", + "\n", + "# When there are few duplicates we can compute the results to a list and use `isin`.\n", + "result = input_dataset.df[\n", + " ~input_dataset.df[input_id_field].isin(\n", + " docs_to_remove[input_id_field].compute()\n", + " )\n", + "]\n", + "\n", + "write_to_disk(\n", + " result,\n", + " output_dir,\n", + " write_to_filename=True,\n", + " output_type='jsonl',\n", + ")\n", + "\n", + "print(f\"Removing exact duplicates took:{time.time()-t0}\")" + ] + }, + { + "cell_type": "markdown", + "id": "f6ad08cf-f114-4665-b03a-1778347ae636", + "metadata": {}, + "source": [ + "We can see that exact dedup removed 70,675,782 documents and we now have 1,017,635,738 documents left in the dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "a37ceaa7-9d7c-49ab-a503-abe3be30861e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "70675782" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(docs_to_remove)" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "a018fbae-de99-4624-9027-de0b8b52e236", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1017635738" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(result)" + ] + }, + { + "cell_type": "markdown", + "id": "fb9fb70d-51ea-48f9-8427-f1a42d049625", + "metadata": {}, + "source": [ + "## 5.2 Fuzzy Deduplication\n", + "\n", + "Fuzzy deduplication aims to find near-duplicated documents in our dataset. Near-duplicated documents are common in web crawl data due to plagiarism and mirror sites. Removing them can help improve the quality of trained models. In many cases, we can skip exact dedup and just perform fuzzy dedup as it will also find the exact duplicates. Thus, we will start with the cleaned dataset for fuzzy dedup.\n", + "\n", + "Curator implements GPU-accelerated Fuzzy Deduplication based on minhash + LSH algorithm for finding similar documents across the dataset. Specifically, Fuzzy Deduplication include six steps:\n", + "\n", + "- Compute minhashes\n", + "- Locality-Sensitive Hashing (LSH)\n", + "- Map buckets\n", + "- Jaccard shuffle\n", + "- Jaccard compute\n", + "- Connected components\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "750b1c02-2b37-474f-aaa2-2de86ac3a9e7", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Num Workers = 16\n", + "Pre imports complete\n" + ] + } + ], + "source": [ + "def pre_imports():\n", + " import cudf # noqa: F401\n", + "\n", + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", + "gpu_client = get_client(scheduler_address=scheduler_address)\n", + "print(f\"Num Workers = {get_num_workers(gpu_client)}\", flush=True)\n", + "\n", + "gpu_client.run(pre_imports)\n", + "print(\"Pre imports complete\")" + ] + }, + { + "cell_type": "markdown", + "id": "eb279812-e11f-4e5e-bc38-cd4189201be9", + "metadata": {}, + "source": [ + "### 5.2.1 Compute minhashes\n", + "\n", + "First, we will compute the minhash signature for each documents. For this purpose, each document will be represented by a set of n-grams. We will apply random hash functions on each element of the set. The minimum hash value generated by each hash function will be recorded and becomes a component of the MinHash signature. Thus, the length of the minhash signature will be the same as the number of hash functions. " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "8ad04e90-a1b6-4881-a29a-dc0c63b026bc", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator import MinHash\n", + "\n", + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")\n", + "seed = 42\n", + "minhash_length = 260\n", + "char_ngram = 5\n", + "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-minhash\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "a42f11d4-3b53-4d64-8f47-dd834f6e1312", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + } + ], + "source": [ + "files = get_all_files_paths_under(root=input_data_dir, recurse_subdirectories=False)\n", + "files = [f for f in files if f.endswith(\".jsonl\")]\n", + "df = read_data(\n", + " files,\n", + " file_type=\"jsonl\",\n", + " backend=\"cudf\",\n", + " files_per_partition=1,\n", + " add_filename=False,\n", + ")[[id_field, text_field]]" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "897b88bf-f49f-46a0-b19d-c3688cb056f2", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing minhashes took:5161.864866495132\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "# Run MinHash() on input data\n", + "minhasher = MinHash(\n", + " seed=seed,\n", + " num_hashes=minhash_length,\n", + " char_ngrams=char_ngram,\n", + " use_64bit_hash=False,\n", + " logger=log_dir,\n", + " id_field=id_field,\n", + " text_field=text_field,\n", + " cache_dir=minshah_output_dir\n", + ")\n", + "\n", + "result = minhasher(DocumentDataset(df)).df\n", + "\n", + "print(f\"Computing minhashes took:{time.time()-t0}\")" + ] + }, + { + "cell_type": "markdown", + "id": "8a198426-29d2-4e60-9468-e2839d634d18", + "metadata": {}, + "source": [ + "We can see some example outputs from the minhash computation." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "7e5358f6-12c5-4ffb-ad0d-19af893670ca", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
id_minhash_signature
0rpv2-2023-06-0000000000[56978, 157261, 839276, 103231, 51779, 396833,...
1rpv2-2023-06-0000100000[4644772, 2991701, 2571423, 12369524, 50603761...
2rpv2-2023-06-0000200000[1312196, 17635, 1520869, 3337920, 2052016, 10...
3rpv2-2023-06-0000300000[5374828, 2268627, 4903126, 2134671, 1828983, ...
4rpv2-2023-06-0000400000[4999022, 2320370, 2068984, 3469276, 621627, 5...
\n", + "
" + ], + "text/plain": [ + " id _minhash_signature\n", + "0 rpv2-2023-06-0000000000 [56978, 157261, 839276, 103231, 51779, 396833,...\n", + "1 rpv2-2023-06-0000100000 [4644772, 2991701, 2571423, 12369524, 50603761...\n", + "2 rpv2-2023-06-0000200000 [1312196, 17635, 1520869, 3337920, 2052016, 10...\n", + "3 rpv2-2023-06-0000300000 [5374828, 2268627, 4903126, 2134671, 1828983, ...\n", + "4 rpv2-2023-06-0000400000 [4999022, 2320370, 2068984, 3469276, 621627, 5..." + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result.head()" + ] + }, + { + "cell_type": "markdown", + "id": "ec27e094-c097-48e3-aeae-446acc01b965", + "metadata": {}, + "source": [ + "### 5.2.2 Minhash LSH" + ] + }, + { + "cell_type": "markdown", + "id": "dcc1c210-2670-48b7-90d7-8105e274e1a8", + "metadata": {}, + "source": [ + "LSH() implements LSH algorithm which includes the following steps:\n", + "\n", + "- Divide the minhash signature array into X different portions.\n", + "\n", + "- For each portions, hash the minhash values into buckets. One document will be assigned to X buckets.\n", + "\n", + "- Documents within the same bucket will be deemed similar. Since every document will be assigned X buckets and as long as two documents share 1 or more buckets they are deemed similar, the result of LSH will have more false positive as compared to false negative. The false positive cases will be filtered in following modules, namely jaccard compute." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "89b62629-6d54-43ce-8995-49a89d89859f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator import LSH\n", + "from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int\n", + "\n", + "lsh_input_dir = os.path.join(base_dir,\"rpv2-2023-06-minhash\")\n", + "id_field = 'id'\n", + "output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06\"))\n", + "num_bands = 20\n", + "buckets_per_shuffle = 1\n", + "minhash_field = '_minhash_signature'\n", + "minhash_length = 260\n", + "log_dir = os.path.join(base_dir, \"logs\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7ccb7591-e5ef-482a-b226-a612641f90d0", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "LSH took 6116.864866495132 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "#Load MinHash output\n", + "df = dask_cudf.read_parquet(lsh_input_dir, blocksize=\"2GB\", aggregate_files=True)\n", + "df = df.map_partitions(\n", + " convert_str_id_to_int,\n", + " id_column=id_field,\n", + " meta=cudf.DataFrame(\n", + " {minhash_field: [[1, 2, 3]], \"doc_id\": [1], \"dataset_id\": np.uint32(1)}\n", + " ),\n", + ")\n", + "\n", + "lsh = LSH(\n", + " cache_dir=output_bucket_dir,\n", + " num_hashes=minhash_length,\n", + " num_buckets=num_bands,\n", + " buckets_per_shuffle=buckets_per_shuffle,\n", + " id_fields=[\"dataset_id\", \"doc_id\"],\n", + " minhash_field=minhash_field,\n", + " logger=log_dir,\n", + ")\n", + "\n", + "lsh_result = lsh(DocumentDataset(df))\n", + "print(f\"LSH took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "0d14ba2f-cfb8-448e-9bb8-34af6005ba15", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dataset_iddoc_id_bucket_id
025621391340430734639666
1256213913993005579456
22562139132550185011694
3256213913209210262418675
4256213913167721045053186
\n", + "
" + ], + "text/plain": [ + " dataset_id doc_id _bucket_id\n", + "0 256213913 404307346 39666\n", + "1 256213913 993005579 456\n", + "2 256213913 25501850 11694\n", + "3 256213913 2092102624 18675\n", + "4 256213913 1677210450 53186" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "lsh_result.df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "8dd9dbc9", + "metadata": {}, + "source": [ + "### 5.2.3 Map Buckets\n", + "\n", + "After performing LSH, we processed each bucket and calculated an approximation of the all-pairs Jaccard\n", + "similarity in order to remove false positive duplicates introduced by LSH. For this purpose, we will randomly sample n \"anchor\" documents within each buckets and calculate the Jaccard similarity with everything remaining in the bucket." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "7985cf1a-9d88-4844-8ce4-e68d9792118c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator.modules.fuzzy_dedup import _MapBuckets\n", + "from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (\n", + " get_bucket_ddf_from_parquet_path,\n", + " get_text_ddf_from_json_path_with_blocksize,\n", + ")\n", + "\n", + "input_data_paths = [os.path.join(base_dir,\"rpv2-2023-06-en-cleaned\")]\n", + "num_files = None\n", + "text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "input_bucket_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/_buckets.parquet\")\n", + "input_bucket_field = '_bucket_id'\n", + "shuffle_type ='tasks'\n", + "log_dir = os.path.join(base_dir, \"logs\")\n", + "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "f94be7c9-27ea-4f1f-8a8f-05a866eafac3", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of files being read for jaccard calculation = 37848\n", + "ddf_text.npartitions = 21501\n" + ] + } + ], + "source": [ + "# Read .jsonl input data\n", + "ddf_text = get_text_ddf_from_json_path_with_blocksize(\n", + " input_data_paths=input_data_paths,\n", + " num_files=num_files,\n", + " blocksize=text_ddf_blocksize,\n", + " id_column=id_field,\n", + " text_column=text_field,\n", + ")\n", + "\n", + "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "a0700327-5171-4673-8ded-c9f43492f582", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of ddf_bk partitions = 102\n", + "Mapping Bucket took 711.1930673122406 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "num_workers = get_num_workers(gpu_client)\n", + "\n", + "# Read \"_buckets.parquet\"\n", + "ddf_bk = get_bucket_ddf_from_parquet_path(\n", + " input_bucket_path=input_bucket_path, \n", + " num_workers=num_workers\n", + ")\n", + "\n", + "#Run _MapBuckets()\n", + "map_buckets = _MapBuckets(\n", + " id_fields=[\"dataset_id\", \"doc_id\"], \n", + " bucket_field=input_bucket_field, \n", + " logger=log_dir,\n", + " text_field=text_field,\n", + ")\n", + "\n", + "ddf_anchor_docs_with_bk = map_buckets.map_buckets_with_anchors(\n", + " documents_df=ddf_text, \n", + " buckets_df=ddf_bk, \n", + " shuffle_type=shuffle_type\n", + ")\n", + "\n", + "#Write to disk\n", + "ddf_anchor_docs_with_bk.to_parquet(\n", + " output_anchor_docs_with_bk_path, \n", + " write_index=False\n", + ")\n", + "\n", + "print(f\"Mapping Bucket took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "3a986f76-191e-436d-9df3-bb68dc78d365", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dataset_iddoc_idanchor_1_dataset_idanchor_1_doc_idanchor_0_dataset_idanchor_0_doc_id_output_partition_id
0256213913144062180525621391352073349225621391324012307031461
12562139138212324042562139133713324532562139138212324043852
2256213913178780561725621391319691136402562139133976348757811
32562139136587069002562139136587069002562139136753102363403
425621391327273541225621391327273541225621391322508355815160
\n", + "
" + ], + "text/plain": [ + " dataset_id doc_id anchor_1_dataset_id anchor_1_doc_id \\\n", + "0 256213913 1440621805 256213913 520733492 \n", + "1 256213913 821232404 256213913 371332453 \n", + "2 256213913 1787805617 256213913 1969113640 \n", + "3 256213913 658706900 256213913 658706900 \n", + "4 256213913 272735412 256213913 272735412 \n", + "\n", + " anchor_0_dataset_id anchor_0_doc_id _output_partition_id \n", + "0 256213913 2401230703 1461 \n", + "1 256213913 821232404 3852 \n", + "2 256213913 397634875 7811 \n", + "3 256213913 675310236 3403 \n", + "4 256213913 2250835581 5160 " + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ddf_anchor_docs_with_bk.head()" + ] + }, + { + "cell_type": "markdown", + "id": "58e90363-ebfe-48e2-8f7c-ff7ef27c97d5", + "metadata": {}, + "source": [ + "### 5.2.4 Jaccard Shuffle\n", + "\n", + "We shuffle the documents within the dataset based on their bucket assignments, essentially distributing similar documents across different partitions or workers, enabling efficient parallel processing and deduplication in subsequent steps." + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "11d7184d-4ca5-4b49-85b4-1264056f5c33", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator.modules.fuzzy_dedup import _Shuffle\n", + "\n", + "log_dir = os.path.join(base_dir, \"logs\")\n", + "input_anchor_docs_with_bk_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06/anchor_docs_with_bk.parquet\")\n", + "output_shuffled_docs_path = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/shuffled_docs.parquet\")\n", + ")\n", + "bucket_mapping_ddf_blocksize = 256\n", + "parts_per_worker = 16\n", + "bucket_parts_per_worker = 256\n", + "id_field = 'id'\n", + "text_field = 'raw_content'" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "07894e81-6cdc-4292-951a-977b220fbd81", + "metadata": { + "collapsed": true, + "jupyter": { + "outputs_hidden": true + }, + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " 0%| | 0/1 [00:00.wait() done, defined at /usr/local/lib/python3.10/dist-packages/distributed/client.py:2197> exception=AllExit()>\n", + "Traceback (most recent call last):\n", + " File \"/usr/local/lib/python3.10/dist-packages/distributed/client.py\", line 2206, in wait\n", + " raise AllExit()\n", + "distributed.client.AllExit\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Text-df partition 5376/21501 completed in 44.53200340270996\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21130783 rows to disk\n", + "Text-df partition 5632/21501 completed in 54.21355414390564\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21283077 rows to disk\n", + "Text-df partition 5888/21501 completed in 41.81457304954529\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22384930 rows to disk\n", + "Text-df partition 6144/21501 completed in 45.46053504943848\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20776364 rows to disk\n", + "Text-df partition 6400/21501 completed in 40.972795248031616\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20072714 rows to disk\n", + "Text-df partition 6656/21501 completed in 43.9665105342865\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21287119 rows to disk\n", + "Text-df partition 6912/21501 completed in 46.75365734100342\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21389175 rows to disk\n", + "Text-df partition 7168/21501 completed in 44.202338457107544\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20580021 rows to disk\n", + "Text-df partition 7424/21501 completed in 45.469704151153564\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21695552 rows to disk\n", + "Text-df partition 7680/21501 completed in 42.65142750740051\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20388309 rows to disk\n", + "Text-df partition 7936/21501 completed in 47.71988654136658\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21259333 rows to disk\n", + "Text-df partition 8192/21501 completed in 49.69535183906555\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22171199 rows to disk\n", + "Text-df partition 8448/21501 completed in 43.66416621208191\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22743079 rows to disk\n", + "Text-df partition 8704/21501 completed in 44.621586322784424\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21899187 rows to disk\n", + "Text-df partition 8960/21501 completed in 44.56813859939575\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22286972 rows to disk\n", + "Text-df partition 9216/21501 completed in 54.81862425804138\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22179900 rows to disk\n", + "Text-df partition 9472/21501 completed in 43.12162232398987\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21492583 rows to disk\n", + "Text-df partition 9728/21501 completed in 75.82933282852173\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22016268 rows to disk\n", + "Text-df partition 9984/21501 completed in 42.6993567943573\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21855524 rows to disk\n", + "Text-df partition 10240/21501 completed in 52.208579778671265\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21876461 rows to disk\n", + "Text-df partition 10496/21501 completed in 44.576396465301514\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21738436 rows to disk\n", + "Text-df partition 10752/21501 completed in 42.986634969711304\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21302223 rows to disk\n", + "Text-df partition 11008/21501 completed in 39.9963161945343\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20698714 rows to disk\n", + "Text-df partition 11264/21501 completed in 40.962194204330444\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22158633 rows to disk\n", + "Text-df partition 11520/21501 completed in 49.96597933769226\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22048695 rows to disk\n", + "Text-df partition 11776/21501 completed in 45.995996713638306\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21702006 rows to disk\n", + "Text-df partition 12032/21501 completed in 46.70681190490723\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21347159 rows to disk\n", + "Text-df partition 12288/21501 completed in 81.4986321926117\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21955887 rows to disk\n", + "Text-df partition 12544/21501 completed in 42.991360902786255\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20738788 rows to disk\n", + "Text-df partition 12800/21501 completed in 45.26166224479675\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21851660 rows to disk\n", + "Text-df partition 13056/21501 completed in 43.972177028656006\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21509705 rows to disk\n", + "Text-df partition 13312/21501 completed in 42.279316902160645\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21253609 rows to disk\n", + "Text-df partition 13568/21501 completed in 42.06918454170227\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21327966 rows to disk\n", + "Text-df partition 13824/21501 completed in 42.79487657546997\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21643067 rows to disk\n", + "Text-df partition 14080/21501 completed in 46.259148597717285\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21428884 rows to disk\n", + "Text-df partition 14336/21501 completed in 41.45153284072876\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22143560 rows to disk\n", + "Text-df partition 14592/21501 completed in 64.21475219726562\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22325699 rows to disk\n", + "Text-df partition 14848/21501 completed in 44.90940022468567\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21819547 rows to disk\n", + "Text-df partition 15104/21501 completed in 42.385361671447754\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22358380 rows to disk\n", + "Text-df partition 15360/21501 completed in 44.72035098075867\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21915533 rows to disk\n", + "Text-df partition 15616/21501 completed in 62.369131565093994\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21881549 rows to disk\n", + "Text-df partition 15872/21501 completed in 41.794671297073364\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22808310 rows to disk\n", + "Text-df partition 16128/21501 completed in 42.90521025657654\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22373007 rows to disk\n", + "Text-df partition 16384/21501 completed in 44.44476580619812\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22500153 rows to disk\n", + "Text-df partition 16640/21501 completed in 63.39798164367676\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21869272 rows to disk\n", + "Text-df partition 16896/21501 completed in 68.16630506515503\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22681261 rows to disk\n", + "Text-df partition 17152/21501 completed in 43.95643997192383\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22528095 rows to disk\n", + "Text-df partition 17408/21501 completed in 43.677578926086426\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22315077 rows to disk\n", + "Text-df partition 17664/21501 completed in 47.376683712005615\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22590009 rows to disk\n", + "Text-df partition 17920/21501 completed in 45.69715094566345\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22371144 rows to disk\n", + "Text-df partition 18176/21501 completed in 45.908634662628174\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22915118 rows to disk\n", + "Text-df partition 18432/21501 completed in 45.64804434776306\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22445027 rows to disk\n", + "Text-df partition 18688/21501 completed in 48.78413009643555\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21732923 rows to disk\n", + "Text-df partition 18944/21501 completed in 42.96047925949097\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22289403 rows to disk\n", + "Text-df partition 19200/21501 completed in 63.846845626831055\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21964165 rows to disk\n", + "Text-df partition 19456/21501 completed in 49.25983190536499\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22401629 rows to disk\n", + "Text-df partition 19712/21501 completed in 48.457353591918945\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22404881 rows to disk\n", + "Text-df partition 19968/21501 completed in 45.16793203353882\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22410519 rows to disk\n", + "Text-df partition 20224/21501 completed in 46.03614926338196\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22222649 rows to disk\n", + "Text-df partition 20480/21501 completed in 42.315980195999146\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22390142 rows to disk\n", + "Text-df partition 20736/21501 completed in 60.00182127952576\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 22521054 rows to disk\n", + "Text-df partition 20992/21501 completed in 46.08659911155701\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 20889117 rows to disk\n", + "Text-df partition 21248/21501 completed in 45.099570751190186\n", + "Using 256 text partitions.\n", + "Starting text bytes aware shuffle\n", + "Will write 21190706 rows to disk\n", + "Text-df partition 21501/21501 completed in 81.1153199672699\n", + "Bucket partition 102/102 completed in 4069.427500486374\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|██████████| 1/1 [1:07:49<00:00, 4069.43s/it]" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Jaccard Shuffle took 4069.6330242156982 s\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "shuffle = _Shuffle(\n", + " id_fields=[\"dataset_id\", \"doc_id\"],\n", + " text_field=text_field,\n", + " int_to_str_id=id_field,\n", + " logger=log_dir,\n", + ")\n", + "\n", + "shuffle.shuffle_docs_on_buckets(\n", + " documents_df=ddf_text,\n", + " bucket_w_anchors_path=input_anchor_docs_with_bk_path,\n", + " output_shuffled_docs_path=output_shuffled_docs_path,\n", + " bucket_mapping_df_blocksize=bucket_mapping_ddf_blocksize,\n", + " parts_per_worker=parts_per_worker,\n", + " bucket_parts_per_worker=bucket_parts_per_worker,\n", + " partition_on=\"_output_partition_id\",\n", + ")\n", + "\n", + "print(f\"Jaccard Shuffle took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "markdown", + "id": "26739f23-47f1-4e11-ac49-82920f534495", + "metadata": {}, + "source": [ + "We can visualize the jaccard shuffle results for a single partition:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3576685a-c3d8-4950-bac3-5412e9f876d2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "jaccard_shuffle_res = dd.read_parquet(os.path.join(output_shuffled_docs_path,\"_output_partition_id=0\"))\n", + "jaccard_shuffle_res.head()" + ] + }, + { + "cell_type": "markdown", + "id": "80309eef-733a-4926-875f-cb94bbb4d8fa", + "metadata": {}, + "source": [ + "### 5.2.5 Jaccard Compute\n", + "\n", + "Now we have the jaccard pairs sampled, we can compute the Jaccard similarity score for all pairs." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "573dccf7-2e23-4aae-a3ec-2b9e1a42d97d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity\n", + "\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "ngram_size = 5\n", + "shuffled_docs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/shuffled_docs.parquet\")\n", + "jaccard_results_path = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/jaccard_similarity_results.parquet\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "d31a24fe-8229-48bc-89f7-32f2b93f4f5c", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Jaccard Computing+Writing took 5886.298990488052 seconds\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "jaccard = JaccardSimilarity(\n", + " id_field=id_field ,\n", + " text_field=text_field,\n", + " anchor_id_fields=[f\"anchor_{i}_{id_field}\" for i in range(2)],\n", + " ngram_width=ngram_size,\n", + ")\n", + "\n", + "# Run actual computation\n", + "result_df = jaccard.jaccard_compute(shuffled_docs_path)\n", + "\n", + "result_df.to_parquet(\n", + " jaccard_results_path,\n", + " write_index=False,\n", + " write_metadata_file=False,\n", + ")\n", + "\n", + "print(f\"Jaccard Computing+Writing took {time.time() - t0} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "bbebed9d-d920-4b8e-8c88-48f1906c46e3", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
id_xid_yjaccard
0256213913-1894624904256213913-23465249570.956566
1256213913-1785625062256213913-20997256750.973642
2256213913-1350425062256213913-29301251421.000000
3256213913-1324822256213913-13842036090.988306
4256213913-1775024761256213913-15401197740.906369
\n", + "
" + ], + "text/plain": [ + " id_x id_y jaccard\n", + "0 256213913-1894624904 256213913-2346524957 0.956566\n", + "1 256213913-1785625062 256213913-2099725675 0.973642\n", + "2 256213913-1350425062 256213913-2930125142 1.000000\n", + "3 256213913-1324822 256213913-1384203609 0.988306\n", + "4 256213913-1775024761 256213913-1540119774 0.906369" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "jaccard_compute_res = dd.read_parquet(jaccard_results_path)\n", + "jaccard_compute_res.head()" + ] + }, + { + "cell_type": "markdown", + "id": "fb9e1287-bd19-4728-a4c8-b92b39ca1fcc", + "metadata": {}, + "source": [ + "### 5.2.6 Connected Component\n", + "\n", + "After all buckets were processed and duplicates (at the threshold) were approximately discovered,\n", + "we constructed a sparse document graph and found the connected components therein (using scipy). Each\n", + "connected component represents a set of documents that we consider similar enough to be duplicates, and\n", + "from which we select a single representative." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "f9aeb619-3fab-4a18-b582-bccae3eefd17", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator.modules.fuzzy_dedup import ConnectedComponents\n", + "\n", + "cache_dir = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/cc-cache\")\n", + ")\n", + "jaccard_pairs_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/jaccard_similarity_results.parquet\")\n", + "id_field = 'id'\n", + "jaccard_threshold = 0.8\n", + "output_path = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/connected_components.parquet\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6bee85f3-5477-4b9c-b606-7bbbefbe6cfc", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "batch_id = 0/33, time = 10.98209285736084\n", + "batch_id = 1/33, time = 7.240729331970215\n", + "batch_id = 2/33, time = 11.506417274475098\n", + "batch_id = 3/33, time = 10.567672729492188\n", + "batch_id = 4/33, time = 4.118508815765381\n", + "batch_id = 5/33, time = 11.475081443786621\n", + "batch_id = 6/33, time = 4.485937118530273\n", + "batch_id = 7/33, time = 7.7934770584106445\n", + "batch_id = 8/33, time = 12.659213781356812\n", + "batch_id = 9/33, time = 10.357794523239136\n", + "batch_id = 10/33, time = 15.211389780044556\n", + "batch_id = 11/33, time = 11.50840425491333\n", + "batch_id = 12/33, time = 6.360927104949951\n", + "batch_id = 13/33, time = 6.977228403091431\n", + "batch_id = 14/33, time = 14.863914489746094\n", + "batch_id = 15/33, time = 8.78640341758728\n", + "batch_id = 16/33, time = 17.97274613380432\n", + "batch_id = 17/33, time = 15.662312030792236\n", + "batch_id = 18/33, time = 12.669589042663574\n", + "batch_id = 19/33, time = 11.13182783126831\n", + "batch_id = 20/33, time = 4.032534837722778\n", + "batch_id = 21/33, time = 10.532259702682495\n", + "batch_id = 22/33, time = 11.531543016433716\n", + "batch_id = 23/33, time = 4.218446731567383\n", + "batch_id = 24/33, time = 4.118865251541138\n", + "batch_id = 25/33, time = 16.80798053741455\n", + "batch_id = 26/33, time = 14.314243078231812\n", + "batch_id = 27/33, time = 24.660512447357178\n", + "batch_id = 28/33, time = 18.43391704559326\n", + "batch_id = 29/33, time = 10.506544589996338\n", + "batch_id = 30/33, time = 17.96251106262207\n", + "batch_id = 31/33, time = 10.470972061157227\n", + "batch_id = 32/33, time = 1.9526703357696533\n", + "# of groups 134984907\n", + "# of docs removed 239037733\n", + "assert num_nodes:374022640==labels_df:374022640 passed\n", + "Connected Component took 445.6 seconds\n", + "\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "components_stage = ConnectedComponents(\n", + " cache_dir=cache_dir,\n", + " jaccard_pairs_path=jaccard_pairs_path,\n", + " id_column=id_field,\n", + " convert_str_ids=True,\n", + " jaccard_threshold=jaccard_threshold,\n", + ")\n", + "components_stage.cc_workflow(output_path=output_path)\n", + "print(f\"Connected Component took {time.time()-t0} seconds\")" + ] + }, + { + "cell_type": "markdown", + "id": "15214dcf-ff49-439e-b3d7-d8666d081027", + "metadata": {}, + "source": [ + "Let's check the results of connected components step. We can see that 239,037,733 are identified as duplicates to be removed." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "94e8126d-af15-4182-98cd-10df06e9778e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "num of docs to remove = 239037733\n" + ] + } + ], + "source": [ + "output_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/connected_components.parquet\")\n", + "cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)\n", + "\n", + "# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition\n", + "cc_result = cc_result.set_index('group', shuffle='tasks')\n", + "\n", + "# Define a function to assign cumulative counts and filter duplicates\n", + "def assign_cumcount(df):\n", + " df['cumcount'] = df.groupby(level=0).cumcount()\n", + " df = df[df['cumcount'] >= 1]\n", + " df = df.drop(columns=['cumcount'])\n", + " return df\n", + "\n", + "# Find duplicates by applying the function to each partition\n", + "docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)\n", + "\n", + "# Reset the index\n", + "docs_to_remove = docs_to_remove.reset_index()\n", + "\n", + "docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n", + "docs_to_remove = docs_to_remove.rename(columns={\"dataset_id\":\"to_remove_dataset_id\", \"doc_id\":\"to_remove_doc_id\"})\n", + "docs_to_remove = docs_to_remove.reset_index(drop=True).persist()\n", + "_ = wait(docs_to_remove)\n", + "del _ \n", + "\n", + "print(\"num of docs to remove =\", len(docs_to_remove))" + ] + }, + { + "cell_type": "markdown", + "id": "568ee0b5-f2dd-4d34-917f-56f4211a36fe", + "metadata": {}, + "source": [ + "We can examine the size of the duplicate clusters. The largest cluster has 775,379 near duplicates." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "cae7f166-836a-4c21-bff2-7453254956b7", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
count
group
350652173775379
93521324493227
24112861
31929235596224
7014106967474
\n", + "
" + ], + "text/plain": [ + " count\n", + "group \n", + "350652173 775379\n", + "93521324 493227\n", + "24 112861\n", + "319292355 96224\n", + "70141069 67474" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cc_grouped = cc_result.groupby('group').agg({'doc_id': 'count'}).rename(columns={'doc_id': 'count'}).sort_values('count', ascending=False).compute()\n", + "cc_grouped.head()" + ] + }, + { + "cell_type": "markdown", + "id": "0def7323-3d2c-4861-9b7e-a1e296ccf329", + "metadata": {}, + "source": [ + "[Optional] Verify if fuzzy duplicates are similar. For example, we can look into the largest group \"350652173\"." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "e22cb491-c2ab-4ec4-8313-ae2bcd66a352", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dataset_iddoc_id
group
3506521732562139131285625132
3506521732562139132033200488
350652173256213913428016172
3506521732562139131268721963
3506521732562139131285428574
\n", + "
" + ], + "text/plain": [ + " dataset_id doc_id\n", + "group \n", + "350652173 256213913 1285625132\n", + "350652173 256213913 2033200488\n", + "350652173 256213913 428016172\n", + "350652173 256213913 1268721963\n", + "350652173 256213913 1285428574" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dup_group = cc_result.loc[350652173].compute()\n", + "dup_group.head()" + ] + }, + { + "cell_type": "markdown", + "id": "170c1cf4-8cb9-4f10-aab3-acfdaa9e5b16", + "metadata": {}, + "source": [ + "We will examine the first five documents in this cluster:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "00cf923e-fd4e-41b9-a00f-801c186ac70e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + } + ], + "source": [ + "# read input dataset\n", + "input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\")\n", + "input_dataset = DocumentDataset.read_json(input_data_dir, add_filename=True)" + ] + }, + { + "cell_type": "markdown", + "id": "9772bf71-9e18-4e59-b9f8-ebd9053c79b0", + "metadata": {}, + "source": [ + "Let's visualize the content of these documents and see if they are similar (ids may change so revise the `dup_ids` as needed)." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "e3cc167f-30f8-470d-99e3-0a2d916d46bf", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Searching for near duplicate examples with specific IDs took 610.5046670436859 seconds\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "dup_ids = [\n", + " 'rpv2-2023-06-1285625132',\n", + " 'rpv2-2023-06-2033200488',\n", + " 'rpv2-2023-06-0428016172',\n", + " 'rpv2-2023-06-1268721963',\n", + " 'rpv2-2023-06-1285428574'\n", + "] \n", + "dup_examples = input_dataset.df[input_dataset.df['id'].isin(dup_ids)].compute()\n", + "print(f\"Searching for near duplicate examples with specific IDs took {time.time()-t0} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "655a4b3e-8a48-441c-8e12-9b2d287b79ec", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "dup_examples" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "49b1b00a-501e-4d49-a93b-60a5c8ae87d2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print('Example duplicate 1\\n' + dup_examples.raw_content.iloc[0])\n", + "print('\\n\\nExample duplicate 2\\n' + dup_examples.raw_content.iloc[1])\n", + "print('\\n\\nExample duplicate 3\\n' + dup_examples.raw_content.iloc[2])\n", + "print('\\n\\nExample duplicate 4\\n' + dup_examples.raw_content.iloc[3])\n", + "print('\\n\\nExample duplicate 4\\n' + dup_examples.raw_content.iloc[4])" + ] + }, + { + "cell_type": "markdown", + "id": "c428e09c-5442-4908-8888-4e62994e4c5c", + "metadata": {}, + "source": [ + "### 5.2.7 Duplicates Removal" + ] + }, + { + "cell_type": "markdown", + "id": "f2e01b84-07cc-45a3-9dde-97884d1922a3", + "metadata": {}, + "source": [ + "Next, we will proceed to remove the duplicates identified from the dataset. We will first change the string ID to `doc_id` and `dataset_id` in the input dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7cde97c5-cfaa-4096-8d9f-1ffa19c4adb2", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 37848 files\n" + ] + } + ], + "source": [ + "from helper import convert_str_id_to_int\n", + "\n", + "input_dataset = DocumentDataset.read_json(os.path.join(base_dir, \"rpv2-2023-06-en-cleaned\"), backend=\"cudf\")\n", + "input_df = input_dataset.df[['raw_content','id']]\n", + "meta = input_df._meta\n", + "meta['doc_id']=np.int64([0])\n", + "meta['dataset_id']=np.uint32([0])\n", + "input_df = input_df.map_partitions(\n", + " convert_str_id_to_int,\n", + " id_column=\"id\",\n", + " meta=meta,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "99e50a79-18b0-4d1b-bab0-cfd1ec9b62fd", + "metadata": {}, + "source": [ + "Then, we will perform a merge between the `input_df` and the `docs_to_remove` on the IDs and drop the fuzzy duplicates." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "f819d1d3-c4c0-4288-b190-86276d221050", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Removing duplicates and writing deduped dataset took 1241.3191509246826 seconds\n" + ] + } + ], + "source": [ + "dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-deduped\"))\n", + "deduped_df = input_df.merge(docs_to_remove,\n", + " left_on=['doc_id','dataset_id'],\n", + " right_on=[\"to_remove_doc_id\", \"to_remove_dataset_id\"],\n", + " how='left')\n", + "\n", + "deduped_df = deduped_df[deduped_df['to_remove_doc_id'].isna()].drop(columns=['to_remove_doc_id', \"to_remove_dataset_id\"]).reset_index(drop=True)\n", + "\n", + "t0 = time.time()\n", + "deduped_df.to_parquet(dedup_output_dir)\n", + "print(f\"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds\")" + ] + }, + { + "cell_type": "markdown", + "id": "f987b1ac-c8e3-45f8-8e9b-32befc6667aa", + "metadata": {}, + "source": [ + "To verify the results, we can confirm that we have 849,273,787 documents left compared to 1,088,311,520 in the input dataset, essentially removing 239,037,733 duplicates." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "cf56c9ca-27cb-4f03-921b-685d523cf43e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "849273787" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(deduped_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "19f96f15-1ac6-40ab-9e04-1586531bb55f", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1088311520" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(input_df)" + ] + }, + { + "cell_type": "markdown", + "id": "cdc6350e-2363-4b13-ac67-0cbc23ad981d", + "metadata": {}, + "source": [ + "## 5.3 Inter-snapshot Deduplication" + ] + }, + { + "cell_type": "markdown", + "id": "888c2b15-961f-4a73-a0a3-15474ae4134c", + "metadata": {}, + "source": [ + "So far we have deduplicated a single snapshot from rpv2. Pre-training dataet include multiple snapshots so we will often need to perform inter-snapshot deduplication. For this tutorial, we will demostrate deduplication across two snapshots as an example.\n", + "\n", + "We first performed all the above steps for another snapshot `2023-14` and then combined the two deduped datasets into one and stored them in `rpv2-2023-06-and-14-deduped`.\n", + "\n", + "Next, we will perform the fuzzy deduplication on the combined dataset." + ] + }, + { + "cell_type": "markdown", + "id": "2a1445cc-b69c-4007-8f09-75a8eb8f699c", + "metadata": {}, + "source": [ + "### 5.3.1 Compute Minhash" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "f1461b61-887c-4099-bd9f-32e79dc5fdbb", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from nemo_curator import MinHash\n", + "from nemo_curator import LSH\n", + "from nemo_curator.modules.fuzzy_dedup import _MapBuckets\n", + "from nemo_curator.modules.fuzzy_dedup import _Shuffle\n", + "from nemo_curator.modules.fuzzy_dedup import ConnectedComponents\n", + "from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity\n", + "\n", + "from nemo_curator.utils.file_utils import reshard_jsonl\n", + "from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int\n", + "from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (\n", + " get_bucket_ddf_from_parquet_path,\n", + " get_text_ddf_from_json_path_with_blocksize,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "efaed1ed-e6d1-4117-9b0b-fe0d20960b60", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "seed = 42\n", + "minhash_length = 260\n", + "char_ngram = 5\n", + "log_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"logs\"))\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "minshah_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"rpv2-2023-06-and-14-minhash\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b4bf2d09-6601-4bd2-a6f2-f738cffd8885", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-and-14-deduped\")\n", + "\n", + "files = []\n", + "for file in os.listdir(input_data_dir):\n", + " if file.endswith('.part'):\n", + " new_file = file.replace('.part', '.jsonl')\n", + " old_file_path = os.path.join(input_data_dir, file)\n", + " new_file_path = os.path.join(input_data_dir, new_file)\n", + " os.rename(old_file_path, new_file_path)\n", + " files.append(new_file_path)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "38197174-738e-42a4-a38a-1dbd7d84836d", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 72797 files\n" + ] + } + ], + "source": [ + "files = [f for f in files if f.endswith(\".jsonl\")]\n", + "df = read_data(\n", + " files,\n", + " file_type=\"jsonl\",\n", + " backend=\"cudf\",\n", + " files_per_partition=2,\n", + " add_filename=False,\n", + ")[[id_field, text_field]]" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "439add9c-9f51-4481-95cf-456dc5be9fd2", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Computing minhashes took:6115.702769517899\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "# Run MinHash() on input data\n", + "minhasher = MinHash(\n", + " seed=seed,\n", + " num_hashes=minhash_length,\n", + " char_ngrams=char_ngram,\n", + " use_64bit_hash=False,\n", + " logger=log_dir,\n", + " id_field=id_field,\n", + " text_field=text_field,\n", + " cache_dir=minshah_output_dir\n", + ")\n", + "\n", + "result = minhasher(DocumentDataset(df)).df\n", + "\n", + "print(f\"Computing minhashes took:{time.time()-t0}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "1c998d64-54f8-49b0-8e0c-2e5727596e84", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
id_minhash_signature
0rpv2-2023-06-0678400000[36422228, 15993596, 3538361, 16103012, 194100...
1rpv2-2023-06-0678500000[34662, 17635, 1112347, 293654, 313382, 160184...
2rpv2-2023-06-0678600000[15076006, 1801689, 3181854, 2949398, 5699436,...
3rpv2-2023-06-0678700000[13528976, 2438382, 26260517, 26187347, 249748...
4rpv2-2023-06-0678800000[2550974, 157261, 1536526, 1169030, 576861, 10...
\n", + "
" + ], + "text/plain": [ + " id _minhash_signature\n", + "0 rpv2-2023-06-0678400000 [36422228, 15993596, 3538361, 16103012, 194100...\n", + "1 rpv2-2023-06-0678500000 [34662, 17635, 1112347, 293654, 313382, 160184...\n", + "2 rpv2-2023-06-0678600000 [15076006, 1801689, 3181854, 2949398, 5699436,...\n", + "3 rpv2-2023-06-0678700000 [13528976, 2438382, 26260517, 26187347, 249748...\n", + "4 rpv2-2023-06-0678800000 [2550974, 157261, 1536526, 1169030, 576861, 10..." + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result.head()" + ] + }, + { + "cell_type": "markdown", + "id": "c2e3af9e-9e03-4950-93c0-92792f9ad24b", + "metadata": {}, + "source": [ + "### 5.3.2 Minhash LSH" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "b11c2f37-3b78-4e1b-a9ff-4a89b38f3604", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "lsh_input_dir = os.path.join(base_dir,\"rpv2-2023-06-and-14-minhash\")\n", + "id_field = 'id'\n", + "output_bucket_dir = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14\"))\n", + "num_bands = 20\n", + "buckets_per_shuffle = 1\n", + "minhash_field = '_minhash_signature'\n", + "minhash_length = 260\n", + "log_dir = os.path.join(base_dir, \"logs\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "a243ed7a-9175-488f-8097-5b82c47c5708", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "LSH took 10536.635195493698 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "#Load MinHash output\n", + "df = dask_cudf.read_parquet(lsh_input_dir, blocksize=\"2GB\", aggregate_files=True)\n", + "df = df.map_partitions(\n", + " convert_str_id_to_int,\n", + " id_column=id_field,\n", + " meta=cudf.DataFrame(\n", + " {minhash_field: [[1, 2, 3]], \"doc_id\": [1], \"dataset_id\": np.uint32(1)}\n", + " ),\n", + ")\n", + "\n", + "lsh = LSH(\n", + " cache_dir=output_bucket_dir,\n", + " num_hashes=minhash_length,\n", + " num_buckets=num_bands,\n", + " buckets_per_shuffle=buckets_per_shuffle,\n", + " id_fields=[\"dataset_id\", \"doc_id\"],\n", + " minhash_field=minhash_field,\n", + " logger=log_dir,\n", + ")\n", + "\n", + "lsh_result = lsh(DocumentDataset(df))\n", + "print(f\"LSH took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "9ea85033-e275-4b62-8351-6c85ac5ac83b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dataset_iddoc_id_bucket_id
0256213913248063708574400
1256213913207920898388082
225621391311428125867198
34217914658358940171254808
4256213913182793165058134
\n", + "
" + ], + "text/plain": [ + " dataset_id doc_id _bucket_id\n", + "0 256213913 2480637085 74400\n", + "1 256213913 2079208983 88082\n", + "2 256213913 1142812586 7198\n", + "3 4217914658 3589401712 54808\n", + "4 256213913 1827931650 58134" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "lsh_result.df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "ff8c79b8-229f-47ee-9e01-a7bf1f317250", + "metadata": {}, + "source": [ + "### 5.3.3 Map Buckets" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "2cc909e1-e02e-433d-b6c2-e7f82a137438", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "input_data_paths = [os.path.join(base_dir,\"rpv2-2023-06-and-14-deduped\")]\n", + "num_files = None\n", + "text_ddf_blocksize = 256 #The block size for chunking jsonl files for text ddf in mb\n", + "id_field = 'id'\n", + "text_field = 'raw_content'\n", + "input_bucket_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14/_buckets.parquet\")\n", + "input_bucket_field = '_bucket_id'\n", + "shuffle_type ='tasks'\n", + "log_dir = os.path.join(base_dir, \"logs\")\n", + "output_anchor_docs_with_bk_path = expand_outdir_and_mkdir(os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "3effff2a-f01d-4f33-b495-97455a280a59", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of files being read for jaccard calculation = 72797\n", + "ddf_text.npartitions = 23876\n" + ] + } + ], + "source": [ + "# Read .jsonl input data\n", + "ddf_text = get_text_ddf_from_json_path_with_blocksize(\n", + " input_data_paths=input_data_paths,\n", + " num_files=num_files,\n", + " blocksize=text_ddf_blocksize,\n", + " id_column=id_field,\n", + " text_column=text_field,\n", + ")\n", + "\n", + "print(f\"ddf_text.npartitions = {ddf_text.npartitions}\", flush=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "e4e00d8e-170c-4adf-bf34-1ad1b5275760", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of ddf_bk partitions = 54\n", + "Mapping Bucket took 1034.9348919391632 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "num_workers = get_num_workers(gpu_client)\n", + "\n", + "# Read \"_buckets.parquet\"\n", + "ddf_bk = get_bucket_ddf_from_parquet_path(\n", + " input_bucket_path=input_bucket_path, \n", + " num_workers=num_workers\n", + ")\n", + "\n", + "#Run _MapBuckets()\n", + "map_buckets = _MapBuckets(\n", + " id_fields=[\"dataset_id\", \"doc_id\"], \n", + " bucket_field=input_bucket_field, \n", + " logger=log_dir,\n", + " text_field=text_field,\n", + ")\n", + "\n", + "ddf_anchor_docs_with_bk = map_buckets.map_buckets_with_anchors(\n", + " documents_df=ddf_text, \n", + " buckets_df=ddf_bk, \n", + " shuffle_type=shuffle_type\n", + ")\n", + "\n", + "#Write to disk\n", + "ddf_anchor_docs_with_bk.to_parquet(\n", + " output_anchor_docs_with_bk_path, \n", + " write_index=False\n", + ")\n", + "\n", + "print(f\"Mapping Bucket took {time.time()-t0} s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "f0718d8e-5143-458f-ab59-a440adcde8b8", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dataset_iddoc_idanchor_1_dataset_idanchor_1_doc_idanchor_0_dataset_idanchor_0_doc_id_output_partition_id
0421791465851821185042179146585182118502562139134919208922004
1421791465863643033562562139132308804621421791465863643033564246
225621391321035357084217914658120811115525621391321035357084003
325621391313592089124217914658634251053825621391313592089123738
4256213913162316349256213913162316349421791465810330142804258
\n", + "
" + ], + "text/plain": [ + " dataset_id doc_id anchor_1_dataset_id anchor_1_doc_id \\\n", + "0 4217914658 518211850 4217914658 518211850 \n", + "1 4217914658 6364303356 256213913 2308804621 \n", + "2 256213913 2103535708 4217914658 1208111155 \n", + "3 256213913 1359208912 4217914658 6342510538 \n", + "4 256213913 162316349 256213913 162316349 \n", + "\n", + " anchor_0_dataset_id anchor_0_doc_id _output_partition_id \n", + "0 256213913 491920892 2004 \n", + "1 4217914658 6364303356 4246 \n", + "2 256213913 2103535708 4003 \n", + "3 256213913 1359208912 3738 \n", + "4 4217914658 1033014280 4258 " + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ddf_anchor_docs_with_bk.head()" + ] + }, + { + "cell_type": "markdown", + "id": "fca84869-9991-429c-baa1-bccadd15cafa", + "metadata": {}, + "source": [ + "### 6.8.4 Jaccard Shuffle" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "8cbce892-ec9c-46e3-9fbe-09f9a243a7aa", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "log_dir = os.path.join(base_dir, \"logs\")\n", + "input_anchor_docs_with_bk_path = os.path.join(base_dir,\"fuzzy-dedup-output-2023-06-and-14/anchor_docs_with_bk.parquet\")\n", + "output_shuffled_docs_path = expand_outdir_and_mkdir(\n", + " os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/shuffled_docs.parquet\")\n", + ")\n", + "bucket_mapping_ddf_blocksize = 256\n", + "parts_per_worker = 16\n", + "bucket_parts_per_worker = 256\n", + "id_field = 'id'\n", + "text_field = 'raw_content'" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "1acd3e4a-0310-4413-98b2-07cd7c74ee57", + "metadata": { + "collapsed": true, + "jupyter": { + "outputs_hidden": true + }, + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " 0%| | 0/1 [00:00= 1]\n", + " df = df.drop(columns=['cumcount'])\n", + " return df\n", + "\n", + "# Find duplicates by applying the function to each partition\n", + "docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)\n", + "\n", + "# Reset the index\n", + "docs_to_remove = docs_to_remove.reset_index()\n", + "\n", + "docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n", + "docs_to_remove = docs_to_remove.rename(columns={\"dataset_id\":\"to_remove_dataset_id\", \"doc_id\":\"to_remove_doc_id\"})\n", + "docs_to_remove = docs_to_remove.reset_index(drop=True).persist()\n", + "_ = wait(docs_to_remove)\n", + "del _ \n", + "\n", + "print(\"docs_to_remove\", len(docs_to_remove))" + ] + }, + { + "cell_type": "markdown", + "id": "2bc75c4d-0eec-496f-b02f-a639b193522f", + "metadata": {}, + "source": [ + "Before proceeding to duplicates removal, we suggest resharding the data to fix potentially empty partitions due to duplicates removal for single snapshots." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "a6c6d7d6-9db5-4504-9d03-67c7ed69c409", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Data sharding took:904.7163739204407\n" + ] + } + ], + "source": [ + "output_resharded_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped-resharded\"))\n", + "\n", + "t0 = time.time()\n", + "reshard_jsonl(\n", + " os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped\"),\n", + " output_resharded_dir,\n", + " output_file_size=\"100M\",\n", + " start_index=0,\n", + " file_prefix=\"rpv2-2023-06-and-14-deduped\",\n", + ")\n", + "print(f\"Data sharding took:{time.time()-t0}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "63546dd3-1560-4b38-91ce-84c004636657", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 72780 files\n" + ] + } + ], + "source": [ + "from helper import convert_str_id_to_int\n", + "\n", + "input_dataset = DocumentDataset.read_json(os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped-resharded\"), backend=\"cudf\")\n", + "input_df = input_dataset.df[['raw_content','id']]\n", + "meta = input_df._meta\n", + "meta['doc_id']=np.int64([0])\n", + "meta['dataset_id']=np.uint32([0])\n", + "input_df = input_df.map_partitions(\n", + " convert_str_id_to_int,\n", + " id_column=\"id\",\n", + " meta=meta,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "88dfe6a9-a463-4d43-9f79-0d3df960961c", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Removing duplicates and writing deduped dataset took 2084.46063041687 seconds\n" + ] + } + ], + "source": [ + "dedup_output_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"/rpv2-2023-06-and-14-inter-deduped\"))\n", + "deduped_df = input_df.merge(docs_to_remove,\n", + " left_on=['doc_id','dataset_id'],\n", + " right_on=[\"to_remove_doc_id\", \"to_remove_dataset_id\"],\n", + " how='left')\n", + "\n", + "deduped_df = deduped_df[deduped_df['to_remove_doc_id'].isna()].drop(columns=['to_remove_doc_id', \"to_remove_dataset_id\"]).reset_index(drop=True)\n", + "\n", + "t0 = time.time()\n", + "deduped_df.to_parquet(dedup_output_dir)\n", + "print(f\"Removing duplicates and writing deduped dataset took {time.time()-t0} seconds\")" + ] + }, + { + "cell_type": "markdown", + "id": "473fea1e-45e5-423c-9187-17867c0ad2a7", + "metadata": {}, + "source": [ + "We can verify that the deduped dataset has 1,585,546,179 documents, compared to 1,667,310,983 documents befoe dedup." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "d174ea0f-440f-4311-a9c0-d3cc26683a81", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1585546179" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(deduped_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "014e1a79-4967-4392-8cb8-6d822a3f57ca", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1667310983" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(input_df)" + ] + }, + { + "cell_type": "markdown", + "id": "9b9977f5-fa49-4e63-9057-50fadba58a86", + "metadata": {}, + "source": [ + "# 6. Quality Filtering\n", + "" + ] + }, + { + "cell_type": "markdown", + "id": "e19b91e6-3156-4a1b-816b-500a54def99d", + "metadata": {}, + "source": [ + "Web crawled dataset often has low quality documents that we do not want the model to learn from. We can perform quality filtering to remove low quality data. NeMo Curator offers modules for both classifier-based and heuristic-based filtering. In this tutorial, we will perform heuristic filtering using a list of heuristic filters to improve data quality.\n", + "\n", + "Curator provides a generic list of heuristic filters but for this tutorial, we only select 10 filters for demo purposes. The selected filters are given in `config/heuristic_filter_en.yaml`.\n", + "\n", + "Heuristic filtering in Curator is a cpu module so we will need to use the cpu cluter." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a1347d09-8bc5-453b-aa4a-4b75c4b3b47a", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Num Workers = 256\n" + ] + } + ], + "source": [ + "scheduler_address = os.getenv('SCHEDULER_ADDRESS')\n", + "cpu_client = get_client(scheduler_address=scheduler_address)\n", + "print(f\"Num Workers = {get_num_workers(cpu_client)}\", flush=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "49273a8b-848f-4f24-a0ba-3c0b478d17cc", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import nemo_curator\n", + "from nemo_curator.utils.config_utils import build_filter_pipeline\n", + "\n", + "filter_config_file = os.path.join(base_dir, \"config/heuristic_filter_en.yaml\")\n", + "hf_input_data_dir = os.path.join(base_dir, \"rpv2-2023-06-and-14-inter-deduped\")\n", + "kept_document_dir = expand_outdir_and_mkdir(os.path.join(base_dir,'rpv2-2023-06-and-14-heuristic-filtering','hf.parquet'))" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "e5284b6f-c87e-4027-a802-ab08b92610cd", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 72780 files\n", + "Writing to disk complete for 72780 partitions\n", + "Time taken for Heuristic filtering: 5647.508106470108 s\n" + ] + } + ], + "source": [ + "t0 = time.time()\n", + "\n", + "# Load dataset\n", + "dataset = DocumentDataset.read_parquet(hf_input_data_dir)\n", + "\n", + "# construct pipeline from config\n", + "filter_pipeline = build_filter_pipeline(filter_config_file)\n", + "\n", + "# filter data and write to disk\n", + "filtered_dataset = filter_pipeline(dataset)\n", + "filtered_dataset.to_parquet(kept_document_dir)\n", + "\n", + "print(f\"Time taken for Heuristic filtering: {time.time()-t0} s\")" + ] + }, + { + "cell_type": "markdown", + "id": "3ce0658a-97f9-412c-8eef-68f6ab0b4be6", + "metadata": {}, + "source": [ + "After filitering, we have 1,229,679,047 documents left, removing 355,867,132 documents from the deduped dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "5da60070-f78c-43fe-8bc0-fcbd839b7021", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "1229679047" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(filtered_dataset)" + ] + }, + { + "cell_type": "markdown", + "id": "1c92ae5e-7397-4ad9-9dec-cb93eefc3dde", + "metadata": {}, + "source": [ + "[Optional] Examine example low quality documents:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "10e4b615-1eaa-4050-b191-28a48166a560", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from helper import get_dataframe_complement\n", + "\n", + "original_df = dd.read_parquet(hf_input_data_dir)\n", + "filtered_df = dd.read_parquet(kept_document_dir)\n", + "removed_df = get_dataframe_complement(original_df, filtered_df)\n", + "removed_df_example = removed_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "468233aa-d2a9-4e80-a815-e1a645213c75", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print(removed_df_example.raw_content.iloc[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aee129f8-1fc5-401c-aa73-42fb254539c2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print(removed_df_example.raw_content.iloc[1])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa60a9b9-9158-43d6-9ea4-1f544c6f816e", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tutorials/pretraining-data-curation/start-distributed-notebook.sh b/tutorials/pretraining-data-curation/start-distributed-notebook.sh new file mode 100644 index 00000000..0c1cd7ee --- /dev/null +++ b/tutorials/pretraining-data-curation/start-distributed-notebook.sh @@ -0,0 +1,76 @@ +#! /bin/bash + +#SBATCH --job-name=nemo-curator:pretraining-curation +#SBATCH --nodes=2 +#SBATCH --exclusive +#SBATCH --time=04:00:00 + +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ================================================================= +# Begin easy customization +# ================================================================= + +# Base directory for all SLURM job logs and files +# Does not affect directories referenced in your script +export BASE_JOB_DIR=`pwd`/nemo-curator-jobs +export JOB_DIR=$BASE_JOB_DIR/$SLURM_JOB_ID + +# Directory for Dask cluster communication and logging +# Must be paths inside the container that are accessible across nodes +export LOGDIR=$JOB_DIR/logs +export PROFILESDIR=$JOB_DIR/profiles +export SCHEDULER_FILE=$LOGDIR/scheduler.json +export SCHEDULER_LOG=$LOGDIR/scheduler.log +export DONE_MARKER=$LOGDIR/done.txt + +# Device type +# This will change depending on the module to run +export DEVICE="gpu" + +# Container parameters +export CONTAINER_IMAGE=/path/to/container +# Make sure to mount the directories your script references +export BASE_DIR=`pwd` +export MOUNTS="${BASE_DIR}:${BASE_DIR}" +# Below must be path to entrypoint script on your system +export CONTAINER_ENTRYPOINT=`pwd`/container-entrypoint.sh + +# Network interface specific to the cluster being used +export INTERFACE=eth0 +export PROTOCOL=tcp + +# CPU related variables +export CPU_WORKER_MEMORY_LIMIT=0 # 0 means no memory limit +export CPU_WORKER_PER_NODE=128 # number of cpu workers per node + +# GPU related variables +export RAPIDS_NO_INITIALIZE="1" +export CUDF_SPILL="1" +export RMM_SCHEDULER_POOL_SIZE="1GB" +export RMM_WORKER_POOL_SIZE="72GiB" +export LIBCUDF_CUFILE_POLICY=OFF +export DASK_DATAFRAME__QUERY_PLANNING=False + + +# ================================================================= +# End easy customization +# ================================================================= + +# Start the container +srun \ + --container-mounts=${MOUNTS} \ + --container-image=${CONTAINER_IMAGE} \ + ${CONTAINER_ENTRYPOINT}