Skip to content

Commit

Permalink
Merge pull request #7 from converged-computing/add-other-abstractions
Browse files Browse the repository at this point in the history
feat: deployment support and ungate as jobs
  • Loading branch information
vsoch authored Jan 19, 2025
2 parents 5fa1331 + 35d49a9 commit 51fa2da
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 71 deletions.
51 changes: 46 additions & 5 deletions .github/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ while true
do
pod_status=$(kubectl get pods ${controller_pod} -n ${namespace} --no-headers -o custom-columns=":status.phase")
if [[ "${pod_status}" == "Running" ]]; then
echo "Controller ${controller_pod} is ready"
break
echo "Controller ${controller_pod} is running"

# They also need to be ready
ready=true
kubectl get pods -n ${namespace} ${controller_pod} | grep "2/2" || ready=false
if [[ "${ready}" == "true" ]]; then
echo "Controller ${controller_pod} containers are ready"
break
fi
fi
sleep 20
done
Expand Down Expand Up @@ -96,10 +103,13 @@ echo " Pods Running: ${pods_running}"
check_output 'check-pod-deleted' "${pods_running}" "0"

# Do the same for a job
echo_run kubectl apply -f ./examples/job.yaml
echo_run kubectl apply -f ./examples/job-1.yaml
sleep 3
echo_run kubectl get pods

echo_run kubectl logs -n ${namespace} ${controller_pod} -c manager
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pods-running' "${pods_running}" "1"

# Check both job pods
for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name)
Expand All @@ -116,7 +126,38 @@ done


# Now delete the job - we are done!
echo_run kubectl delete -f ./examples/job.yaml
echo_run kubectl delete -f ./examples/job-1.yaml
sleep 2
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pod-deleted' "${pods_running}" "0"



# Deployment
echo_run kubectl apply -f ./examples/deployment-1.yaml
sleep 5
echo_run kubectl get pods
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pods-running' "${pods_running}" "1"
echo_run kubectl logs -n ${namespace} ${controller_pod} -c manager

# Check both job pods
for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name)
do
echo "Checking deployment pod ${pod}"
scheduled_by=$(kubectl get pod ${pod} -o json | jq -r .spec.schedulerName)
pod_status=$(kubectl get pods ${pod} --no-headers -o custom-columns=":status.phase")
echo
echo " Pod Status: ${pod_status}"
echo " Scheduled by: ${scheduled_by}"
check_output 'check-pod-scheduled-by' "${scheduled_by}" "FluxionScheduler"
check_output 'check-pod-status' "${pod_status}" "Running"
done

# Now delete the job - we are done!
echo_run kubectl delete -f ./examples/deployment-1.yaml
sleep 2
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ And we use `ghcr.io/converged-computing/fluxion` for the fluxion service.
### Choices

- **Duration of job comes from Kubernetes** Right now, we don't allow a special or different duration to be given to Fluxion. Any duration or deletion needs to come from Kubernetes first, by way of an object deletion. Otherwise we would need to orchestrate deletion from the cluster and Fluxion, and it's easier to ask the user to delete with a job duration or other mechanism.
- **ungate** is done as a retryable task, the reason being that API operations to Kubernetes are not always reliable.

## Deploy

Expand Down Expand Up @@ -233,7 +234,7 @@ SELECT * from reservations;

### TODO

