diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go index cb017f53..7cfce26d 100644 --- a/tests/kfto/kfto_mnist_training_test.go +++ b/tests/kfto/kfto_mnist_training_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "testing" + "time" kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/gomega" @@ -31,29 +32,29 @@ import ( ) func TestPyTorchJobMnistMultiNodeSingleCpu(t *testing.T) { - runKFTOPyTorchMnistJob(t, 0, 2, 1, "", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 1) } func TestPyTorchJobMnistMultiNodeMultiCpu(t *testing.T) { - runKFTOPyTorchMnistJob(t, 0, 2, 2, "", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 2) } func TestPyTorchJobMnistMultiNodeSingleGpuWithCuda(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 1) } func TestPyTorchJobMnistMultiNodeMultiGpuWithCuda(t *testing.T) { - runKFTOPyTorchMnistJob(t, 2, 1, 2, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 2) } func TestPyTorchJobMnistMultiNodeSingleGpuWithROCm(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") + runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 1) } func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) { - runKFTOPyTorchMnistJob(t, 2, 1, 2, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") + runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 2) } -func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, numProcPerNode int, gpuLabel string, image string, requirementsFile string) { +func runKFTOPyTorchMnistJob(t *testing.T, gpu Gpu, image string, requirementsFile string, workerReplicas, numProcPerNode int) { test := With(t) // Create a namespace @@ -62,7 +63,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, mnist := ReadFile(test, "resources/mnist.py") requirementsFileName := ReadFile(test, requirementsFile) - if totalNumGpus > 0 { + if workerReplicas*numProcPerNode > 0 && gpu.ResourceLabel != "cpu" { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1) } else { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1) @@ -77,24 +78,45 @@ func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) // Create training PyTorch job - tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, totalNumGpus, workerReplicas, numProcPerNode, outputPvc.Name, image) + tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpu, workerReplicas, numProcPerNode, outputPvc.Name, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble). Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue))) + // Verify GPU utilization + if IsOpenShift(test) && gpu == NVIDIA { + trainingPods := GetPods(test, namespace.Name, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()}) + test.Expect(trainingPods).To(HaveLen(workerReplicas + 1)) // +1 is a master node + + for _, trainingPod := range trainingPods { + // Check that GPUs for training pods were utilized recently + test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute). + Should( + And( + HaveLen(numProcPerNode), + ContainElement( + // Check that at least some GPU was utilized on more than 30% + HaveField("Value", BeNumerically(">", 30)), + ), + ), + ) + } + test.T().Log("All GPUs were successfully utilized") + } + // Make sure the PyTorch job succeeded test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue))) test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name) } -func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, totalNumGpus int, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { var useGPU = false var backend string - if totalNumGpus > 0 { + if gpu.ResourceLabel != "cpu" { useGPU = true backend = "nccl" } else { @@ -143,9 +165,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ "/bin/bash", "-c", - fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + echo "Downloading MNIST dataset..." && \ + python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \ + MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \ + echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ + echo -e "\n\n Starting training..." && \ + torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -156,12 +183,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Name: "tmp-volume", MountPath: "/tmp", }, - { - Name: "output-volume", - MountPath: "/mnt/output", - }, }, Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), corev1.ResourceMemory: resource.MustParse("6Gi"), @@ -186,14 +213,6 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, - { - Name: "output-volume", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: outputPvcName, - }, - }, - }, }, RestartPolicy: corev1.RestartPolicyOnFailure, }, @@ -231,9 +250,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ "/bin/bash", "-c", - fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + echo "Downloading MNIST dataset..." && \ + python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \ + MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \ + echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ + echo -e "\n\n Starting training..." && \ + torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -246,6 +270,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, }, Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), corev1.ResourceMemory: resource.MustParse("6Gi"), @@ -281,21 +309,10 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config if useGPU { // Update resource lists for GPU (NVIDIA/ROCm) usecase - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus)) - - tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Command = []string{ - "/bin/bash", "-c", - fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ - pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), - } - tuningJob.Spec.PyTorchReplicaSpecs[kftov1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Command = []string{ - "/bin/bash", "-c", - fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ - pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), - } + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpu.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ { @@ -321,13 +338,13 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config // Update tolerations tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{ { - Key: gpuLabel, + Key: gpu.ResourceLabel, Operator: corev1.TolerationOpExists, }, } tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{ { - Key: gpuLabel, + Key: gpu.ResourceLabel, Operator: corev1.TolerationOpExists, }, } diff --git a/tests/kfto/kfto_training_test.go b/tests/kfto/kfto_training_test.go index 1edc50de..f4d78e6f 100644 --- a/tests/kfto/kfto_training_test.go +++ b/tests/kfto/kfto_training_test.go @@ -98,7 +98,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor And( HaveLen(numGpus), ContainElement( - // Check that at lest some GPU was utilized on more than 50% + // Check that at least some GPU was utilized on more than 50% HaveField("Value", BeNumerically(">", 50)), ), ), diff --git a/tests/kfto/resources/mnist.py b/tests/kfto/resources/mnist.py index a8c8677c..324ffc4e 100644 --- a/tests/kfto/resources/mnist.py +++ b/tests/kfto/resources/mnist.py @@ -13,6 +13,7 @@ # limitations under the License. import torch +import logging import torch.nn as nn import torch.nn.functional as F import torch.distributed as dist @@ -20,10 +21,19 @@ from torch.utils.data import DataLoader, Dataset from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data.distributed import DistributedSampler -import torchvision import torchvision.transforms as transforms +from torchvision.datasets import MNIST import os +# Configure logger. +log_formatter = logging.Formatter( + "%(asctime)s %(levelname)-8s %(message)s", "%Y-%m-%dT%H:%M:%SZ" +) +logger = logging.getLogger(__file__) +console_handler = logging.StreamHandler() +console_handler.setFormatter(log_formatter) +logger.addHandler(console_handler) +logger.setLevel(logging.INFO) def ddp_setup(backend="nccl"): """Setup for Distributed Data Parallel with specified backend.""" @@ -33,12 +43,12 @@ def ddp_setup(backend="nccl"): num_devices = torch.cuda.device_count() device = int(os.environ.get("LOCAL_RANK", 0)) # Default to device 0 if device >= num_devices: - print(f"Warning: Invalid device ordinal {device}. Defaulting to device 0.") + logger.warning(f"Warning: Invalid device ordinal {device}. Defaulting to device 0.") device = 0 torch.cuda.set_device(device) else: # If no GPU is available, use Gloo backend (for CPU-only environments) - print("No GPU available, falling back to CPU.") + logger.info("No GPU available, falling back to CPU.") backend="gloo" dist.init_process_group(backend=backend) @@ -83,7 +93,7 @@ def __init__( self.backend = backend if os.path.exists(snapshot_path): - print("Loading snapshot") + logger.info("Loading snapshot") self._load_snapshot(snapshot_path) # Move model to the appropriate device (GPU/CPU) @@ -93,7 +103,7 @@ def __init__( else: self.device=torch.device('cpu') self.model = DDP(self.model.to(self.device)) - print(f"Using device: {self.device}") + logger.info(f"Using device: {self.device}") def _run_batch(self, source, targets): self.optimizer.zero_grad() @@ -105,9 +115,9 @@ def _run_batch(self, source, targets): def _run_epoch(self, epoch, backend): b_sz = len(next(iter(self.train_data))[0]) if torch.cuda.is_available() and backend=="nccl": - print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + logger.info(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") else: - print(f"[CPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + logger.info(f"[CPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") if isinstance(self.train_data.sampler, DistributedSampler): self.train_data.sampler.set_epoch(epoch) for source, targets in self.train_data: @@ -121,7 +131,7 @@ def _save_snapshot(self, epoch): "EPOCHS_RUN": epoch, } torch.save(snapshot, self.snapshot_path) - print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}") + logger.info(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}") def train(self, max_epochs: int, backend: str): for epoch in range(self.epochs_run, max_epochs): @@ -130,11 +140,11 @@ def train(self, max_epochs: int, backend: str): self._save_snapshot(epoch) -def load_train_objs(lr: float): +def load_train_objs(dataset_path: str,lr: float): """Load dataset, model, and optimizer.""" - train_set = torchvision.datasets.MNIST("../data", + train_set = MNIST(dataset_path, train=False, - download=True, + download=False, transform=transforms.Compose([transforms.ToTensor()])) model = Net() optimizer = torch.optim.Adam(model.parameters(), lr=lr) @@ -152,15 +162,14 @@ def prepare_dataloader(dataset: Dataset, batch_size: int, useGpu: bool): ) -def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path: str, backend: str): +def main(epochs: int, save_every: int, batch_size: int, lr: float, dataset_path: str, snapshot_path: str, backend: str): ddp_setup(backend) - dataset, model, optimizer = load_train_objs(lr) + dataset, model, optimizer = load_train_objs(dataset_path, lr) train_loader = prepare_dataloader(dataset, batch_size, torch.cuda.is_available() and backend=="nccl") trainer = Trainer(model, train_loader, optimizer, save_every, snapshot_path, backend) trainer.train(epochs, backend) dist.destroy_process_group() - if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Distributed MNIST Training") @@ -168,6 +177,7 @@ def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path parser.add_argument('--save_every', type=int, required=True, help='How often to save a snapshot') parser.add_argument('--batch_size', type=int, default=64, help='Input batch size on each device (default: 64)') parser.add_argument('--lr', type=float, default=1e-3, help='Learning rate (default: 1e-3)') + parser.add_argument('--dataset_path', type=str, default="../data", help='Path to MNIST datasets (default: ../data)') parser.add_argument('--snapshot_path', type=str, default="snapshot_mnist.pt", help='Path to save snapshots (default: snapshot_mnist.pt)') parser.add_argument('--backend', type=str, choices=['gloo', 'nccl'], default='nccl', help='Distributed backend type (default: nccl)') args = parser.parse_args() @@ -177,6 +187,7 @@ def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path save_every=args.save_every, batch_size=args.batch_size, lr=args.lr, + dataset_path=args.dataset_path, snapshot_path=args.snapshot_path, backend=args.backend ) diff --git a/tests/kfto/support.go b/tests/kfto/support.go index 982419ad..52ad4af7 100644 --- a/tests/kfto/support.go +++ b/tests/kfto/support.go @@ -36,6 +36,7 @@ type Gpu struct { var ( NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"} AMD = Gpu{ResourceLabel: "amd.com/gpu"} + CPU = Gpu{ResourceLabel: "cpu"} ) //go:embed resources/*