Skip to content

Commit

Permalink
Update MNIST training script to avoid downloading datasets on each no…
Browse files Browse the repository at this point in the history
…de concurrently by using pre-downloaded dataset
  • Loading branch information
abhijeet-dhumal authored and openshift-merge-bot[bot] committed Jan 20, 2025
1 parent c5330cf commit 3c2ca7d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 59 deletions.
105 changes: 61 additions & 44 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"testing"
"time"

kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
. "github.com/onsi/gomega"
Expand All @@ -31,29 +32,29 @@ import (
)

func TestPyTorchJobMnistMultiNodeSingleCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, 1, "", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 1)
}
func TestPyTorchJobMnistMultiNodeMultiCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 2)
}

func TestPyTorchJobMnistMultiNodeSingleGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 1)
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, 2, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 2)
}

func TestPyTorchJobMnistMultiNodeSingleGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 1)
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, 2, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 2)
}

func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, numProcPerNode int, gpuLabel string, image string, requirementsFile string) {
func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFile string, workerReplicas, numProcPerNode int) {
test := With(t)

// Create a namespace
Expand All @@ -62,7 +63,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int,
mnist := ReadFile(test, "resources/mnist.py")
requirementsFileName := ReadFile(test, requirementsFile)

if totalNumGpus > 0 {
if workerReplicas*numProcPerNode > 0 && gpu.ResourceLabel != "cpu" {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
} else {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
Expand All @@ -77,24 +78,45 @@ func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int,
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, totalNumGpus, workerReplicas, numProcPerNode, outputPvc.Name, image)
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpu, workerReplicas, numProcPerNode, outputPvc.Name, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))

// Verify GPU utilization
if IsOpenShift(test) && gpu == NVIDIA {
trainingPods := GetPods(test, namespace.Name, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()})
test.Expect(trainingPods).To(HaveLen(workerReplicas + 1)) // +1 is a master node

for _, trainingPod := range trainingPods {
// Check that GPUs for training pods were utilized recently
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute).
Should(
And(
HaveLen(numProcPerNode),
ContainElement(
// Check that at least some GPU was utilized on more than 30%
HaveField("Value", BeNumerically(">", 30)),
),
),
)
}
test.T().Log("All GPUs were successfully utilized")
}

// Make sure the PyTorch job succeeded
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)

}

func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, totalNumGpus int, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
var useGPU = false
var backend string

if totalNumGpus > 0 {
if gpu.ResourceLabel != "cpu" {
useGPU = true
backend = "nccl"
} else {
Expand Down Expand Up @@ -143,9 +165,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -156,12 +183,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Name: "tmp-volume",
MountPath: "/tmp",
},
{
Name: "output-volume",
MountPath: "/mnt/output",
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
Expand All @@ -186,14 +213,6 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: outputPvcName,
},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
Expand Down Expand Up @@ -231,9 +250,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -246,6 +270,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
Expand Down Expand Up @@ -281,21 +309,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config

if useGPU {
// Update resource lists for GPU (NVIDIA/ROCm) usecase
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus))

tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Command = []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
}
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Command = []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
}
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))

tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Expand All @@ -321,13 +338,13 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
// Update tolerations
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpuLabel,
Key: gpu.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpuLabel,
Key: gpu.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
Expand Down
2 changes: 1 addition & 1 deletion tests/kfto/kfto_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor
And(
HaveLen(numGpus),
ContainElement(
// Check that at lest some GPU was utilized on more than 50%
// Check that at least some GPU was utilized on more than 50%
HaveField("Value", BeNumerically(">", 50)),
),
),
Expand Down
39 changes: 25 additions & 14 deletions tests/kfto/resources/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,27 @@
# limitations under the License.

import torch
import logging
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist

from torch.utils.data import DataLoader, Dataset
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
import os

# Configure logger.
log_formatter = logging.Formatter(
"%(asctime)s %(levelname)-8s %(message)s", "%Y-%m-%dT%H:%M:%SZ"
)
logger = logging.getLogger(__file__)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)