- [ ] need to cleanup - handle FluxJob object so doesn't keep reconciling. Likely we want to delete at some point.
- [ ] Pod creation needs better orchestration
- [ ] In the case of jobs that are changing (e.g., pods deleting, but we don't want to kill entire job) what should we do?
- we need to use shrink here. And a shrink down to size 0 I assume is a cancel.
- [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?)
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/fluxjob_enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"context"
"fmt"

"github.com/converged-computing/fluxqueue/pkg/defaults"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -64,6 +65,9 @@ func (a *jobReceiver) EnqueueJob(ctx context.Context, job *batchv1.Job) error {
if job.Spec.Template.ObjectMeta.Labels == nil {
job.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
if job.ObjectMeta.Labels == nil {
job.ObjectMeta.Labels = map[string]string{}
}

// Cut out early if we are getting hit again
_, ok := job.ObjectMeta.Labels[defaults.SeenLabel]
Expand Down Expand Up @@ -102,6 +106,9 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1.
if deployment.Spec.Template.ObjectMeta.Labels == nil {
deployment.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
if deployment.ObjectMeta.Labels == nil {
deployment.ObjectMeta.Labels = map[string]string{}
}

// Cut out early if we are getting hit again
_, ok := deployment.ObjectMeta.Labels[defaults.SeenLabel]
Expand All @@ -119,6 +126,10 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1.
fluxqGate := corev1.PodSchedulingGate{Name: defaults.SchedulingGateName}
deployment.Spec.Template.Spec.SchedulingGates = append(deployment.Spec.Template.Spec.SchedulingGates, fluxqGate)

// We will use this later as a selector to get pods associated with the deployment
selector := fmt.Sprintf("deployment-%s-%s", deployment.Name, deployment.Namespace)
deployment.Spec.Template.ObjectMeta.Labels[defaults.SelectorLabel] = selector

logger.Info("received deployment and gated pods", "Name", deployment.Name)
return SubmitFluxJob(
ctx,
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha1/fluxjob_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func (a *jobReceiver) Handle(ctx context.Context, req admission.Request) admissi
return admission.Errored(http.StatusBadRequest, err)
}

marshalledDeployment, tryNext, err := a.HandleDeployment(ctx, req)
if err == nil {
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledDeployment)
}
if !tryNext {
return admission.Errored(http.StatusBadRequest, err)
}

marshalledPod, err := a.HandlePod(ctx, req)
if err == nil {
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod)
Expand Down
33 changes: 0 additions & 33 deletions api/v1alpha1/fluxjob_webhook_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions dist/fluxqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ webhooks:
- ""
- core
- batch
- apps
apiVersions:
- v1
operations:
Expand All @@ -602,4 +603,5 @@ webhooks:
resources:
- pods
- jobs
- deployments
sideEffects: None
17 changes: 17 additions & 0 deletions examples/deployment-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: deployment
spec:
replicas: 1
selector:
matchLabels:
app: deployment
template:
metadata:
labels:
app: deployment
spec:
containers:
- name: container
image: registry.k8s.io/pause:2.0
17 changes: 17 additions & 0 deletions examples/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: deployment
spec:
replicas: 2
selector:
matchLabels:
app: deployment
template:
metadata:
labels:
app: deployment
spec:
containers:
- name: container
image: registry.k8s.io/pause:2.0
17 changes: 17 additions & 0 deletions examples/job-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job
spec:
completions: 1
parallelism: 1
completionMode: Indexed
template:
metadata:
labels:
app: job
spec:
restartPolicy: Never
containers:
- name: job
image: registry.k8s.io/pause:2.0
6 changes: 4 additions & 2 deletions internal/controller/fluxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ func (r *FluxJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
rlog.Info("Found FluxJob", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus)
result := ctrl.Result{}

// If the job is already submit, continue
// If the job is already submit, delete it
if spec.Status.SubmitStatus == api.SubmitStatusSubmit {
return result, nil
rlog.Info("Deleting FluxJob that is submit", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus)
err = r.Client.Delete(ctx, &spec)
return result, err
}

// Submit the job to the queue - TODO if error, should delete?
Expand Down
1 change: 1 addition & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ var (
FluxJobIdLabel = "fluxqueue/jobid"
NodesLabel = "fluxqueue/fluxion-nodes"
SeenLabel = "fluxqueue.seen"
SelectorLabel = "fluxqueue.selector"
UnschedulableLabel = "fluxqueue/unschedulable"
)
7 changes: 7 additions & 0 deletions pkg/fluxqueue/fluxqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
const (
// IMPORTANT: must be one because fluxion is run single threaded
queueMaxWorkers = 1
taskMaxWorkers = 1
mutexLocked = 1
)

Expand Down Expand Up @@ -105,7 +106,13 @@ func NewQueue(ctx context.Context, cfg rest.Config) (*Queue, error) {
river.QueueDefault: {MaxWorkers: queueMaxWorkers},

// Cleanup queue is typically for cancel
// Note that if this needs to be single threaded,
// it should be done in the default queue
"cleanup_queue": {MaxWorkers: queueMaxWorkers},

// This is for Kubernetes tasks (ungate, etc)
// that don't need to be single threaded
"task_queue": {MaxWorkers: taskMaxWorkers},
},
Workers: workers,
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/fluxqueue/strategy/easy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type ReservationModel struct {
// job worker: a queue to submit jobs to fluxion
// cleanup worker: a queue to cleanup
func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error {

// These workers are in the default (fluxion) queue with one worker
jobWorker, err := work.NewJobWorker(cfg)
if err != nil {
return err
Expand All @@ -59,9 +61,16 @@ func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error {
if err != nil {
return err
}

// These workers can be run concurrently (>1 worker)
ungateWorker, err := work.NewUngateWorker(cfg)
if err != nil {
return err
}
river.AddWorker(workers, jobWorker)
river.AddWorker(workers, cleanupWorker)
river.AddWorker(workers, reservationWorker)
river.AddWorker(workers, ungateWorker)
return nil
}

Expand Down
Loading

0 comments on commit 51fa2da

Please sign in to comment.