Skip to content

Commit

Permalink
feat: deployment support and ungate as jobs
Browse files Browse the repository at this point in the history
Ungating a pod can be erroneous if the pod is not listed
with the API listing (common for deployments) or there
is some other ephemeral api error. To fix this, we can
use a task that will retry if it does not work the first
time.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
vsoch committed Jan 19, 2025
1 parent 5fa1331 commit 35d49a9
Showing 15 changed files with 299 additions and 71 deletions.
51 changes: 46 additions & 5 deletions .github/test.sh
Original file line number Diff line number Diff line change
@@ -25,8 +25,15 @@ while true
do
pod_status=$(kubectl get pods ${controller_pod} -n ${namespace} --no-headers -o custom-columns=":status.phase")
if [[ "${pod_status}" == "Running" ]]; then
echo "Controller ${controller_pod} is ready"
break
echo "Controller ${controller_pod} is running"

# They also need to be ready
ready=true
kubectl get pods -n ${namespace} ${controller_pod} | grep "2/2" || ready=false
if [[ "${ready}" == "true" ]]; then
echo "Controller ${controller_pod} containers are ready"
break
fi
fi
sleep 20
done
@@ -96,10 +103,13 @@ echo " Pods Running: ${pods_running}"
check_output 'check-pod-deleted' "${pods_running}" "0"

# Do the same for a job
echo_run kubectl apply -f ./examples/job.yaml
echo_run kubectl apply -f ./examples/job-1.yaml
sleep 3
echo_run kubectl get pods

echo_run kubectl logs -n ${namespace} ${controller_pod} -c manager
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pods-running' "${pods_running}" "1"

# Check both job pods
for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name)
@@ -116,7 +126,38 @@ done


# Now delete the job - we are done!
echo_run kubectl delete -f ./examples/job.yaml
echo_run kubectl delete -f ./examples/job-1.yaml
sleep 2
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pod-deleted' "${pods_running}" "0"



# Deployment
echo_run kubectl apply -f ./examples/deployment-1.yaml
sleep 5
echo_run kubectl get pods
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
check_output 'check-pods-running' "${pods_running}" "1"
echo_run kubectl logs -n ${namespace} ${controller_pod} -c manager

# Check both job pods
for pod in $(kubectl get pods -o json | jq -r .items[].metadata.name)
do
echo "Checking deployment pod ${pod}"
scheduled_by=$(kubectl get pod ${pod} -o json | jq -r .spec.schedulerName)
pod_status=$(kubectl get pods ${pod} --no-headers -o custom-columns=":status.phase")
echo
echo " Pod Status: ${pod_status}"
echo " Scheduled by: ${scheduled_by}"
check_output 'check-pod-scheduled-by' "${scheduled_by}" "FluxionScheduler"
check_output 'check-pod-status' "${pod_status}" "Running"
done

# Now delete the job - we are done!
echo_run kubectl delete -f ./examples/deployment-1.yaml
sleep 2
pods_running=$(kubectl get pods -o json | jq -r '.items | length')
echo " Pods Running: ${pods_running}"
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ And we use `ghcr.io/converged-computing/fluxion` for the fluxion service.
### Choices

- **Duration of job comes from Kubernetes** Right now, we don't allow a special or different duration to be given to Fluxion. Any duration or deletion needs to come from Kubernetes first, by way of an object deletion. Otherwise we would need to orchestrate deletion from the cluster and Fluxion, and it's easier to ask the user to delete with a job duration or other mechanism.
- **ungate** is done as a retryable task, the reason being that API operations to Kubernetes are not always reliable.

## Deploy

@@ -233,7 +234,7 @@ SELECT * from reservations;

### TODO