def ddp_setup(backend="nccl"):
"""Setup for Distributed Data Parallel with specified backend."""
Expand All @@ -33,12 +43,12 @@ def ddp_setup(backend="nccl"):
num_devices = torch.cuda.device_count()
device = int(os.environ.get("LOCAL_RANK", 0)) # Default to device 0
if device >= num_devices:
print(f"Warning: Invalid device ordinal {device}. Defaulting to device 0.")
logger.warning(f"Warning: Invalid device ordinal {device}. Defaulting to device 0.")
device = 0
torch.cuda.set_device(device)
else:
# If no GPU is available, use Gloo backend (for CPU-only environments)
print("No GPU available, falling back to CPU.")
logger.info("No GPU available, falling back to CPU.")
backend="gloo"
dist.init_process_group(backend=backend)

Expand Down Expand Up @@ -83,7 +93,7 @@ def __init__(
self.backend = backend

if os.path.exists(snapshot_path):
print("Loading snapshot")
logger.info("Loading snapshot")
self._load_snapshot(snapshot_path)

# Move model to the appropriate device (GPU/CPU)
Expand All @@ -93,7 +103,7 @@ def __init__(
else:
self.device=torch.device('cpu')
self.model = DDP(self.model.to(self.device))
print(f"Using device: {self.device}")
logger.info(f"Using device: {self.device}")

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
Expand All @@ -105,9 +115,9 @@ def _run_batch(self, source, targets):
def _run_epoch(self, epoch, backend):
b_sz = len(next(iter(self.train_data))[0])
if torch.cuda.is_available() and backend=="nccl":
print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
logger.info(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
else:
print(f"[CPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
logger.info(f"[CPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
if isinstance(self.train_data.sampler, DistributedSampler):
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
Expand All @@ -121,7 +131,7 @@ def _save_snapshot(self, epoch):
"EPOCHS_RUN": epoch,
}
torch.save(snapshot, self.snapshot_path)
print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")
logger.info(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

def train(self, max_epochs: int, backend: str):
for epoch in range(self.epochs_run, max_epochs):
Expand All @@ -130,11 +140,11 @@ def train(self, max_epochs: int, backend: str):
self._save_snapshot(epoch)


def load_train_objs(lr: float):
def load_train_objs(dataset_path: str,lr: float):
"""Load dataset, model, and optimizer."""
train_set = torchvision.datasets.MNIST("../data",
train_set = MNIST(dataset_path,
train=False,
download=True,
download=False,
transform=transforms.Compose([transforms.ToTensor()]))
model = Net()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
Expand All @@ -152,22 +162,22 @@ def prepare_dataloader(dataset: Dataset, batch_size: int, useGpu: bool):
)


def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path: str, backend: str):
def main(epochs: int, save_every: int, batch_size: int, lr: float, dataset_path: str, snapshot_path: str, backend: str):
ddp_setup(backend)
dataset, model, optimizer = load_train_objs(lr)
dataset, model, optimizer = load_train_objs(dataset_path, lr)
train_loader = prepare_dataloader(dataset, batch_size, torch.cuda.is_available() and backend=="nccl")
trainer = Trainer(model, train_loader, optimizer, save_every, snapshot_path, backend)
trainer.train(epochs, backend)
dist.destroy_process_group()


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Distributed MNIST Training")
parser.add_argument('--epochs', type=int, required=True, help='Total epochs to train the model')
parser.add_argument('--save_every', type=int, required=True, help='How often to save a snapshot')
parser.add_argument('--batch_size', type=int, default=64, help='Input batch size on each device (default: 64)')
parser.add_argument('--lr', type=float, default=1e-3, help='Learning rate (default: 1e-3)')
parser.add_argument('--dataset_path', type=str, default="../data", help='Path to MNIST datasets (default: ../data)')
parser.add_argument('--snapshot_path', type=str, default="snapshot_mnist.pt", help='Path to save snapshots (default: snapshot_mnist.pt)')
parser.add_argument('--backend', type=str, choices=['gloo', 'nccl'], default='nccl', help='Distributed backend type (default: nccl)')
args = parser.parse_args()
Expand All @@ -177,6 +187,7 @@ def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path
save_every=args.save_every,
batch_size=args.batch_size,
lr=args.lr,
dataset_path=args.dataset_path,
snapshot_path=args.snapshot_path,
backend=args.backend
)
1 change: 1 addition & 0 deletions tests/kfto/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Gpu struct {
var (
NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"}
AMD = Gpu{ResourceLabel: "amd.com/gpu"}
CPU = Gpu{ResourceLabel: "cpu"}
)

//go:embed resources/*
Expand Down

0 comments on commit 3c2ca7d

Please sign in to comment.