Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
dchourasia committed Jan 21, 2025
2 parents face046 + 92575c8 commit de50808
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 253 deletions.
131 changes: 84 additions & 47 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"testing"
"time"

kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
. "github.com/onsi/gomega"
Expand All @@ -30,27 +31,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPyTorchJobMnistMultiNodeCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
func TestPyTorchJobMnistMultiNodeSingleCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 1)
}

func TestPyTorchJobMnistMultiNodeWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
func TestPyTorchJobMnistMultiNodeMultiCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, CPU, GetCudaTrainingImage(), "resources/requirements.txt", 2, 2)
}

func TestPyTorchJobMnistMultiNodeWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
func TestPyTorchJobMnistMultiNodeSingleGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 1)
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, NVIDIA, GetCudaTrainingImage(), "resources/requirements.txt", 1, 2)
}

func TestPyTorchJobMnistMultiNodeSingleGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 1)
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
runKFTOPyTorchMnistJob(t, AMD, GetROCmTrainingImage(), "resources/requirements-rocm.txt", 1, 2)
}

func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) {
func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string, requirementsFile string, workerReplicas, numProcPerNode int) {
test := With(t)

// Create a namespace
Expand All @@ -59,7 +63,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa
mnist := ReadFile(test, "resources/mnist.py")
requirementsFileName := ReadFile(test, requirementsFile)

if numGpus > 0 {
if accelerator.isGpu() {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
} else {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
Expand All @@ -70,29 +74,44 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa
"requirements.txt": requirementsFileName,
})

outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteOnce)
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image)
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, accelerator, workerReplicas, numProcPerNode, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))

// Verify GPU utilization
if IsOpenShift(test) && accelerator == NVIDIA {
trainingPods := GetPods(test, namespace.Name, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()})
test.Expect(trainingPods).To(HaveLen(workerReplicas + 1)) // +1 is a master node

for _, trainingPod := range trainingPods {
// Check that GPUs for training pods were utilized recently
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, accelerator), 15*time.Minute).
Should(
And(
HaveLen(numProcPerNode),
ContainElement(
// Check that at least some GPU was utilized on more than 20%
HaveField("Value", BeNumerically(">", 20)),
),
),
)
}
test.T().Log("All GPUs were successfully utilized")
}

// Make sure the PyTorch job succeeded
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)

}

func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
var useGPU = false
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, accelerator Accelerator, workerReplicas int, numProcPerNode int, baseImage string) *kftov1.PyTorchJob {
var backend string

if numGpus > 0 {
useGPU = true
if accelerator.isGpu() {
backend = "nccl"
} else {
backend = "gloo"
Expand All @@ -108,13 +127,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
Spec: kftov1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{
"Master": {
kftov1.PyTorchJobReplicaTypeMaster: {
Replicas: Ptr(int32(1)),
RestartPolicy: kftov1.RestartPolicyOnFailure,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-mnist",
"app": "kfto-mnist",
"role": "master",
},
},
Spec: corev1.PodSpec{
Expand All @@ -139,9 +159,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 3 --save-model --output-path /mnt/output --backend %s`, backend),
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -152,14 +177,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Name: "tmp-volume",
MountPath: "/tmp",
},
{
Name: "output-volume",
MountPath: "/mnt/output",
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
},
Expand All @@ -182,26 +207,19 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "output-volume",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: outputPvcName,
},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
},
},
"Worker": {
kftov1.PyTorchJobReplicaTypeWorker: {
Replicas: Ptr(int32(workerReplicas)),
RestartPolicy: kftov1.RestartPolicyOnFailure,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-mnist",
"app": "kfto-mnist",
"role": "worker",
},
},
Spec: corev1.PodSpec{
Expand All @@ -226,9 +244,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 3 --save-model --backend %s`, backend),
echo "Downloading MNIST dataset..." && \
python3 -c "from torchvision.datasets import MNIST; from torchvision.transforms import Compose, ToTensor; \
MNIST('/tmp/datasets/mnist', train=False, download=True, transform=Compose([ToTensor()]))" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
echo -e "\n\n Starting training..." && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --dataset_path "/tmp/datasets/mnist" --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -241,8 +264,12 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
},
Expand Down Expand Up @@ -274,34 +301,44 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
}

if useGPU {
if accelerator.isGpu() {
// Update resource lists for GPU (NVIDIA/ROCm) usecase
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))

tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
{
Name: "TORCH_DISTRIBUTED_DEBUG",
Value: "DETAIL",
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
{
Name: "TORCH_DISTRIBUTED_DEBUG",
Value: "DETAIL",
},
}

// Update tolerations
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpuLabel,
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: gpuLabel,
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
Expand Down
6 changes: 3 additions & 3 deletions tests/kfto/kfto_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1)
}

func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) {
func runKFTOPyTorchJob(t *testing.T, image string, gpu Accelerator, numGpus, numberOfWorkerNodes int) {
test := With(t)

// Create a namespace
Expand Down Expand Up @@ -98,7 +98,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor
And(
HaveLen(numGpus),
ContainElement(
// Check that at lest some GPU was utilized on more than 50%
// Check that at least some GPU was utilized on more than 50%
HaveField("Value", BeNumerically(">", 50)),
),
),
Expand All @@ -112,7 +112,7 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWor
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
}

func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Accelerator, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
tuningJob := &kftov1.PyTorchJob{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Expand Down
Loading

0 comments on commit de50808

Please sign in to comment.