Skip to content

Commit

Permalink
[Tutorial] Add Pretraining Data Curation Tutorial (NVIDIA#292)
Browse files Browse the repository at this point in the history
* add tutorial files

Signed-off-by: Yang Yu <[email protected]>

* add note on cpu vars

Signed-off-by: Yang Yu <[email protected]>

* add note on duplicates cluster size

Signed-off-by: Yang Yu <[email protected]>

* correct dir name typo

Signed-off-by: Yang Yu <[email protected]>

* correct dir name typo

Signed-off-by: Yang Yu <[email protected]>

---------

Signed-off-by: Yang Yu <[email protected]>
  • Loading branch information
yyu22 authored Oct 10, 2024
1 parent 0bbdc06 commit f3c5770
Show file tree
Hide file tree
Showing 7 changed files with 5,146 additions and 0 deletions.
2 changes: 2 additions & 0 deletions tutorials/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ To get started, we recommend starting with the following tutorials to become fam
2. **[peft-curation](./peft-curation)**, which overviews operations suitable for curating small-scale datasets which are used for task-specific fine-tuning.
3. **[synthetic-data-hello-world](./synthetic-data-hello-world)**, which overviews basic synthetic data generation facilities for interfacing with external models such as [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct).
4. **[peft-curation-with-sdg](./peft-curation-with-sdg)**, which combines data processing opeartions and synthetic data generation using [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct) or [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) into a single pipeline. Additionally, this tutorial also demonstrates advanced functions such as reward score assignment via [Nemotron-4 340B Reward](https://build.nvidia.com/nvidia/nemotron-4-340b-reward), as well as semantic deduplication to remove semantically similar real or synthetic records.
5. **[pretraining-data-curation](./pretraining-data-curation/)**, which overviews data curation pipeline for creating LLM pretraining dataset at scale.


## List of Tutorials
Expand All @@ -15,6 +16,7 @@ To get started, we recommend starting with the following tutorials to become fam

| Tutorial | Description | Additional Resources |
| --- | --- | --- |
| [pretraining-data-curation](./pretraining-data-curation/) | Demonstrates accelerated pipeline for curating large-scale data for LLM pretraining in a distributed environment | |
| [dapt-curation](./dapt-curation) | Data curation sample for domain-adaptive pre-training (DAPT), focusing on [ChipNeMo](https://blogs.nvidia.com/blog/llm-semiconductors-chip-nemo/) data curation as an example | [Blog post](https://developer.nvidia.com/blog/streamlining-data-processing-for-domain-adaptive-pretraining-with-nvidia-nemo-curator/) |
| [distributed_data_classification](./distributed_data_classification) | Demonstrates data domain and data quality classification at scale in a distributed environment | |
| [nemotron_340B_synthetic_datagen](./nemotron_340B_synthetic_datagen) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [Nemotron-4 340B Instruct](https://build.nvidia.com/nvidia/nemotron-4-340b-instruct) for generating synthetic preference data | |
Expand Down
11 changes: 11 additions & 0 deletions tutorials/pretraining-data-curation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# RedPajama-Data-v2 Datasets Curation for LLM Pretraining

This tutorial demonstrates the usage of NeMo Curator to curate the RedPajama-Data-v2 dataset for LLM pretraining in a distributed environment.

## RedPajama-Data-v2
RedPajama-V2 (RPV2) is an open dataset for training large language models. The dataset includes over 100B text documents coming from 84 CommonCrawl snapshots and processed using the CCNet pipeline. In this tutorial, we will be perform data curation on two raw snapshots from RPV2 for demonstration purposes.

## Getting Started
This tutorial is designed to run in multi-node environment due to the pre-training dataset scale. To start the tutorial, run the slurm script `start-distributed-notebook.sh` in this directory which will start the Jupyter notebook that demonstrates the step by step walkthrough of the end to end curation pipeline. To access the Jupyter notebook running on the scheduler node from your local machine, you can establish an SSH tunnel by running the following command:

`ssh -L <local_port>:localhost:8888 <user>@<scheduler_address>`
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
input_field: raw_content
filters:
- name: nemo_curator.filters.heuristic_filter.NonAlphaNumericFilter
params:
max_non_alpha_numeric_to_text_ratio: 0.25
- name: nemo_curator.filters.heuristic_filter.SymbolsToWordsFilter
params:
max_symbol_to_word_ratio: 0.1
- name: nemo_curator.filters.heuristic_filter.NumbersFilter
params:
max_number_to_text_ratio: 0.15
- name: nemo_curator.filters.heuristic_filter.UrlsFilter
params:
max_url_to_text_ratio: 0.2
- name: nemo_curator.filters.heuristic_filter.WhiteSpaceFilter
params:
max_white_space_ratio: 0.25
- name: nemo_curator.filters.heuristic_filter.ParenthesesFilter
params:
max_parentheses_ratio: 0.1
- name: nemo_curator.filters.heuristic_filter.BoilerPlateStringFilter
params:
remove_if_at_top_or_bottom: True
max_boilerplate_string_ratio: 0.4
- name: nemo_curator.filters.heuristic_filter.RepeatedLinesFilter
params:
max_repeated_line_fraction: 0.7
- name: nemo_curator.filters.heuristic_filter.RepeatedParagraphsFilter
params:
max_repeated_paragraphs_ratio: 0.7
- name: nemo_curator.filters.heuristic_filter.WordCountFilter
params:
min_words: 50
max_words: 100000
75 changes: 75 additions & 0 deletions tutorials/pretraining-data-curation/container-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#! /bin/bash

# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Start the scheduler on the rank 0 node
if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then
echo "Starting scheduler"
if [[ $DEVICE == 'cpu' ]]; then
dask scheduler \
--scheduler-file $SCHEDULER_FILE \
--protocol $PROTOCOL \
--interface $INTERFACE >> $SCHEDULER_LOG 2>&1 &
fi
if [[ $DEVICE == 'gpu' ]]; then
DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True \
DASK_DISTRIBUTED__RMM__POOL_SIZE=$RMM_SCHEDULER_POOL_SIZE \
dask scheduler \
--scheduler-file $SCHEDULER_FILE \
--protocol $PROTOCOL \
--interface $INTERFACE >> $SCHEDULER_LOG 2>&1 &
fi
fi

# Wait for the scheduler to start
sleep 30

# Start the workers on each node
echo "Starting workers..."

export WORKER_LOG=$LOGDIR/worker_${SLURM_NODEID}-${SLURM_LOCALID}.log
if [[ $DEVICE == 'cpu' ]]; then
dask worker \
--scheduler-file $SCHEDULER_FILE \
--memory-limit $CPU_WORKER_MEMORY_LIMIT \
--nworkers $CPU_WORKER_PER_NODE \
--interface $INTERFACE >> $WORKER_LOG 2>&1 &
fi
if [[ $DEVICE == 'gpu' ]]; then
dask-cuda-worker \
--scheduler-file $SCHEDULER_FILE \
--rmm-pool-size $RMM_WORKER_POOL_SIZE \
--interface $INTERFACE \
--rmm-async >> $WORKER_LOG 2>&1 &
fi

# Wait for the workers to start
sleep 30

# Extract the the scheduler address and export it as an environment variable
export SCHEDULER_ADDRESS=$(jq -r '.address' "$SCHEDULER_FILE")
echo "SCHEDULER_ADDRESS=$SCHEDULER_ADDRESS"

if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then
echo "Starting notebook"
bash -c "jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --NotebookApp.token='' --NotebookApp.password='' --notebook-dir=${BASE_DIR}"
touch $DONE_MARKER
fi

# All nodes wait until done to keep the workers and scheduler active
while [ ! -f $DONE_MARKER ]
do
sleep 15
done
66 changes: 66 additions & 0 deletions tutorials/pretraining-data-curation/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import gzip
import json
import os

import cudf
import dask.bag as db


def convert_single_file(input_output_paths):
input_path, output_path = input_output_paths

with gzip.open(input_path, "rt", encoding="utf-8") as f_in:
with open(output_path, "w", encoding="utf-8") as f_out:
for line in f_in:
try:
# Parse each line as a separate JSON object
item = json.loads(line)
# Write the JSON object to the .jsonl file
json.dump(item, f_out)
f_out.write("\n")
except json.JSONDecodeError as e:
print(f"Error decoding JSON in file {input_path}: {e}")
continue


def convert_json_gz_to_jsonl(input_dir, output_dir, partition_size=2):
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# List all .json.gz files in the input directory
file_paths = []
for filename in os.listdir(input_dir):
if filename.endswith(".json.gz"):
input_path = os.path.join(input_dir, filename)
output_filename = (
os.path.splitext(os.path.splitext(filename)[0])[0] + ".jsonl"
)
output_path = os.path.join(output_dir, output_filename)
file_paths.append((input_path, output_path))

# Create a Dask bag from the file paths and apply the function in parallel
bag = db.from_sequence(file_paths, partition_size=partition_size)
bag.map(convert_single_file).compute()


def convert_str_id_to_int(df, id_column="id"):
"""
Converts the legacy id format "dataset_name-0000034"
type of ID into 2 int based ID's
"""
dx = df[id_column].str.rsplit("-", n=1, expand=True)
df["doc_id"] = dx[1].astype("int64").values
df["dataset_id"] = dx[0].hash_values()
return df


def get_dataframe_complement(original_df, filtered_df):
def partition_complement(part_original_df, partition_info=None):
if not partition_info:
return part_original_df
part_filtered_df = filtered_df.get_partition(partition_info["number"])
complement_mask = ~part_original_df.index.isin(part_filtered_df.index.persist())
complement_df = part_original_df[complement_mask]
return complement_df

return original_df.map_partitions(partition_complement)
Loading

0 comments on commit f3c5770

Please sign in to comment.