- [ ] need to cleanup - handle FluxJob object so doesn't keep reconciling. Likely we want to delete at some point.
- [ ] Pod creation needs better orchestration
- [ ] In the case of jobs that are changing (e.g., pods deleting, but we don't want to kill entire job) what should we do?
- we need to use shrink here. And a shrink down to size 0 I assume is a cancel.
- [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?)
11 changes: 11 additions & 0 deletions api/v1alpha1/fluxjob_enqueue.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package v1alpha1

import (
"context"
"fmt"

"github.com/converged-computing/fluxqueue/pkg/defaults"
appsv1 "k8s.io/api/apps/v1"
@@ -64,6 +65,9 @@ func (a *jobReceiver) EnqueueJob(ctx context.Context, job *batchv1.Job) error {
if job.Spec.Template.ObjectMeta.Labels == nil {
job.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
if job.ObjectMeta.Labels == nil {
job.ObjectMeta.Labels = map[string]string{}
}

// Cut out early if we are getting hit again
_, ok := job.ObjectMeta.Labels[defaults.SeenLabel]
@@ -102,6 +106,9 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1.
if deployment.Spec.Template.ObjectMeta.Labels == nil {
deployment.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
if deployment.ObjectMeta.Labels == nil {
deployment.ObjectMeta.Labels = map[string]string{}
}

// Cut out early if we are getting hit again
_, ok := deployment.ObjectMeta.Labels[defaults.SeenLabel]
@@ -119,6 +126,10 @@ func (a *jobReceiver) EnqueueDeployment(ctx context.Context, deployment *appsv1.
fluxqGate := corev1.PodSchedulingGate{Name: defaults.SchedulingGateName}
deployment.Spec.Template.Spec.SchedulingGates = append(deployment.Spec.Template.Spec.SchedulingGates, fluxqGate)

// We will use this later as a selector to get pods associated with the deployment
selector := fmt.Sprintf("deployment-%s-%s", deployment.Name, deployment.Namespace)
deployment.Spec.Template.ObjectMeta.Labels[defaults.SelectorLabel] = selector

logger.Info("received deployment and gated pods", "Name", deployment.Name)
return SubmitFluxJob(
ctx,
8 changes: 8 additions & 0 deletions api/v1alpha1/fluxjob_webhook.go
Original file line number Diff line number Diff line change
@@ -54,6 +54,14 @@ func (a *jobReceiver) Handle(ctx context.Context, req admission.Request) admissi
return admission.Errored(http.StatusBadRequest, err)
}

marshalledDeployment, tryNext, err := a.HandleDeployment(ctx, req)
if err == nil {
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledDeployment)
}
if !tryNext {
return admission.Errored(http.StatusBadRequest, err)
}

marshalledPod, err := a.HandlePod(ctx, req)
if err == nil {
return admission.PatchResponseFromRaw(req.Object.Raw, marshalledPod)
33 changes: 0 additions & 33 deletions api/v1alpha1/fluxjob_webhook_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions dist/fluxqueue.yaml
Original file line number Diff line number Diff line change
@@ -594,6 +594,7 @@ webhooks:
- ""
- core
- batch
- apps
apiVersions:
- v1
operations:
@@ -602,4 +603,5 @@ webhooks:
resources:
- pods
- jobs
- deployments
sideEffects: None
17 changes: 17 additions & 0 deletions examples/deployment-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: deployment
spec:
replicas: 1
selector:
matchLabels:
app: deployment
template:
metadata:
labels:
app: deployment
spec:
containers:
- name: container
image: registry.k8s.io/pause:2.0
17 changes: 17 additions & 0 deletions examples/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: deployment
spec:
replicas: 2
selector:
matchLabels:
app: deployment
template:
metadata:
labels:
app: deployment
spec:
containers:
- name: container
image: registry.k8s.io/pause:2.0
17 changes: 17 additions & 0 deletions examples/job-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job
spec:
completions: 1
parallelism: 1
completionMode: Indexed
template:
metadata:
labels:
app: job
spec:
restartPolicy: Never
containers:
- name: job
image: registry.k8s.io/pause:2.0
6 changes: 4 additions & 2 deletions internal/controller/fluxjob_controller.go
Original file line number Diff line number Diff line change
@@ -112,9 +112,11 @@ func (r *FluxJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
rlog.Info("Found FluxJob", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus)
result := ctrl.Result{}

// If the job is already submit, continue
// If the job is already submit, delete it
if spec.Status.SubmitStatus == api.SubmitStatusSubmit {
return result, nil
rlog.Info("Deleting FluxJob that is submit", "Name", spec.Name, "Namespace", spec.Namespace, "Status", spec.Status.SubmitStatus)
err = r.Client.Delete(ctx, &spec)
return result, err
}

// Submit the job to the queue - TODO if error, should delete?
1 change: 1 addition & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -8,5 +8,6 @@ var (
FluxJobIdLabel = "fluxqueue/jobid"
NodesLabel = "fluxqueue/fluxion-nodes"
SeenLabel = "fluxqueue.seen"
SelectorLabel = "fluxqueue.selector"
UnschedulableLabel = "fluxqueue/unschedulable"
)
7 changes: 7 additions & 0 deletions pkg/fluxqueue/fluxqueue.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (
const (
// IMPORTANT: must be one because fluxion is run single threaded
queueMaxWorkers = 1
taskMaxWorkers = 1
mutexLocked = 1
)

@@ -105,7 +106,13 @@ func NewQueue(ctx context.Context, cfg rest.Config) (*Queue, error) {
river.QueueDefault: {MaxWorkers: queueMaxWorkers},

// Cleanup queue is typically for cancel
// Note that if this needs to be single threaded,
// it should be done in the default queue
"cleanup_queue": {MaxWorkers: queueMaxWorkers},

// This is for Kubernetes tasks (ungate, etc)
// that don't need to be single threaded
"task_queue": {MaxWorkers: taskMaxWorkers},
},
Workers: workers,
})
9 changes: 9 additions & 0 deletions pkg/fluxqueue/strategy/easy.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@ type ReservationModel struct {
// job worker: a queue to submit jobs to fluxion
// cleanup worker: a queue to cleanup
func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error {

// These workers are in the default (fluxion) queue with one worker
jobWorker, err := work.NewJobWorker(cfg)
if err != nil {
return err
@@ -59,9 +61,16 @@ func (EasyBackfill) AddWorkers(workers *river.Workers, cfg rest.Config) error {
if err != nil {
return err
}

// These workers can be run concurrently (>1 worker)
ungateWorker, err := work.NewUngateWorker(cfg)
if err != nil {
return err
}
river.AddWorker(workers, jobWorker)
river.AddWorker(workers, cleanupWorker)
river.AddWorker(workers, reservationWorker)
river.AddWorker(workers, ungateWorker)
return nil
}

60 changes: 30 additions & 30 deletions pkg/fluxqueue/strategy/workers/job.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/converged-computing/fluxion/pkg/client"
@@ -123,7 +124,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
wlog.Info("Fluxion allocation response", "Nodes", nodes)

// Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler
err = w.releaseJob(job.Args, fluxID, nodes)
err = w.releaseJob(ctx, job.Args, fluxID, nodes)
if err != nil {
return err
}
@@ -132,7 +133,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}

// Release job will unsuspend a job or ungate pods to allow for scheduling
func (w JobWorker) releaseJob(args JobArgs, fluxID int64, nodes []string) error {
func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string) error {
var err error

if args.Type == api.JobWrappedJob.String() {
@@ -145,15 +146,8 @@ func (w JobWorker) releaseJob(args JobArgs, fluxID int64, nodes []string) error
}
wlog.Info("Success unsuspending job", "Namespace", args.Namespace, "Name", args.Name)

} else if args.Type == api.JobWrappedPod.String() {

// Pod Type
err = w.ungatePod(args.Namespace, args.Name, nodes, fluxID)
if err != nil {
wlog.Info("Error ungating pod", "Namespace", args.Namespace, "Name", args.Name, "Error", err)
return err
}
wlog.Info("Success ungating pod", "Namespace", args.Namespace, "Name", args.Name)
} else if args.Type == api.JobWrappedDeployment.String() || args.Type == api.JobWrappedPod.String() {
w.ungatePod(ctx, args.Namespace, args.Name, args.Type, nodes, fluxID)

} else {

@@ -274,32 +268,38 @@ func (w JobWorker) rejectJob(namespace, name string) error {
return patchUnsuspend(ctx, client, name, namespace)
}

// Ungate the pod, adding an annotation for nodes along with the fluxion scheduler
func (w JobWorker) ungatePod(namespace, name string, nodes []string, fluxId int64) error {
ctx := context.Background()

// Get the pod to update
client, err := kubernetes.NewForConfig(&w.RESTConfig)
if err != nil {
return err
// ungatePod submits jobs to ungate. We do this because Kubernetes isn't always reliable
// to get pods that we need via the API, or operations to patch, etc.
func (w JobWorker) ungatePod(
ctx context.Context,
namespace, name, jobType string,
nodes []string,
fluxId int64,
) error {

// Create a job to ungate the deployment pods
riverClient := river.ClientFromContext[pgx.Tx](ctx)
insertOpts := river.InsertOpts{
Tags: []string{"ungate"},
Queue: "task_queue",
}
// Convert jobid to string
jobid := fmt.Sprintf("%d", fluxId)

// Patch the pod to add the nodes
nodesStr := strings.Join(nodes, "__")
payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + nodesStr + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}`
fmt.Println(payload)
_, err = client.CoreV1().Pods(namespace).Patch(ctx, name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{})
ungateArgs := UngateArgs{
Name: name,
Namespace: namespace,
Nodes: nodes,
JobID: fluxId,
Type: jobType,
}
_, err := riverClient.Insert(ctx, ungateArgs, &insertOpts)
if err != nil {
return err
wlog.Info("Error inserting ungate job", "Namespace", namespace, "Name", name, "Error", err)
}
return removeGate(ctx, client, namespace, name)
return err
}

// removeGate removes the scheduling gate from the pod
func removeGate(ctx context.Context, client *kubernetes.Clientset, namespace, name string) error {
pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
pod, err := client.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
128 changes: 128 additions & 0 deletions pkg/fluxqueue/strategy/workers/ungate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package workers

import (
"context"
"fmt"
"strings"

api "github.com/converged-computing/fluxqueue/api/v1alpha1"
"github.com/converged-computing/fluxqueue/pkg/defaults"
"github.com/riverqueue/river"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchTypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// Ungate workers explicitly ungate pods, and add node labels
func (args UngateArgs) Kind() string { return "ungate" }

type UngateWorker struct {
river.WorkerDefaults[UngateArgs]
RESTConfig rest.Config
}

// NewJobWorker returns a new job worker with a Fluxion client
func NewUngateWorker(cfg rest.Config) (*UngateWorker, error) {
worker := UngateWorker{RESTConfig: cfg}
return &worker, nil
}

// JobArgs serializes a postgres row back into fields for the FluxJob
// We add extra fields to anticipate getting node assignments
type UngateArgs struct {
Name string `json:"name"`
Type string `json:"type"`
Namespace string `json:"namespace"`
Nodes []string `json:"nodes"`
JobID int64 `json:"jobid"`
}

// Ungate a specific pod for a group (e.g., deployment)
// Right now we aren't using this for single pods (but can/will)
func (w UngateWorker) Work(ctx context.Context, job *river.Job[UngateArgs]) error {

var err error
wlog.Info("Running ungate worker", "Namespace", job.Args.Namespace, "Name", job.Args.Name)
jobid := fmt.Sprintf("%d", job.Args.JobID)

client, err := kubernetes.NewForConfig(&w.RESTConfig)
if err != nil {
wlog.Info("Error getting Kubernetes client", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err)
return err
}

// Ungate single pod (should only be one)
if job.Args.Type == api.JobWrappedPod.String() {
nodesStr := strings.Join(job.Args.Nodes, "__")
payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + nodesStr + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}`
_, err = client.CoreV1().Pods(job.Args.Namespace).Patch(ctx, job.Args.Name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{})
if err != nil {
return err
}
err = removeGate(ctx, client, job.Args.Namespace, job.Args.Name)
if err != nil {
wlog.Info("Error in removing single pod", "Error", err)
return err
}
}

// For a deployment, we need to get the pods based on a selector
if job.Args.Type == api.JobWrappedDeployment.String() {
selector := fmt.Sprintf("%s=deployment-%s-%s", defaults.SelectorLabel, job.Args.Name, job.Args.Namespace)

// 4. Get pods in the default namespace
pods, err := client.CoreV1().Pods(job.Args.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector,
})
wlog.Info("Selector returned pods for nodes", "Pods", len(pods.Items), "Nodes", len(job.Args.Nodes))

if err != nil {
wlog.Info("Error listing pods in ungate worker", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err)
return err
}
// Ungate as many as we are able
for i, pod := range pods.Items {

// This shouldn't happen
if i >= len(pods.Items) {
wlog.Info("Warning - we have more pods than nodes")
break
}

// We should not try to ungate (and assign a node) to a pod that
// already has been ungated
ungated := true
if pod.Spec.SchedulingGates != nil {
for _, gate := range pod.Spec.SchedulingGates {
if gate.Name == defaults.SchedulingGateName {
ungated = false
break
}
}
}
if ungated {
continue
}
payload := `{"metadata": {"labels": {"` + defaults.NodesLabel + `": "` + job.Args.Nodes[i] + `", "` + defaults.FluxJobIdLabel + `": "` + jobid + `"}}}`
_, err = client.CoreV1().Pods(job.Args.Namespace).Patch(ctx, pod.ObjectMeta.Name, patchTypes.MergePatchType, []byte(payload), metav1.PatchOptions{})
if err != nil {
wlog.Info("Error in patching deployment pod", "Error", err)
return err
}
err = removeGate(ctx, client, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
if err != nil {
wlog.Info("Error in removing deployment pod gate", "Error", err)
return err
}
}

// Kubernetes has not created the pod objects yet
// Returning an error will have it run again, with a delay
// https://riverqueue.com/docs/job-retries
if len(pods.Items) < len(job.Args.Nodes) {
return fmt.Errorf("ungate pods job did not have all pods")
}
}
return err
}

0 comments on commit 35d49a9

Please sign in to comment.